1 //******************************************************************************
2 //
3 // File: JobBackend.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.JobBackend
6 //
7 // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import static java.lang.String.format;
43
44 import java.io.File;
45 import java.io.IOException;
46 import java.io.PrintStream;
47 import java.lang.reflect.Method;
48 import java.net.InetSocketAddress;
49 import java.util.Map;
50 import java.util.Properties;
51 import java.util.concurrent.CountDownLatch;
52 import java.util.logging.FileHandler;
53 import java.util.logging.Handler;
54 import java.util.logging.Level;
55 import java.util.logging.LogManager;
56 import java.util.logging.Logger;
57 import java.util.logging.SimpleFormatter;
58
59 import edu.rit.mp.ChannelGroup;
60 import edu.rit.mp.ChannelGroupClosedException;
61 import edu.rit.mp.ObjectBuf;
62 import edu.rit.mp.Status;
63 import edu.rit.mp.buf.ObjectItemBuf;
64 import edu.rit.util.ByteSequence;
65 import edu.rit.util.Timer;
66 import edu.rit.util.TimerThread;
67
68 /**
69 * Class JobBackend is the main program for a job backend process in the PJ
70 * cluster middleware. The job backend process is launched by an SSH remote
71 * login from the job frontend process (class {@linkplain JobFrontend}).
72 * <p>
73 * The command line for the job backend main program is:
74 * <p>
75 * java edu.rit.pj.cluster.JobBackend <I>username</I> <I>jobnum</I> <I>K</I>
76 * <I>rank</I> <I>hasFrontendComm</I> <I>frontendHost</I> <I>frontendPort</I>
77 * <I>backendHost</I>
78 * <BR><I>username</I> = User name
79 * <BR><I>jobnum</I> = Job number
80 * <BR><I>K</I> = Number of backend processes (>= 1)
81 * <BR><I>rank</I> = Rank of this backend process (0 .. <I>K</I>-1)
82 * <BR><I>hasFrontendComm</I> = Whether the frontend communicator exists
83 * (<code>true</code> or <code>false</code>)
84 * <BR><I>frontendHost</I> = Job frontend's middleware channel group host name
85 * <BR><I>frontendPort</I> = Job frontend's middleware channel group port number
86 * <BR><I>backendHost</I> = Job backend's middleware channel group host name
87 *
88 * @author Alan Kaminsky
89 * @version 24-Jan-2012
90 */
91 public class JobBackend
92 implements Runnable, JobBackendRef {
93
94 // Logger
95 private static final Logger logger = Logger.getLogger(JobBackend.class.getName());
96
97 // Hidden class-wide data members.
98 private static JobBackend theJobBackend;
99
100 // Hidden data members.
101 // Command line arguments.
102 private final String username;
103 private final int jobnum;
104 private final int K;
105 private final int rank;
106 private final boolean hasFrontendComm;
107 private final String frontendHost;
108 private final int frontendPort;
109 private final String backendHost;
110
111 // Logging
112 FileHandler fileHandler = null;
113
114 // Timer thread for lease renewals and expirations.
115 private final TimerThread myLeaseTimerThread;
116
117 // Timers for the lease with the job frontend.
118 private final Timer myFrontendRenewTimer;
119 private final Timer myFrontendExpireTimer;
120
121 // Middleware channel group and address array.
122 private final ChannelGroup myMiddlewareChannelGroup;
123 private InetSocketAddress[] myMiddlewareAddress;
124
125 // Job frontend proxy.
126 private final JobFrontendRef myJobFrontend;
127
128 // For loading classes from the job frontend process.
129 private final ResourceCache myResourceCache;
130 private final BackendClassLoader myClassLoader;
131
132 // World communicator channel group and address array.
133 private final ChannelGroup myWorldChannelGroup;
134 private InetSocketAddress[] myWorldAddress;
135
136 // Frontend communicator channel group and address array.
137 private ChannelGroup myFrontendChannelGroup;
138 private InetSocketAddress[] myFrontendAddress;
139
140 // Java system properties.
141 private Properties myProperties;
142
143 // Main class name.
144 private String myMainClassName;
145
146 // Command line arguments.
147 private String[] myArgs;
148
149 // Flag set true to commence job.
150 private boolean commence;
151
152 // Buffer for receiving job backend messages.
153 private final ObjectItemBuf<JobBackendMessage> myBuffer
154 = ObjectBuf.buffer((JobBackendMessage) null);
155
156 // Flags for shutting down the run() method.
157 private boolean continueRun = true;
158 private final CountDownLatch runFinished = new CountDownLatch(1);
159
160 // State of this job backend.
161 private State myState = State.RUNNING;
162
163 private static enum State {
164 RUNNING,
165 TERMINATE_CANCEL_JOB,
166 TERMINATE_NO_REPORT,
167 TERMINATING
168 }
169
170 // Error message if job canceled, or null if job finished normally.
171 private String myCancelMessage;
172
173 // Original standard error stream; goes to the Job Launcher's log file.
174 private final PrintStream myJobLauncherLog;
175
176 // For writing and reading files in the job frontend.
177 private final BackendFileWriter myFileWriter;
178 private final BackendFileReader myFileReader;
179
180 // Hidden constructors.
181
182 /**
183 * Construct a new Job Backend.
184 *
185 * @param username User name.
186 * @param jobnum Job number.
187 * @param K Number of backend processes.
188 * @param rank Rank of this backend process.
189 * @param hasFrontendComm Whether the frontend communicator exists.
190 * @param frontendHost Host name of job frontend's middleware channel group.
191 * @param frontendPort Port number of job frontend's middleware channel group.
192 * @param backendHost Host name of job backend's middleware channel group.
193 * @throws IOException Thrown if an I/O error occurred.
194 */
195 private JobBackend(String username,
196 int jobnum,
197 int K,
198 int rank,
199 boolean hasFrontendComm,
200 String frontendHost,
201 int frontendPort,
202 String backendHost)
203 throws IOException {
204
205 // Record command line arguments.
206 this.username = username;
207 this.jobnum = jobnum;
208 this.K = K;
209 this.rank = rank;
210 this.hasFrontendComm = hasFrontendComm;
211 this.frontendHost = frontendHost;
212 this.frontendPort = frontendPort;
213 this.backendHost = backendHost;
214
215 // Turn on verbose start-up logging.
216 StringBuilder sb = new StringBuilder();
217 boolean verbose = Boolean.parseBoolean(System.getProperty("pj.verbose", "false"));
218 if (verbose) {
219 try {
220 // Remove all log handlers from the default logger.
221 Logger defaultLogger = LogManager.getLogManager().getLogger("");
222 Handler[] defaultHandlers = defaultLogger.getHandlers();
223 for (Handler h : defaultHandlers) {
224 defaultLogger.removeHandler(h);
225 }
226
227 // Create a FileHandler that logs messages with a SimpleFormatter.
228 File dir = new File(Integer.toString(this.rank));
229 if (!dir.exists()) {
230 boolean success = dir.mkdir();
231 sb.append(format("\n Creation of logging directory %s: %b", dir, success));
232 }
233
234 String logFile = dir.getAbsolutePath() + File.separator + "backend.log";
235 fileHandler = new FileHandler(logFile);
236 sb.append(format("\n Log file: %s", logFile));
237 fileHandler.setFormatter(new SimpleFormatter());
238 logger.addHandler(fileHandler);
239 logger.setLevel(Level.INFO);
240 } catch (Exception e) {
241 logger.setLevel(Level.OFF);
242 }
243 } else {
244 logger.setLevel(Level.OFF);
245 }
246
247 // Note that logging is OFF if the "pj.verbose" property was not true.
248 sb.append(format("\n Username: %s", this.username));
249 sb.append(format("\n Job number: %d", this.jobnum));
250 sb.append(format("\n Nodes: %d", this.K));
251 sb.append(format("\n Rank: %d", this.rank));
252 sb.append(format("\n Has Frontend Comm: %b", this.hasFrontendComm));
253 sb.append(format("\n Frontend Host: %s", this.frontendHost));
254 sb.append(format("\n Frontend Port: %d", this.frontendPort));
255 sb.append(format("\n Backend Host: %s", this.backendHost));
256 logger.log(Level.INFO, sb.toString());
257
258 // Set up shutdown hook.
259 Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
260
261 // Set up lease timer thread.
262 logger.log(Level.INFO, " Set up lease timer thread.");
263 myLeaseTimerThread = new TimerThread();
264 myLeaseTimerThread.setDaemon(true);
265 myLeaseTimerThread.start();
266
267 // Set up job frontend lease timers.
268 logger.log(Level.INFO, format(" Create frontend renew timer (%d sec)." , Constants.LEASE_RENEW_INTERVAL / 1000));
269 myFrontendRenewTimer = myLeaseTimerThread.createTimer(timer -> {
270 try {
271 frontendRenewTimeout();
272 } catch (Throwable ignored) {
273 }
274 });
275 logger.log(Level.INFO, format(" Create frontend expire timer (%d sec)." , Constants.LEASE_EXPIRE_INTERVAL / 1000));
276 myFrontendExpireTimer = myLeaseTimerThread.createTimer(timer -> {
277 try {
278 frontendExpireTimeout();
279 } catch (Throwable ignored) {
280 }
281 });
282
283 // Start job frontend lease expiration timer regardless of whether the
284 // job frontend proxy gets set up.
285 myFrontendExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
286
287 // Set up middleware channel group.
288 logger.log(Level.INFO, " Set up middleware channel group.");
289 myMiddlewareChannelGroup = new ChannelGroup(new InetSocketAddress(backendHost, 0));
290 myMiddlewareChannelGroup.startListening();
291
292 // Set up job frontend proxy.
293 logger.log(Level.INFO, " Set up job frontend proxy.");
294 myJobFrontend = new JobFrontendProxy(myMiddlewareChannelGroup,
295 myMiddlewareChannelGroup.connect(new InetSocketAddress(frontendHost, frontendPort)));
296
297 // If we get here, the job frontend proxy has been set up.
298 logger.log(Level.INFO, " The job frontend proxy has been set up.");
299
300 // Start job frontend lease renewal timer.
301 logger.log(Level.INFO, " Start frontend lease renewal timer.");
302 myFrontendRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
303 Constants.LEASE_RENEW_INTERVAL);
304
305 // Set up backend class loader.
306 myResourceCache = new ResourceCache();
307 myClassLoader = new BackendClassLoader(
308 /*parent */ getClass().getClassLoader(),
309 /*theJobBackend */ this,
310 /*theJobFrontend*/ myJobFrontend,
311 /*theCache */ myResourceCache);
312
313 // Set up world communicator channel group.
314 logger.log(Level.INFO, " Set up world communicator channel group.");
315 myWorldChannelGroup= new ChannelGroup(new InetSocketAddress(backendHost, 0));
316 myWorldChannelGroup.setAlternateClassLoader(myClassLoader);
317
318 // Set up frontend communicator channel group.
319 if (hasFrontendComm) {
320 logger.log(Level.INFO, " Set up frontend communicator channel group.");
321 myFrontendChannelGroup = new ChannelGroup(new InetSocketAddress(backendHost, 0));
322 myFrontendChannelGroup.setAlternateClassLoader(myClassLoader);
323 }
324
325 // Set up backend file writer and reader.
326 logger.log(Level.INFO, " Set up backend file writer and reader.");
327 myFileWriter = new BackendFileWriter(myJobFrontend, this);
328 myFileReader = new BackendFileReader(myJobFrontend, this);
329
330 // Redirect standard input, standard output, and standard error to job frontend.
331 logger.log(Level.INFO, " Redirect standard input, standard output, and standard error to job frontend.");
332 System.in.close();
333 System.out.close();
334 myJobLauncherLog = System.err;
335 System.setIn(myFileReader.in);
336 System.setOut(myFileWriter.out);
337 System.setErr(myFileWriter.err);
338
339 // Tell job frontend we're ready!
340 logger.log(Level.INFO, " Tell job frontend we're ready.");
341 myJobFrontend.backendReady(
342 /*theJobBackend */ this,
343 /*rank */ rank,
344 /*middlewareAddress*/ myMiddlewareChannelGroup.listenAddress(),
345 /*worldAddress */ myWorldChannelGroup.listenAddress(),
346 /*frontendAddress */ hasFrontendComm ? myFrontendChannelGroup.listenAddress() : null);
347 }
348 // Exported operations.
349
350 /**
351 * Run this Job Backend.
352 */
353 public void run() {
354 Status status = null;
355 JobBackendMessage message = null;
356
357 try {
358 while (continueRun) {
359 // Receive a message from any channel.
360 status
361 = myMiddlewareChannelGroup.receive(null, null, myBuffer);
362 message = myBuffer.item;
363
364 // Process message.
365 message.invoke(this, myJobFrontend);
366
367 // Enable garbage collection of no-longer-needed objects while
368 // waiting to receive next message.
369 myBuffer.item = null;
370 status = null;
371 message = null;
372 }
373
374 // Allow shutdown hook to proceed.
375 reportRunFinished();
376 } catch (ChannelGroupClosedException exc) {
377 // Allow shutdown hook to proceed.
378 reportRunFinished();
379 } catch (Throwable exc) {
380 // Allow shutdown hook to proceed.
381 reportRunFinished();
382 terminateCancelJob(exc);
383 }
384
385 // Exit process if necessary.
386 switch (myState) {
387 case TERMINATE_CANCEL_JOB:
388 case TERMINATE_NO_REPORT:
389 System.exit(1);
390 break;
391 case RUNNING:
392 case TERMINATING:
393 break;
394 }
395 }
396
397 /**
398 * {@inheritDoc}
399 * <p>
400 * Cancel the job.
401 */
402 public synchronized void cancelJob(JobFrontendRef theJobFrontend,
403 String errmsg)
404 throws IOException {
405 terminateNoReport();
406 }
407
408 /**
409 * Commence the job.
410 *
411 * @param theJobFrontend Job Frontend that is calling this method.
412 * @param middlewareAddress Array of hosts/ports for middleware messages.
413 * The first <I>K</I>
414 * elements are for the job backend processes in rank order, the
415 * <I>K</I>+1st element is for the job frontend process. If the
416 * @param worldAddress Array of hosts/ports for the world communicator. The
417 * <I>K</I>
418 * elements are for the job backend processes in rank order.
419 * @param frontendAddress Array of hosts/ports for the frontend
420 * communicator. The first
421 * <I>K</I> elements are for the job backend processes in rank order, the
422 * <I>K</I>+1st element is for the job frontend process. If the frontend
423 * communicator does not exist, <code>frontendAddress</code> is null.
424 * @param properties Java system properties.
425 * @param mainClassName Fully qualified class name of the Java main program
426 * class to execute.
427 * @param args Array of 0 or more Java command line arguments.
428 * @throws java.io.IOException Thrown if an I/O error occurred.
429 * @throws java.io.IOException if any.
430 */
431 public synchronized void commenceJob(JobFrontendRef theJobFrontend,
432 InetSocketAddress[] middlewareAddress,
433 InetSocketAddress[] worldAddress,
434 InetSocketAddress[] frontendAddress,
435 Properties properties,
436 String mainClassName,
437 String[] args)
438 throws IOException {
439 // Record information.
440 myMiddlewareAddress = middlewareAddress;
441 myWorldAddress = worldAddress;
442 myFrontendAddress = frontendAddress;
443 myProperties = properties;
444 myMainClassName = mainClassName;
445 myArgs = args;
446
447 // Notify main program to commence job.
448 commence = true;
449 notifyAll();
450 }
451
452 /**
453 * {@inheritDoc}
454 * <p>
455 * Report that the job finished.
456 */
457 public synchronized void jobFinished(JobFrontendRef theJobFrontend)
458 throws IOException {
459 continueRun = false;
460 }
461
462 /**
463 * {@inheritDoc}
464 * <p>
465 * Renew the lease on the job.
466 */
467 public synchronized void renewLease(JobFrontendRef theJobFrontend)
468 throws IOException {
469 myFrontendExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
470 }
471
472 /**
473 * Report the content for a previously-requested resource.
474 *
475 * @param theJobFrontend Job Frontend that is calling this method.
476 * @param resourceName Resource name.
477 * @param content Resource content, or null if resource not found.
478 * @throws java.io.IOException Thrown if an I/O error occurred.
479 * @throws java.io.IOException if any.
480 */
481 public synchronized void reportResource(JobFrontendRef theJobFrontend,
482 String resourceName,
483 byte[] content)
484 throws IOException {
485 myResourceCache.put(resourceName, content);
486 }
487
488 /**
489 * {@inheritDoc}
490 * <p>
491 * Report the content for a previously-requested resource.
492 *
493 * @throws java.io.IOException Thrown if an I/O error occurred.
494 * @param theJobFrontend a {@link edu.rit.pj.cluster.JobFrontendRef} object
495 * @param resourceName a {@link java.lang.String} object
496 * @param content a {@link edu.rit.util.ByteSequence} object
497 */
498 public synchronized void reportResource(JobFrontendRef theJobFrontend,
499 String resourceName,
500 ByteSequence content)
501 throws IOException {
502 myResourceCache.put(resourceName,
503 content == null ? null : content.toByteArray());
504 }
505
506 /**
507 * {@inheritDoc}
508 * <p>
509 * Report the result of opening the given output file.
510 */
511 public synchronized void outputFileOpenResult(JobFrontendRef theJobFrontend,
512 int bfd,
513 int ffd,
514 IOException exc)
515 throws IOException {
516 myFileWriter.outputFileOpenResult(theJobFrontend, bfd, ffd, exc);
517 }
518
519 /**
520 * {@inheritDoc}
521 * <p>
522 * Report the result of writing the given output file.
523 */
524 public synchronized void outputFileWriteResult(JobFrontendRef theJobFrontend,
525 int ffd,
526 IOException exc)
527 throws IOException {
528 myFileWriter.outputFileWriteResult(theJobFrontend, ffd, exc);
529 }
530
531 /**
532 * {@inheritDoc}
533 * <p>
534 * Report the result of flushing the given output file.
535 */
536 public synchronized void outputFileFlushResult(JobFrontendRef theJobFrontend,
537 int ffd,
538 IOException exc)
539 throws IOException {
540 myFileWriter.outputFileFlushResult(theJobFrontend, ffd, exc);
541 }
542
543 /**
544 * {@inheritDoc}
545 * <p>
546 * Report the result of closing the given output file.
547 */
548 public synchronized void outputFileCloseResult(JobFrontendRef theJobFrontend,
549 int ffd,
550 IOException exc)
551 throws IOException {
552 myFileWriter.outputFileCloseResult(theJobFrontend, ffd, exc);
553 }
554
555 /**
556 * {@inheritDoc}
557 * <p>
558 * Report the result of opening the given input file.
559 */
560 public synchronized void inputFileOpenResult(JobFrontendRef theJobFrontend,
561 int bfd,
562 int ffd,
563 IOException exc)
564 throws IOException {
565 myFileReader.inputFileOpenResult(theJobFrontend, bfd, ffd, exc);
566 }
567
568 /**
569 * {@inheritDoc}
570 * <p>
571 * Report the result of reading the given input file.
572 */
573 public synchronized void inputFileReadResult(JobFrontendRef theJobFrontend,
574 int ffd,
575 byte[] buf,
576 int len,
577 IOException exc)
578 throws IOException {
579 myFileReader.inputFileReadResult(theJobFrontend, ffd, len, exc);
580 }
581
582 /**
583 * {@inheritDoc}
584 * <p>
585 * Report the result of skipping the given input file.
586 */
587 public synchronized void inputFileSkipResult(JobFrontendRef theJobFrontend,
588 int ffd,
589 long len,
590 IOException exc)
591 throws IOException {
592 myFileReader.inputFileSkipResult(theJobFrontend, ffd, len, exc);
593 }
594
595 /**
596 * {@inheritDoc}
597 * <p>
598 * Report the result of closing the given input file.
599 */
600 public synchronized void inputFileCloseResult(JobFrontendRef theJobFrontend,
601 int ffd,
602 IOException exc)
603 throws IOException {
604 myFileReader.inputFileCloseResult(theJobFrontend, ffd, exc);
605 }
606
607 /**
608 * Close communication with this Job Backend.
609 */
610 public synchronized void close() {
611 }
612
613 /**
614 * Obtain this job's user name.
615 *
616 * @return User name.
617 */
618 public String getUserName() {
619 return username;
620 }
621
622 /**
623 * Obtain this job's job number.
624 *
625 * @return Job number.
626 */
627 public int getJobNumber() {
628 return jobnum;
629 }
630
631 /**
632 * Obtain the number of backend processes in this job.
633 *
634 * @return <I>K</I>, the number of backend processes.
635 */
636 public int getK() {
637 return K;
638 }
639
640 /**
641 * Obtain the rank of this backend process in this job.
642 *
643 * @return Rank.
644 */
645 public int getRank() {
646 return rank;
647 }
648
649 /**
650 * Obtain the backend host name on which this job is running.
651 *
652 * @return Host name.
653 */
654 public String getBackendHost() {
655 return backendHost;
656 }
657
658 /**
659 * Determine whether the frontend communicator exists in this job.
660 *
661 * @return True if the frontend communicator exists, false if it doesn't.
662 */
663 public boolean hasFrontendCommunicator() {
664 return hasFrontendComm;
665 }
666
667 /**
668 * Obtain this job's backend class loader.
669 *
670 * @return Class loader.
671 */
672 public ClassLoader getClassLoader() {
673 return myClassLoader;
674 }
675
676 /**
677 * Obtain this job's backend file writer.
678 *
679 * @return Backend file writer.
680 */
681 public BackendFileWriter getFileWriter() {
682 return myFileWriter;
683 }
684
685 /**
686 * Obtain this job's backend file reader.
687 *
688 * @return Backend file reader.
689 */
690 public BackendFileReader getFileReader() {
691 return myFileReader;
692 }
693
694 /**
695 * Wait until this job commences.
696 */
697 public synchronized void waitForCommence() {
698 while (!commence) {
699 try {
700 wait();
701 } catch (InterruptedException ignored) {
702 }
703 }
704 }
705
706 /**
707 * Obtain this job's world communicator channel group. If this job has not
708 * commenced yet, null is returned.
709 *
710 * @return Channel group.
711 */
712 public ChannelGroup getWorldChannelGroup() {
713 return myWorldChannelGroup;
714 }
715
716 /**
717 * Obtain this job's array of hosts/ports for the world communicator. The
718 * <I>K</I> elements are for the job backend processes in rank order. If
719 * this job has not commenced yet, null is returned.
720 *
721 * @return Array of world communicator addresses.
722 */
723 public InetSocketAddress[] getWorldAddress() {
724 return myWorldAddress;
725 }
726
727 /**
728 * Obtain this job's frontend communicator channel group. If the frontend
729 * communicator does not exist, or if this job has not commenced yet, null
730 * is returned.
731 *
732 * @return Channel group.
733 */
734 public ChannelGroup getFrontendChannelGroup() {
735 return myFrontendChannelGroup;
736 }
737
738 /**
739 * Obtain this job's array of hosts/ports for the frontend communicator. The
740 * first <I>K</I> elements are for the job backend processes in rank order,
741 * the <I>K</I>+1st element is for the job frontend process. If the frontend
742 * communicator does not exist, or if this job has not commenced yet, null
743 * is returned.
744 *
745 * @return Array of frontend communicator addresses.
746 */
747 public InetSocketAddress[] getFrontendAddress() {
748 return myFrontendAddress;
749 }
750
751 /**
752 * Obtain this job's Java system properties. If this job has not commenced
753 * yet, null is returned.
754 *
755 * @return Properties.
756 */
757 public Properties getProperties() {
758 return myProperties;
759 }
760
761 /**
762 * Obtain this job's main class name. If this job has not commenced yet,
763 * null is returned.
764 *
765 * @return Fully qualified class name of the Java main program class to
766 * execute.
767 */
768 public String getMainClassName() {
769 return myMainClassName;
770 }
771
772 /**
773 * Obtain this job's command line arguments. If this job has not commenced
774 * yet, null is returned.
775 *
776 * @return Array of 0 or more Java command line arguments.
777 */
778 public String[] getArgs() {
779 return myArgs;
780 }
781
782 /**
783 * Set the comment string for this job backend process. The comment string
784 * appears in the detailed job status display in the Job Scheduler's web
785 * interface. Each job backend process (rank) has its own comment string. If
786 * <code>setComment()</code> is never called, the comment string is empty. The
787 * comment string is typically used to display this job backend process's
788 * progress. The comment string is rendered by a web browser and can contain
789 * HTML tags.
790 * <p>
791 * Calling <code>setComment()</code> causes a message to be sent to the job
792 * frontend process, which in turn causes a message to be sent to the Job
793 * Scheduler. (Any I/O errors during message sending are ignored.)
794 * Consequently, don't call <code>setComment()</code> too frequently, or the
795 * program's performance will suffer.
796 *
797 * @param comment Comment string.
798 */
799 public void setComment(String comment) {
800 try {
801 myJobFrontend.reportComment(this, rank, comment);
802 } catch (IOException ignored) {
803 }
804 }
805
806 /**
807 * Obtain the Job Backend object. If the Job Backend main program is
808 * running, the job backend object for the job is returned. If some other
809 * main program is running, null is returned.
810 *
811 * @return Job backend object, or null.
812 */
813 public static JobBackend getJobBackend() {
814 return theJobBackend;
815 }
816
817 // More hidden operations.
818
819 /**
820 * Take action when the job frontend's lease renewal timer times out.
821 *
822 * @throws IOException Thrown if an I/O error occurred.
823 */
824 private synchronized void frontendRenewTimeout()
825 throws IOException {
826 if (myFrontendRenewTimer.isTriggered()) {
827 myJobFrontend.renewLease(this);
828 }
829 }
830
831 /**
832 * Take action when the job frontend's lease expiration timer times out.
833 *
834 * @throws IOException Thrown if an I/O error occurred.
835 */
836 private void frontendExpireTimeout()
837 throws IOException {
838 boolean doExit = false;
839 synchronized (this) {
840 if (myFrontendExpireTimer.isTriggered()) {
841 reportRunFinished();
842 if (myState == State.RUNNING) {
843 myState = State.TERMINATE_NO_REPORT;
844 doExit = true;
845 }
846 }
847 }
848
849 // Cannot hold the synchronization lock while calling System.exit(),
850 // otherwise a deadlock can occur between this thread (the timer thread)
851 // and the shutdown hook thread.
852 myJobLauncherLog.println("Job frontend lease expired");
853 if (doExit) {
854 System.exit(1);
855 }
856 }
857
858 /**
859 * Terminate this Job Backend immediately, sending a "cancel job" message to
860 * the Job Frontend. The error message comes from the given exception.
861 *
862 * @param exc Exception.
863 */
864 private void terminateCancelJob(Throwable exc) {
865 continueRun = false;
866 if (myState == State.RUNNING) {
867 myState = State.TERMINATE_CANCEL_JOB;
868 myCancelMessage = exc.getClass().getName();
869 String msg = exc.getMessage();
870 if (msg != null) {
871 myCancelMessage = myCancelMessage + ": " + msg;
872 }
873 //System.err.println (myCancelMessage);
874 //exc.printStackTrace (System.err);
875 }
876 }
877
878 /**
879 * Terminate this Job Backend immediately, with no report to the Job
880 * Frontend.
881 */
882 private void terminateNoReport() {
883 continueRun = false;
884 if (myState == State.RUNNING) {
885 myState = State.TERMINATE_NO_REPORT;
886 }
887 }
888
889 /**
890 * Shut down this Job Backend.
891 */
892 private void shutdown() {
893 synchronized (this) {
894 // Tell job frontend that we are terminating.
895 if (myJobFrontend != null) {
896 try {
897 switch (myState) {
898 case RUNNING:
899 // Tell job frontend we finished normally.
900 myJobFrontend.backendFinished(this);
901 break;
902 case TERMINATE_CANCEL_JOB:
903 // Tell job frontend we're canceling.
904 myJobFrontend.cancelJob(this, myCancelMessage);
905 break;
906 case TERMINATE_NO_REPORT:
907 case TERMINATING:
908 // Tell job frontend nothing.
909 break;
910 }
911 } catch (IOException ignored) {
912 }
913 }
914
915 // Record that we are terminating.
916 myState = State.TERMINATING;
917 }
918
919 // Wait until the run() method thread terminates.
920 waitForRunFinished();
921
922 // Shut down job frontend lease timers.
923 synchronized (this) {
924 myFrontendRenewTimer.stop();
925 myFrontendExpireTimer.stop();
926 }
927
928 // All proxies, channels, and channel groups will close when the process
929 // exits.
930 }
931
932 /**
933 * Wait for the run() method to finish.
934 */
935 private void waitForRunFinished() {
936 for (; ; ) {
937 try {
938 runFinished.await();
939 break;
940 } catch (InterruptedException ignored) {
941 }
942 }
943 }
944
945 /**
946 * Report that the run() method finished.
947 */
948 private void reportRunFinished() {
949 runFinished.countDown();
950 }
951
952 /**
953 * Dump this job backend to the standard output, for debugging.
954 */
955 private synchronized void dump() {
956 System.out.println("********************************");
957 System.out.println("username = " + username);
958 System.out.println("jobnum = " + jobnum);
959 System.out.println("K = " + K);
960 System.out.println("rank = " + rank);
961 System.out.println("hasFrontendComm = " + hasFrontendComm);
962 for (int i = 0; i <= K; ++i) {
963 System.out.println("myMiddlewareAddress[" + i + "] = " + myMiddlewareAddress[i]);
964 }
965 for (int i = 0; i < K; ++i) {
966 System.out.println("myWorldAddress[" + i + "] = " + myWorldAddress[i]);
967 }
968 if (hasFrontendComm) {
969 for (int i = 0; i <= K; ++i) {
970 System.out.println("myFrontendAddress[" + i + "] = " + myFrontendAddress[i]);
971 }
972 }
973 myProperties.list(System.out);
974 System.out.println("myMainClassName = " + myMainClassName);
975 for (int i = 0; i < myArgs.length; ++i) {
976 System.out.println("myArgs[" + i + "] = \"" + myArgs[i] + "\"");
977 }
978 }
979
980 // Main program.
981
982 /**
983 * Job Backend main program.
984 *
985 * @param args an array of {@link java.lang.String} objects.
986 * @throws java.lang.Exception if any.
987 */
988 public static void main(String[] args)
989 throws Exception {
990 try {
991 // Parse command line arguments.
992 if (args.length != 8) {
993 usage();
994 }
995 String username = args[0];
996 int jobnum = Integer.parseInt(args[1]);
997 int K = Integer.parseInt(args[2]);
998 int rank = Integer.parseInt(args[3]);
999 boolean hasFrontendComm = Boolean.parseBoolean(args[4]);
1000 String frontendHost = args[5];
1001 int frontendPort = Integer.parseInt(args[6]);
1002 String backendHost = args[7];
1003
1004 // Set up job backend object.
1005 theJobBackend
1006 = new JobBackend(username, jobnum, K, rank, hasFrontendComm,
1007 frontendHost, frontendPort, backendHost);
1008 } catch (Throwable exc) {
1009 exc.printStackTrace(System.err);
1010 System.exit(1);
1011 }
1012
1013 // Set the main thread's context class loader to be the job backend's
1014 // class loader.
1015 Thread.currentThread().setContextClassLoader(theJobBackend.getClassLoader());
1016
1017 // Run job backend object in a separate thread.
1018 logger.log(Level.INFO, " Starting backend Daemon thread.");
1019 Thread thr = new Thread(theJobBackend);
1020 thr.setDaemon(true);
1021 thr.start();
1022
1023 // Wait until job commences.
1024 logger.log(Level.INFO, " Waiting for commence.");
1025 theJobBackend.waitForCommence();
1026 logger.log(Level.INFO, " Commencing.");
1027
1028 // Add any Java system properties from the job frontend process that do
1029 // not exist in this job backend process.
1030 Properties backendProperties = System.getProperties();
1031 Properties frontendProperties = theJobBackend.getProperties();
1032 for (Map.Entry<Object, Object> entry : frontendProperties.entrySet()) {
1033 String name = (String) entry.getKey();
1034 String value = (String) entry.getValue();
1035 if (backendProperties.getProperty(name) == null) {
1036 backendProperties.setProperty(name, value);
1037 }
1038 }
1039
1040 // Turn on headless mode. This allows graphics drawing operations (that
1041 // do not require a screen, keyboard, or mouse) to work.
1042 System.setProperty("java.awt.headless", "true");
1043
1044 // Call the job's main() method, passing in the job's command line
1045 // arguments.
1046 Class<?> mainclass
1047 = Class.forName(theJobBackend.getMainClassName(),
1048 true,
1049 // Force Field X modification to use our SystemClassLoader.
1050 ClassLoader.getSystemClassLoader());
1051 // Original:
1052 // theJobBackend.getClassLoader());
1053 Method mainmethod = mainclass.getMethod("main", String[].class);
1054
1055 StringBuilder stringBuilder = new StringBuilder(format(" Preparing to invoke main method:\n %s\n With args: ", mainmethod));
1056
1057 for (String arg : theJobBackend.getArgs()) {
1058 stringBuilder.append(format(" %s", arg));
1059 }
1060 stringBuilder.append("\n Backend start-up completed.");
1061 logger.log(Level.INFO, stringBuilder.toString());
1062
1063 // Close down the FileHandler if it exists.
1064 if (theJobBackend.fileHandler != null) {
1065 theJobBackend.fileHandler.flush();
1066 logger.setLevel(Level.OFF);
1067 theJobBackend.fileHandler.close();
1068 }
1069
1070 mainmethod.invoke(null, (Object) theJobBackend.getArgs());
1071
1072 // After the main() method returns and all non-daemon threads have
1073 // terminated, the process will exit, and the shutdown hook will call
1074 // the shutdown() method.
1075 }
1076
1077 /**
1078 * Print a usage message and exit.
1079 */
1080 private static void usage() {
1081 System.err.println("Usage: java edu.rit.pj.cluster.JobBackend <username> <jobnum> <K> <rank> <hasFrontendComm> <frontendHost> <frontendPort> <backendHost>");
1082 System.exit(1);
1083 }
1084
1085 }