View Javadoc
1   //******************************************************************************
2   //
3   // File:    JobFrontend.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.JobFrontend
6   //
7   // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import static java.lang.String.format;
43  
44  import java.io.File;
45  import java.io.IOException;
46  import java.io.InputStream;
47  import java.net.InetSocketAddress;
48  import java.util.ArrayList;
49  import java.util.Arrays;
50  import java.util.HashMap;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Properties;
54  import java.util.logging.FileHandler;
55  import java.util.logging.Level;
56  import java.util.logging.Logger;
57  
58  import edu.rit.mp.Channel;
59  import edu.rit.mp.ChannelGroup;
60  import edu.rit.mp.ChannelGroupClosedException;
61  import edu.rit.mp.ObjectBuf;
62  import edu.rit.mp.Status;
63  import edu.rit.mp.buf.ObjectItemBuf;
64  import edu.rit.pj.PJProperties;
65  import edu.rit.util.ByteSequence;
66  import edu.rit.util.Timer;
67  import edu.rit.util.TimerTask;
68  import edu.rit.util.TimerThread;
69  
70  /**
71   * Class JobFrontend provides the message handler for the PJ job frontend
72   * process.
73   *
74   * @author Alan Kaminsky
75   * @version 20-Jun-2012
76   */
77  public class JobFrontend
78          implements Runnable, JobFrontendRef {
79  
80  // Hidden data members.
81  
82      // Logger
83      private static final Logger logger = Logger.getLogger(JobFrontend.class.getName());
84  
85      // Logging
86      private FileHandler fileHandler = null;
87  
88      // User name.
89      private String username;
90  
91      // Job number.
92      private int jobnum;
93  
94      // Job resources.
95      private int Nn;
96      private int Np;
97      private int Nt;
98  
99      // Whether the frontend communicator exists, true or false.
100     private boolean hasFrontendComm;
101 
102     // Main class name.
103     private String myMainClassName;
104 
105     // Command line arguments.
106     private String[] myArgs;
107 
108     // Rank of next backend process to be assigned.
109     private int myNextRank;
110 
111     // Timer thread for lease renewals and expirations.
112     private TimerThread myLeaseTimerThread;
113 
114     // Timers for the lease with the Job Scheduler.
115     private Timer mySchedulerRenewTimer;
116     private Timer mySchedulerExpireTimer;
117 
118     // Timer for the job timeout if any.
119     private Timer myJobTimer;
120 
121     // Array of job backend process info records, indexed by rank.
122     private ProcessInfo[] myProcessInfo;
123 
124     // Mapping from job backend reference to job backend process info record.
125     private Map<JobBackendRef, ProcessInfo> myProcessMap
126             = new HashMap<JobBackendRef, ProcessInfo>();
127 
128     // Number of running job backend processes.
129     private int myRunningCount;
130 
131     // Number of finished job backend processes.
132     private int myFinishedCount;
133 
134     // Middleware channel group and address array.
135     private ChannelGroup myMiddlewareChannelGroup;
136     private InetSocketAddress[] myMiddlewareAddress;
137 
138     // Proxy for Job Scheduler Daemon.
139     private JobSchedulerRef myJobScheduler;
140 
141     // World communicator channel group address array.
142     private InetSocketAddress[] myWorldAddress;
143 
144     // Frontend communicator channel group and address array.
145     private ChannelGroup myFrontendChannelGroup;
146     private InetSocketAddress[] myFrontendAddress;
147 
148     // JVM flags.
149     private String userJvmFlags = PJProperties.getPjJvmFlags();
150 
151     // Resource contents that have been reported to job backend processes.
152     private ResourceCache myResourceCache = new ResourceCache();
153 
154     // Flag for shutting down the run() method.
155     private boolean continueRun = true;
156 
157     // State of this job frontend.
158     private State myState = State.RUNNING;
159 
160     private static enum State {
161 
162         RUNNING,
163         TERMINATE_CANCEL_JOB,
164         TERMINATING
165     }
166 
167     ;
168 
169     // Error message if job canceled, or null if job finished normally.
170     private String myCancelMessage = "User canceled job";
171 
172     // For writing and reading files on the job frontend's node.
173     private FrontendFileWriter myFrontendFileWriter;
174     private FrontendFileReader myFrontendFileReader;
175 
176 // Exported constructors.
177 
178     /**
179      * Construct a new job frontend object. The job frontend object will contact
180      * the Job Scheduler Daemon specified by the <code>"pj.host"</code> and
181      * <code>"pj.port"</code> Java system properties. See class {@linkplain
182      * edu.rit.pj.PJProperties} for further information.
183      *
184      * @param username        User name.
185      * @param Nn              Number of backend nodes (&gt;= 1).
186      * @param Np              Number of processes (&gt;= 1).
187      * @param Nt              Number of CPUs per process (&gt;= 0). 0 means "all CPUs."
188      * @param hasFrontendComm True if the job has the frontend communicator,
189      *                        false if it doesn't.
190      * @param mainClassName   Main class name.
191      * @param args            Command line arguments.
192      * @throws java.io.IOException           Thrown if an I/O error occurred.
193      * @throws java.io.IOException   if any.
194      */
195     public JobFrontend(String username,
196                        int Nn,
197                        int Np,
198                        int Nt,
199                        boolean hasFrontendComm,
200                        String mainClassName,
201                        String[] args)
202             throws IOException {
203         // Record arguments.
204         this.username = username;
205         this.Nn = Nn;
206         this.Np = Np;
207         this.Nt = Nt;
208         this.hasFrontendComm = hasFrontendComm;
209         this.myMainClassName = mainClassName;
210         this.myArgs = args;
211 
212         // Set up shutdown hook.
213         Runtime.getRuntime().addShutdownHook(new Thread() {
214             public void run() {
215                 shutdown();
216             }
217         });
218 
219         // Set up lease timer thread.
220         myLeaseTimerThread = new TimerThread();
221         myLeaseTimerThread.setDaemon(true);
222         myLeaseTimerThread.start();
223 
224         // Set up Job Scheduler lease timers.
225         mySchedulerRenewTimer
226                 = myLeaseTimerThread.createTimer(new TimerTask() {
227             public void action(Timer timer) {
228                 try {
229                     schedulerRenewTimeout();
230                 } catch (Throwable ignored) {
231                 }
232             }
233         });
234         mySchedulerExpireTimer
235                 = myLeaseTimerThread.createTimer(new TimerTask() {
236             public void action(Timer timer) {
237                 try {
238                     schedulerExpireTimeout();
239                 } catch (Throwable ignored) {
240                 }
241             }
242         });
243 
244         // Set up job timer.
245         myJobTimer
246                 = myLeaseTimerThread.createTimer(new TimerTask() {
247             public void action(Timer timer) {
248                 try {
249                     jobTimeout();
250                 } catch (Throwable ignored) {
251                 }
252             }
253         });
254 
255         // Set up array of job backend process info records.
256         myProcessInfo = new ProcessInfo[Np];
257         for (int i = 0; i < Np; ++i) {
258             final int rank = i;
259             ProcessInfo processinfo
260                     = new ProcessInfo(/*state            */ProcessInfo.State.NOT_STARTED,
261                     /*name             */ null,
262                     /*rank             */ rank,
263                     /*backend          */ null,
264                     /*middlewareAddress*/ null,
265                     /*worldAddress     */ null,
266                     /*frontendAddress  */ null,
267                     /*renewTimer       */
268                     myLeaseTimerThread.createTimer(new TimerTask() {
269                         public void action(Timer timer) {
270                             try {
271                                 backendRenewTimeout(rank);
272                             } catch (Throwable ignored) {
273                             }
274                         }
275                     }),
276                     /*expireTimer      */
277                     myLeaseTimerThread.createTimer(new TimerTask() {
278                         public void action(Timer timer) {
279                             try {
280                                 backendExpireTimeout(rank);
281                             } catch (Throwable ignored) {
282                             }
283                         }
284                     }),
285                     /*Nt               */ 0);
286             myProcessInfo[rank] = processinfo;
287         }
288 
289         // Set up middleware channel group and address array.
290         myMiddlewareChannelGroup = new ChannelGroup();
291         myMiddlewareAddress = new InetSocketAddress[Np + 1];
292 
293         // Set up world communicator address array.
294         myWorldAddress = new InetSocketAddress[Np];
295 
296         // Set up frontend communicator channel group and address array.
297         if (hasFrontendComm) {
298             myFrontendChannelGroup = new ChannelGroup();
299             myFrontendAddress = new InetSocketAddress[Np + 1];
300         }
301 
302         // Set up frontend file writer and reader.
303         myFrontendFileWriter = new FrontendFileWriter(this);
304         myFrontendFileReader = new FrontendFileReader(this);
305 
306         // Set up Job Scheduler proxy.
307         InetSocketAddress js_address = null;
308         Channel js_channel = null;
309         try {
310             js_address
311                     = new InetSocketAddress(PJProperties.getPjHost(),
312                     PJProperties.getPjPort());
313             js_channel = myMiddlewareChannelGroup.connect(js_address);
314         } catch (IOException exc) {
315             throw new JobSchedulerException("JobFrontend(): Cannot contact Job Scheduler Daemon at "
316                     + js_address,
317                     exc);
318         }
319         myJobScheduler
320                 = new JobSchedulerProxy(myMiddlewareChannelGroup, js_channel);
321 
322         // Start Job Scheduler lease timers.
323         mySchedulerRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
324                 Constants.LEASE_RENEW_INTERVAL);
325         mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
326 
327         // Kick off the job!
328         myJobScheduler.requestJob(this, username, Nn, Np, Nt);
329     }
330 
331 // Exported operations.
332 
333     /**
334      * Run this Job Frontend.
335      */
336     public void run() {
337         ObjectItemBuf<JobFrontendMessage> buf
338                 = ObjectBuf.buffer((JobFrontendMessage) null);
339         Status status = null;
340         JobFrontendMessage message = null;
341         JobBackendRef backend = null;
342 
343         try {
344             while (continueRun) {
345                 // Receive a message from any channel.
346                 status = myMiddlewareChannelGroup.receive(null, null, buf);
347                 message = buf.item;
348 
349                 // Process a message from the Job Scheduler.
350                 if (status.tag == Message.FROM_JOB_SCHEDULER) {
351                     message.invoke(this, myJobScheduler);
352                 } // Process a message from a job backend.
353                 else if (status.tag == Message.FROM_JOB_BACKEND) {
354                     // Get job backend associated with channel. If none, set up
355                     // a new job backend proxy.
356                     backend = (JobBackendRef) status.channel.info();
357                     if (backend == null) {
358                         backend
359                                 = new JobBackendProxy(myMiddlewareChannelGroup, status.channel);
360                         status.channel.info(backend);
361                     }
362 
363                     // Process message.
364                     message.invoke(this, backend);
365                 }
366 
367                 // Enable garbage collection of no-longer-needed objects while
368                 // waiting to receive next message.
369                 buf.item = null;
370                 status = null;
371                 message = null;
372                 backend = null;
373             }
374         } catch (ChannelGroupClosedException ignored) {
375         } catch (Throwable exc) {
376             terminateCancelJob(exc);
377         }
378 
379         // Exit process if necessary.
380         switch (myState) {
381             case TERMINATE_CANCEL_JOB:
382                 System.exit(1);
383                 break;
384             case RUNNING:
385             case TERMINATING:
386                 break;
387         }
388     }
389 
390     /**
391      * {@inheritDoc}
392      * <p>
393      * Assign a backend process to the job.
394      */
395     public void assignBackend(JobSchedulerRef theJobScheduler,
396                               String name,
397                               String host,
398                               String jvm,
399                               String classpath,
400                               String[] jvmflags,
401                               String shellCommand,
402                               int Nt)
403             throws IOException {
404         // Record backend name and number of CPUs.
405         int rank = myNextRank++;
406         ProcessInfo processinfo = myProcessInfo[rank];
407         processinfo.name = name;
408         processinfo.Nt = Nt;
409 
410         // Display backend.
411         System.err.print(", ");
412         System.err.print(name);
413         System.err.flush();
414         if (myNextRank == Np) {
415             System.err.println();
416         }
417 
418         /**
419          * Without this pause of 10 msec, the SSH command below will often fail when using more than 10 to 12 nodes.
420          */
421         try {
422             Thread.sleep(10);
423         } catch (InterruptedException e) {
424             //
425         }
426 
427 //        if (System.getProperty("pj.log", "false").equalsIgnoreCase("true")) {
428 //            try {
429 //                // Remove all log handlers from the default logger.
430 //                java.util.logging.Logger defaultLogger = java.util.logging.LogManager.getLogManager().getLogger("");
431 //                java.util.logging.Handler defaultHandlers[] = defaultLogger.getHandlers();
432 //                for (java.util.logging.Handler h : defaultHandlers) {
433 //                    defaultLogger.removeHandler(h);
434 //                }
435 //
436 //                // Create a FileHandler that logs messages with a SimpleFormatter.
437 //                File file = new File(Integer.toString(rank));
438 //                file.mkdir();
439 //                fileHandler = new FileHandler(file.getAbsolutePath() + "/frontend.log");
440 //                fileHandler.setFormatter(new SimpleFormatter());
441 //                logger.addHandler(fileHandler);
442 //                logger.setLevel(Level.INFO);
443 //            } catch (Exception e) {
444 //                logger.setLevel(Level.OFF);
445 //            }
446 //        } else {
447 //            logger.setLevel(Level.OFF);
448 //        }
449 //
450 //        logger.log(Level.INFO, " Username: " + username);
451 //        logger.log(Level.INFO, " Job number: " + jobnum);
452 //        logger.log(Level.INFO, " Nodes: " + Np);
453 //        logger.log(Level.INFO, " Rank: " + rank);
454 //        logger.log(Level.INFO, " Has Frontend Comm: " + hasFrontendComm);
455 //        logger.log(Level.INFO, " Frontend Host: " + myMiddlewareChannelGroup.listenAddress().getHostName());
456 //        logger.log(Level.INFO, " Frontend Port: " + myMiddlewareChannelGroup.listenAddress().getPort());
457 //        logger.log(Level.INFO, " Backend Host: " + host);
458 
459         try {
460             String listenHost = myMiddlewareChannelGroup.listenAddress().getHostName();
461             if (listenHost.equalsIgnoreCase(host)) {
462                 // Build a command to run on a backend node via a local ProcessBuilder.
463                 List<String> command = new ArrayList<>();
464                 command.add(jvm);
465                 command.add("-classpath");
466                 command.add(classpath);
467                 if (jvmflags != null && jvmflags.length > 0) {
468                     command.addAll(Arrays.asList(jvmflags));
469                 }
470                 if (userJvmFlags != null && !userJvmFlags.trim().equalsIgnoreCase("")) {
471                     command.add(userJvmFlags.trim());
472                 }
473                 command.add("edu.rit.pj.cluster.JobBackend");
474                 command.add(username);
475                 command.add(Integer.toString(jobnum));
476                 command.add(Integer.toString(Np));
477                 command.add(Integer.toString(rank));
478                 command.add(Boolean.toString(hasFrontendComm));
479                 command.add(myMiddlewareChannelGroup.listenAddress().getHostName());
480                 command.add(Integer.toString(myMiddlewareChannelGroup.listenAddress().getPort()));
481                 command.add(host);
482                 ProcessBuilder pb = new ProcessBuilder(command);
483                 String cwd = System.getProperty("user.dir");
484                 if (cwd != null) {
485                     pb.directory(new File(cwd));
486                 }
487                 Process ssh = pb.start();
488             } else {
489                 // Build a command to run on the backend node via ssh.
490                 StringBuilder command = new StringBuilder();
491                 command.append(shellCommand);
492                 command.append(" \"");
493                 String cwd = System.getProperty("user.dir");
494                 if (cwd != null) {
495                     command.append("cd '");
496                     command.append(cwd);
497                     command.append("'; ");
498                 }
499                 command.append("nohup ");
500                 command.append(jvm);
501                 command.append(" -classpath '");
502                 command.append(classpath);
503                 command.append("'");
504                 for (String flag : jvmflags) {
505                     command.append(" ");
506                     command.append(flag);
507                 }
508                 command.append(" ");
509                 command.append(userJvmFlags);
510                 command.append(" edu.rit.pj.cluster.JobBackend '");
511                 command.append(username);
512                 command.append("' ");
513                 command.append(jobnum);
514                 command.append(" ");
515                 command.append(Np);
516                 command.append(" ");
517                 command.append(rank);
518                 command.append(" ");
519                 command.append(hasFrontendComm);
520                 command.append(" '");
521                 command.append(myMiddlewareChannelGroup.listenAddress().getHostName());
522                 command.append("' ");
523                 command.append(myMiddlewareChannelGroup.listenAddress().getPort());
524                 command.append(" '");
525                 command.append(host);
526                 command.append("' >/dev/null 2>/dev/null &\"");
527 
528                 // So an SSH remote login and execute the above command.
529                 Process ssh = Runtime.getRuntime().exec(new String[]{"ssh", host, command.toString()});
530             }
531 
532             // Start lease timers for the backend node.
533             processinfo.renewTimer.start(Constants.LEASE_RENEW_INTERVAL,
534                     Constants.LEASE_RENEW_INTERVAL);
535             processinfo.expireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
536 
537         } // If an I/O error occurs, treat it as a backend node failure.
538         catch (IOException exc) {
539             if (myNextRank != Np) {
540                 System.err.println();
541             }
542             System.err.println(" Exception executing SSH command:\n" + exc);
543             logger.log(Level.SEVERE, " Exception executing SSH command:\n" + exc);
544             terminateCancelJob(backendFailed(processinfo, "SSH command failed"));
545         }
546     }
547 
548     /**
549      * {@inheritDoc}
550      * <p>
551      * Assign a job number to the job. The host name for the job frontend's
552      * middleware channel group is also specified.
553      */
554     public synchronized void assignJobNumber(JobSchedulerRef theJobScheduler,
555                                              int jobnum,
556                                              String pjhost)
557             throws IOException {
558         // Record job number.
559         this.jobnum = jobnum;
560 
561         // Start listening for connections to the middleware channel group.
562         myMiddlewareChannelGroup.listen(new InetSocketAddress(pjhost, 0));
563         myMiddlewareChannelGroup.startListening();
564         myMiddlewareAddress[Np] = myMiddlewareChannelGroup.listenAddress();
565 
566         // Start listening for connections to the frontend communicator channel
567         // group.
568         if (hasFrontendComm) {
569             myFrontendChannelGroup.listen(new InetSocketAddress(pjhost, 0));
570             myFrontendChannelGroup.startListening();
571             myFrontendAddress[Np] = myFrontendChannelGroup.listenAddress();
572         }
573 
574         // Report job number.
575         System.err.print("Job " + jobnum);
576         System.err.flush();
577     }
578 
579     /**
580      * {@inheritDoc}
581      * <p>
582      * Cancel the job.
583      */
584     public synchronized void cancelJob(JobSchedulerRef theJobScheduler,
585                                        String errmsg)
586             throws IOException {
587         terminateCancelJob(errmsg);
588     }
589 
590     /**
591      * Renew the lease on the job.
592      *
593      * @param theJobScheduler Job Scheduler that is calling this method.
594      * @throws java.io.IOException         Thrown if an I/O error occurred.
595      * @throws java.io.IOException if any.
596      */
597     public synchronized void renewLease(JobSchedulerRef theJobScheduler)
598             throws IOException {
599         mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
600     }
601 
602     /**
603      * {@inheritDoc}
604      * <p>
605      * Report that a backend process has finished executing the job.
606      */
607     public synchronized void backendFinished(JobBackendRef theJobBackend)
608             throws IOException {
609         ProcessInfo processinfo = myProcessMap.get(theJobBackend);
610         if (processinfo == null) {
611             return;
612         }
613 
614         // Verify that this backend has not finished already.
615         if (processinfo.state != ProcessInfo.State.RUNNING) {
616             terminateCancelJob("Unexpected \"backend finished\" message, rank="
617                     + processinfo.rank);
618         }
619 
620         // Update job backend process state.
621         processinfo.state = ProcessInfo.State.FINISHED;
622 
623         // Increase count of finished processes.
624         ++myFinishedCount;
625 
626         // If all job backend processes have finished, terminate the run()
627         // method. This will cause the job frontend process to exit when all
628         // other non-daemon threads have also terminated.
629         if (myFinishedCount == Np) {
630             continueRun = false;
631             myCancelMessage = null;
632         }
633     }
634 
635     /**
636      * {@inheritDoc}
637      * <p>
638      * Report that a backend process is ready to commence executing the job.
639      */
640     public synchronized void backendReady(JobBackendRef theJobBackend,
641                                           int rank,
642                                           InetSocketAddress middlewareAddress,
643                                           InetSocketAddress worldAddress,
644                                           InetSocketAddress frontendAddress)
645             throws IOException {
646         // Verify that rank is in range.
647         if (0 > rank || rank >= Np) {
648             terminateCancelJob("Illegal \"backend ready\" message, rank=" + rank);
649         }
650 
651         // Verify that this backend has not started already.
652         ProcessInfo processinfo = myProcessInfo[rank];
653         if (processinfo.state != ProcessInfo.State.NOT_STARTED) {
654             terminateCancelJob("Unexpected \"backend ready\" message, rank=" + rank);
655         }
656 
657         // Record information in job backend process info record.
658         processinfo.state = ProcessInfo.State.RUNNING;
659         processinfo.backend = theJobBackend;
660         processinfo.middlewareAddress = middlewareAddress;
661         processinfo.worldAddress = worldAddress;
662         processinfo.frontendAddress = frontendAddress;
663         myProcessMap.put(theJobBackend, processinfo);
664 
665         // Record channel group addresses.
666         myMiddlewareAddress[rank] = middlewareAddress;
667         myWorldAddress[rank] = worldAddress;
668         if (hasFrontendComm) {
669             myFrontendAddress[rank] = frontendAddress;
670         }
671 
672         // Increase count of running processes.
673         ++myRunningCount;
674 
675         // If all job backend processes have reported ready, commence job.
676         if (myRunningCount == Np) {
677             // Start job timer if necessary.
678             int jobtime = PJProperties.getPjJobTime();
679             if (jobtime > 0) {
680                 myJobTimer.start(jobtime * 1000L);
681             }
682 
683             // Get the system properties.
684             Properties props = System.getProperties();
685 
686             // Send "commence job" message to each job backend, with system
687             // property "pj.nt" set to the proper number of CPUs.
688             for (ProcessInfo info : myProcessMap.values()) {
689                 props.setProperty("pj.nt", "" + info.Nt);
690                 info.backend.commenceJob(/*theJobFrontend   */this,
691                         /*middlewareAddress*/ myMiddlewareAddress,
692                         /*worldAddress     */ myWorldAddress,
693                         /*frontendAddress  */ myFrontendAddress,
694                         /*properties       */ props,
695                         /*mainClassName    */ myMainClassName,
696                         /*args             */ myArgs);
697             }
698         }
699     }
700 
701     /**
702      * {@inheritDoc}
703      * <p>
704      * Cancel the job.
705      */
706     public synchronized void cancelJob(JobBackendRef theJobBackend,
707                                        String errmsg)
708             throws IOException {
709         terminateCancelJob(errmsg);
710     }
711 
712     /**
713      * {@inheritDoc}
714      * <p>
715      * Renew the lease on the job.
716      */
717     public synchronized void renewLease(JobBackendRef theJobBackend)
718             throws IOException {
719         ProcessInfo processinfo = myProcessMap.get(theJobBackend);
720         if (processinfo != null) {
721             processinfo.expireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
722         }
723     }
724 
725     /**
726      * {@inheritDoc}
727      * <p>
728      * Request the given resource from this job frontend's class loader.
729      */
730     public synchronized void requestResource(JobBackendRef theJobBackend,
731                                              String resourceName)
732             throws IOException {
733         // To hold resource content.
734         byte[] content = null;
735 
736         // Get resource content. If resource not found, content is null.
737         if (myResourceCache.contains(resourceName)) {
738             // Get resource content from cache.
739             content = myResourceCache.getNoWait(resourceName);
740         } else {
741             // Get resource content from class loader, save it in cache.
742             InputStream stream
743                     = getClass().getClassLoader().getResourceAsStream(resourceName);
744             if (stream != null) {
745                 content = new ByteSequence(stream).toByteArray();
746             }
747             myResourceCache.put(resourceName, content);
748         }
749 
750         // Send resource to job backend.
751         theJobBackend.reportResource(this, resourceName, content);
752     }
753 
754     /**
755      * {@inheritDoc}
756      * <p>
757      * Open the given output file for writing or appending.
758      */
759     public synchronized void outputFileOpen(JobBackendRef theJobBackend,
760                                             int bfd,
761                                             File file,
762                                             boolean append)
763             throws IOException {
764         myFrontendFileWriter.outputFileOpen(theJobBackend, bfd, file, append);
765     }
766 
767     /**
768      * {@inheritDoc}
769      * <p>
770      * Write the given bytes to the given output file. <code>ffd</code> = 1 refers
771      * to the job's standard output stream; <code>ffd</code> = 2 refers to the job's
772      * standard error stream; other values refer to a previously opened file.
773      */
774     public synchronized void outputFileWrite(JobBackendRef theJobBackend,
775                                              int ffd,
776                                              byte[] buf,
777                                              int off,
778                                              int len)
779             throws IOException {
780         myFrontendFileWriter.outputFileWrite(theJobBackend, ffd, len);
781     }
782 
783     /**
784      * {@inheritDoc}
785      * <p>
786      * Flush accumulated bytes to the given output file.
787      */
788     public synchronized void outputFileFlush(JobBackendRef theJobBackend,
789                                              int ffd)
790             throws IOException {
791         myFrontendFileWriter.outputFileFlush(theJobBackend, ffd);
792     }
793 
794     /**
795      * {@inheritDoc}
796      * <p>
797      * Close the given output file.
798      */
799     public synchronized void outputFileClose(JobBackendRef theJobBackend,
800                                              int ffd)
801             throws IOException {
802         myFrontendFileWriter.outputFileClose(theJobBackend, ffd);
803     }
804 
805     /**
806      * {@inheritDoc}
807      * <p>
808      * Open the given input file for reading.
809      */
810     public synchronized void inputFileOpen(JobBackendRef theJobBackend,
811                                            int bfd,
812                                            File file)
813             throws IOException {
814         myFrontendFileReader.inputFileOpen(theJobBackend, bfd, file);
815     }
816 
817     /**
818      * {@inheritDoc}
819      * <p>
820      * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
821      * job's standard input stream; other values refer to a previously opened
822      * file.
823      */
824     public synchronized void inputFileRead(JobBackendRef theJobBackend,
825                                            int ffd,
826                                            int len)
827             throws IOException {
828         myFrontendFileReader.inputFileRead(theJobBackend, ffd, len);
829     }
830 
831     /**
832      * {@inheritDoc}
833      * <p>
834      * Skip bytes from the given input file.
835      */
836     public synchronized void inputFileSkip(JobBackendRef theJobBackend,
837                                            int ffd,
838                                            long len)
839             throws IOException {
840         myFrontendFileReader.inputFileSkip(theJobBackend, ffd, len);
841     }
842 
843     /**
844      * {@inheritDoc}
845      * <p>
846      * Close the given input file.
847      */
848     public synchronized void inputFileClose(JobBackendRef theJobBackend,
849                                             int ffd)
850             throws IOException {
851         myFrontendFileReader.inputFileClose(theJobBackend, ffd);
852     }
853 
854     /**
855      * {@inheritDoc}
856      * <p>
857      * Report a comment for a process.
858      */
859     public synchronized void reportComment(JobBackendRef theJobBackend,
860                                            int rank,
861                                            String comment)
862             throws IOException {
863         myJobScheduler.reportComment(this, rank, comment);
864     }
865 
866     /**
867      * Close communication with this Job Frontend.
868      */
869     public void close() {
870     }
871 
872 // Hidden operations.
873 
874     /**
875      * Take action when the Job Scheduler's lease renewal timer times out.
876      *
877      * @throws IOException Thrown if an I/O error occurred.
878      */
879     private synchronized void schedulerRenewTimeout()
880             throws IOException {
881         if (mySchedulerRenewTimer.isTriggered()) {
882             myJobScheduler.renewLease(this);
883         }
884     }
885 
886     /**
887      * Take action when the Job Scheduler's lease expiration timer times out.
888      *
889      * @throws IOException Thrown if an I/O error occurred.
890      */
891     private void schedulerExpireTimeout()
892             throws IOException {
893         boolean doExit = false;
894         synchronized (this) {
895             if (mySchedulerExpireTimer.isTriggered()) {
896                 continueRun = false;
897                 if (myState == State.RUNNING) {
898                     myState = State.TERMINATE_CANCEL_JOB;
899                     myCancelMessage = "Job Scheduler failed";
900                     System.err.println(myCancelMessage);
901                     doExit = true;
902                 }
903             }
904         }
905 
906         // Cannot hold the synchronization lock while calling System.exit(),
907         // otherwise a deadlock can occur between this thread (the timer thread)
908         // and the shutdown hook thread.
909         if (doExit) {
910             System.exit(1);
911         }
912     }
913 
914     /**
915      * Take action when the job timer times out.
916      *
917      * @throws IOException Thrown if an I/O error occurred.
918      */
919     private void jobTimeout()
920             throws IOException {
921         boolean doExit = false;
922         synchronized (this) {
923             if (myJobTimer.isTriggered()) {
924                 continueRun = false;
925                 if (myState == State.RUNNING) {
926                     myState = State.TERMINATE_CANCEL_JOB;
927                     myCancelMessage = "Job exceeded maximum running time";
928                     System.err.println(myCancelMessage);
929                     doExit = true;
930                 }
931             }
932         }
933 
934         // Cannot hold the synchronization lock while calling System.exit(),
935         // otherwise a deadlock can occur between this thread (the timer thread)
936         // and the shutdown hook thread.
937         if (doExit) {
938             System.exit(1);
939         }
940     }
941 
942     /**
943      * Take action when a job backend process's lease renewal timer times out.
944      *
945      * @param rank Job backend process's rank.
946      * @throws IOException Thrown if an I/O error occurred.
947      */
948     private synchronized void backendRenewTimeout(int rank)
949             throws IOException {
950         ProcessInfo processinfo = myProcessInfo[rank];
951         if (processinfo.renewTimer.isTriggered()) {
952             processinfo.backend.renewLease(this);
953         }
954     }
955 
956     /**
957      * Take action when a job backend process's lease expiration timer times
958      * out.
959      *
960      * @param rank Job backend process's rank.
961      * @throws IOException Thrown if an I/O error occurred.
962      */
963     private void backendExpireTimeout(int rank)
964             throws IOException {
965         boolean doExit = false;
966         synchronized (this) {
967             ProcessInfo processinfo = myProcessInfo[rank];
968             if (processinfo.expireTimer.isTriggered()) {
969                 // Terminate the Job Frontend.
970                 String msg = backendFailed(processinfo, "Expire Timer Triggered");
971                 continueRun = false;
972                 if (myState == State.RUNNING) {
973                     myState = State.TERMINATE_CANCEL_JOB;
974                     myCancelMessage = msg;
975                     System.err.println(myCancelMessage);
976                     doExit = true;
977                 }
978             }
979         }
980 
981         // Cannot hold the synchronization lock while calling System.exit(),
982         // otherwise a deadlock can occur between this thread (the timer thread)
983         // and the shutdown hook thread.
984         if (doExit) {
985             System.exit(1);
986         }
987     }
988 
989     /**
990      * Take action when a backend process fails.
991      *
992      * @param processinfo Process info.
993      * @param reason Reason for the failure.
994      * @return Error message.
995      */
996     private String backendFailed(ProcessInfo processinfo, String reason) {
997         // Mark the backend process as failed.
998         processinfo.state = ProcessInfo.State.FAILED;
999 
1000         // Tell the Job Scheduler that the backend process failed.
1001         try {
1002             myJobScheduler.backendFailed(this, processinfo.name);
1003         } catch (IOException ignored) {
1004         }
1005 
1006         // Set up error message.
1007         String message = format("Job backend process failed (%s), node %s, rank %d",
1008             reason, processinfo.name, processinfo.rank);
1009         return message;
1010     }
1011 
1012     /**
1013      * Terminate this Job Frontend immediately, sending a "cancel job" message
1014      * to the Job Scheduler and all Job Backends. The error message is
1015      * <code>msg</code>. This method must only be called by the thread calling
1016      * <code>run()</code>.
1017      *
1018      * @param msg Error message.
1019      */
1020     private void terminateCancelJob(String msg) {
1021         continueRun = false;
1022         if (myState == State.RUNNING) {
1023             myState = State.TERMINATE_CANCEL_JOB;
1024             myCancelMessage = msg;
1025             System.err.println(myCancelMessage);
1026         }
1027     }
1028 
1029     /**
1030      * Terminate this Job Frontend immediately, sending a "cancel job" message
1031      * to the Job Scheduler and all Job Backends. The error message comes from
1032      * the given exception. This method must only be called by the thread
1033      * calling <code>run()</code>.
1034      *
1035      * @param exc Exception.
1036      */
1037     private void terminateCancelJob(Throwable exc) {
1038         continueRun = false;
1039         if (myState == State.RUNNING) {
1040             myCancelMessage = exc.getClass().getName();
1041             String msg = exc.getMessage();
1042             if (msg != null) {
1043                 myCancelMessage = myCancelMessage + ": " + msg;
1044             }
1045             System.err.println(myCancelMessage);
1046             exc.printStackTrace(System.err);
1047         }
1048     }
1049 
1050     /**
1051      * Terminate this Job Frontend immediately, sending a "cancel job" message
1052      * to the Job Scheduler and all Job Backends. The error message comes from
1053      * the given exception. This method must only be called by a thread other
1054      * than the thread calling <code>run()</code>.
1055      *
1056      * @param exc Exception.
1057      */
1058     void terminateCancelJobOther(Throwable exc) {
1059         boolean doExit = false;
1060         synchronized (this) {
1061             continueRun = false;
1062             if (myState == State.RUNNING) {
1063                 myCancelMessage = exc.getClass().getName();
1064                 String msg = exc.getMessage();
1065                 if (msg != null) {
1066                     myCancelMessage = myCancelMessage + ": " + msg;
1067                 }
1068                 System.err.println(myCancelMessage);
1069                 exc.printStackTrace(System.err);
1070                 doExit = true;
1071             }
1072         }
1073 
1074         // Cannot hold the synchronization lock while calling System.exit(),
1075         // otherwise a deadlock can occur between this thread and the shutdown
1076         // hook thread.
1077         if (doExit) {
1078             System.exit(1);
1079         }
1080     }
1081 
1082     /**
1083      * Shut down this Job Frontend.
1084      */
1085     private void shutdown() {
1086         synchronized (this) {
1087             // Stop all lease timers.
1088             mySchedulerRenewTimer.stop();
1089             mySchedulerExpireTimer.stop();
1090             for (ProcessInfo processinfo : myProcessInfo) {
1091                 processinfo.renewTimer.stop();
1092                 processinfo.expireTimer.stop();
1093             }
1094 
1095             // If state is RUNNING but myCancelMessage is not null, it means the
1096             // user canceled the job (e.g., by hitting CTRL-C).
1097             if (myState == State.RUNNING && myCancelMessage != null) {
1098                 myState = State.TERMINATE_CANCEL_JOB;
1099             }
1100 
1101             // Inform Job Scheduler and Job Backends.
1102             switch (myState) {
1103                 case RUNNING:
1104                     // Send "job finished" messages.
1105                     for (ProcessInfo processinfo : myProcessInfo) {
1106                         if (processinfo.backend != null) {
1107                             try {
1108                                 processinfo.backend.jobFinished(this);
1109                             } catch (IOException ignored) {
1110                             }
1111                         }
1112                     }
1113                     if (myJobScheduler != null) {
1114                         try {
1115                             myJobScheduler.jobFinished(this);
1116                         } catch (IOException ignored) {
1117                         }
1118                     }
1119                     break;
1120                 case TERMINATE_CANCEL_JOB:
1121                     // Send "cancel job" messages.
1122                     for (ProcessInfo processinfo : myProcessInfo) {
1123                         if (processinfo.backend != null
1124                                 && processinfo.state != ProcessInfo.State.FAILED) {
1125                             try {
1126                                 processinfo.backend.cancelJob(this, myCancelMessage);
1127                             } catch (IOException ignored) {
1128                             }
1129                         }
1130                     }
1131                     if (myJobScheduler != null) {
1132                         try {
1133                             myJobScheduler.cancelJob(this, myCancelMessage);
1134                         } catch (IOException ignored) {
1135                         }
1136                     }
1137                     break;
1138                 case TERMINATING:
1139                     // Send nothing.
1140                     break;
1141             }
1142 
1143             // Record that we are terminating.
1144             myState = State.TERMINATING;
1145         }
1146 
1147         // All proxies, channels, and channel groups will close when the process
1148         // exits.
1149     }
1150 
1151 // Unit test main program.
1152 //	/**
1153 //	 * Unit test main program.
1154 //	 * <P>
1155 //	 * Usage: java edu.rit.pj.cluster.JobFrontend <I>username</I> <I>K</I>
1156 //	 * <I>hasFrontendComm</I> <I>mainClassName</I> [ <I>arg</I> . . . ]
1157 //	 */
1158 //	public static void main
1159 //		(String[] args)
1160 //		throws Exception
1161 //		{
1162 //		if (args.length < 4) usage();
1163 //		String username = args[0];
1164 //		int K = Integer.parseInt (args[1]);
1165 //		boolean hasFrontendComm = Boolean.parseBoolean (args[2]);
1166 //		String mainClassName = args[3];
1167 //		int n = args.length - 4;
1168 //		String[] cmdargs = new String [n];
1169 //		System.arraycopy (args, 4, cmdargs, 0, n);
1170 //
1171 //		new JobFrontend (username, K, hasFrontendComm, mainClassName, cmdargs)
1172 //					.run();
1173 //		}
1174 //
1175 //	/**
1176 //	 * Print a usage message and exit.
1177 //	 */
1178 //	private static void usage()
1179 //		{
1180 //		System.err.println ("Usage: java edu.rit.pj.cluster.JobFrontend <username> <K> <hasFrontendComm> <mainClassName> [<arg>...]");
1181 //		System.exit (1);
1182 //		}
1183 }