1 //******************************************************************************
2 //
3 // File: JobFrontend.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.JobFrontend
6 //
7 // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import static java.lang.String.format;
43
44 import java.io.File;
45 import java.io.IOException;
46 import java.io.InputStream;
47 import java.net.InetSocketAddress;
48 import java.util.ArrayList;
49 import java.util.Arrays;
50 import java.util.HashMap;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Properties;
54 import java.util.logging.FileHandler;
55 import java.util.logging.Level;
56 import java.util.logging.Logger;
57
58 import edu.rit.mp.Channel;
59 import edu.rit.mp.ChannelGroup;
60 import edu.rit.mp.ChannelGroupClosedException;
61 import edu.rit.mp.ObjectBuf;
62 import edu.rit.mp.Status;
63 import edu.rit.mp.buf.ObjectItemBuf;
64 import edu.rit.pj.PJProperties;
65 import edu.rit.util.ByteSequence;
66 import edu.rit.util.Timer;
67 import edu.rit.util.TimerTask;
68 import edu.rit.util.TimerThread;
69
70 /**
71 * Class JobFrontend provides the message handler for the PJ job frontend
72 * process.
73 *
74 * @author Alan Kaminsky
75 * @version 20-Jun-2012
76 */
77 public class JobFrontend
78 implements Runnable, JobFrontendRef {
79
80 // Hidden data members.
81
82 // Logger
83 private static final Logger logger = Logger.getLogger(JobFrontend.class.getName());
84
85 // Logging
86 private FileHandler fileHandler = null;
87
88 // User name.
89 private String username;
90
91 // Job number.
92 private int jobnum;
93
94 // Job resources.
95 private int Nn;
96 private int Np;
97 private int Nt;
98
99 // Whether the frontend communicator exists, true or false.
100 private boolean hasFrontendComm;
101
102 // Main class name.
103 private String myMainClassName;
104
105 // Command line arguments.
106 private String[] myArgs;
107
108 // Rank of next backend process to be assigned.
109 private int myNextRank;
110
111 // Timer thread for lease renewals and expirations.
112 private TimerThread myLeaseTimerThread;
113
114 // Timers for the lease with the Job Scheduler.
115 private Timer mySchedulerRenewTimer;
116 private Timer mySchedulerExpireTimer;
117
118 // Timer for the job timeout if any.
119 private Timer myJobTimer;
120
121 // Array of job backend process info records, indexed by rank.
122 private ProcessInfo[] myProcessInfo;
123
124 // Mapping from job backend reference to job backend process info record.
125 private Map<JobBackendRef, ProcessInfo> myProcessMap
126 = new HashMap<JobBackendRef, ProcessInfo>();
127
128 // Number of running job backend processes.
129 private int myRunningCount;
130
131 // Number of finished job backend processes.
132 private int myFinishedCount;
133
134 // Middleware channel group and address array.
135 private ChannelGroup myMiddlewareChannelGroup;
136 private InetSocketAddress[] myMiddlewareAddress;
137
138 // Proxy for Job Scheduler Daemon.
139 private JobSchedulerRef myJobScheduler;
140
141 // World communicator channel group address array.
142 private InetSocketAddress[] myWorldAddress;
143
144 // Frontend communicator channel group and address array.
145 private ChannelGroup myFrontendChannelGroup;
146 private InetSocketAddress[] myFrontendAddress;
147
148 // JVM flags.
149 private String userJvmFlags = PJProperties.getPjJvmFlags();
150
151 // Resource contents that have been reported to job backend processes.
152 private ResourceCache myResourceCache = new ResourceCache();
153
154 // Flag for shutting down the run() method.
155 private boolean continueRun = true;
156
157 // State of this job frontend.
158 private State myState = State.RUNNING;
159
160 private static enum State {
161
162 RUNNING,
163 TERMINATE_CANCEL_JOB,
164 TERMINATING
165 }
166
167 ;
168
169 // Error message if job canceled, or null if job finished normally.
170 private String myCancelMessage = "User canceled job";
171
172 // For writing and reading files on the job frontend's node.
173 private FrontendFileWriter myFrontendFileWriter;
174 private FrontendFileReader myFrontendFileReader;
175
176 // Exported constructors.
177
178 /**
179 * Construct a new job frontend object. The job frontend object will contact
180 * the Job Scheduler Daemon specified by the <code>"pj.host"</code> and
181 * <code>"pj.port"</code> Java system properties. See class {@linkplain
182 * edu.rit.pj.PJProperties} for further information.
183 *
184 * @param username User name.
185 * @param Nn Number of backend nodes (>= 1).
186 * @param Np Number of processes (>= 1).
187 * @param Nt Number of CPUs per process (>= 0). 0 means "all CPUs."
188 * @param hasFrontendComm True if the job has the frontend communicator,
189 * false if it doesn't.
190 * @param mainClassName Main class name.
191 * @param args Command line arguments.
192 * @throws java.io.IOException Thrown if an I/O error occurred.
193 * @throws java.io.IOException if any.
194 */
195 public JobFrontend(String username,
196 int Nn,
197 int Np,
198 int Nt,
199 boolean hasFrontendComm,
200 String mainClassName,
201 String[] args)
202 throws IOException {
203 // Record arguments.
204 this.username = username;
205 this.Nn = Nn;
206 this.Np = Np;
207 this.Nt = Nt;
208 this.hasFrontendComm = hasFrontendComm;
209 this.myMainClassName = mainClassName;
210 this.myArgs = args;
211
212 // Set up shutdown hook.
213 Runtime.getRuntime().addShutdownHook(new Thread() {
214 public void run() {
215 shutdown();
216 }
217 });
218
219 // Set up lease timer thread.
220 myLeaseTimerThread = new TimerThread();
221 myLeaseTimerThread.setDaemon(true);
222 myLeaseTimerThread.start();
223
224 // Set up Job Scheduler lease timers.
225 mySchedulerRenewTimer
226 = myLeaseTimerThread.createTimer(new TimerTask() {
227 public void action(Timer timer) {
228 try {
229 schedulerRenewTimeout();
230 } catch (Throwable ignored) {
231 }
232 }
233 });
234 mySchedulerExpireTimer
235 = myLeaseTimerThread.createTimer(new TimerTask() {
236 public void action(Timer timer) {
237 try {
238 schedulerExpireTimeout();
239 } catch (Throwable ignored) {
240 }
241 }
242 });
243
244 // Set up job timer.
245 myJobTimer
246 = myLeaseTimerThread.createTimer(new TimerTask() {
247 public void action(Timer timer) {
248 try {
249 jobTimeout();
250 } catch (Throwable ignored) {
251 }
252 }
253 });
254
255 // Set up array of job backend process info records.
256 myProcessInfo = new ProcessInfo[Np];
257 for (int i = 0; i < Np; ++i) {
258 final int rank = i;
259 ProcessInfo processinfo
260 = new ProcessInfo(/*state */ProcessInfo.State.NOT_STARTED,
261 /*name */ null,
262 /*rank */ rank,
263 /*backend */ null,
264 /*middlewareAddress*/ null,
265 /*worldAddress */ null,
266 /*frontendAddress */ null,
267 /*renewTimer */
268 myLeaseTimerThread.createTimer(new TimerTask() {
269 public void action(Timer timer) {
270 try {
271 backendRenewTimeout(rank);
272 } catch (Throwable ignored) {
273 }
274 }
275 }),
276 /*expireTimer */
277 myLeaseTimerThread.createTimer(new TimerTask() {
278 public void action(Timer timer) {
279 try {
280 backendExpireTimeout(rank);
281 } catch (Throwable ignored) {
282 }
283 }
284 }),
285 /*Nt */ 0);
286 myProcessInfo[rank] = processinfo;
287 }
288
289 // Set up middleware channel group and address array.
290 myMiddlewareChannelGroup = new ChannelGroup();
291 myMiddlewareAddress = new InetSocketAddress[Np + 1];
292
293 // Set up world communicator address array.
294 myWorldAddress = new InetSocketAddress[Np];
295
296 // Set up frontend communicator channel group and address array.
297 if (hasFrontendComm) {
298 myFrontendChannelGroup = new ChannelGroup();
299 myFrontendAddress = new InetSocketAddress[Np + 1];
300 }
301
302 // Set up frontend file writer and reader.
303 myFrontendFileWriter = new FrontendFileWriter(this);
304 myFrontendFileReader = new FrontendFileReader(this);
305
306 // Set up Job Scheduler proxy.
307 InetSocketAddress js_address = null;
308 Channel js_channel = null;
309 try {
310 js_address
311 = new InetSocketAddress(PJProperties.getPjHost(),
312 PJProperties.getPjPort());
313 js_channel = myMiddlewareChannelGroup.connect(js_address);
314 } catch (IOException exc) {
315 throw new JobSchedulerException("JobFrontend(): Cannot contact Job Scheduler Daemon at "
316 + js_address,
317 exc);
318 }
319 myJobScheduler
320 = new JobSchedulerProxy(myMiddlewareChannelGroup, js_channel);
321
322 // Start Job Scheduler lease timers.
323 mySchedulerRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
324 Constants.LEASE_RENEW_INTERVAL);
325 mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
326
327 // Kick off the job!
328 myJobScheduler.requestJob(this, username, Nn, Np, Nt);
329 }
330
331 // Exported operations.
332
333 /**
334 * Run this Job Frontend.
335 */
336 public void run() {
337 ObjectItemBuf<JobFrontendMessage> buf
338 = ObjectBuf.buffer((JobFrontendMessage) null);
339 Status status = null;
340 JobFrontendMessage message = null;
341 JobBackendRef backend = null;
342
343 try {
344 while (continueRun) {
345 // Receive a message from any channel.
346 status = myMiddlewareChannelGroup.receive(null, null, buf);
347 message = buf.item;
348
349 // Process a message from the Job Scheduler.
350 if (status.tag == Message.FROM_JOB_SCHEDULER) {
351 message.invoke(this, myJobScheduler);
352 } // Process a message from a job backend.
353 else if (status.tag == Message.FROM_JOB_BACKEND) {
354 // Get job backend associated with channel. If none, set up
355 // a new job backend proxy.
356 backend = (JobBackendRef) status.channel.info();
357 if (backend == null) {
358 backend
359 = new JobBackendProxy(myMiddlewareChannelGroup, status.channel);
360 status.channel.info(backend);
361 }
362
363 // Process message.
364 message.invoke(this, backend);
365 }
366
367 // Enable garbage collection of no-longer-needed objects while
368 // waiting to receive next message.
369 buf.item = null;
370 status = null;
371 message = null;
372 backend = null;
373 }
374 } catch (ChannelGroupClosedException ignored) {
375 } catch (Throwable exc) {
376 terminateCancelJob(exc);
377 }
378
379 // Exit process if necessary.
380 switch (myState) {
381 case TERMINATE_CANCEL_JOB:
382 System.exit(1);
383 break;
384 case RUNNING:
385 case TERMINATING:
386 break;
387 }
388 }
389
390 /**
391 * {@inheritDoc}
392 * <p>
393 * Assign a backend process to the job.
394 */
395 public void assignBackend(JobSchedulerRef theJobScheduler,
396 String name,
397 String host,
398 String jvm,
399 String classpath,
400 String[] jvmflags,
401 String shellCommand,
402 int Nt)
403 throws IOException {
404 // Record backend name and number of CPUs.
405 int rank = myNextRank++;
406 ProcessInfo processinfo = myProcessInfo[rank];
407 processinfo.name = name;
408 processinfo.Nt = Nt;
409
410 // Display backend.
411 System.err.print(", ");
412 System.err.print(name);
413 System.err.flush();
414 if (myNextRank == Np) {
415 System.err.println();
416 }
417
418 /**
419 * Without this pause of 10 msec, the SSH command below will often fail when using more than 10 to 12 nodes.
420 */
421 try {
422 Thread.sleep(10);
423 } catch (InterruptedException e) {
424 //
425 }
426
427 // if (System.getProperty("pj.log", "false").equalsIgnoreCase("true")) {
428 // try {
429 // // Remove all log handlers from the default logger.
430 // java.util.logging.Logger defaultLogger = java.util.logging.LogManager.getLogManager().getLogger("");
431 // java.util.logging.Handler defaultHandlers[] = defaultLogger.getHandlers();
432 // for (java.util.logging.Handler h : defaultHandlers) {
433 // defaultLogger.removeHandler(h);
434 // }
435 //
436 // // Create a FileHandler that logs messages with a SimpleFormatter.
437 // File file = new File(Integer.toString(rank));
438 // file.mkdir();
439 // fileHandler = new FileHandler(file.getAbsolutePath() + "/frontend.log");
440 // fileHandler.setFormatter(new SimpleFormatter());
441 // logger.addHandler(fileHandler);
442 // logger.setLevel(Level.INFO);
443 // } catch (Exception e) {
444 // logger.setLevel(Level.OFF);
445 // }
446 // } else {
447 // logger.setLevel(Level.OFF);
448 // }
449 //
450 // logger.log(Level.INFO, " Username: " + username);
451 // logger.log(Level.INFO, " Job number: " + jobnum);
452 // logger.log(Level.INFO, " Nodes: " + Np);
453 // logger.log(Level.INFO, " Rank: " + rank);
454 // logger.log(Level.INFO, " Has Frontend Comm: " + hasFrontendComm);
455 // logger.log(Level.INFO, " Frontend Host: " + myMiddlewareChannelGroup.listenAddress().getHostName());
456 // logger.log(Level.INFO, " Frontend Port: " + myMiddlewareChannelGroup.listenAddress().getPort());
457 // logger.log(Level.INFO, " Backend Host: " + host);
458
459 try {
460 String listenHost = myMiddlewareChannelGroup.listenAddress().getHostName();
461 if (listenHost.equalsIgnoreCase(host)) {
462 // Build a command to run on a backend node via a local ProcessBuilder.
463 List<String> command = new ArrayList<>();
464 command.add(jvm);
465 command.add("-classpath");
466 command.add(classpath);
467 if (jvmflags != null && jvmflags.length > 0) {
468 command.addAll(Arrays.asList(jvmflags));
469 }
470 if (userJvmFlags != null && !userJvmFlags.trim().equalsIgnoreCase("")) {
471 command.add(userJvmFlags.trim());
472 }
473 command.add("edu.rit.pj.cluster.JobBackend");
474 command.add(username);
475 command.add(Integer.toString(jobnum));
476 command.add(Integer.toString(Np));
477 command.add(Integer.toString(rank));
478 command.add(Boolean.toString(hasFrontendComm));
479 command.add(myMiddlewareChannelGroup.listenAddress().getHostName());
480 command.add(Integer.toString(myMiddlewareChannelGroup.listenAddress().getPort()));
481 command.add(host);
482 ProcessBuilder pb = new ProcessBuilder(command);
483 String cwd = System.getProperty("user.dir");
484 if (cwd != null) {
485 pb.directory(new File(cwd));
486 }
487 Process ssh = pb.start();
488 } else {
489 // Build a command to run on the backend node via ssh.
490 StringBuilder command = new StringBuilder();
491 command.append(shellCommand);
492 command.append(" \"");
493 String cwd = System.getProperty("user.dir");
494 if (cwd != null) {
495 command.append("cd '");
496 command.append(cwd);
497 command.append("'; ");
498 }
499 command.append("nohup ");
500 command.append(jvm);
501 command.append(" -classpath '");
502 command.append(classpath);
503 command.append("'");
504 for (String flag : jvmflags) {
505 command.append(" ");
506 command.append(flag);
507 }
508 command.append(" ");
509 command.append(userJvmFlags);
510 command.append(" edu.rit.pj.cluster.JobBackend '");
511 command.append(username);
512 command.append("' ");
513 command.append(jobnum);
514 command.append(" ");
515 command.append(Np);
516 command.append(" ");
517 command.append(rank);
518 command.append(" ");
519 command.append(hasFrontendComm);
520 command.append(" '");
521 command.append(myMiddlewareChannelGroup.listenAddress().getHostName());
522 command.append("' ");
523 command.append(myMiddlewareChannelGroup.listenAddress().getPort());
524 command.append(" '");
525 command.append(host);
526 command.append("' >/dev/null 2>/dev/null &\"");
527
528 // So an SSH remote login and execute the above command.
529 Process ssh = Runtime.getRuntime().exec(new String[]{"ssh", host, command.toString()});
530 }
531
532 // Start lease timers for the backend node.
533 processinfo.renewTimer.start(Constants.LEASE_RENEW_INTERVAL,
534 Constants.LEASE_RENEW_INTERVAL);
535 processinfo.expireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
536
537 } // If an I/O error occurs, treat it as a backend node failure.
538 catch (IOException exc) {
539 if (myNextRank != Np) {
540 System.err.println();
541 }
542 System.err.println(" Exception executing SSH command:\n" + exc);
543 logger.log(Level.SEVERE, " Exception executing SSH command:\n" + exc);
544 terminateCancelJob(backendFailed(processinfo, "SSH command failed"));
545 }
546 }
547
548 /**
549 * {@inheritDoc}
550 * <p>
551 * Assign a job number to the job. The host name for the job frontend's
552 * middleware channel group is also specified.
553 */
554 public synchronized void assignJobNumber(JobSchedulerRef theJobScheduler,
555 int jobnum,
556 String pjhost)
557 throws IOException {
558 // Record job number.
559 this.jobnum = jobnum;
560
561 // Start listening for connections to the middleware channel group.
562 myMiddlewareChannelGroup.listen(new InetSocketAddress(pjhost, 0));
563 myMiddlewareChannelGroup.startListening();
564 myMiddlewareAddress[Np] = myMiddlewareChannelGroup.listenAddress();
565
566 // Start listening for connections to the frontend communicator channel
567 // group.
568 if (hasFrontendComm) {
569 myFrontendChannelGroup.listen(new InetSocketAddress(pjhost, 0));
570 myFrontendChannelGroup.startListening();
571 myFrontendAddress[Np] = myFrontendChannelGroup.listenAddress();
572 }
573
574 // Report job number.
575 System.err.print("Job " + jobnum);
576 System.err.flush();
577 }
578
579 /**
580 * {@inheritDoc}
581 * <p>
582 * Cancel the job.
583 */
584 public synchronized void cancelJob(JobSchedulerRef theJobScheduler,
585 String errmsg)
586 throws IOException {
587 terminateCancelJob(errmsg);
588 }
589
590 /**
591 * Renew the lease on the job.
592 *
593 * @param theJobScheduler Job Scheduler that is calling this method.
594 * @throws java.io.IOException Thrown if an I/O error occurred.
595 * @throws java.io.IOException if any.
596 */
597 public synchronized void renewLease(JobSchedulerRef theJobScheduler)
598 throws IOException {
599 mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
600 }
601
602 /**
603 * {@inheritDoc}
604 * <p>
605 * Report that a backend process has finished executing the job.
606 */
607 public synchronized void backendFinished(JobBackendRef theJobBackend)
608 throws IOException {
609 ProcessInfo processinfo = myProcessMap.get(theJobBackend);
610 if (processinfo == null) {
611 return;
612 }
613
614 // Verify that this backend has not finished already.
615 if (processinfo.state != ProcessInfo.State.RUNNING) {
616 terminateCancelJob("Unexpected \"backend finished\" message, rank="
617 + processinfo.rank);
618 }
619
620 // Update job backend process state.
621 processinfo.state = ProcessInfo.State.FINISHED;
622
623 // Increase count of finished processes.
624 ++myFinishedCount;
625
626 // If all job backend processes have finished, terminate the run()
627 // method. This will cause the job frontend process to exit when all
628 // other non-daemon threads have also terminated.
629 if (myFinishedCount == Np) {
630 continueRun = false;
631 myCancelMessage = null;
632 }
633 }
634
635 /**
636 * {@inheritDoc}
637 * <p>
638 * Report that a backend process is ready to commence executing the job.
639 */
640 public synchronized void backendReady(JobBackendRef theJobBackend,
641 int rank,
642 InetSocketAddress middlewareAddress,
643 InetSocketAddress worldAddress,
644 InetSocketAddress frontendAddress)
645 throws IOException {
646 // Verify that rank is in range.
647 if (0 > rank || rank >= Np) {
648 terminateCancelJob("Illegal \"backend ready\" message, rank=" + rank);
649 }
650
651 // Verify that this backend has not started already.
652 ProcessInfo processinfo = myProcessInfo[rank];
653 if (processinfo.state != ProcessInfo.State.NOT_STARTED) {
654 terminateCancelJob("Unexpected \"backend ready\" message, rank=" + rank);
655 }
656
657 // Record information in job backend process info record.
658 processinfo.state = ProcessInfo.State.RUNNING;
659 processinfo.backend = theJobBackend;
660 processinfo.middlewareAddress = middlewareAddress;
661 processinfo.worldAddress = worldAddress;
662 processinfo.frontendAddress = frontendAddress;
663 myProcessMap.put(theJobBackend, processinfo);
664
665 // Record channel group addresses.
666 myMiddlewareAddress[rank] = middlewareAddress;
667 myWorldAddress[rank] = worldAddress;
668 if (hasFrontendComm) {
669 myFrontendAddress[rank] = frontendAddress;
670 }
671
672 // Increase count of running processes.
673 ++myRunningCount;
674
675 // If all job backend processes have reported ready, commence job.
676 if (myRunningCount == Np) {
677 // Start job timer if necessary.
678 int jobtime = PJProperties.getPjJobTime();
679 if (jobtime > 0) {
680 myJobTimer.start(jobtime * 1000L);
681 }
682
683 // Get the system properties.
684 Properties props = System.getProperties();
685
686 // Send "commence job" message to each job backend, with system
687 // property "pj.nt" set to the proper number of CPUs.
688 for (ProcessInfo info : myProcessMap.values()) {
689 props.setProperty("pj.nt", "" + info.Nt);
690 info.backend.commenceJob(/*theJobFrontend */this,
691 /*middlewareAddress*/ myMiddlewareAddress,
692 /*worldAddress */ myWorldAddress,
693 /*frontendAddress */ myFrontendAddress,
694 /*properties */ props,
695 /*mainClassName */ myMainClassName,
696 /*args */ myArgs);
697 }
698 }
699 }
700
701 /**
702 * {@inheritDoc}
703 * <p>
704 * Cancel the job.
705 */
706 public synchronized void cancelJob(JobBackendRef theJobBackend,
707 String errmsg)
708 throws IOException {
709 terminateCancelJob(errmsg);
710 }
711
712 /**
713 * {@inheritDoc}
714 * <p>
715 * Renew the lease on the job.
716 */
717 public synchronized void renewLease(JobBackendRef theJobBackend)
718 throws IOException {
719 ProcessInfo processinfo = myProcessMap.get(theJobBackend);
720 if (processinfo != null) {
721 processinfo.expireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
722 }
723 }
724
725 /**
726 * {@inheritDoc}
727 * <p>
728 * Request the given resource from this job frontend's class loader.
729 */
730 public synchronized void requestResource(JobBackendRef theJobBackend,
731 String resourceName)
732 throws IOException {
733 // To hold resource content.
734 byte[] content = null;
735
736 // Get resource content. If resource not found, content is null.
737 if (myResourceCache.contains(resourceName)) {
738 // Get resource content from cache.
739 content = myResourceCache.getNoWait(resourceName);
740 } else {
741 // Get resource content from class loader, save it in cache.
742 InputStream stream
743 = getClass().getClassLoader().getResourceAsStream(resourceName);
744 if (stream != null) {
745 content = new ByteSequence(stream).toByteArray();
746 }
747 myResourceCache.put(resourceName, content);
748 }
749
750 // Send resource to job backend.
751 theJobBackend.reportResource(this, resourceName, content);
752 }
753
754 /**
755 * {@inheritDoc}
756 * <p>
757 * Open the given output file for writing or appending.
758 */
759 public synchronized void outputFileOpen(JobBackendRef theJobBackend,
760 int bfd,
761 File file,
762 boolean append)
763 throws IOException {
764 myFrontendFileWriter.outputFileOpen(theJobBackend, bfd, file, append);
765 }
766
767 /**
768 * {@inheritDoc}
769 * <p>
770 * Write the given bytes to the given output file. <code>ffd</code> = 1 refers
771 * to the job's standard output stream; <code>ffd</code> = 2 refers to the job's
772 * standard error stream; other values refer to a previously opened file.
773 */
774 public synchronized void outputFileWrite(JobBackendRef theJobBackend,
775 int ffd,
776 byte[] buf,
777 int off,
778 int len)
779 throws IOException {
780 myFrontendFileWriter.outputFileWrite(theJobBackend, ffd, len);
781 }
782
783 /**
784 * {@inheritDoc}
785 * <p>
786 * Flush accumulated bytes to the given output file.
787 */
788 public synchronized void outputFileFlush(JobBackendRef theJobBackend,
789 int ffd)
790 throws IOException {
791 myFrontendFileWriter.outputFileFlush(theJobBackend, ffd);
792 }
793
794 /**
795 * {@inheritDoc}
796 * <p>
797 * Close the given output file.
798 */
799 public synchronized void outputFileClose(JobBackendRef theJobBackend,
800 int ffd)
801 throws IOException {
802 myFrontendFileWriter.outputFileClose(theJobBackend, ffd);
803 }
804
805 /**
806 * {@inheritDoc}
807 * <p>
808 * Open the given input file for reading.
809 */
810 public synchronized void inputFileOpen(JobBackendRef theJobBackend,
811 int bfd,
812 File file)
813 throws IOException {
814 myFrontendFileReader.inputFileOpen(theJobBackend, bfd, file);
815 }
816
817 /**
818 * {@inheritDoc}
819 * <p>
820 * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
821 * job's standard input stream; other values refer to a previously opened
822 * file.
823 */
824 public synchronized void inputFileRead(JobBackendRef theJobBackend,
825 int ffd,
826 int len)
827 throws IOException {
828 myFrontendFileReader.inputFileRead(theJobBackend, ffd, len);
829 }
830
831 /**
832 * {@inheritDoc}
833 * <p>
834 * Skip bytes from the given input file.
835 */
836 public synchronized void inputFileSkip(JobBackendRef theJobBackend,
837 int ffd,
838 long len)
839 throws IOException {
840 myFrontendFileReader.inputFileSkip(theJobBackend, ffd, len);
841 }
842
843 /**
844 * {@inheritDoc}
845 * <p>
846 * Close the given input file.
847 */
848 public synchronized void inputFileClose(JobBackendRef theJobBackend,
849 int ffd)
850 throws IOException {
851 myFrontendFileReader.inputFileClose(theJobBackend, ffd);
852 }
853
854 /**
855 * {@inheritDoc}
856 * <p>
857 * Report a comment for a process.
858 */
859 public synchronized void reportComment(JobBackendRef theJobBackend,
860 int rank,
861 String comment)
862 throws IOException {
863 myJobScheduler.reportComment(this, rank, comment);
864 }
865
866 /**
867 * Close communication with this Job Frontend.
868 */
869 public void close() {
870 }
871
872 // Hidden operations.
873
874 /**
875 * Take action when the Job Scheduler's lease renewal timer times out.
876 *
877 * @throws IOException Thrown if an I/O error occurred.
878 */
879 private synchronized void schedulerRenewTimeout()
880 throws IOException {
881 if (mySchedulerRenewTimer.isTriggered()) {
882 myJobScheduler.renewLease(this);
883 }
884 }
885
886 /**
887 * Take action when the Job Scheduler's lease expiration timer times out.
888 *
889 * @throws IOException Thrown if an I/O error occurred.
890 */
891 private void schedulerExpireTimeout()
892 throws IOException {
893 boolean doExit = false;
894 synchronized (this) {
895 if (mySchedulerExpireTimer.isTriggered()) {
896 continueRun = false;
897 if (myState == State.RUNNING) {
898 myState = State.TERMINATE_CANCEL_JOB;
899 myCancelMessage = "Job Scheduler failed";
900 System.err.println(myCancelMessage);
901 doExit = true;
902 }
903 }
904 }
905
906 // Cannot hold the synchronization lock while calling System.exit(),
907 // otherwise a deadlock can occur between this thread (the timer thread)
908 // and the shutdown hook thread.
909 if (doExit) {
910 System.exit(1);
911 }
912 }
913
914 /**
915 * Take action when the job timer times out.
916 *
917 * @throws IOException Thrown if an I/O error occurred.
918 */
919 private void jobTimeout()
920 throws IOException {
921 boolean doExit = false;
922 synchronized (this) {
923 if (myJobTimer.isTriggered()) {
924 continueRun = false;
925 if (myState == State.RUNNING) {
926 myState = State.TERMINATE_CANCEL_JOB;
927 myCancelMessage = "Job exceeded maximum running time";
928 System.err.println(myCancelMessage);
929 doExit = true;
930 }
931 }
932 }
933
934 // Cannot hold the synchronization lock while calling System.exit(),
935 // otherwise a deadlock can occur between this thread (the timer thread)
936 // and the shutdown hook thread.
937 if (doExit) {
938 System.exit(1);
939 }
940 }
941
942 /**
943 * Take action when a job backend process's lease renewal timer times out.
944 *
945 * @param rank Job backend process's rank.
946 * @throws IOException Thrown if an I/O error occurred.
947 */
948 private synchronized void backendRenewTimeout(int rank)
949 throws IOException {
950 ProcessInfo processinfo = myProcessInfo[rank];
951 if (processinfo.renewTimer.isTriggered()) {
952 processinfo.backend.renewLease(this);
953 }
954 }
955
956 /**
957 * Take action when a job backend process's lease expiration timer times
958 * out.
959 *
960 * @param rank Job backend process's rank.
961 * @throws IOException Thrown if an I/O error occurred.
962 */
963 private void backendExpireTimeout(int rank)
964 throws IOException {
965 boolean doExit = false;
966 synchronized (this) {
967 ProcessInfo processinfo = myProcessInfo[rank];
968 if (processinfo.expireTimer.isTriggered()) {
969 // Terminate the Job Frontend.
970 String msg = backendFailed(processinfo, "Expire Timer Triggered");
971 continueRun = false;
972 if (myState == State.RUNNING) {
973 myState = State.TERMINATE_CANCEL_JOB;
974 myCancelMessage = msg;
975 System.err.println(myCancelMessage);
976 doExit = true;
977 }
978 }
979 }
980
981 // Cannot hold the synchronization lock while calling System.exit(),
982 // otherwise a deadlock can occur between this thread (the timer thread)
983 // and the shutdown hook thread.
984 if (doExit) {
985 System.exit(1);
986 }
987 }
988
989 /**
990 * Take action when a backend process fails.
991 *
992 * @param processinfo Process info.
993 * @param reason Reason for the failure.
994 * @return Error message.
995 */
996 private String backendFailed(ProcessInfo processinfo, String reason) {
997 // Mark the backend process as failed.
998 processinfo.state = ProcessInfo.State.FAILED;
999
1000 // Tell the Job Scheduler that the backend process failed.
1001 try {
1002 myJobScheduler.backendFailed(this, processinfo.name);
1003 } catch (IOException ignored) {
1004 }
1005
1006 // Set up error message.
1007 String message = format("Job backend process failed (%s), node %s, rank %d",
1008 reason, processinfo.name, processinfo.rank);
1009 return message;
1010 }
1011
1012 /**
1013 * Terminate this Job Frontend immediately, sending a "cancel job" message
1014 * to the Job Scheduler and all Job Backends. The error message is
1015 * <code>msg</code>. This method must only be called by the thread calling
1016 * <code>run()</code>.
1017 *
1018 * @param msg Error message.
1019 */
1020 private void terminateCancelJob(String msg) {
1021 continueRun = false;
1022 if (myState == State.RUNNING) {
1023 myState = State.TERMINATE_CANCEL_JOB;
1024 myCancelMessage = msg;
1025 System.err.println(myCancelMessage);
1026 }
1027 }
1028
1029 /**
1030 * Terminate this Job Frontend immediately, sending a "cancel job" message
1031 * to the Job Scheduler and all Job Backends. The error message comes from
1032 * the given exception. This method must only be called by the thread
1033 * calling <code>run()</code>.
1034 *
1035 * @param exc Exception.
1036 */
1037 private void terminateCancelJob(Throwable exc) {
1038 continueRun = false;
1039 if (myState == State.RUNNING) {
1040 myCancelMessage = exc.getClass().getName();
1041 String msg = exc.getMessage();
1042 if (msg != null) {
1043 myCancelMessage = myCancelMessage + ": " + msg;
1044 }
1045 System.err.println(myCancelMessage);
1046 exc.printStackTrace(System.err);
1047 }
1048 }
1049
1050 /**
1051 * Terminate this Job Frontend immediately, sending a "cancel job" message
1052 * to the Job Scheduler and all Job Backends. The error message comes from
1053 * the given exception. This method must only be called by a thread other
1054 * than the thread calling <code>run()</code>.
1055 *
1056 * @param exc Exception.
1057 */
1058 void terminateCancelJobOther(Throwable exc) {
1059 boolean doExit = false;
1060 synchronized (this) {
1061 continueRun = false;
1062 if (myState == State.RUNNING) {
1063 myCancelMessage = exc.getClass().getName();
1064 String msg = exc.getMessage();
1065 if (msg != null) {
1066 myCancelMessage = myCancelMessage + ": " + msg;
1067 }
1068 System.err.println(myCancelMessage);
1069 exc.printStackTrace(System.err);
1070 doExit = true;
1071 }
1072 }
1073
1074 // Cannot hold the synchronization lock while calling System.exit(),
1075 // otherwise a deadlock can occur between this thread and the shutdown
1076 // hook thread.
1077 if (doExit) {
1078 System.exit(1);
1079 }
1080 }
1081
1082 /**
1083 * Shut down this Job Frontend.
1084 */
1085 private void shutdown() {
1086 synchronized (this) {
1087 // Stop all lease timers.
1088 mySchedulerRenewTimer.stop();
1089 mySchedulerExpireTimer.stop();
1090 for (ProcessInfo processinfo : myProcessInfo) {
1091 processinfo.renewTimer.stop();
1092 processinfo.expireTimer.stop();
1093 }
1094
1095 // If state is RUNNING but myCancelMessage is not null, it means the
1096 // user canceled the job (e.g., by hitting CTRL-C).
1097 if (myState == State.RUNNING && myCancelMessage != null) {
1098 myState = State.TERMINATE_CANCEL_JOB;
1099 }
1100
1101 // Inform Job Scheduler and Job Backends.
1102 switch (myState) {
1103 case RUNNING:
1104 // Send "job finished" messages.
1105 for (ProcessInfo processinfo : myProcessInfo) {
1106 if (processinfo.backend != null) {
1107 try {
1108 processinfo.backend.jobFinished(this);
1109 } catch (IOException ignored) {
1110 }
1111 }
1112 }
1113 if (myJobScheduler != null) {
1114 try {
1115 myJobScheduler.jobFinished(this);
1116 } catch (IOException ignored) {
1117 }
1118 }
1119 break;
1120 case TERMINATE_CANCEL_JOB:
1121 // Send "cancel job" messages.
1122 for (ProcessInfo processinfo : myProcessInfo) {
1123 if (processinfo.backend != null
1124 && processinfo.state != ProcessInfo.State.FAILED) {
1125 try {
1126 processinfo.backend.cancelJob(this, myCancelMessage);
1127 } catch (IOException ignored) {
1128 }
1129 }
1130 }
1131 if (myJobScheduler != null) {
1132 try {
1133 myJobScheduler.cancelJob(this, myCancelMessage);
1134 } catch (IOException ignored) {
1135 }
1136 }
1137 break;
1138 case TERMINATING:
1139 // Send nothing.
1140 break;
1141 }
1142
1143 // Record that we are terminating.
1144 myState = State.TERMINATING;
1145 }
1146
1147 // All proxies, channels, and channel groups will close when the process
1148 // exits.
1149 }
1150
1151 // Unit test main program.
1152 // /**
1153 // * Unit test main program.
1154 // * <P>
1155 // * Usage: java edu.rit.pj.cluster.JobFrontend <I>username</I> <I>K</I>
1156 // * <I>hasFrontendComm</I> <I>mainClassName</I> [ <I>arg</I> . . . ]
1157 // */
1158 // public static void main
1159 // (String[] args)
1160 // throws Exception
1161 // {
1162 // if (args.length < 4) usage();
1163 // String username = args[0];
1164 // int K = Integer.parseInt (args[1]);
1165 // boolean hasFrontendComm = Boolean.parseBoolean (args[2]);
1166 // String mainClassName = args[3];
1167 // int n = args.length - 4;
1168 // String[] cmdargs = new String [n];
1169 // System.arraycopy (args, 4, cmdargs, 0, n);
1170 //
1171 // new JobFrontend (username, K, hasFrontendComm, mainClassName, cmdargs)
1172 // .run();
1173 // }
1174 //
1175 // /**
1176 // * Print a usage message and exit.
1177 // */
1178 // private static void usage()
1179 // {
1180 // System.err.println ("Usage: java edu.rit.pj.cluster.JobFrontend <username> <K> <hasFrontendComm> <mainClassName> [<arg>...]");
1181 // System.exit (1);
1182 // }
1183 }