1 //******************************************************************************
2 //
3 // File: NonPjJobFrontend.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.NonPjJobFrontend
6 //
7 // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.IOException;
44 import java.net.InetSocketAddress;
45 import java.util.Collections;
46 import java.util.LinkedList;
47 import java.util.List;
48
49 import edu.rit.mp.Channel;
50 import edu.rit.mp.ChannelGroup;
51 import edu.rit.mp.ChannelGroupClosedException;
52 import edu.rit.mp.ObjectBuf;
53 import edu.rit.mp.Status;
54 import edu.rit.mp.buf.ObjectItemBuf;
55 import edu.rit.pj.PJProperties;
56 import edu.rit.util.Timer;
57 import edu.rit.util.TimerTask;
58 import edu.rit.util.TimerThread;
59
60 /**
61 * Class NonPjJobFrontend provides the message handler for a job frontend
62 * process that communicates with the Job Scheduler to allocate backend nodes
63 * but does not run a PJ program.
64 *
65 * @author Alan Kaminsky
66 * @version 20-Jun-2012
67 */
68 public class NonPjJobFrontend
69 implements Runnable, JobFrontendRef {
70
71 // Hidden data members.
72 // User name.
73 private String username;
74
75 // Job number.
76 private int jobnum;
77
78 // Job resources.
79 private int Np;
80
81 // Timer thread for lease renewals and expirations.
82 private TimerThread myLeaseTimerThread;
83
84 // Timers for the lease with the Job Scheduler.
85 private Timer mySchedulerRenewTimer;
86 private Timer mySchedulerExpireTimer;
87
88 // Timer for the job timeout if any.
89 private Timer myJobTimer;
90
91 // Middleware channel group.
92 private ChannelGroup myMiddlewareChannelGroup;
93
94 // Proxy for Job Scheduler Daemon.
95 private JobSchedulerRef myJobScheduler;
96
97 // Flag for shutting down the run() method.
98 private boolean continueRun = true;
99
100 // State of this job frontend.
101 private State myState = State.RUNNING;
102
103 private static enum State {
104
105 RUNNING,
106 TERMINATE_CANCEL_JOB,
107 TERMINATING
108 };
109
110 // Error message if job canceled, or null if job finished normally.
111 private String myCancelMessage = "User canceled job";
112
113 // List of backend names assigned to the job.
114 private LinkedList<String> myBackendNames = new LinkedList<String>();
115
116 // Exported constructors.
117 /**
118 * Construct a new non-PJ job frontend object. The job frontend object will
119 * contact the Job Scheduler Daemon specified by the <code>"pj.host"</code> and
120 * <code>"pj.port"</code> Java system properties. See class {@linkplain
121 * edu.rit.pj.PJProperties} for further information.
122 * <P>
123 * The non-PJ job frontend object will ask the Job Scheduler Daemon to run
124 * one process per node and to use all CPUs on each node. Other
125 * possibilities are not supported.
126 *
127 * @param username User name.
128 * @param Np Number of processes (>= 1).
129 * @exception JobSchedulerException (subclass of IOException) Thrown if the
130 * job frontend object could not contact the Job Scheduler Daemon.
131 * @exception IOException Thrown if an I/O error occurred.
132 * @throws java.io.IOException if any.
133 */
134 public NonPjJobFrontend(String username,
135 int Np)
136 throws IOException {
137 // Record arguments.
138 this.username = username;
139 this.Np = Np;
140
141 // Set up shutdown hook.
142 Runtime.getRuntime().addShutdownHook(new Thread() {
143 public void run() {
144 shutdown();
145 }
146 });
147
148 // Set up lease timer thread.
149 myLeaseTimerThread = new TimerThread();
150 myLeaseTimerThread.setDaemon(true);
151 myLeaseTimerThread.start();
152
153 // Set up Job Scheduler lease timers.
154 mySchedulerRenewTimer
155 = myLeaseTimerThread.createTimer(new TimerTask() {
156 public void action(Timer timer) {
157 try {
158 schedulerRenewTimeout();
159 } catch (Throwable ignored) {
160 }
161 }
162 });
163 mySchedulerExpireTimer
164 = myLeaseTimerThread.createTimer(new TimerTask() {
165 public void action(Timer timer) {
166 try {
167 schedulerExpireTimeout();
168 } catch (Throwable ignored) {
169 }
170 }
171 });
172
173 // Set up job timer.
174 myJobTimer
175 = myLeaseTimerThread.createTimer(new TimerTask() {
176 public void action(Timer timer) {
177 try {
178 jobTimeout();
179 } catch (Throwable ignored) {
180 }
181 }
182 });
183
184 // Set up middleware channel group.
185 myMiddlewareChannelGroup = new ChannelGroup();
186
187 // Set up Job Scheduler proxy.
188 InetSocketAddress js_address = null;
189 Channel js_channel = null;
190 try {
191 js_address
192 = new InetSocketAddress(PJProperties.getPjHost(),
193 PJProperties.getPjPort());
194 js_channel = myMiddlewareChannelGroup.connect(js_address);
195 } catch (IOException exc) {
196 throw new JobSchedulerException("JobFrontend(): Cannot contact Job Scheduler Daemon at "
197 + js_address,
198 exc);
199 }
200 myJobScheduler
201 = new JobSchedulerProxy(myMiddlewareChannelGroup, js_channel);
202
203 // Start Job Scheduler lease timers.
204 mySchedulerRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
205 Constants.LEASE_RENEW_INTERVAL);
206 mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
207
208 // Kick off the job!
209 myJobScheduler.requestJob(this, username, Np, Np, 0);
210 }
211
212 // Exported operations.
213 /**
214 * Obtain the job number assigned to this Non-PJ Job Frontend. This method
215 * blocks until the requested number of backend processes (a constructor
216 * parameter) have been assigned.
217 *
218 * @return Job number.
219 * @exception InterruptedException Thrown if the calling thread is
220 * interrupted while blocked in this method.
221 * @throws java.lang.InterruptedException if any.
222 */
223 public synchronized int getJobNumber()
224 throws InterruptedException {
225 while (myBackendNames.size() < Np) {
226 wait();
227 }
228 return jobnum;
229 }
230
231 /**
232 * Obtain a list of the backend names assigned to this Non-PJ Job Frontend.
233 * This method blocks until the requested number of backend processes (a
234 * constructor parameter) have been assigned. The returned list is
235 * unmodifiable.
236 *
237 * @return List of backend names.
238 * @exception InterruptedException Thrown if the calling thread is
239 * interrupted while blocked in this method.
240 * @throws java.lang.InterruptedException if any.
241 */
242 public synchronized List<String> getBackendNames()
243 throws InterruptedException {
244 while (myBackendNames.size() < Np) {
245 wait();
246 }
247 return Collections.unmodifiableList(myBackendNames);
248 }
249
250 /**
251 * Run this Non-PJ Job Frontend.
252 */
253 public void run() {
254 ObjectItemBuf<JobFrontendMessage> buf
255 = ObjectBuf.buffer((JobFrontendMessage) null);
256 Status status = null;
257 JobFrontendMessage message = null;
258
259 try {
260 while (continueRun) {
261 // Receive a message from any channel.
262 status = myMiddlewareChannelGroup.receive(null, null, buf);
263 message = buf.item;
264
265 // Process a message from the Job Scheduler.
266 if (status.tag == Message.FROM_JOB_SCHEDULER) {
267 message.invoke(this, myJobScheduler);
268 }
269
270 // Enable garbage collection of no-longer-needed objects while
271 // waiting to receive next message.
272 buf.item = null;
273 status = null;
274 message = null;
275 }
276 } catch (ChannelGroupClosedException ignored) {
277 } catch (Throwable exc) {
278 terminateCancelJob(exc);
279 }
280
281 // Exit process if necessary.
282 switch (myState) {
283 case TERMINATE_CANCEL_JOB:
284 System.exit(1);
285 break;
286 case RUNNING:
287 case TERMINATING:
288 break;
289 }
290 }
291
292 /**
293 * Terminate this Non-PJ Job Frontend immediately, sending a "job finished"
294 * message to the Job Scheduler. This method must only be called by a thread
295 * other than the thread calling <code>run()</code>. This method calls
296 * <code>System.exit(status)</code> to terminate the process.
297 *
298 * @param status Status value for <code>System.exit()</code>.
299 */
300 public void terminateJobFinished(int status) {
301 boolean doExit = false;
302 synchronized (this) {
303 continueRun = false;
304 if (myState == State.RUNNING) {
305 myCancelMessage = null;
306 doExit = true;
307 }
308 }
309
310 // Cannot hold the synchronization lock while calling System.exit(),
311 // otherwise a deadlock can occur between this thread and the shutdown
312 // hook thread.
313 if (doExit) {
314 System.exit(status);
315 }
316 }
317
318 /**
319 * {@inheritDoc}
320 *
321 * Assign a backend process to the job.
322 * @exception IOException Thrown if an I/O error occurred.
323 */
324 public synchronized void assignBackend(JobSchedulerRef theJobScheduler,
325 String name,
326 String host,
327 String jvm,
328 String classpath,
329 String[] jvmflags,
330 String shellCommand,
331 int Nt)
332 throws IOException {
333 // Record backend name.
334 myBackendNames.add(name);
335
336 // If all backends have been assigned, start job timer.
337 if (myBackendNames.size() == Np) {
338 int jobtime = PJProperties.getPjJobTime();
339 if (jobtime > 0) {
340 myJobTimer.start(jobtime * 1000L);
341 }
342 }
343
344 notifyAll();
345 }
346
347 /**
348 * {@inheritDoc}
349 *
350 * Assign a job number to the job. The host name for the job frontend's
351 * middleware channel group is also specified.
352 * @exception IOException Thrown if an I/O error occurred.
353 */
354 public synchronized void assignJobNumber(JobSchedulerRef theJobScheduler,
355 int jobnum,
356 String pjhost)
357 throws IOException {
358 // Record job number.
359 this.jobnum = jobnum;
360 notifyAll();
361 }
362
363 /**
364 * {@inheritDoc}
365 *
366 * Cancel the job.
367 * @exception IOException Thrown if an I/O error occurred.
368 */
369 public synchronized void cancelJob(JobSchedulerRef theJobScheduler,
370 String errmsg)
371 throws IOException {
372 terminateCancelJob(errmsg);
373 }
374
375 /**
376 * Renew the lease on the job.
377 *
378 * @param theJobScheduler Job Scheduler that is calling this method.
379 * @exception IOException Thrown if an I/O error occurred.
380 * @throws java.io.IOException if any.
381 */
382 public synchronized void renewLease(JobSchedulerRef theJobScheduler)
383 throws IOException {
384 mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
385 }
386
387 /**
388 * {@inheritDoc}
389 *
390 * Report that a backend process has finished executing the job.
391 * @exception IOException Thrown if an I/O error occurred.
392 */
393 public synchronized void backendFinished(JobBackendRef theJobBackend)
394 throws IOException {
395 }
396
397 /**
398 * {@inheritDoc}
399 *
400 * Report that a backend process is ready to commence executing the job.
401 * @exception IOException Thrown if an I/O error occurred.
402 */
403 public synchronized void backendReady(JobBackendRef theJobBackend,
404 int rank,
405 InetSocketAddress middlewareAddress,
406 InetSocketAddress worldAddress,
407 InetSocketAddress frontendAddress)
408 throws IOException {
409 }
410
411 /**
412 * {@inheritDoc}
413 *
414 * Cancel the job.
415 * @exception IOException Thrown if an I/O error occurred.
416 */
417 public synchronized void cancelJob(JobBackendRef theJobBackend,
418 String errmsg)
419 throws IOException {
420 }
421
422 /**
423 * {@inheritDoc}
424 *
425 * Renew the lease on the job.
426 * @exception IOException Thrown if an I/O error occurred.
427 */
428 public synchronized void renewLease(JobBackendRef theJobBackend)
429 throws IOException {
430 }
431
432 /**
433 * {@inheritDoc}
434 *
435 * Request the given resource from this job frontend's class loader.
436 * @exception IOException Thrown if an I/O error occurred.
437 */
438 public synchronized void requestResource(JobBackendRef theJobBackend,
439 String resourceName)
440 throws IOException {
441 }
442
443 /**
444 * {@inheritDoc}
445 *
446 * Open the given output file for writing or appending.
447 * @exception IOException Thrown if an I/O error occurred.
448 */
449 public synchronized void outputFileOpen(JobBackendRef theJobBackend,
450 int bfd,
451 File file,
452 boolean append)
453 throws IOException {
454 }
455
456 /**
457 * {@inheritDoc}
458 *
459 * Write the given bytes to the given output file. <code>ffd</code> = 1 refers
460 * to the job's standard output stream; <code>ffd</code> = 2 refers to the job's
461 * standard error stream; other values refer to a previously opened file.
462 * @exception IOException Thrown if an I/O error occurred.
463 */
464 public synchronized void outputFileWrite(JobBackendRef theJobBackend,
465 int ffd,
466 byte[] buf,
467 int off,
468 int len)
469 throws IOException {
470 }
471
472 /**
473 * {@inheritDoc}
474 *
475 * Flush accumulated bytes to the given output file.
476 * @exception IOException Thrown if an I/O error occurred.
477 */
478 public synchronized void outputFileFlush(JobBackendRef theJobBackend,
479 int ffd)
480 throws IOException {
481 }
482
483 /**
484 * {@inheritDoc}
485 *
486 * Close the given output file.
487 * @exception IOException Thrown if an I/O error occurred.
488 */
489 public synchronized void outputFileClose(JobBackendRef theJobBackend,
490 int ffd)
491 throws IOException {
492 }
493
494 /**
495 * {@inheritDoc}
496 *
497 * Open the given input file for reading.
498 * @exception IOException Thrown if an I/O error occurred.
499 */
500 public synchronized void inputFileOpen(JobBackendRef theJobBackend,
501 int bfd,
502 File file)
503 throws IOException {
504 }
505
506 /**
507 * {@inheritDoc}
508 *
509 * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
510 * job's standard input stream; other values refer to a previously opened
511 * file.
512 * @exception IOException Thrown if an I/O error occurred.
513 */
514 public synchronized void inputFileRead(JobBackendRef theJobBackend,
515 int ffd,
516 int len)
517 throws IOException {
518 }
519
520 /**
521 * {@inheritDoc}
522 *
523 * Skip bytes from the given input file.
524 * @exception IOException Thrown if an I/O error occurred.
525 */
526 public synchronized void inputFileSkip(JobBackendRef theJobBackend,
527 int ffd,
528 long len)
529 throws IOException {
530 }
531
532 /**
533 * {@inheritDoc}
534 *
535 * Close the given input file.
536 * @exception IOException Thrown if an I/O error occurred.
537 */
538 public synchronized void inputFileClose(JobBackendRef theJobBackend,
539 int ffd)
540 throws IOException {
541 }
542
543 /**
544 * {@inheritDoc}
545 *
546 * Report a comment for a process.
547 * @exception IOException Thrown if an I/O error occurred.
548 */
549 public synchronized void reportComment(JobBackendRef theJobBackend,
550 int rank,
551 String comment)
552 throws IOException {
553 myJobScheduler.reportComment(this, rank, comment);
554 }
555
556 /**
557 * Close communication with this Job Frontend.
558 */
559 public void close() {
560 }
561
562 // Hidden operations.
563 /**
564 * Take action when the Job Scheduler's lease renewal timer times out.
565 *
566 * @exception IOException Thrown if an I/O error occurred.
567 */
568 private synchronized void schedulerRenewTimeout()
569 throws IOException {
570 if (mySchedulerRenewTimer.isTriggered()) {
571 myJobScheduler.renewLease(this);
572 }
573 }
574
575 /**
576 * Take action when the Job Scheduler's lease expiration timer times out.
577 *
578 * @exception IOException Thrown if an I/O error occurred.
579 */
580 private void schedulerExpireTimeout()
581 throws IOException {
582 boolean doExit = false;
583 synchronized (this) {
584 if (mySchedulerExpireTimer.isTriggered()) {
585 continueRun = false;
586 if (myState == State.RUNNING) {
587 myState = State.TERMINATE_CANCEL_JOB;
588 myCancelMessage = "Job Scheduler failed";
589 System.err.println(myCancelMessage);
590 doExit = true;
591 }
592 }
593 }
594
595 // Cannot hold the synchronization lock while calling System.exit(),
596 // otherwise a deadlock can occur between this thread (the timer thread)
597 // and the shutdown hook thread.
598 if (doExit) {
599 System.exit(1);
600 }
601 }
602
603 /**
604 * Take action when the job timer times out.
605 *
606 * @exception IOException Thrown if an I/O error occurred.
607 */
608 private void jobTimeout()
609 throws IOException {
610 boolean doExit = false;
611 synchronized (this) {
612 if (myJobTimer.isTriggered()) {
613 continueRun = false;
614 if (myState == State.RUNNING) {
615 myState = State.TERMINATE_CANCEL_JOB;
616 myCancelMessage = "Job exceeded maximum running time";
617 System.err.println(myCancelMessage);
618 doExit = true;
619 }
620 }
621 }
622
623 // Cannot hold the synchronization lock while calling System.exit(),
624 // otherwise a deadlock can occur between this thread (the timer thread)
625 // and the shutdown hook thread.
626 if (doExit) {
627 System.exit(1);
628 }
629 }
630
631 /**
632 * Terminate this Job Frontend immediately, sending a "cancel job" message
633 * to the Job Scheduler. The error message is <code>msg</code>. This method must
634 * only be called by the thread calling <code>run()</code>.
635 *
636 * @param msg Error message.
637 */
638 private void terminateCancelJob(String msg) {
639 continueRun = false;
640 if (myState == State.RUNNING) {
641 myState = State.TERMINATE_CANCEL_JOB;
642 myCancelMessage = msg;
643 System.err.println(myCancelMessage);
644 }
645 }
646
647 /**
648 * Terminate this Job Frontend immediately, sending a "cancel job" message
649 * to the Job Scheduler. The error message comes from the given exception.
650 * This method must only be called by the thread calling <code>run()</code>.
651 *
652 * @param exc Exception.
653 */
654 private void terminateCancelJob(Throwable exc) {
655 continueRun = false;
656 if (myState == State.RUNNING) {
657 myCancelMessage = exc.getClass().getName();
658 String msg = exc.getMessage();
659 if (msg != null) {
660 myCancelMessage = myCancelMessage + ": " + msg;
661 }
662 System.err.println(myCancelMessage);
663 exc.printStackTrace(System.err);
664 }
665 }
666
667 /**
668 * Terminate this Job Frontend immediately, sending a "cancel job" message
669 * to the Job Scheduler. The error message comes from the given exception.
670 * This method must only be called by a thread other than the thread calling
671 * <code>run()</code>.
672 *
673 * @param exc Exception.
674 */
675 void terminateCancelJobOther(Throwable exc) {
676 boolean doExit = false;
677 synchronized (this) {
678 continueRun = false;
679 if (myState == State.RUNNING) {
680 myCancelMessage = exc.getClass().getName();
681 String msg = exc.getMessage();
682 if (msg != null) {
683 myCancelMessage = myCancelMessage + ": " + msg;
684 }
685 System.err.println(myCancelMessage);
686 exc.printStackTrace(System.err);
687 doExit = true;
688 }
689 }
690
691 // Cannot hold the synchronization lock while calling System.exit(),
692 // otherwise a deadlock can occur between this thread and the shutdown
693 // hook thread.
694 if (doExit) {
695 System.exit(1);
696 }
697 }
698
699 /**
700 * Shut down this Job Frontend.
701 */
702 private void shutdown() {
703 synchronized (this) {
704 // Stop all lease timers.
705 mySchedulerRenewTimer.stop();
706 mySchedulerExpireTimer.stop();
707
708 // If state is RUNNING but myCancelMessage is not null, it means the
709 // user canceled the job (e.g., by hitting CTRL-C).
710 if (myState == State.RUNNING && myCancelMessage != null) {
711 myState = State.TERMINATE_CANCEL_JOB;
712 }
713
714 // Inform Job Scheduler.
715 switch (myState) {
716 case RUNNING:
717 // Send "job finished" message.
718 if (myJobScheduler != null) {
719 try {
720 myJobScheduler.jobFinished(this);
721 } catch (IOException ignored) {
722 }
723 }
724 break;
725 case TERMINATE_CANCEL_JOB:
726 // Send "cancel job" message.
727 if (myJobScheduler != null) {
728 try {
729 myJobScheduler.cancelJob(this, myCancelMessage);
730 } catch (IOException ignored) {
731 }
732 }
733 break;
734 case TERMINATING:
735 // Send nothing.
736 break;
737 }
738
739 // Record that we are terminating.
740 myState = State.TERMINATING;
741 }
742
743 // All proxies, channels, and channel groups will close when the process
744 // exits.
745 }
746
747 }