1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.pj.cluster;
41
42 import static java.lang.String.format;
43
44 import java.io.File;
45 import java.io.IOException;
46 import java.io.PrintStream;
47 import java.lang.reflect.Method;
48 import java.net.InetSocketAddress;
49 import java.util.Map;
50 import java.util.Properties;
51 import java.util.concurrent.CountDownLatch;
52 import java.util.logging.FileHandler;
53 import java.util.logging.Handler;
54 import java.util.logging.Level;
55 import java.util.logging.LogManager;
56 import java.util.logging.Logger;
57 import java.util.logging.SimpleFormatter;
58
59 import edu.rit.mp.ChannelGroup;
60 import edu.rit.mp.ChannelGroupClosedException;
61 import edu.rit.mp.ObjectBuf;
62 import edu.rit.mp.Status;
63 import edu.rit.mp.buf.ObjectItemBuf;
64 import edu.rit.util.ByteSequence;
65 import edu.rit.util.Timer;
66 import edu.rit.util.TimerThread;
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 public class JobBackend
92 implements Runnable, JobBackendRef {
93
94
95 private static final Logger logger = Logger.getLogger(JobBackend.class.getName());
96
97
98 private static JobBackend theJobBackend;
99
100
101
102 private final String username;
103 private final int jobnum;
104 private final int K;
105 private final int rank;
106 private final boolean hasFrontendComm;
107 private final String frontendHost;
108 private final int frontendPort;
109 private final String backendHost;
110
111
112 FileHandler fileHandler = null;
113
114
115 private final TimerThread myLeaseTimerThread;
116
117
118 private final Timer myFrontendRenewTimer;
119 private final Timer myFrontendExpireTimer;
120
121
122 private final ChannelGroup myMiddlewareChannelGroup;
123 private InetSocketAddress[] myMiddlewareAddress;
124
125
126 private final JobFrontendRef myJobFrontend;
127
128
129 private final ResourceCache myResourceCache;
130 private final BackendClassLoader myClassLoader;
131
132
133 private final ChannelGroup myWorldChannelGroup;
134 private InetSocketAddress[] myWorldAddress;
135
136
137 private ChannelGroup myFrontendChannelGroup;
138 private InetSocketAddress[] myFrontendAddress;
139
140
141 private Properties myProperties;
142
143
144 private String myMainClassName;
145
146
147 private String[] myArgs;
148
149
150 private boolean commence;
151
152
153 private final ObjectItemBuf<JobBackendMessage> myBuffer
154 = ObjectBuf.buffer((JobBackendMessage) null);
155
156
157 private boolean continueRun = true;
158 private final CountDownLatch runFinished = new CountDownLatch(1);
159
160
161 private State myState = State.RUNNING;
162
163 private static enum State {
164 RUNNING,
165 TERMINATE_CANCEL_JOB,
166 TERMINATE_NO_REPORT,
167 TERMINATING
168 }
169
170
171 private String myCancelMessage;
172
173
174 private final PrintStream myJobLauncherLog;
175
176
177 private final BackendFileWriter myFileWriter;
178 private final BackendFileReader myFileReader;
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 private JobBackend(String username,
196 int jobnum,
197 int K,
198 int rank,
199 boolean hasFrontendComm,
200 String frontendHost,
201 int frontendPort,
202 String backendHost)
203 throws IOException {
204
205
206 this.username = username;
207 this.jobnum = jobnum;
208 this.K = K;
209 this.rank = rank;
210 this.hasFrontendComm = hasFrontendComm;
211 this.frontendHost = frontendHost;
212 this.frontendPort = frontendPort;
213 this.backendHost = backendHost;
214
215
216 StringBuilder sb = new StringBuilder();
217 boolean verbose = Boolean.parseBoolean(System.getProperty("pj.verbose", "false"));
218 if (verbose) {
219 try {
220
221 Logger defaultLogger = LogManager.getLogManager().getLogger("");
222 Handler[] defaultHandlers = defaultLogger.getHandlers();
223 for (Handler h : defaultHandlers) {
224 defaultLogger.removeHandler(h);
225 }
226
227
228 File dir = new File(Integer.toString(this.rank));
229 if (!dir.exists()) {
230 boolean success = dir.mkdir();
231 sb.append(format("\n Creation of logging directory %s: %b", dir, success));
232 }
233
234 String logFile = dir.getAbsolutePath() + File.separator + "backend.log";
235 fileHandler = new FileHandler(logFile);
236 sb.append(format("\n Log file: %s", logFile));
237 fileHandler.setFormatter(new SimpleFormatter());
238 logger.addHandler(fileHandler);
239 logger.setLevel(Level.INFO);
240 } catch (Exception e) {
241 logger.setLevel(Level.OFF);
242 }
243 } else {
244 logger.setLevel(Level.OFF);
245 }
246
247
248 sb.append(format("\n Username: %s", this.username));
249 sb.append(format("\n Job number: %d", this.jobnum));
250 sb.append(format("\n Nodes: %d", this.K));
251 sb.append(format("\n Rank: %d", this.rank));
252 sb.append(format("\n Has Frontend Comm: %b", this.hasFrontendComm));
253 sb.append(format("\n Frontend Host: %s", this.frontendHost));
254 sb.append(format("\n Frontend Port: %d", this.frontendPort));
255 sb.append(format("\n Backend Host: %s", this.backendHost));
256 logger.log(Level.INFO, sb.toString());
257
258
259 Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
260
261
262 logger.log(Level.INFO, " Set up lease timer thread.");
263 myLeaseTimerThread = new TimerThread();
264 myLeaseTimerThread.setDaemon(true);
265 myLeaseTimerThread.start();
266
267
268 logger.log(Level.INFO, format(" Create frontend renew timer (%d sec)." , Constants.LEASE_RENEW_INTERVAL / 1000));
269 myFrontendRenewTimer = myLeaseTimerThread.createTimer(timer -> {
270 try {
271 frontendRenewTimeout();
272 } catch (Throwable ignored) {
273 }
274 });
275 logger.log(Level.INFO, format(" Create frontend expire timer (%d sec)." , Constants.LEASE_EXPIRE_INTERVAL / 1000));
276 myFrontendExpireTimer = myLeaseTimerThread.createTimer(timer -> {
277 try {
278 frontendExpireTimeout();
279 } catch (Throwable ignored) {
280 }
281 });
282
283
284
285 myFrontendExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
286
287
288 logger.log(Level.INFO, " Set up middleware channel group.");
289 myMiddlewareChannelGroup = new ChannelGroup(new InetSocketAddress(backendHost, 0));
290 myMiddlewareChannelGroup.startListening();
291
292
293 logger.log(Level.INFO, " Set up job frontend proxy.");
294 myJobFrontend = new JobFrontendProxy(myMiddlewareChannelGroup,
295 myMiddlewareChannelGroup.connect(new InetSocketAddress(frontendHost, frontendPort)));
296
297
298 logger.log(Level.INFO, " The job frontend proxy has been set up.");
299
300
301 logger.log(Level.INFO, " Start frontend lease renewal timer.");
302 myFrontendRenewTimer.start(Constants.LEASE_RENEW_INTERVAL,
303 Constants.LEASE_RENEW_INTERVAL);
304
305
306 myResourceCache = new ResourceCache();
307 myClassLoader = new BackendClassLoader(
308 getClass().getClassLoader(),
309 this,
310 myJobFrontend,
311 myResourceCache);
312
313
314 logger.log(Level.INFO, " Set up world communicator channel group.");
315 myWorldChannelGroup= new ChannelGroup(new InetSocketAddress(backendHost, 0));
316 myWorldChannelGroup.setAlternateClassLoader(myClassLoader);
317
318
319 if (hasFrontendComm) {
320 logger.log(Level.INFO, " Set up frontend communicator channel group.");
321 myFrontendChannelGroup = new ChannelGroup(new InetSocketAddress(backendHost, 0));
322 myFrontendChannelGroup.setAlternateClassLoader(myClassLoader);
323 }
324
325
326 logger.log(Level.INFO, " Set up backend file writer and reader.");
327 myFileWriter = new BackendFileWriter(myJobFrontend, this);
328 myFileReader = new BackendFileReader(myJobFrontend, this);
329
330
331 logger.log(Level.INFO, " Redirect standard input, standard output, and standard error to job frontend.");
332 System.in.close();
333 System.out.close();
334 myJobLauncherLog = System.err;
335 System.setIn(myFileReader.in);
336 System.setOut(myFileWriter.out);
337 System.setErr(myFileWriter.err);
338
339
340 logger.log(Level.INFO, " Tell job frontend we're ready.");
341 myJobFrontend.backendReady(
342 this,
343 rank,
344 myMiddlewareChannelGroup.listenAddress(),
345 myWorldChannelGroup.listenAddress(),
346 hasFrontendComm ? myFrontendChannelGroup.listenAddress() : null);
347 }
348
349
350
351
352
353 public void run() {
354 Status status = null;
355 JobBackendMessage message = null;
356
357 try {
358 while (continueRun) {
359
360 status
361 = myMiddlewareChannelGroup.receive(null, null, myBuffer);
362 message = myBuffer.item;
363
364
365 message.invoke(this, myJobFrontend);
366
367
368
369 myBuffer.item = null;
370 status = null;
371 message = null;
372 }
373
374
375 reportRunFinished();
376 } catch (ChannelGroupClosedException exc) {
377
378 reportRunFinished();
379 } catch (Throwable exc) {
380
381 reportRunFinished();
382 terminateCancelJob(exc);
383 }
384
385
386 switch (myState) {
387 case TERMINATE_CANCEL_JOB:
388 case TERMINATE_NO_REPORT:
389 System.exit(1);
390 break;
391 case RUNNING:
392 case TERMINATING:
393 break;
394 }
395 }
396
397
398
399
400
401
402 public synchronized void cancelJob(JobFrontendRef theJobFrontend,
403 String errmsg)
404 throws IOException {
405 terminateNoReport();
406 }
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431 public synchronized void commenceJob(JobFrontendRef theJobFrontend,
432 InetSocketAddress[] middlewareAddress,
433 InetSocketAddress[] worldAddress,
434 InetSocketAddress[] frontendAddress,
435 Properties properties,
436 String mainClassName,
437 String[] args)
438 throws IOException {
439
440 myMiddlewareAddress = middlewareAddress;
441 myWorldAddress = worldAddress;
442 myFrontendAddress = frontendAddress;
443 myProperties = properties;
444 myMainClassName = mainClassName;
445 myArgs = args;
446
447
448 commence = true;
449 notifyAll();
450 }
451
452
453
454
455
456
457 public synchronized void jobFinished(JobFrontendRef theJobFrontend)
458 throws IOException {
459 continueRun = false;
460 }
461
462
463
464
465
466
467 public synchronized void renewLease(JobFrontendRef theJobFrontend)
468 throws IOException {
469 myFrontendExpireTimer.start(Constants.LEASE_EXPIRE_INTERVAL);
470 }
471
472
473
474
475
476
477
478
479
480
481 public synchronized void reportResource(JobFrontendRef theJobFrontend,
482 String resourceName,
483 byte[] content)
484 throws IOException {
485 myResourceCache.put(resourceName, content);
486 }
487
488
489
490
491
492
493
494
495
496
497
498 public synchronized void reportResource(JobFrontendRef theJobFrontend,
499 String resourceName,
500 ByteSequence content)
501 throws IOException {
502 myResourceCache.put(resourceName,
503 content == null ? null : content.toByteArray());
504 }
505
506
507
508
509
510
511 public synchronized void outputFileOpenResult(JobFrontendRef theJobFrontend,
512 int bfd,
513 int ffd,
514 IOException exc)
515 throws IOException {
516 myFileWriter.outputFileOpenResult(theJobFrontend, bfd, ffd, exc);
517 }
518
519
520
521
522
523
524 public synchronized void outputFileWriteResult(JobFrontendRef theJobFrontend,
525 int ffd,
526 IOException exc)
527 throws IOException {
528 myFileWriter.outputFileWriteResult(theJobFrontend, ffd, exc);
529 }
530
531
532
533
534
535
536 public synchronized void outputFileFlushResult(JobFrontendRef theJobFrontend,
537 int ffd,
538 IOException exc)
539 throws IOException {
540 myFileWriter.outputFileFlushResult(theJobFrontend, ffd, exc);
541 }
542
543
544
545
546
547
548 public synchronized void outputFileCloseResult(JobFrontendRef theJobFrontend,
549 int ffd,
550 IOException exc)
551 throws IOException {
552 myFileWriter.outputFileCloseResult(theJobFrontend, ffd, exc);
553 }
554
555
556
557
558
559
560 public synchronized void inputFileOpenResult(JobFrontendRef theJobFrontend,
561 int bfd,
562 int ffd,
563 IOException exc)
564 throws IOException {
565 myFileReader.inputFileOpenResult(theJobFrontend, bfd, ffd, exc);
566 }
567
568
569
570
571
572
573 public synchronized void inputFileReadResult(JobFrontendRef theJobFrontend,
574 int ffd,
575 byte[] buf,
576 int len,
577 IOException exc)
578 throws IOException {
579 myFileReader.inputFileReadResult(theJobFrontend, ffd, len, exc);
580 }
581
582
583
584
585
586
587 public synchronized void inputFileSkipResult(JobFrontendRef theJobFrontend,
588 int ffd,
589 long len,
590 IOException exc)
591 throws IOException {
592 myFileReader.inputFileSkipResult(theJobFrontend, ffd, len, exc);
593 }
594
595
596
597
598
599
600 public synchronized void inputFileCloseResult(JobFrontendRef theJobFrontend,
601 int ffd,
602 IOException exc)
603 throws IOException {
604 myFileReader.inputFileCloseResult(theJobFrontend, ffd, exc);
605 }
606
607
608
609
610 public synchronized void close() {
611 }
612
613
614
615
616
617
618 public String getUserName() {
619 return username;
620 }
621
622
623
624
625
626
627 public int getJobNumber() {
628 return jobnum;
629 }
630
631
632
633
634
635
636 public int getK() {
637 return K;
638 }
639
640
641
642
643
644
645 public int getRank() {
646 return rank;
647 }
648
649
650
651
652
653
654 public String getBackendHost() {
655 return backendHost;
656 }
657
658
659
660
661
662
663 public boolean hasFrontendCommunicator() {
664 return hasFrontendComm;
665 }
666
667
668
669
670
671
672 public ClassLoader getClassLoader() {
673 return myClassLoader;
674 }
675
676
677
678
679
680
681 public BackendFileWriter getFileWriter() {
682 return myFileWriter;
683 }
684
685
686
687
688
689
690 public BackendFileReader getFileReader() {
691 return myFileReader;
692 }
693
694
695
696
697 public synchronized void waitForCommence() {
698 while (!commence) {
699 try {
700 wait();
701 } catch (InterruptedException ignored) {
702 }
703 }
704 }
705
706
707
708
709
710
711
712 public ChannelGroup getWorldChannelGroup() {
713 return myWorldChannelGroup;
714 }
715
716
717
718
719
720
721
722
723 public InetSocketAddress[] getWorldAddress() {
724 return myWorldAddress;
725 }
726
727
728
729
730
731
732
733
734 public ChannelGroup getFrontendChannelGroup() {
735 return myFrontendChannelGroup;
736 }
737
738
739
740
741
742
743
744
745
746
747 public InetSocketAddress[] getFrontendAddress() {
748 return myFrontendAddress;
749 }
750
751
752
753
754
755
756
757 public Properties getProperties() {
758 return myProperties;
759 }
760
761
762
763
764
765
766
767
768 public String getMainClassName() {
769 return myMainClassName;
770 }
771
772
773
774
775
776
777
778 public String[] getArgs() {
779 return myArgs;
780 }
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799 public void setComment(String comment) {
800 try {
801 myJobFrontend.reportComment(this, rank, comment);
802 } catch (IOException ignored) {
803 }
804 }
805
806
807
808
809
810
811
812
813 public static JobBackend getJobBackend() {
814 return theJobBackend;
815 }
816
817
818
819
820
821
822
823
824 private synchronized void frontendRenewTimeout()
825 throws IOException {
826 if (myFrontendRenewTimer.isTriggered()) {
827 myJobFrontend.renewLease(this);
828 }
829 }
830
831
832
833
834
835
836 private void frontendExpireTimeout()
837 throws IOException {
838 boolean doExit = false;
839 synchronized (this) {
840 if (myFrontendExpireTimer.isTriggered()) {
841 reportRunFinished();
842 if (myState == State.RUNNING) {
843 myState = State.TERMINATE_NO_REPORT;
844 doExit = true;
845 }
846 }
847 }
848
849
850
851
852 myJobLauncherLog.println("Job frontend lease expired");
853 if (doExit) {
854 System.exit(1);
855 }
856 }
857
858
859
860
861
862
863
864 private void terminateCancelJob(Throwable exc) {
865 continueRun = false;
866 if (myState == State.RUNNING) {
867 myState = State.TERMINATE_CANCEL_JOB;
868 myCancelMessage = exc.getClass().getName();
869 String msg = exc.getMessage();
870 if (msg != null) {
871 myCancelMessage = myCancelMessage + ": " + msg;
872 }
873
874
875 }
876 }
877
878
879
880
881
882 private void terminateNoReport() {
883 continueRun = false;
884 if (myState == State.RUNNING) {
885 myState = State.TERMINATE_NO_REPORT;
886 }
887 }
888
889
890
891
892 private void shutdown() {
893 synchronized (this) {
894
895 if (myJobFrontend != null) {
896 try {
897 switch (myState) {
898 case RUNNING:
899
900 myJobFrontend.backendFinished(this);
901 break;
902 case TERMINATE_CANCEL_JOB:
903
904 myJobFrontend.cancelJob(this, myCancelMessage);
905 break;
906 case TERMINATE_NO_REPORT:
907 case TERMINATING:
908
909 break;
910 }
911 } catch (IOException ignored) {
912 }
913 }
914
915
916 myState = State.TERMINATING;
917 }
918
919
920 waitForRunFinished();
921
922
923 synchronized (this) {
924 myFrontendRenewTimer.stop();
925 myFrontendExpireTimer.stop();
926 }
927
928
929
930 }
931
932
933
934
935 private void waitForRunFinished() {
936 for (; ; ) {
937 try {
938 runFinished.await();
939 break;
940 } catch (InterruptedException ignored) {
941 }
942 }
943 }
944
945
946
947
948 private void reportRunFinished() {
949 runFinished.countDown();
950 }
951
952
953
954
955 private synchronized void dump() {
956 System.out.println("********************************");
957 System.out.println("username = " + username);
958 System.out.println("jobnum = " + jobnum);
959 System.out.println("K = " + K);
960 System.out.println("rank = " + rank);
961 System.out.println("hasFrontendComm = " + hasFrontendComm);
962 for (int i = 0; i <= K; ++i) {
963 System.out.println("myMiddlewareAddress[" + i + "] = " + myMiddlewareAddress[i]);
964 }
965 for (int i = 0; i < K; ++i) {
966 System.out.println("myWorldAddress[" + i + "] = " + myWorldAddress[i]);
967 }
968 if (hasFrontendComm) {
969 for (int i = 0; i <= K; ++i) {
970 System.out.println("myFrontendAddress[" + i + "] = " + myFrontendAddress[i]);
971 }
972 }
973 myProperties.list(System.out);
974 System.out.println("myMainClassName = " + myMainClassName);
975 for (int i = 0; i < myArgs.length; ++i) {
976 System.out.println("myArgs[" + i + "] = \"" + myArgs[i] + "\"");
977 }
978 }
979
980
981
982
983
984
985
986
987
988 public static void main(String[] args)
989 throws Exception {
990 try {
991
992 if (args.length != 8) {
993 usage();
994 }
995 String username = args[0];
996 int jobnum = Integer.parseInt(args[1]);
997 int K = Integer.parseInt(args[2]);
998 int rank = Integer.parseInt(args[3]);
999 boolean hasFrontendComm = Boolean.parseBoolean(args[4]);
1000 String frontendHost = args[5];
1001 int frontendPort = Integer.parseInt(args[6]);
1002 String backendHost = args[7];
1003
1004
1005 theJobBackend
1006 = new JobBackend(username, jobnum, K, rank, hasFrontendComm,
1007 frontendHost, frontendPort, backendHost);
1008 } catch (Throwable exc) {
1009 exc.printStackTrace(System.err);
1010 System.exit(1);
1011 }
1012
1013
1014
1015 Thread.currentThread().setContextClassLoader(theJobBackend.getClassLoader());
1016
1017
1018 logger.log(Level.INFO, " Starting backend Daemon thread.");
1019 Thread thr = new Thread(theJobBackend);
1020 thr.setDaemon(true);
1021 thr.start();
1022
1023
1024 logger.log(Level.INFO, " Waiting for commence.");
1025 theJobBackend.waitForCommence();
1026 logger.log(Level.INFO, " Commencing.");
1027
1028
1029
1030 Properties backendProperties = System.getProperties();
1031 Properties frontendProperties = theJobBackend.getProperties();
1032 for (Map.Entry<Object, Object> entry : frontendProperties.entrySet()) {
1033 String name = (String) entry.getKey();
1034 String value = (String) entry.getValue();
1035 if (backendProperties.getProperty(name) == null) {
1036 backendProperties.setProperty(name, value);
1037 }
1038 }
1039
1040
1041
1042 System.setProperty("java.awt.headless", "true");
1043
1044
1045
1046 Class<?> mainclass
1047 = Class.forName(theJobBackend.getMainClassName(),
1048 true,
1049
1050 ClassLoader.getSystemClassLoader());
1051
1052
1053 Method mainmethod = mainclass.getMethod("main", String[].class);
1054
1055 StringBuilder stringBuilder = new StringBuilder(format(" Preparing to invoke main method:\n %s\n With args: ", mainmethod));
1056
1057 for (String arg : theJobBackend.getArgs()) {
1058 stringBuilder.append(format(" %s", arg));
1059 }
1060 stringBuilder.append("\n Backend start-up completed.");
1061 logger.log(Level.INFO, stringBuilder.toString());
1062
1063
1064 if (theJobBackend.fileHandler != null) {
1065 theJobBackend.fileHandler.flush();
1066 logger.setLevel(Level.OFF);
1067 theJobBackend.fileHandler.close();
1068 }
1069
1070 mainmethod.invoke(null, (Object) theJobBackend.getArgs());
1071
1072
1073
1074
1075 }
1076
1077
1078
1079
1080 private static void usage() {
1081 System.err.println("Usage: java edu.rit.pj.cluster.JobBackend <username> <jobnum> <K> <rank> <hasFrontendComm> <frontendHost> <frontendPort> <backendHost>");
1082 System.exit(1);
1083 }
1084
1085 }