View Javadoc
1   //******************************************************************************
2   //
3   // File:    NonPjJobFrontend.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.NonPjJobFrontend
6   //
7   // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import java.io.File;
43  import java.io.IOException;
44  import java.net.InetSocketAddress;
45  import java.util.Collections;
46  import java.util.LinkedList;
47  import java.util.List;
48  
49  import edu.rit.mp.Channel;
50  import edu.rit.mp.ChannelGroup;
51  import edu.rit.mp.ChannelGroupClosedException;
52  import edu.rit.mp.ObjectBuf;
53  import edu.rit.mp.Status;
54  import edu.rit.mp.buf.ObjectItemBuf;
55  import edu.rit.pj.PJProperties;
56  import edu.rit.util.Timer;
57  import edu.rit.util.TimerTask;
58  import edu.rit.util.TimerThread;
59  
60  /**
61   * Class NonPjJobFrontend provides the message handler for a job frontend
62   * process that communicates with the Job Scheduler to allocate backend nodes
63   * but does not run a PJ program.
64   *
65   * @author Alan Kaminsky
66   * @version 20-Jun-2012
67   */
68  public class NonPjJobFrontend
69          implements Runnable, JobFrontendRef {
70  
71  // Hidden data members.
72      // User name.
73      private String username;
74  
75      // Job number.
76      private int jobnum;
77  
78      // Job resources.
79      private int Np;
80  
81      // Timer thread for lease renewals and expirations.
82      private TimerThread myLeaseTimerThread;
83  
84      // Timers for the lease with the Job Scheduler.
85      private Timer mySchedulerRenewTimer;
86      private Timer mySchedulerExpireTimer;
87  
88      // Timer for the job timeout if any.
89      private Timer myJobTimer;
90  
91      // Middleware channel group.
92      private ChannelGroup myMiddlewareChannelGroup;
93  
94      // Proxy for Job Scheduler Daemon.
95      private JobSchedulerRef myJobScheduler;
96  
97      // Flag for shutting down the run() method.
98      private boolean continueRun = true;
99  
100     // State of this job frontend.
101     private State myState = State.RUNNING;
102 
103     private static enum State {
104 
105         RUNNING,
106         TERMINATE_CANCEL_JOB,
107         TERMINATING
108     };
109 
110     // Error message if job canceled, or null if job finished normally.
111     private String myCancelMessage = "User canceled job";
112 
113     // List of backend names assigned to the job.
114     private LinkedList<String> myBackendNames = new LinkedList<String>();
115 
116 // Exported constructors.
117     /**
118      * Construct a new non-PJ job frontend object. The job frontend object will
119      * contact the Job Scheduler Daemon specified by the <code>"pj.host"</code> and
120      * <code>"pj.port"</code> Java system properties. See class {@linkplain
121      * edu.rit.pj.PJProperties} for further information.
122      * <P>
123      * The non-PJ job frontend object will ask the Job Scheduler Daemon to run
124      * one process per node and to use all CPUs on each node. Other
125      * possibilities are not supported.
126      *
127      * @param username User name.
128      * @param Np Number of processes (&gt;= 1).
129      * @exception JobSchedulerException (subclass of IOException) Thrown if the
130      * job frontend object could not contact the Job Scheduler Daemon.
131      * @exception IOException Thrown if an I/O error occurred.
132      * @throws java.io.IOException if any.
133      */
134     public NonPjJobFrontend(String username,
135             int Np)
136             throws IOException {
137         // Record arguments.
138         this.username = username;
139         this.Np = Np;
140 
141         // Set up shutdown hook.
142         Runtime.getRuntime().addShutdownHook(new Thread() {
143             public void run() {
144                 shutdown();
145             }
146         });
147 
148         // Set up lease timer thread.
149         myLeaseTimerThread = new TimerThread();
150         myLeaseTimerThread.setDaemon(true);
151         myLeaseTimerThread.start();
152 
153         // Set up Job Scheduler lease timers.
154         mySchedulerRenewTimer
155                 = myLeaseTimerThread.createTimer(new TimerTask() {
156                     public void action(Timer timer) {
157                         try {
158                             schedulerRenewTimeout();
159                         } catch (Throwable ignored) {
160                         }
161                     }
162                 });
163         mySchedulerExpireTimer
164                 = myLeaseTimerThread.createTimer(new TimerTask() {
165                     public void action(Timer timer) {
166                         try {
167                             schedulerExpireTimeout();
168                         } catch (Throwable ignored) {
169                         }
170                     }
171                 });
172 
173         // Set up job timer.
174         myJobTimer
175                 = myLeaseTimerThread.createTimer(new TimerTask() {
176                     public void action(Timer timer) {
177                         try {
178                             jobTimeout();
179                         } catch (Throwable ignored) {
180                         }
181                     }
182                 });
183 
184         // Set up middleware channel group.
185         myMiddlewareChannelGroup = new ChannelGroup();
186 
187         // Set up Job Scheduler proxy.
188         InetSocketAddress js_address = null;
189         Channel js_channel = null;
190         try {
191             js_address
192                     = new InetSocketAddress(PJProperties.getPjHost(),
193                             PJProperties.getPjPort());
194             js_channel = myMiddlewareChannelGroup.connect(js_address);
195         } catch (IOException exc) {
196             throw new JobSchedulerException("JobFrontend(): Cannot contact Job Scheduler Daemon at "
197                     + js_address,
198                     exc);
199         }
200         myJobScheduler
201                 = new JobSchedulerProxy(myMiddlewareChannelGroup, js_channel);
202 
203         // Start Job Scheduler lease timers.
204         mySchedulerRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
205                 Constants.LEASE_RENEW_INTERVAL);
206         mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
207 
208         // Kick off the job!
209         myJobScheduler.requestJob(this, username, Np, Np, 0);
210     }
211 
212 // Exported operations.
213     /**
214      * Obtain the job number assigned to this Non-PJ Job Frontend. This method
215      * blocks until the requested number of backend processes (a constructor
216      * parameter) have been assigned.
217      *
218      * @return Job number.
219      * @exception InterruptedException Thrown if the calling thread is
220      * interrupted while blocked in this method.
221      * @throws java.lang.InterruptedException if any.
222      */
223     public synchronized int getJobNumber()
224             throws InterruptedException {
225         while (myBackendNames.size() < Np) {
226             wait();
227         }
228         return jobnum;
229     }
230 
231     /**
232      * Obtain a list of the backend names assigned to this Non-PJ Job Frontend.
233      * This method blocks until the requested number of backend processes (a
234      * constructor parameter) have been assigned. The returned list is
235      * unmodifiable.
236      *
237      * @return List of backend names.
238      * @exception InterruptedException Thrown if the calling thread is
239      * interrupted while blocked in this method.
240      * @throws java.lang.InterruptedException if any.
241      */
242     public synchronized List<String> getBackendNames()
243             throws InterruptedException {
244         while (myBackendNames.size() < Np) {
245             wait();
246         }
247         return Collections.unmodifiableList(myBackendNames);
248     }
249 
250     /**
251      * Run this Non-PJ Job Frontend.
252      */
253     public void run() {
254         ObjectItemBuf<JobFrontendMessage> buf
255                 = ObjectBuf.buffer((JobFrontendMessage) null);
256         Status status = null;
257         JobFrontendMessage message = null;
258 
259         try {
260             while (continueRun) {
261                 // Receive a message from any channel.
262                 status = myMiddlewareChannelGroup.receive(null, null, buf);
263                 message = buf.item;
264 
265                 // Process a message from the Job Scheduler.
266                 if (status.tag == Message.FROM_JOB_SCHEDULER) {
267                     message.invoke(this, myJobScheduler);
268                 }
269 
270                 // Enable garbage collection of no-longer-needed objects while
271                 // waiting to receive next message.
272                 buf.item = null;
273                 status = null;
274                 message = null;
275             }
276         } catch (ChannelGroupClosedException ignored) {
277         } catch (Throwable exc) {
278             terminateCancelJob(exc);
279         }
280 
281         // Exit process if necessary.
282         switch (myState) {
283             case TERMINATE_CANCEL_JOB:
284                 System.exit(1);
285                 break;
286             case RUNNING:
287             case TERMINATING:
288                 break;
289         }
290     }
291 
292     /**
293      * Terminate this Non-PJ Job Frontend immediately, sending a "job finished"
294      * message to the Job Scheduler. This method must only be called by a thread
295      * other than the thread calling <code>run()</code>. This method calls
296      * <code>System.exit(status)</code> to terminate the process.
297      *
298      * @param status Status value for <code>System.exit()</code>.
299      */
300     public void terminateJobFinished(int status) {
301         boolean doExit = false;
302         synchronized (this) {
303             continueRun = false;
304             if (myState == State.RUNNING) {
305                 myCancelMessage = null;
306                 doExit = true;
307             }
308         }
309 
310         // Cannot hold the synchronization lock while calling System.exit(),
311         // otherwise a deadlock can occur between this thread and the shutdown
312         // hook thread.
313         if (doExit) {
314             System.exit(status);
315         }
316     }
317 
318     /**
319      * {@inheritDoc}
320      *
321      * Assign a backend process to the job.
322      * @exception IOException Thrown if an I/O error occurred.
323      */
324     public synchronized void assignBackend(JobSchedulerRef theJobScheduler,
325             String name,
326             String host,
327             String jvm,
328             String classpath,
329             String[] jvmflags,
330             String shellCommand,
331             int Nt)
332             throws IOException {
333         // Record backend name.
334         myBackendNames.add(name);
335 
336         // If all backends have been assigned, start job timer.
337         if (myBackendNames.size() == Np) {
338             int jobtime = PJProperties.getPjJobTime();
339             if (jobtime > 0) {
340                 myJobTimer.start(jobtime * 1000L);
341             }
342         }
343 
344         notifyAll();
345     }
346 
347     /**
348      * {@inheritDoc}
349      *
350      * Assign a job number to the job. The host name for the job frontend's
351      * middleware channel group is also specified.
352      * @exception IOException Thrown if an I/O error occurred.
353      */
354     public synchronized void assignJobNumber(JobSchedulerRef theJobScheduler,
355             int jobnum,
356             String pjhost)
357             throws IOException {
358         // Record job number.
359         this.jobnum = jobnum;
360         notifyAll();
361     }
362 
363     /**
364      * {@inheritDoc}
365      *
366      * Cancel the job.
367      * @exception IOException Thrown if an I/O error occurred.
368      */
369     public synchronized void cancelJob(JobSchedulerRef theJobScheduler,
370             String errmsg)
371             throws IOException {
372         terminateCancelJob(errmsg);
373     }
374 
375     /**
376      * Renew the lease on the job.
377      *
378      * @param theJobScheduler Job Scheduler that is calling this method.
379      * @exception IOException Thrown if an I/O error occurred.
380      * @throws java.io.IOException if any.
381      */
382     public synchronized void renewLease(JobSchedulerRef theJobScheduler)
383             throws IOException {
384         mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
385     }
386 
387     /**
388      * {@inheritDoc}
389      *
390      * Report that a backend process has finished executing the job.
391      * @exception IOException Thrown if an I/O error occurred.
392      */
393     public synchronized void backendFinished(JobBackendRef theJobBackend)
394             throws IOException {
395     }
396 
397     /**
398      * {@inheritDoc}
399      *
400      * Report that a backend process is ready to commence executing the job.
401      * @exception IOException Thrown if an I/O error occurred.
402      */
403     public synchronized void backendReady(JobBackendRef theJobBackend,
404             int rank,
405             InetSocketAddress middlewareAddress,
406             InetSocketAddress worldAddress,
407             InetSocketAddress frontendAddress)
408             throws IOException {
409     }
410 
411     /**
412      * {@inheritDoc}
413      *
414      * Cancel the job.
415      * @exception IOException Thrown if an I/O error occurred.
416      */
417     public synchronized void cancelJob(JobBackendRef theJobBackend,
418             String errmsg)
419             throws IOException {
420     }
421 
422     /**
423      * {@inheritDoc}
424      *
425      * Renew the lease on the job.
426      * @exception IOException Thrown if an I/O error occurred.
427      */
428     public synchronized void renewLease(JobBackendRef theJobBackend)
429             throws IOException {
430     }
431 
432     /**
433      * {@inheritDoc}
434      *
435      * Request the given resource from this job frontend's class loader.
436      * @exception IOException Thrown if an I/O error occurred.
437      */
438     public synchronized void requestResource(JobBackendRef theJobBackend,
439             String resourceName)
440             throws IOException {
441     }
442 
443     /**
444      * {@inheritDoc}
445      *
446      * Open the given output file for writing or appending.
447      * @exception IOException Thrown if an I/O error occurred.
448      */
449     public synchronized void outputFileOpen(JobBackendRef theJobBackend,
450             int bfd,
451             File file,
452             boolean append)
453             throws IOException {
454     }
455 
456     /**
457      * {@inheritDoc}
458      *
459      * Write the given bytes to the given output file. <code>ffd</code> = 1 refers
460      * to the job's standard output stream; <code>ffd</code> = 2 refers to the job's
461      * standard error stream; other values refer to a previously opened file.
462      * @exception IOException Thrown if an I/O error occurred.
463      */
464     public synchronized void outputFileWrite(JobBackendRef theJobBackend,
465             int ffd,
466             byte[] buf,
467             int off,
468             int len)
469             throws IOException {
470     }
471 
472     /**
473      * {@inheritDoc}
474      *
475      * Flush accumulated bytes to the given output file.
476      * @exception IOException Thrown if an I/O error occurred.
477      */
478     public synchronized void outputFileFlush(JobBackendRef theJobBackend,
479             int ffd)
480             throws IOException {
481     }
482 
483     /**
484      * {@inheritDoc}
485      *
486      * Close the given output file.
487      * @exception IOException Thrown if an I/O error occurred.
488      */
489     public synchronized void outputFileClose(JobBackendRef theJobBackend,
490             int ffd)
491             throws IOException {
492     }
493 
494     /**
495      * {@inheritDoc}
496      *
497      * Open the given input file for reading.
498      * @exception IOException Thrown if an I/O error occurred.
499      */
500     public synchronized void inputFileOpen(JobBackendRef theJobBackend,
501             int bfd,
502             File file)
503             throws IOException {
504     }
505 
506     /**
507      * {@inheritDoc}
508      *
509      * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
510      * job's standard input stream; other values refer to a previously opened
511      * file.
512      * @exception IOException Thrown if an I/O error occurred.
513      */
514     public synchronized void inputFileRead(JobBackendRef theJobBackend,
515             int ffd,
516             int len)
517             throws IOException {
518     }
519 
520     /**
521      * {@inheritDoc}
522      *
523      * Skip bytes from the given input file.
524      * @exception IOException Thrown if an I/O error occurred.
525      */
526     public synchronized void inputFileSkip(JobBackendRef theJobBackend,
527             int ffd,
528             long len)
529             throws IOException {
530     }
531 
532     /**
533      * {@inheritDoc}
534      *
535      * Close the given input file.
536      * @exception IOException Thrown if an I/O error occurred.
537      */
538     public synchronized void inputFileClose(JobBackendRef theJobBackend,
539             int ffd)
540             throws IOException {
541     }
542 
543     /**
544      * {@inheritDoc}
545      *
546      * Report a comment for a process.
547      * @exception IOException Thrown if an I/O error occurred.
548      */
549     public synchronized void reportComment(JobBackendRef theJobBackend,
550             int rank,
551             String comment)
552             throws IOException {
553         myJobScheduler.reportComment(this, rank, comment);
554     }
555 
556     /**
557      * Close communication with this Job Frontend.
558      */
559     public void close() {
560     }
561 
562 // Hidden operations.
563     /**
564      * Take action when the Job Scheduler's lease renewal timer times out.
565      *
566      * @exception IOException Thrown if an I/O error occurred.
567      */
568     private synchronized void schedulerRenewTimeout()
569             throws IOException {
570         if (mySchedulerRenewTimer.isTriggered()) {
571             myJobScheduler.renewLease(this);
572         }
573     }
574 
575     /**
576      * Take action when the Job Scheduler's lease expiration timer times out.
577      *
578      * @exception IOException Thrown if an I/O error occurred.
579      */
580     private void schedulerExpireTimeout()
581             throws IOException {
582         boolean doExit = false;
583         synchronized (this) {
584             if (mySchedulerExpireTimer.isTriggered()) {
585                 continueRun = false;
586                 if (myState == State.RUNNING) {
587                     myState = State.TERMINATE_CANCEL_JOB;
588                     myCancelMessage = "Job Scheduler failed";
589                     System.err.println(myCancelMessage);
590                     doExit = true;
591                 }
592             }
593         }
594 
595         // Cannot hold the synchronization lock while calling System.exit(),
596         // otherwise a deadlock can occur between this thread (the timer thread)
597         // and the shutdown hook thread.
598         if (doExit) {
599             System.exit(1);
600         }
601     }
602 
603     /**
604      * Take action when the job timer times out.
605      *
606      * @exception IOException Thrown if an I/O error occurred.
607      */
608     private void jobTimeout()
609             throws IOException {
610         boolean doExit = false;
611         synchronized (this) {
612             if (myJobTimer.isTriggered()) {
613                 continueRun = false;
614                 if (myState == State.RUNNING) {
615                     myState = State.TERMINATE_CANCEL_JOB;
616                     myCancelMessage = "Job exceeded maximum running time";
617                     System.err.println(myCancelMessage);
618                     doExit = true;
619                 }
620             }
621         }
622 
623         // Cannot hold the synchronization lock while calling System.exit(),
624         // otherwise a deadlock can occur between this thread (the timer thread)
625         // and the shutdown hook thread.
626         if (doExit) {
627             System.exit(1);
628         }
629     }
630 
631     /**
632      * Terminate this Job Frontend immediately, sending a "cancel job" message
633      * to the Job Scheduler. The error message is <code>msg</code>. This method must
634      * only be called by the thread calling <code>run()</code>.
635      *
636      * @param msg Error message.
637      */
638     private void terminateCancelJob(String msg) {
639         continueRun = false;
640         if (myState == State.RUNNING) {
641             myState = State.TERMINATE_CANCEL_JOB;
642             myCancelMessage = msg;
643             System.err.println(myCancelMessage);
644         }
645     }
646 
647     /**
648      * Terminate this Job Frontend immediately, sending a "cancel job" message
649      * to the Job Scheduler. The error message comes from the given exception.
650      * This method must only be called by the thread calling <code>run()</code>.
651      *
652      * @param exc Exception.
653      */
654     private void terminateCancelJob(Throwable exc) {
655         continueRun = false;
656         if (myState == State.RUNNING) {
657             myCancelMessage = exc.getClass().getName();
658             String msg = exc.getMessage();
659             if (msg != null) {
660                 myCancelMessage = myCancelMessage + ": " + msg;
661             }
662             System.err.println(myCancelMessage);
663             exc.printStackTrace(System.err);
664         }
665     }
666 
667     /**
668      * Terminate this Job Frontend immediately, sending a "cancel job" message
669      * to the Job Scheduler. The error message comes from the given exception.
670      * This method must only be called by a thread other than the thread calling
671      * <code>run()</code>.
672      *
673      * @param exc Exception.
674      */
675     void terminateCancelJobOther(Throwable exc) {
676         boolean doExit = false;
677         synchronized (this) {
678             continueRun = false;
679             if (myState == State.RUNNING) {
680                 myCancelMessage = exc.getClass().getName();
681                 String msg = exc.getMessage();
682                 if (msg != null) {
683                     myCancelMessage = myCancelMessage + ": " + msg;
684                 }
685                 System.err.println(myCancelMessage);
686                 exc.printStackTrace(System.err);
687                 doExit = true;
688             }
689         }
690 
691         // Cannot hold the synchronization lock while calling System.exit(),
692         // otherwise a deadlock can occur between this thread and the shutdown
693         // hook thread.
694         if (doExit) {
695             System.exit(1);
696         }
697     }
698 
699     /**
700      * Shut down this Job Frontend.
701      */
702     private void shutdown() {
703         synchronized (this) {
704             // Stop all lease timers.
705             mySchedulerRenewTimer.stop();
706             mySchedulerExpireTimer.stop();
707 
708             // If state is RUNNING but myCancelMessage is not null, it means the
709             // user canceled the job (e.g., by hitting CTRL-C).
710             if (myState == State.RUNNING && myCancelMessage != null) {
711                 myState = State.TERMINATE_CANCEL_JOB;
712             }
713 
714             // Inform Job Scheduler.
715             switch (myState) {
716                 case RUNNING:
717                     // Send "job finished" message.
718                     if (myJobScheduler != null) {
719                         try {
720                             myJobScheduler.jobFinished(this);
721                         } catch (IOException ignored) {
722                         }
723                     }
724                     break;
725                 case TERMINATE_CANCEL_JOB:
726                     // Send "cancel job" message.
727                     if (myJobScheduler != null) {
728                         try {
729                             myJobScheduler.cancelJob(this, myCancelMessage);
730                         } catch (IOException ignored) {
731                         }
732                     }
733                     break;
734                 case TERMINATING:
735                     // Send nothing.
736                     break;
737             }
738 
739             // Record that we are terminating.
740             myState = State.TERMINATING;
741         }
742 
743         // All proxies, channels, and channel groups will close when the process
744         // exits.
745     }
746 
747 }