1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.IOException;
44 import java.net.InetSocketAddress;
45 import java.util.Collections;
46 import java.util.LinkedList;
47 import java.util.List;
48
49 import edu.rit.mp.Channel;
50 import edu.rit.mp.ChannelGroup;
51 import edu.rit.mp.ChannelGroupClosedException;
52 import edu.rit.mp.ObjectBuf;
53 import edu.rit.mp.Status;
54 import edu.rit.mp.buf.ObjectItemBuf;
55 import edu.rit.pj.PJProperties;
56 import edu.rit.util.Timer;
57 import edu.rit.util.TimerTask;
58 import edu.rit.util.TimerThread;
59
60
61
62
63
64
65
66
67
68 public class NonPjJobFrontend
69 implements Runnable, JobFrontendRef {
70
71
72
73 private String username;
74
75
76 private int jobnum;
77
78
79 private int Np;
80
81
82 private TimerThread myLeaseTimerThread;
83
84
85 private Timer mySchedulerRenewTimer;
86 private Timer mySchedulerExpireTimer;
87
88
89 private Timer myJobTimer;
90
91
92 private ChannelGroup myMiddlewareChannelGroup;
93
94
95 private JobSchedulerRef myJobScheduler;
96
97
98 private boolean continueRun = true;
99
100
101 private State myState = State.RUNNING;
102
103 private static enum State {
104
105 RUNNING,
106 TERMINATE_CANCEL_JOB,
107 TERMINATING
108 };
109
110
111 private String myCancelMessage = "User canceled job";
112
113
114 private LinkedList<String> myBackendNames = new LinkedList<String>();
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 public NonPjJobFrontend(String username,
135 int Np)
136 throws IOException {
137
138 this.username = username;
139 this.Np = Np;
140
141
142 Runtime.getRuntime().addShutdownHook(new Thread() {
143 public void run() {
144 shutdown();
145 }
146 });
147
148
149 myLeaseTimerThread = new TimerThread();
150 myLeaseTimerThread.setDaemon(true);
151 myLeaseTimerThread.start();
152
153
154 mySchedulerRenewTimer
155 = myLeaseTimerThread.createTimer(new TimerTask() {
156 public void action(Timer timer) {
157 try {
158 schedulerRenewTimeout();
159 } catch (Throwable ignored) {
160 }
161 }
162 });
163 mySchedulerExpireTimer
164 = myLeaseTimerThread.createTimer(new TimerTask() {
165 public void action(Timer timer) {
166 try {
167 schedulerExpireTimeout();
168 } catch (Throwable ignored) {
169 }
170 }
171 });
172
173
174 myJobTimer
175 = myLeaseTimerThread.createTimer(new TimerTask() {
176 public void action(Timer timer) {
177 try {
178 jobTimeout();
179 } catch (Throwable ignored) {
180 }
181 }
182 });
183
184
185 myMiddlewareChannelGroup = new ChannelGroup();
186
187
188 InetSocketAddress js_address = null;
189 Channel js_channel = null;
190 try {
191 js_address
192 = new InetSocketAddress(PJProperties.getPjHost(),
193 PJProperties.getPjPort());
194 js_channel = myMiddlewareChannelGroup.connect(js_address);
195 } catch (IOException exc) {
196 throw new JobSchedulerException("JobFrontend(): Cannot contact Job Scheduler Daemon at "
197 + js_address,
198 exc);
199 }
200 myJobScheduler
201 = new JobSchedulerProxy(myMiddlewareChannelGroup, js_channel);
202
203
204 mySchedulerRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
205 Constants.LEASE_RENEW_INTERVAL);
206 mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
207
208
209 myJobScheduler.requestJob(this, username, Np, Np, 0);
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223 public synchronized int getJobNumber()
224 throws InterruptedException {
225 while (myBackendNames.size() < Np) {
226 wait();
227 }
228 return jobnum;
229 }
230
231
232
233
234
235
236
237
238
239
240
241
242 public synchronized List<String> getBackendNames()
243 throws InterruptedException {
244 while (myBackendNames.size() < Np) {
245 wait();
246 }
247 return Collections.unmodifiableList(myBackendNames);
248 }
249
250
251
252
253 public void run() {
254 ObjectItemBuf<JobFrontendMessage> buf
255 = ObjectBuf.buffer((JobFrontendMessage) null);
256 Status status = null;
257 JobFrontendMessage message = null;
258
259 try {
260 while (continueRun) {
261
262 status = myMiddlewareChannelGroup.receive(null, null, buf);
263 message = buf.item;
264
265
266 if (status.tag == Message.FROM_JOB_SCHEDULER) {
267 message.invoke(this, myJobScheduler);
268 }
269
270
271
272 buf.item = null;
273 status = null;
274 message = null;
275 }
276 } catch (ChannelGroupClosedException ignored) {
277 } catch (Throwable exc) {
278 terminateCancelJob(exc);
279 }
280
281
282 switch (myState) {
283 case TERMINATE_CANCEL_JOB:
284 System.exit(1);
285 break;
286 case RUNNING:
287 case TERMINATING:
288 break;
289 }
290 }
291
292
293
294
295
296
297
298
299
300 public void terminateJobFinished(int status) {
301 boolean doExit = false;
302 synchronized (this) {
303 continueRun = false;
304 if (myState == State.RUNNING) {
305 myCancelMessage = null;
306 doExit = true;
307 }
308 }
309
310
311
312
313 if (doExit) {
314 System.exit(status);
315 }
316 }
317
318
319
320
321
322
323
324 public synchronized void assignBackend(JobSchedulerRef theJobScheduler,
325 String name,
326 String host,
327 String jvm,
328 String classpath,
329 String[] jvmflags,
330 String shellCommand,
331 int Nt)
332 throws IOException {
333
334 myBackendNames.add(name);
335
336
337 if (myBackendNames.size() == Np) {
338 int jobtime = PJProperties.getPjJobTime();
339 if (jobtime > 0) {
340 myJobTimer.start(jobtime * 1000L);
341 }
342 }
343
344 notifyAll();
345 }
346
347
348
349
350
351
352
353
354 public synchronized void assignJobNumber(JobSchedulerRef theJobScheduler,
355 int jobnum,
356 String pjhost)
357 throws IOException {
358
359 this.jobnum = jobnum;
360 notifyAll();
361 }
362
363
364
365
366
367
368
369 public synchronized void cancelJob(JobSchedulerRef theJobScheduler,
370 String errmsg)
371 throws IOException {
372 terminateCancelJob(errmsg);
373 }
374
375
376
377
378
379
380
381
382 public synchronized void renewLease(JobSchedulerRef theJobScheduler)
383 throws IOException {
384 mySchedulerExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
385 }
386
387
388
389
390
391
392
393 public synchronized void backendFinished(JobBackendRef theJobBackend)
394 throws IOException {
395 }
396
397
398
399
400
401
402
403 public synchronized void backendReady(JobBackendRef theJobBackend,
404 int rank,
405 InetSocketAddress middlewareAddress,
406 InetSocketAddress worldAddress,
407 InetSocketAddress frontendAddress)
408 throws IOException {
409 }
410
411
412
413
414
415
416
417 public synchronized void cancelJob(JobBackendRef theJobBackend,
418 String errmsg)
419 throws IOException {
420 }
421
422
423
424
425
426
427
428 public synchronized void renewLease(JobBackendRef theJobBackend)
429 throws IOException {
430 }
431
432
433
434
435
436
437
438 public synchronized void requestResource(JobBackendRef theJobBackend,
439 String resourceName)
440 throws IOException {
441 }
442
443
444
445
446
447
448
449 public synchronized void outputFileOpen(JobBackendRef theJobBackend,
450 int bfd,
451 File file,
452 boolean append)
453 throws IOException {
454 }
455
456
457
458
459
460
461
462
463
464 public synchronized void outputFileWrite(JobBackendRef theJobBackend,
465 int ffd,
466 byte[] buf,
467 int off,
468 int len)
469 throws IOException {
470 }
471
472
473
474
475
476
477
478 public synchronized void outputFileFlush(JobBackendRef theJobBackend,
479 int ffd)
480 throws IOException {
481 }
482
483
484
485
486
487
488
489 public synchronized void outputFileClose(JobBackendRef theJobBackend,
490 int ffd)
491 throws IOException {
492 }
493
494
495
496
497
498
499
500 public synchronized void inputFileOpen(JobBackendRef theJobBackend,
501 int bfd,
502 File file)
503 throws IOException {
504 }
505
506
507
508
509
510
511
512
513
514 public synchronized void inputFileRead(JobBackendRef theJobBackend,
515 int ffd,
516 int len)
517 throws IOException {
518 }
519
520
521
522
523
524
525
526 public synchronized void inputFileSkip(JobBackendRef theJobBackend,
527 int ffd,
528 long len)
529 throws IOException {
530 }
531
532
533
534
535
536
537
538 public synchronized void inputFileClose(JobBackendRef theJobBackend,
539 int ffd)
540 throws IOException {
541 }
542
543
544
545
546
547
548
549 public synchronized void reportComment(JobBackendRef theJobBackend,
550 int rank,
551 String comment)
552 throws IOException {
553 myJobScheduler.reportComment(this, rank, comment);
554 }
555
556
557
558
559 public void close() {
560 }
561
562
563
564
565
566
567
568 private synchronized void schedulerRenewTimeout()
569 throws IOException {
570 if (mySchedulerRenewTimer.isTriggered()) {
571 myJobScheduler.renewLease(this);
572 }
573 }
574
575
576
577
578
579
580 private void schedulerExpireTimeout()
581 throws IOException {
582 boolean doExit = false;
583 synchronized (this) {
584 if (mySchedulerExpireTimer.isTriggered()) {
585 continueRun = false;
586 if (myState == State.RUNNING) {
587 myState = State.TERMINATE_CANCEL_JOB;
588 myCancelMessage = "Job Scheduler failed";
589 System.err.println(myCancelMessage);
590 doExit = true;
591 }
592 }
593 }
594
595
596
597
598 if (doExit) {
599 System.exit(1);
600 }
601 }
602
603
604
605
606
607
608 private void jobTimeout()
609 throws IOException {
610 boolean doExit = false;
611 synchronized (this) {
612 if (myJobTimer.isTriggered()) {
613 continueRun = false;
614 if (myState == State.RUNNING) {
615 myState = State.TERMINATE_CANCEL_JOB;
616 myCancelMessage = "Job exceeded maximum running time";
617 System.err.println(myCancelMessage);
618 doExit = true;
619 }
620 }
621 }
622
623
624
625
626 if (doExit) {
627 System.exit(1);
628 }
629 }
630
631
632
633
634
635
636
637
638 private void terminateCancelJob(String msg) {
639 continueRun = false;
640 if (myState == State.RUNNING) {
641 myState = State.TERMINATE_CANCEL_JOB;
642 myCancelMessage = msg;
643 System.err.println(myCancelMessage);
644 }
645 }
646
647
648
649
650
651
652
653
654 private void terminateCancelJob(Throwable exc) {
655 continueRun = false;
656 if (myState == State.RUNNING) {
657 myCancelMessage = exc.getClass().getName();
658 String msg = exc.getMessage();
659 if (msg != null) {
660 myCancelMessage = myCancelMessage + ": " + msg;
661 }
662 System.err.println(myCancelMessage);
663 exc.printStackTrace(System.err);
664 }
665 }
666
667
668
669
670
671
672
673
674
675 void terminateCancelJobOther(Throwable exc) {
676 boolean doExit = false;
677 synchronized (this) {
678 continueRun = false;
679 if (myState == State.RUNNING) {
680 myCancelMessage = exc.getClass().getName();
681 String msg = exc.getMessage();
682 if (msg != null) {
683 myCancelMessage = myCancelMessage + ": " + msg;
684 }
685 System.err.println(myCancelMessage);
686 exc.printStackTrace(System.err);
687 doExit = true;
688 }
689 }
690
691
692
693
694 if (doExit) {
695 System.exit(1);
696 }
697 }
698
699
700
701
702 private void shutdown() {
703 synchronized (this) {
704
705 mySchedulerRenewTimer.stop();
706 mySchedulerExpireTimer.stop();
707
708
709
710 if (myState == State.RUNNING && myCancelMessage != null) {
711 myState = State.TERMINATE_CANCEL_JOB;
712 }
713
714
715 switch (myState) {
716 case RUNNING:
717
718 if (myJobScheduler != null) {
719 try {
720 myJobScheduler.jobFinished(this);
721 } catch (IOException ignored) {
722 }
723 }
724 break;
725 case TERMINATE_CANCEL_JOB:
726
727 if (myJobScheduler != null) {
728 try {
729 myJobScheduler.cancelJob(this, myCancelMessage);
730 } catch (IOException ignored) {
731 }
732 }
733 break;
734 case TERMINATING:
735
736 break;
737 }
738
739
740 myState = State.TERMINATING;
741 }
742
743
744
745 }
746
747 }