View Javadoc
1   //******************************************************************************
2   //
3   // File:    JobBackend.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.JobBackend
6   //
7   // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import static java.lang.String.format;
43  
44  import java.io.File;
45  import java.io.IOException;
46  import java.io.PrintStream;
47  import java.lang.reflect.Method;
48  import java.net.InetSocketAddress;
49  import java.util.Map;
50  import java.util.Properties;
51  import java.util.concurrent.CountDownLatch;
52  import java.util.logging.FileHandler;
53  import java.util.logging.Handler;
54  import java.util.logging.Level;
55  import java.util.logging.LogManager;
56  import java.util.logging.Logger;
57  import java.util.logging.SimpleFormatter;
58  
59  import edu.rit.mp.ChannelGroup;
60  import edu.rit.mp.ChannelGroupClosedException;
61  import edu.rit.mp.ObjectBuf;
62  import edu.rit.mp.Status;
63  import edu.rit.mp.buf.ObjectItemBuf;
64  import edu.rit.util.ByteSequence;
65  import edu.rit.util.Timer;
66  import edu.rit.util.TimerThread;
67  
68  /**
69   * Class JobBackend is the main program for a job backend process in the PJ
70   * cluster middleware. The job backend process is launched by an SSH remote
71   * login from the job frontend process (class {@linkplain JobFrontend}).
72   * <p>
73   * The command line for the job backend main program is:
74   * <p>
75   * java edu.rit.pj.cluster.JobBackend <I>username</I> <I>jobnum</I> <I>K</I>
76   * <I>rank</I> <I>hasFrontendComm</I> <I>frontendHost</I> <I>frontendPort</I>
77   * <I>backendHost</I>
78   * <BR><I>username</I> = User name
79   * <BR><I>jobnum</I> = Job number
80   * <BR><I>K</I> = Number of backend processes (&gt;= 1)
81   * <BR><I>rank</I> = Rank of this backend process (0 .. <I>K</I>-1)
82   * <BR><I>hasFrontendComm</I> = Whether the frontend communicator exists
83   * (<code>true</code> or <code>false</code>)
84   * <BR><I>frontendHost</I> = Job frontend's middleware channel group host name
85   * <BR><I>frontendPort</I> = Job frontend's middleware channel group port number
86   * <BR><I>backendHost</I> = Job backend's middleware channel group host name
87   *
88   * @author Alan Kaminsky
89   * @version 24-Jan-2012
90   */
91  public class JobBackend
92          implements Runnable, JobBackendRef {
93  
94      // Logger
95      private static final Logger logger = Logger.getLogger(JobBackend.class.getName());
96  
97      // Hidden class-wide data members.
98      private static JobBackend theJobBackend;
99  
100     // Hidden data members.
101     // Command line arguments.
102     private final String username;
103     private final int jobnum;
104     private final int K;
105     private final int rank;
106     private final boolean hasFrontendComm;
107     private final String frontendHost;
108     private final int frontendPort;
109     private final String backendHost;
110 
111     // Logging
112     FileHandler fileHandler = null;
113 
114     // Timer thread for lease renewals and expirations.
115     private final TimerThread myLeaseTimerThread;
116 
117     // Timers for the lease with the job frontend.
118     private final Timer myFrontendRenewTimer;
119     private final Timer myFrontendExpireTimer;
120 
121     // Middleware channel group and address array.
122     private final ChannelGroup myMiddlewareChannelGroup;
123     private InetSocketAddress[] myMiddlewareAddress;
124 
125     // Job frontend proxy.
126     private final JobFrontendRef myJobFrontend;
127 
128     // For loading classes from the job frontend process.
129     private final ResourceCache myResourceCache;
130     private final BackendClassLoader myClassLoader;
131 
132     // World communicator channel group and address array.
133     private final ChannelGroup myWorldChannelGroup;
134     private InetSocketAddress[] myWorldAddress;
135 
136     // Frontend communicator channel group and address array.
137     private ChannelGroup myFrontendChannelGroup;
138     private InetSocketAddress[] myFrontendAddress;
139 
140     // Java system properties.
141     private Properties myProperties;
142 
143     // Main class name.
144     private String myMainClassName;
145 
146     // Command line arguments.
147     private String[] myArgs;
148 
149     // Flag set true to commence job.
150     private boolean commence;
151 
152     // Buffer for receiving job backend messages.
153     private final ObjectItemBuf<JobBackendMessage> myBuffer
154             = ObjectBuf.buffer((JobBackendMessage) null);
155 
156     // Flags for shutting down the run() method.
157     private boolean continueRun = true;
158     private final CountDownLatch runFinished = new CountDownLatch(1);
159 
160     // State of this job backend.
161     private State myState = State.RUNNING;
162 
163     private static enum State {
164         RUNNING,
165         TERMINATE_CANCEL_JOB,
166         TERMINATE_NO_REPORT,
167         TERMINATING
168     }
169 
170     // Error message if job canceled, or null if job finished normally.
171     private String myCancelMessage;
172 
173     // Original standard error stream; goes to the Job Launcher's log file.
174     private final PrintStream myJobLauncherLog;
175 
176     // For writing and reading files in the job frontend.
177     private final BackendFileWriter myFileWriter;
178     private final BackendFileReader myFileReader;
179 
180 // Hidden constructors.
181 
182     /**
183      * Construct a new Job Backend.
184      *
185      * @param username        User name.
186      * @param jobnum          Job number.
187      * @param K               Number of backend processes.
188      * @param rank            Rank of this backend process.
189      * @param hasFrontendComm Whether the frontend communicator exists.
190      * @param frontendHost    Host name of job frontend's middleware channel group.
191      * @param frontendPort    Port number of job frontend's middleware channel group.
192      * @param backendHost     Host name of job backend's middleware channel group.
193      * @throws IOException Thrown if an I/O error occurred.
194      */
195     private JobBackend(String username,
196                        int jobnum,
197                        int K,
198                        int rank,
199                        boolean hasFrontendComm,
200                        String frontendHost,
201                        int frontendPort,
202                        String backendHost)
203             throws IOException {
204 
205         // Record command line arguments.
206         this.username = username;
207         this.jobnum = jobnum;
208         this.K = K;
209         this.rank = rank;
210         this.hasFrontendComm = hasFrontendComm;
211         this.frontendHost = frontendHost;
212         this.frontendPort = frontendPort;
213         this.backendHost = backendHost;
214 
215         // Turn on verbose start-up logging.
216         StringBuilder sb = new StringBuilder();
217         boolean verbose = Boolean.parseBoolean(System.getProperty("pj.verbose", "false"));
218         if (verbose) {
219             try {
220                 // Remove all log handlers from the default logger.
221                 Logger defaultLogger = LogManager.getLogManager().getLogger("");
222                 Handler[] defaultHandlers = defaultLogger.getHandlers();
223                 for (Handler h : defaultHandlers) {
224                     defaultLogger.removeHandler(h);
225                 }
226 
227                 // Create a FileHandler that logs messages with a SimpleFormatter.
228                 File dir = new File(Integer.toString(this.rank));
229                 if (!dir.exists()) {
230                     boolean success = dir.mkdir();
231                     sb.append(format("\n Creation of logging directory %s: %b", dir, success));
232                 }
233 
234                 String logFile = dir.getAbsolutePath() + File.separator + "backend.log";
235                 fileHandler = new FileHandler(logFile);
236                 sb.append(format("\n Log file: %s", logFile));
237                 fileHandler.setFormatter(new SimpleFormatter());
238                 logger.addHandler(fileHandler);
239                 logger.setLevel(Level.INFO);
240             } catch (Exception e) {
241                 logger.setLevel(Level.OFF);
242             }
243         } else {
244             logger.setLevel(Level.OFF);
245         }
246 
247         // Note that logging is OFF if the "pj.verbose" property was not true.
248         sb.append(format("\n Username:          %s", this.username));
249         sb.append(format("\n Job number:        %d", this.jobnum));
250         sb.append(format("\n Nodes:             %d", this.K));
251         sb.append(format("\n Rank:              %d", this.rank));
252         sb.append(format("\n Has Frontend Comm: %b", this.hasFrontendComm));
253         sb.append(format("\n Frontend Host:     %s", this.frontendHost));
254         sb.append(format("\n Frontend Port:     %d", this.frontendPort));
255         sb.append(format("\n Backend Host:      %s", this.backendHost));
256         logger.log(Level.INFO, sb.toString());
257 
258         // Set up shutdown hook.
259         Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
260 
261         // Set up lease timer thread.
262         logger.log(Level.INFO, " Set up lease timer thread.");
263         myLeaseTimerThread = new TimerThread();
264         myLeaseTimerThread.setDaemon(true);
265         myLeaseTimerThread.start();
266 
267         // Set up job frontend lease timers.
268         logger.log(Level.INFO, format(" Create frontend renew timer (%d sec)." , Constants.LEASE_RENEW_INTERVAL / 1000));
269         myFrontendRenewTimer = myLeaseTimerThread.createTimer(timer -> {
270                     try {
271                         frontendRenewTimeout();
272                     } catch (Throwable ignored) {
273                     }
274                 });
275         logger.log(Level.INFO, format(" Create frontend expire timer (%d sec)." , Constants.LEASE_EXPIRE_INTERVAL / 1000));
276         myFrontendExpireTimer = myLeaseTimerThread.createTimer(timer -> {
277                     try {
278                         frontendExpireTimeout();
279                     } catch (Throwable ignored) {
280                     }
281                 });
282 
283         // Start job frontend lease expiration timer regardless of whether the
284         // job frontend proxy gets set up.
285         myFrontendExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
286 
287         // Set up middleware channel group.
288         logger.log(Level.INFO, " Set up middleware channel group.");
289         myMiddlewareChannelGroup = new ChannelGroup(new InetSocketAddress(backendHost, 0));
290         myMiddlewareChannelGroup.startListening();
291 
292         // Set up job frontend proxy.
293         logger.log(Level.INFO, " Set up job frontend proxy.");
294         myJobFrontend = new JobFrontendProxy(myMiddlewareChannelGroup,
295                 myMiddlewareChannelGroup.connect(new InetSocketAddress(frontendHost, frontendPort)));
296 
297         // If we get here, the job frontend proxy has been set up.
298         logger.log(Level.INFO, " The job frontend proxy has been set up.");
299 
300         // Start job frontend lease renewal timer.
301         logger.log(Level.INFO, " Start frontend lease renewal timer.");
302         myFrontendRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
303                 Constants.LEASE_RENEW_INTERVAL);
304 
305         // Set up backend class loader.
306         myResourceCache = new ResourceCache();
307         myClassLoader = new BackendClassLoader(
308                 /*parent        */ getClass().getClassLoader(),
309                 /*theJobBackend */ this,
310                 /*theJobFrontend*/ myJobFrontend,
311                 /*theCache      */ myResourceCache);
312 
313         // Set up world communicator channel group.
314         logger.log(Level.INFO, " Set up world communicator channel group.");
315         myWorldChannelGroup= new ChannelGroup(new InetSocketAddress(backendHost, 0));
316         myWorldChannelGroup.setAlternateClassLoader(myClassLoader);
317 
318         // Set up frontend communicator channel group.
319         if (hasFrontendComm) {
320             logger.log(Level.INFO, " Set up frontend communicator channel group.");
321             myFrontendChannelGroup = new ChannelGroup(new InetSocketAddress(backendHost, 0));
322             myFrontendChannelGroup.setAlternateClassLoader(myClassLoader);
323         }
324 
325         // Set up backend file writer and reader.
326         logger.log(Level.INFO, " Set up backend file writer and reader.");
327         myFileWriter = new BackendFileWriter(myJobFrontend, this);
328         myFileReader = new BackendFileReader(myJobFrontend, this);
329 
330         // Redirect standard input, standard output, and standard error to job frontend.
331         logger.log(Level.INFO, " Redirect standard input, standard output, and standard error to job frontend.");
332         System.in.close();
333         System.out.close();
334         myJobLauncherLog = System.err;
335         System.setIn(myFileReader.in);
336         System.setOut(myFileWriter.out);
337         System.setErr(myFileWriter.err);
338 
339         // Tell job frontend we're ready!
340         logger.log(Level.INFO, " Tell job frontend we're ready.");
341         myJobFrontend.backendReady(
342                 /*theJobBackend    */ this,
343                 /*rank             */ rank,
344                 /*middlewareAddress*/ myMiddlewareChannelGroup.listenAddress(),
345                 /*worldAddress     */ myWorldChannelGroup.listenAddress(),
346                 /*frontendAddress  */ hasFrontendComm ? myFrontendChannelGroup.listenAddress() : null);
347     }
348 // Exported operations.
349 
350     /**
351      * Run this Job Backend.
352      */
353     public void run() {
354         Status status = null;
355         JobBackendMessage message = null;
356 
357         try {
358             while (continueRun) {
359                 // Receive a message from any channel.
360                 status
361                         = myMiddlewareChannelGroup.receive(null, null, myBuffer);
362                 message = myBuffer.item;
363 
364                 // Process message.
365                 message.invoke(this, myJobFrontend);
366 
367                 // Enable garbage collection of no-longer-needed objects while
368                 // waiting to receive next message.
369                 myBuffer.item = null;
370                 status = null;
371                 message = null;
372             }
373 
374             // Allow shutdown hook to proceed.
375             reportRunFinished();
376         } catch (ChannelGroupClosedException exc) {
377             // Allow shutdown hook to proceed.
378             reportRunFinished();
379         } catch (Throwable exc) {
380             // Allow shutdown hook to proceed.
381             reportRunFinished();
382             terminateCancelJob(exc);
383         }
384 
385         // Exit process if necessary.
386         switch (myState) {
387             case TERMINATE_CANCEL_JOB:
388             case TERMINATE_NO_REPORT:
389                 System.exit(1);
390                 break;
391             case RUNNING:
392             case TERMINATING:
393                 break;
394         }
395     }
396 
397     /**
398      * {@inheritDoc}
399      * <p>
400      * Cancel the job.
401      */
402     public synchronized void cancelJob(JobFrontendRef theJobFrontend,
403                                        String errmsg)
404             throws IOException {
405         terminateNoReport();
406     }
407 
408     /**
409      * Commence the job.
410      *
411      * @param theJobFrontend    Job Frontend that is calling this method.
412      * @param middlewareAddress Array of hosts/ports for middleware messages.
413      *                          The first <I>K</I>
414      *                          elements are for the job backend processes in rank order, the
415      *                          <I>K</I>+1st element is for the job frontend process. If the
416      * @param worldAddress      Array of hosts/ports for the world communicator. The
417      *                          <I>K</I>
418      *                          elements are for the job backend processes in rank order.
419      * @param frontendAddress   Array of hosts/ports for the frontend
420      *                          communicator. The first
421      *                          <I>K</I> elements are for the job backend processes in rank order, the
422      *                          <I>K</I>+1st element is for the job frontend process. If the frontend
423      *                          communicator does not exist, <code>frontendAddress</code> is null.
424      * @param properties        Java system properties.
425      * @param mainClassName     Fully qualified class name of the Java main program
426      *                          class to execute.
427      * @param args              Array of 0 or more Java command line arguments.
428      * @throws java.io.IOException         Thrown if an I/O error occurred.
429      * @throws java.io.IOException if any.
430      */
431     public synchronized void commenceJob(JobFrontendRef theJobFrontend,
432                                          InetSocketAddress[] middlewareAddress,
433                                          InetSocketAddress[] worldAddress,
434                                          InetSocketAddress[] frontendAddress,
435                                          Properties properties,
436                                          String mainClassName,
437                                          String[] args)
438             throws IOException {
439         // Record information.
440         myMiddlewareAddress = middlewareAddress;
441         myWorldAddress = worldAddress;
442         myFrontendAddress = frontendAddress;
443         myProperties = properties;
444         myMainClassName = mainClassName;
445         myArgs = args;
446 
447         // Notify main program to commence job.
448         commence = true;
449         notifyAll();
450     }
451 
452     /**
453      * {@inheritDoc}
454      * <p>
455      * Report that the job finished.
456      */
457     public synchronized void jobFinished(JobFrontendRef theJobFrontend)
458             throws IOException {
459         continueRun = false;
460     }
461 
462     /**
463      * {@inheritDoc}
464      * <p>
465      * Renew the lease on the job.
466      */
467     public synchronized void renewLease(JobFrontendRef theJobFrontend)
468             throws IOException {
469         myFrontendExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
470     }
471 
472     /**
473      * Report the content for a previously-requested resource.
474      *
475      * @param theJobFrontend Job Frontend that is calling this method.
476      * @param resourceName   Resource name.
477      * @param content        Resource content, or null if resource not found.
478      * @throws java.io.IOException         Thrown if an I/O error occurred.
479      * @throws java.io.IOException if any.
480      */
481     public synchronized void reportResource(JobFrontendRef theJobFrontend,
482                                             String resourceName,
483                                             byte[] content)
484             throws IOException {
485         myResourceCache.put(resourceName, content);
486     }
487 
488     /**
489      * {@inheritDoc}
490      * <p>
491      * Report the content for a previously-requested resource.
492      *
493      * @throws java.io.IOException Thrown if an I/O error occurred.
494      * @param theJobFrontend a {@link edu.rit.pj.cluster.JobFrontendRef} object
495      * @param resourceName a {@link java.lang.String} object
496      * @param content a {@link edu.rit.util.ByteSequence} object
497      */
498     public synchronized void reportResource(JobFrontendRef theJobFrontend,
499                                             String resourceName,
500                                             ByteSequence content)
501             throws IOException {
502         myResourceCache.put(resourceName,
503                 content == null ? null : content.toByteArray());
504     }
505 
506     /**
507      * {@inheritDoc}
508      * <p>
509      * Report the result of opening the given output file.
510      */
511     public synchronized void outputFileOpenResult(JobFrontendRef theJobFrontend,
512                                                   int bfd,
513                                                   int ffd,
514                                                   IOException exc)
515             throws IOException {
516         myFileWriter.outputFileOpenResult(theJobFrontend, bfd, ffd, exc);
517     }
518 
519     /**
520      * {@inheritDoc}
521      * <p>
522      * Report the result of writing the given output file.
523      */
524     public synchronized void outputFileWriteResult(JobFrontendRef theJobFrontend,
525                                                    int ffd,
526                                                    IOException exc)
527             throws IOException {
528         myFileWriter.outputFileWriteResult(theJobFrontend, ffd, exc);
529     }
530 
531     /**
532      * {@inheritDoc}
533      * <p>
534      * Report the result of flushing the given output file.
535      */
536     public synchronized void outputFileFlushResult(JobFrontendRef theJobFrontend,
537                                                    int ffd,
538                                                    IOException exc)
539             throws IOException {
540         myFileWriter.outputFileFlushResult(theJobFrontend, ffd, exc);
541     }
542 
543     /**
544      * {@inheritDoc}
545      * <p>
546      * Report the result of closing the given output file.
547      */
548     public synchronized void outputFileCloseResult(JobFrontendRef theJobFrontend,
549                                                    int ffd,
550                                                    IOException exc)
551             throws IOException {
552         myFileWriter.outputFileCloseResult(theJobFrontend, ffd, exc);
553     }
554 
555     /**
556      * {@inheritDoc}
557      * <p>
558      * Report the result of opening the given input file.
559      */
560     public synchronized void inputFileOpenResult(JobFrontendRef theJobFrontend,
561                                                  int bfd,
562                                                  int ffd,
563                                                  IOException exc)
564             throws IOException {
565         myFileReader.inputFileOpenResult(theJobFrontend, bfd, ffd, exc);
566     }
567 
568     /**
569      * {@inheritDoc}
570      * <p>
571      * Report the result of reading the given input file.
572      */
573     public synchronized void inputFileReadResult(JobFrontendRef theJobFrontend,
574                                                  int ffd,
575                                                  byte[] buf,
576                                                  int len,
577                                                  IOException exc)
578             throws IOException {
579         myFileReader.inputFileReadResult(theJobFrontend, ffd, len, exc);
580     }
581 
582     /**
583      * {@inheritDoc}
584      * <p>
585      * Report the result of skipping the given input file.
586      */
587     public synchronized void inputFileSkipResult(JobFrontendRef theJobFrontend,
588                                                  int ffd,
589                                                  long len,
590                                                  IOException exc)
591             throws IOException {
592         myFileReader.inputFileSkipResult(theJobFrontend, ffd, len, exc);
593     }
594 
595     /**
596      * {@inheritDoc}
597      * <p>
598      * Report the result of closing the given input file.
599      */
600     public synchronized void inputFileCloseResult(JobFrontendRef theJobFrontend,
601                                                   int ffd,
602                                                   IOException exc)
603             throws IOException {
604         myFileReader.inputFileCloseResult(theJobFrontend, ffd, exc);
605     }
606 
607     /**
608      * Close communication with this Job Backend.
609      */
610     public synchronized void close() {
611     }
612 
613     /**
614      * Obtain this job's user name.
615      *
616      * @return User name.
617      */
618     public String getUserName() {
619         return username;
620     }
621 
622     /**
623      * Obtain this job's job number.
624      *
625      * @return Job number.
626      */
627     public int getJobNumber() {
628         return jobnum;
629     }
630 
631     /**
632      * Obtain the number of backend processes in this job.
633      *
634      * @return <I>K</I>, the number of backend processes.
635      */
636     public int getK() {
637         return K;
638     }
639 
640     /**
641      * Obtain the rank of this backend process in this job.
642      *
643      * @return Rank.
644      */
645     public int getRank() {
646         return rank;
647     }
648 
649     /**
650      * Obtain the backend host name on which this job is running.
651      *
652      * @return Host name.
653      */
654     public String getBackendHost() {
655         return backendHost;
656     }
657 
658     /**
659      * Determine whether the frontend communicator exists in this job.
660      *
661      * @return True if the frontend communicator exists, false if it doesn't.
662      */
663     public boolean hasFrontendCommunicator() {
664         return hasFrontendComm;
665     }
666 
667     /**
668      * Obtain this job's backend class loader.
669      *
670      * @return Class loader.
671      */
672     public ClassLoader getClassLoader() {
673         return myClassLoader;
674     }
675 
676     /**
677      * Obtain this job's backend file writer.
678      *
679      * @return Backend file writer.
680      */
681     public BackendFileWriter getFileWriter() {
682         return myFileWriter;
683     }
684 
685     /**
686      * Obtain this job's backend file reader.
687      *
688      * @return Backend file reader.
689      */
690     public BackendFileReader getFileReader() {
691         return myFileReader;
692     }
693 
694     /**
695      * Wait until this job commences.
696      */
697     public synchronized void waitForCommence() {
698         while (!commence) {
699             try {
700                 wait();
701             } catch (InterruptedException ignored) {
702             }
703         }
704     }
705 
706     /**
707      * Obtain this job's world communicator channel group. If this job has not
708      * commenced yet, null is returned.
709      *
710      * @return Channel group.
711      */
712     public ChannelGroup getWorldChannelGroup() {
713         return myWorldChannelGroup;
714     }
715 
716     /**
717      * Obtain this job's array of hosts/ports for the world communicator. The
718      * <I>K</I> elements are for the job backend processes in rank order. If
719      * this job has not commenced yet, null is returned.
720      *
721      * @return Array of world communicator addresses.
722      */
723     public InetSocketAddress[] getWorldAddress() {
724         return myWorldAddress;
725     }
726 
727     /**
728      * Obtain this job's frontend communicator channel group. If the frontend
729      * communicator does not exist, or if this job has not commenced yet, null
730      * is returned.
731      *
732      * @return Channel group.
733      */
734     public ChannelGroup getFrontendChannelGroup() {
735         return myFrontendChannelGroup;
736     }
737 
738     /**
739      * Obtain this job's array of hosts/ports for the frontend communicator. The
740      * first <I>K</I> elements are for the job backend processes in rank order,
741      * the <I>K</I>+1st element is for the job frontend process. If the frontend
742      * communicator does not exist, or if this job has not commenced yet, null
743      * is returned.
744      *
745      * @return Array of frontend communicator addresses.
746      */
747     public InetSocketAddress[] getFrontendAddress() {
748         return myFrontendAddress;
749     }
750 
751     /**
752      * Obtain this job's Java system properties. If this job has not commenced
753      * yet, null is returned.
754      *
755      * @return Properties.
756      */
757     public Properties getProperties() {
758         return myProperties;
759     }
760 
761     /**
762      * Obtain this job's main class name. If this job has not commenced yet,
763      * null is returned.
764      *
765      * @return Fully qualified class name of the Java main program class to
766      * execute.
767      */
768     public String getMainClassName() {
769         return myMainClassName;
770     }
771 
772     /**
773      * Obtain this job's command line arguments. If this job has not commenced
774      * yet, null is returned.
775      *
776      * @return Array of 0 or more Java command line arguments.
777      */
778     public String[] getArgs() {
779         return myArgs;
780     }
781 
782     /**
783      * Set the comment string for this job backend process. The comment string
784      * appears in the detailed job status display in the Job Scheduler's web
785      * interface. Each job backend process (rank) has its own comment string. If
786      * <code>setComment()</code> is never called, the comment string is empty. The
787      * comment string is typically used to display this job backend process's
788      * progress. The comment string is rendered by a web browser and can contain
789      * HTML tags.
790      * <p>
791      * Calling <code>setComment()</code> causes a message to be sent to the job
792      * frontend process, which in turn causes a message to be sent to the Job
793      * Scheduler. (Any I/O errors during message sending are ignored.)
794      * Consequently, don't call <code>setComment()</code> too frequently, or the
795      * program's performance will suffer.
796      *
797      * @param comment Comment string.
798      */
799     public void setComment(String comment) {
800         try {
801             myJobFrontend.reportComment(this, rank, comment);
802         } catch (IOException ignored) {
803         }
804     }
805 
806     /**
807      * Obtain the Job Backend object. If the Job Backend main program is
808      * running, the job backend object for the job is returned. If some other
809      * main program is running, null is returned.
810      *
811      * @return Job backend object, or null.
812      */
813     public static JobBackend getJobBackend() {
814         return theJobBackend;
815     }
816 
817 // More hidden operations.
818 
819     /**
820      * Take action when the job frontend's lease renewal timer times out.
821      *
822      * @throws IOException Thrown if an I/O error occurred.
823      */
824     private synchronized void frontendRenewTimeout()
825             throws IOException {
826         if (myFrontendRenewTimer.isTriggered()) {
827             myJobFrontend.renewLease(this);
828         }
829     }
830 
831     /**
832      * Take action when the job frontend's lease expiration timer times out.
833      *
834      * @throws IOException Thrown if an I/O error occurred.
835      */
836     private void frontendExpireTimeout()
837             throws IOException {
838         boolean doExit = false;
839         synchronized (this) {
840             if (myFrontendExpireTimer.isTriggered()) {
841                 reportRunFinished();
842                 if (myState == State.RUNNING) {
843                     myState = State.TERMINATE_NO_REPORT;
844                     doExit = true;
845                 }
846             }
847         }
848 
849         // Cannot hold the synchronization lock while calling System.exit(),
850         // otherwise a deadlock can occur between this thread (the timer thread)
851         // and the shutdown hook thread.
852         myJobLauncherLog.println("Job frontend lease expired");
853         if (doExit) {
854             System.exit(1);
855         }
856     }
857 
858     /**
859      * Terminate this Job Backend immediately, sending a "cancel job" message to
860      * the Job Frontend. The error message comes from the given exception.
861      *
862      * @param exc Exception.
863      */
864     private void terminateCancelJob(Throwable exc) {
865         continueRun = false;
866         if (myState == State.RUNNING) {
867             myState = State.TERMINATE_CANCEL_JOB;
868             myCancelMessage = exc.getClass().getName();
869             String msg = exc.getMessage();
870             if (msg != null) {
871                 myCancelMessage = myCancelMessage + ": " + msg;
872             }
873             //System.err.println (myCancelMessage);
874             //exc.printStackTrace (System.err);
875         }
876     }
877 
878     /**
879      * Terminate this Job Backend immediately, with no report to the Job
880      * Frontend.
881      */
882     private void terminateNoReport() {
883         continueRun = false;
884         if (myState == State.RUNNING) {
885             myState = State.TERMINATE_NO_REPORT;
886         }
887     }
888 
889     /**
890      * Shut down this Job Backend.
891      */
892     private void shutdown() {
893         synchronized (this) {
894             // Tell job frontend that we are terminating.
895             if (myJobFrontend != null) {
896                 try {
897                     switch (myState) {
898                         case RUNNING:
899                             // Tell job frontend we finished normally.
900                             myJobFrontend.backendFinished(this);
901                             break;
902                         case TERMINATE_CANCEL_JOB:
903                             // Tell job frontend we're canceling.
904                             myJobFrontend.cancelJob(this, myCancelMessage);
905                             break;
906                         case TERMINATE_NO_REPORT:
907                         case TERMINATING:
908                             // Tell job frontend nothing.
909                             break;
910                     }
911                 } catch (IOException ignored) {
912                 }
913             }
914 
915             // Record that we are terminating.
916             myState = State.TERMINATING;
917         }
918 
919         // Wait until the run() method thread terminates.
920         waitForRunFinished();
921 
922         // Shut down job frontend lease timers.
923         synchronized (this) {
924             myFrontendRenewTimer.stop();
925             myFrontendExpireTimer.stop();
926         }
927 
928         // All proxies, channels, and channel groups will close when the process
929         // exits.
930     }
931 
932     /**
933      * Wait for the run() method to finish.
934      */
935     private void waitForRunFinished() {
936         for (; ; ) {
937             try {
938                 runFinished.await();
939                 break;
940             } catch (InterruptedException ignored) {
941             }
942         }
943     }
944 
945     /**
946      * Report that the run() method finished.
947      */
948     private void reportRunFinished() {
949         runFinished.countDown();
950     }
951 
952     /**
953      * Dump this job backend to the standard output, for debugging.
954      */
955     private synchronized void dump() {
956         System.out.println("********************************");
957         System.out.println("username = " + username);
958         System.out.println("jobnum = " + jobnum);
959         System.out.println("K = " + K);
960         System.out.println("rank = " + rank);
961         System.out.println("hasFrontendComm = " + hasFrontendComm);
962         for (int i = 0; i <= K; ++i) {
963             System.out.println("myMiddlewareAddress[" + i + "] = " + myMiddlewareAddress[i]);
964         }
965         for (int i = 0; i < K; ++i) {
966             System.out.println("myWorldAddress[" + i + "] = " + myWorldAddress[i]);
967         }
968         if (hasFrontendComm) {
969             for (int i = 0; i <= K; ++i) {
970                 System.out.println("myFrontendAddress[" + i + "] = " + myFrontendAddress[i]);
971             }
972         }
973         myProperties.list(System.out);
974         System.out.println("myMainClassName = " + myMainClassName);
975         for (int i = 0; i < myArgs.length; ++i) {
976             System.out.println("myArgs[" + i + "] = \"" + myArgs[i] + "\"");
977         }
978     }
979 
980 // Main program.
981 
982     /**
983      * Job Backend main program.
984      *
985      * @param args an array of {@link java.lang.String} objects.
986      * @throws java.lang.Exception if any.
987      */
988     public static void main(String[] args)
989             throws Exception {
990         try {
991             // Parse command line arguments.
992             if (args.length != 8) {
993                 usage();
994             }
995             String username = args[0];
996             int jobnum = Integer.parseInt(args[1]);
997             int K = Integer.parseInt(args[2]);
998             int rank = Integer.parseInt(args[3]);
999             boolean hasFrontendComm = Boolean.parseBoolean(args[4]);
1000             String frontendHost = args[5];
1001             int frontendPort = Integer.parseInt(args[6]);
1002             String backendHost = args[7];
1003 
1004             // Set up job backend object.
1005             theJobBackend
1006                     = new JobBackend(username, jobnum, K, rank, hasFrontendComm,
1007                     frontendHost, frontendPort, backendHost);
1008         } catch (Throwable exc) {
1009             exc.printStackTrace(System.err);
1010             System.exit(1);
1011         }
1012 
1013         // Set the main thread's context class loader to be the job backend's
1014         // class loader.
1015         Thread.currentThread().setContextClassLoader(theJobBackend.getClassLoader());
1016 
1017         // Run job backend object in a separate thread.
1018         logger.log(Level.INFO, " Starting backend Daemon thread.");
1019         Thread thr = new Thread(theJobBackend);
1020         thr.setDaemon(true);
1021         thr.start();
1022 
1023         // Wait until job commences.
1024         logger.log(Level.INFO, " Waiting for commence.");
1025         theJobBackend.waitForCommence();
1026         logger.log(Level.INFO, " Commencing.");
1027 
1028         // Add any Java system properties from the job frontend process that do
1029         // not exist in this job backend process.
1030         Properties backendProperties = System.getProperties();
1031         Properties frontendProperties = theJobBackend.getProperties();
1032         for (Map.Entry<Object, Object> entry : frontendProperties.entrySet()) {
1033             String name = (String) entry.getKey();
1034             String value = (String) entry.getValue();
1035             if (backendProperties.getProperty(name) == null) {
1036                 backendProperties.setProperty(name, value);
1037             }
1038         }
1039 
1040         // Turn on headless mode. This allows graphics drawing operations (that
1041         // do not require a screen, keyboard, or mouse) to work.
1042         System.setProperty("java.awt.headless", "true");
1043 
1044         // Call the job's main() method, passing in the job's command line
1045         // arguments.
1046         Class<?> mainclass
1047                 = Class.forName(theJobBackend.getMainClassName(),
1048                 true,
1049                 // Force Field X modification to use our SystemClassLoader.
1050                 ClassLoader.getSystemClassLoader());
1051         // Original:
1052         // theJobBackend.getClassLoader());
1053         Method mainmethod = mainclass.getMethod("main", String[].class);
1054 
1055         StringBuilder stringBuilder = new StringBuilder(format(" Preparing to invoke main method:\n %s\n With args: ", mainmethod));
1056 
1057         for (String arg : theJobBackend.getArgs()) {
1058             stringBuilder.append(format(" %s", arg));
1059         }
1060         stringBuilder.append("\n Backend start-up completed.");
1061         logger.log(Level.INFO, stringBuilder.toString());
1062 
1063         // Close down the FileHandler if it exists.
1064         if (theJobBackend.fileHandler != null) {
1065             theJobBackend.fileHandler.flush();
1066             logger.setLevel(Level.OFF);
1067             theJobBackend.fileHandler.close();
1068         }
1069 
1070         mainmethod.invoke(null, (Object) theJobBackend.getArgs());
1071 
1072         // After the main() method returns and all non-daemon threads have
1073         // terminated, the process will exit, and the shutdown hook will call
1074         // the shutdown() method.
1075     }
1076 
1077     /**
1078      * Print a usage message and exit.
1079      */
1080     private static void usage() {
1081         System.err.println("Usage: java edu.rit.pj.cluster.JobBackend <username> <jobnum> <K> <rank> <hasFrontendComm> <frontendHost> <frontendPort> <backendHost>");
1082         System.exit(1);
1083     }
1084 
1085 }