1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.pj.cluster;
41
42 import java.io.Externalizable;
43 import java.io.File;
44 import java.io.IOException;
45 import java.io.ObjectInput;
46 import java.io.ObjectOutput;
47 import java.net.InetSocketAddress;
48
49
50
51
52
53
54
55
56 public abstract class JobFrontendMessage
57 extends Message
58 implements Externalizable {
59
60
61 private static final long serialVersionUID = -6601793901631997673L;
62
63
64
65
66
67 public JobFrontendMessage() {
68 }
69
70
71
72
73
74
75 public JobFrontendMessage(int theTag) {
76 super(theTag);
77 }
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public static JobFrontendMessage assignBackend(JobSchedulerRef theJobScheduler,
94 String name,
95 String host,
96 String jvm,
97 String classpath,
98 String[] jvmflags,
99 String shellCommand,
100 int Nt) {
101 return new AssignBackendMessage(theJobScheduler, name, host, jvm, classpath, jvmflags,
102 shellCommand, Nt);
103 }
104
105
106
107
108
109
110
111
112
113 public static JobFrontendMessage assignJobNumber(JobSchedulerRef theJobScheduler,
114 int jobnum,
115 String pjhost) {
116 return new AssignJobNumberMessage(theJobScheduler, jobnum, pjhost);
117 }
118
119
120
121
122
123
124
125
126 public static JobFrontendMessage cancelJob(JobSchedulerRef theJobScheduler,
127 String errmsg) {
128 return new CancelJobMessage(theJobScheduler, errmsg);
129 }
130
131
132
133
134
135
136
137 public static JobFrontendMessage renewLease(JobSchedulerRef theJobScheduler) {
138 return new RenewLeaseMessage(theJobScheduler);
139 }
140
141
142
143
144
145
146
147 public static JobFrontendMessage backendFinished(JobBackendRef theJobBackend) {
148 return new BackendFinishedMessage(theJobBackend);
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 public static JobFrontendMessage backendReady(JobBackendRef theJobBackend,
166 int rank,
167 InetSocketAddress middlewareAddress,
168 InetSocketAddress worldAddress,
169 InetSocketAddress frontendAddress) {
170 return new BackendReadyMessage(theJobBackend, rank, middlewareAddress,
171 worldAddress, frontendAddress);
172 }
173
174
175
176
177
178
179
180
181 public static JobFrontendMessage cancelJob(JobBackendRef theJobBackend,
182 String errmsg) {
183 return new CancelJobMessage(theJobBackend, errmsg);
184 }
185
186
187
188
189
190
191
192 public static JobFrontendMessage renewLease(JobBackendRef theJobBackend) {
193 return new RenewLeaseMessage(theJobBackend);
194 }
195
196
197
198
199
200
201
202
203 public static JobFrontendMessage requestResource(JobBackendRef theJobBackend,
204 String resourceName) {
205 return new RequestResourceMessage(theJobBackend, resourceName);
206 }
207
208
209
210
211
212
213
214
215
216
217 public static JobFrontendMessage outputFileOpen(JobBackendRef theJobBackend,
218 int bfd,
219 File file,
220 boolean append) {
221 return new OutputFileOpenMessage(theJobBackend, bfd, file, append);
222 }
223
224
225
226
227
228
229
230
231
232 public static JobFrontendMessage outputFileWrite(JobBackendRef theJobBackend,
233 int ffd,
234 int len) {
235 return new OutputFileWriteMessage(theJobBackend, ffd, len);
236 }
237
238
239
240
241
242
243
244
245 public static JobFrontendMessage outputFileFlush(JobBackendRef theJobBackend,
246 int ffd) {
247 return new OutputFileFlushMessage(theJobBackend, ffd);
248 }
249
250
251
252
253
254
255
256
257 public static JobFrontendMessage outputFileClose(JobBackendRef theJobBackend,
258 int ffd) {
259 return new OutputFileCloseMessage(theJobBackend, ffd);
260 }
261
262
263
264
265
266
267
268
269
270 public static JobFrontendMessage inputFileOpen(JobBackendRef theJobBackend,
271 int bfd,
272 File file) {
273 return new InputFileOpenMessage(theJobBackend, bfd, file);
274 }
275
276
277
278
279
280
281
282
283
284 public static JobFrontendMessage inputFileRead(JobBackendRef theJobBackend,
285 int ffd,
286 int len) {
287 return new InputFileReadMessage(theJobBackend, ffd, len);
288 }
289
290
291
292
293
294
295
296
297
298 public static JobFrontendMessage inputFileSkip(JobBackendRef theJobBackend,
299 int ffd,
300 long len) {
301 return new InputFileSkipMessage(theJobBackend, ffd, len);
302 }
303
304
305
306
307
308
309
310
311 public static JobFrontendMessage inputFileClose(JobBackendRef theJobBackend,
312 int ffd) {
313 return new InputFileCloseMessage(theJobBackend, ffd);
314 }
315
316
317
318
319
320
321
322
323
324 public static JobFrontendMessage reportComment(JobBackendRef theJobBackend,
325 int rank,
326 String comment) {
327 return new ReportCommentMessage(theJobBackend, rank, comment);
328 }
329
330
331
332
333
334
335
336
337
338
339
340 public void invoke(JobFrontendRef theJobFrontend,
341 JobSchedulerRef theJobScheduler)
342 throws IOException {
343 throw new UnsupportedOperationException();
344 }
345
346
347
348
349
350
351
352
353
354
355
356 public void invoke(JobFrontendRef theJobFrontend,
357 JobBackendRef theJobBackend)
358 throws IOException {
359 throw new UnsupportedOperationException();
360 }
361
362
363
364
365
366
367
368 public void writeExternal(ObjectOutput out)
369 throws IOException {
370 }
371
372
373
374
375
376
377
378
379
380 public void readExternal(ObjectInput in)
381 throws IOException, ClassNotFoundException {
382 }
383
384
385
386
387
388
389
390
391
392 private static class AssignBackendMessage
393 extends JobFrontendMessage {
394
395 private static final long serialVersionUID = 4426930548078115598L;
396
397 private String name;
398 private String host;
399 private String jvm;
400 private String classpath;
401 private String[] jvmflags;
402 private String shellCommand;
403 private int Nt;
404
405 public AssignBackendMessage() {
406 }
407
408 public AssignBackendMessage(JobSchedulerRef theJobScheduler,
409 String name,
410 String host,
411 String jvm,
412 String classpath,
413 String[] jvmflags,
414 String shellCommand,
415 int Nt) {
416 super(Message.FROM_JOB_SCHEDULER);
417 this.name = name;
418 this.host = host;
419 this.jvm = jvm;
420 this.classpath = classpath;
421 this.jvmflags = jvmflags;
422 this.shellCommand = shellCommand;
423 this.Nt = Nt;
424 }
425
426 public void invoke(JobFrontendRef theJobFrontend,
427 JobSchedulerRef theJobScheduler)
428 throws IOException {
429 theJobFrontend.assignBackend(theJobScheduler, name, host, jvm, classpath, jvmflags,
430 shellCommand, Nt);
431 }
432
433 public void writeExternal(ObjectOutput out)
434 throws IOException {
435 out.writeUTF(name);
436 out.writeUTF(host);
437 out.writeUTF(jvm);
438 out.writeUTF(classpath);
439 int n = jvmflags.length;
440 out.writeInt(n);
441 for (int i = 0; i < n; ++i) {
442 out.writeUTF(jvmflags[i]);
443 }
444 out.writeUTF(shellCommand);
445 out.writeInt(Nt);
446 }
447
448 public void readExternal(ObjectInput in)
449 throws IOException {
450 name = in.readUTF();
451 host = in.readUTF();
452 jvm = in.readUTF();
453 classpath = in.readUTF();
454 int n = in.readInt();
455 jvmflags = new String[n];
456 for (int i = 0; i < n; ++i) {
457 jvmflags[i] = in.readUTF();
458 }
459 shellCommand = in.readUTF();
460 Nt = in.readInt();
461 }
462 }
463
464
465
466
467
468
469
470
471 private static class AssignJobNumberMessage
472 extends JobFrontendMessage {
473
474 private static final long serialVersionUID = -197099388467750972L;
475
476 private int jobnum;
477 private String pjhost;
478
479 public AssignJobNumberMessage() {
480 }
481
482 public AssignJobNumberMessage(JobSchedulerRef theJobScheduler,
483 int jobnum,
484 String pjhost) {
485 super(Message.FROM_JOB_SCHEDULER);
486 this.jobnum = jobnum;
487 this.pjhost = pjhost;
488 }
489
490 public void invoke(JobFrontendRef theJobFrontend,
491 JobSchedulerRef theJobScheduler)
492 throws IOException {
493 theJobFrontend.assignJobNumber(theJobScheduler, jobnum, pjhost);
494 }
495
496 public void writeExternal(ObjectOutput out)
497 throws IOException {
498 out.writeInt(jobnum);
499 out.writeUTF(pjhost);
500 }
501
502 public void readExternal(ObjectInput in)
503 throws IOException {
504 jobnum = in.readInt();
505 pjhost = in.readUTF();
506 }
507 }
508
509
510
511
512
513
514
515
516 private static class BackendFinishedMessage
517 extends JobFrontendMessage {
518
519 private static final long serialVersionUID = 759872032212034107L;
520
521 public BackendFinishedMessage() {
522 }
523
524 public BackendFinishedMessage(JobBackendRef theJobBackend) {
525 super(Message.FROM_JOB_BACKEND);
526 }
527
528 public void invoke(JobFrontendRef theJobFrontend,
529 JobBackendRef theJobBackend)
530 throws IOException {
531 theJobFrontend.backendFinished(theJobBackend);
532 }
533 }
534
535
536
537
538
539
540
541
542 private static class BackendReadyMessage
543 extends JobFrontendMessage {
544
545 private static final long serialVersionUID = -8872540352133660209L;
546
547 private int rank;
548 private InetSocketAddress middlewareAddress;
549 private InetSocketAddress worldAddress;
550 private InetSocketAddress frontendAddress;
551
552 public BackendReadyMessage() {
553 }
554
555 public BackendReadyMessage(JobBackendRef theJobBackend,
556 int rank,
557 InetSocketAddress middlewareAddress,
558 InetSocketAddress worldAddress,
559 InetSocketAddress frontendAddress) {
560 super(Message.FROM_JOB_BACKEND);
561 this.rank = rank;
562 this.middlewareAddress = middlewareAddress;
563 this.worldAddress = worldAddress;
564 this.frontendAddress = frontendAddress;
565 }
566
567 public void invoke(JobFrontendRef theJobFrontend,
568 JobBackendRef theJobBackend)
569 throws IOException {
570 theJobFrontend.backendReady(theJobBackend, rank, middlewareAddress,
571 worldAddress, frontendAddress);
572 }
573
574 public void writeExternal(ObjectOutput out)
575 throws IOException {
576 out.writeInt(rank);
577 out.writeObject(middlewareAddress);
578 out.writeObject(worldAddress);
579 out.writeObject(frontendAddress);
580 }
581
582 public void readExternal(ObjectInput in)
583 throws IOException, ClassNotFoundException {
584 rank = in.readInt();
585 middlewareAddress = (InetSocketAddress) in.readObject();
586 worldAddress = (InetSocketAddress) in.readObject();
587 frontendAddress = (InetSocketAddress) in.readObject();
588 }
589 }
590
591
592
593
594
595
596
597
598 private static class CancelJobMessage
599 extends JobFrontendMessage {
600
601 private static final long serialVersionUID = -2009079595329812496L;
602
603 private String errmsg;
604
605 public CancelJobMessage() {
606 }
607
608 public CancelJobMessage(JobSchedulerRef theJobScheduler,
609 String errmsg) {
610 super(Message.FROM_JOB_SCHEDULER);
611 this.errmsg = errmsg;
612 }
613
614 public CancelJobMessage(JobBackendRef theJobBackend,
615 String errmsg) {
616 super(Message.FROM_JOB_BACKEND);
617 this.errmsg = errmsg;
618 }
619
620 public void invoke(JobFrontendRef theJobFrontend,
621 JobSchedulerRef theJobScheduler)
622 throws IOException {
623 theJobFrontend.cancelJob(theJobScheduler, errmsg);
624 }
625
626 public void invoke(JobFrontendRef theJobFrontend,
627 JobBackendRef theJobBackend)
628 throws IOException {
629 theJobFrontend.cancelJob(theJobBackend, errmsg);
630 }
631
632 public void writeExternal(ObjectOutput out)
633 throws IOException {
634 out.writeUTF(errmsg);
635 }
636
637 public void readExternal(ObjectInput in)
638 throws IOException {
639 errmsg = in.readUTF();
640 }
641 }
642
643
644
645
646
647
648
649
650 private static class RenewLeaseMessage
651 extends JobFrontendMessage {
652
653 private static final long serialVersionUID = -9030468939160436622L;
654
655 public RenewLeaseMessage() {
656 }
657
658 public RenewLeaseMessage(JobSchedulerRef theJobScheduler) {
659 super(Message.FROM_JOB_SCHEDULER);
660 }
661
662 public RenewLeaseMessage(JobBackendRef theJobBackend) {
663 super(Message.FROM_JOB_BACKEND);
664 }
665
666 public void invoke(JobFrontendRef theJobFrontend,
667 JobSchedulerRef theJobScheduler)
668 throws IOException {
669 theJobFrontend.renewLease(theJobScheduler);
670 }
671
672 public void invoke(JobFrontendRef theJobFrontend,
673 JobBackendRef theJobBackend)
674 throws IOException {
675 theJobFrontend.renewLease(theJobBackend);
676 }
677 }
678
679
680
681
682
683
684
685
686 private static class RequestResourceMessage
687 extends JobFrontendMessage {
688
689 private static final long serialVersionUID = 9184806604713339263L;
690
691 private String resourceName;
692
693 public RequestResourceMessage() {
694 }
695
696 public RequestResourceMessage(JobBackendRef theJobBackend,
697 String resourceName) {
698 super(Message.FROM_JOB_BACKEND);
699 this.resourceName = resourceName;
700 }
701
702 public void invoke(JobFrontendRef theJobFrontend,
703 JobBackendRef theJobBackend)
704 throws IOException {
705 theJobFrontend.requestResource(theJobBackend, resourceName);
706 }
707
708 public void writeExternal(ObjectOutput out)
709 throws IOException {
710 out.writeUTF(resourceName);
711 }
712
713 public void readExternal(ObjectInput in)
714 throws IOException {
715 resourceName = in.readUTF();
716 }
717 }
718
719
720
721
722
723
724
725
726 private static class OutputFileOpenMessage
727 extends JobFrontendMessage {
728
729 private static final long serialVersionUID = -4987754930269039852L;
730
731 private int bfd;
732 private File file;
733 private boolean append;
734
735 public OutputFileOpenMessage() {
736 }
737
738 public OutputFileOpenMessage(JobBackendRef theJobBackend,
739 int bfd,
740 File file,
741 boolean append) {
742 super(Message.FROM_JOB_BACKEND);
743 this.bfd = bfd;
744 this.file = file;
745 this.append = append;
746 }
747
748 public void invoke(JobFrontendRef theJobFrontend,
749 JobBackendRef theJobBackend)
750 throws IOException {
751 theJobFrontend.outputFileOpen(theJobBackend, bfd, file, append);
752 }
753
754 public void writeExternal(ObjectOutput out)
755 throws IOException {
756 out.writeInt(bfd);
757 out.writeBoolean(append);
758 out.writeObject(file);
759 }
760
761 public void readExternal(ObjectInput in)
762 throws IOException, ClassNotFoundException {
763 bfd = in.readInt();
764 append = in.readBoolean();
765 file = (File) in.readObject();
766 }
767 }
768
769
770
771
772
773
774
775
776 private static class OutputFileWriteMessage
777 extends JobFrontendMessage {
778
779 private static final long serialVersionUID = -4460426636308841602L;
780
781 private int ffd;
782 private int len;
783
784 public OutputFileWriteMessage() {
785 }
786
787 public OutputFileWriteMessage(JobBackendRef theJobBackend,
788 int ffd,
789 int len) {
790 super(Message.FROM_JOB_BACKEND);
791 this.ffd = ffd;
792 this.len = len;
793 }
794
795 public void invoke(JobFrontendRef theJobFrontend,
796 JobBackendRef theJobBackend)
797 throws IOException {
798 theJobFrontend.outputFileWrite(theJobBackend, ffd, null, 0, len);
799 }
800
801 public void writeExternal(ObjectOutput out)
802 throws IOException {
803 out.writeInt(ffd);
804 out.writeInt(len);
805 }
806
807 public void readExternal(ObjectInput in)
808 throws IOException {
809 ffd = in.readInt();
810 len = in.readInt();
811 }
812 }
813
814
815
816
817
818
819
820
821 private static class OutputFileFlushMessage
822 extends JobFrontendMessage {
823
824 private static final long serialVersionUID = 7074849708663078210L;
825
826 private int ffd;
827
828 public OutputFileFlushMessage() {
829 }
830
831 public OutputFileFlushMessage(JobBackendRef theJobBackend,
832 int ffd) {
833 super(Message.FROM_JOB_BACKEND);
834 this.ffd = ffd;
835 }
836
837 public void invoke(JobFrontendRef theJobFrontend,
838 JobBackendRef theJobBackend)
839 throws IOException {
840 theJobFrontend.outputFileFlush(theJobBackend, ffd);
841 }
842
843 public void writeExternal(ObjectOutput out)
844 throws IOException {
845 out.writeInt(ffd);
846 }
847
848 public void readExternal(ObjectInput in)
849 throws IOException {
850 ffd = in.readInt();
851 }
852 }
853
854
855
856
857
858
859
860
861 private static class OutputFileCloseMessage
862 extends JobFrontendMessage {
863
864 private static final long serialVersionUID = -5637017577338427153L;
865
866 private int ffd;
867
868 public OutputFileCloseMessage() {
869 }
870
871 public OutputFileCloseMessage(JobBackendRef theJobBackend,
872 int ffd) {
873 super(Message.FROM_JOB_BACKEND);
874 this.ffd = ffd;
875 }
876
877 public void invoke(JobFrontendRef theJobFrontend,
878 JobBackendRef theJobBackend)
879 throws IOException {
880 theJobFrontend.outputFileClose(theJobBackend, ffd);
881 }
882
883 public void writeExternal(ObjectOutput out)
884 throws IOException {
885 out.writeInt(ffd);
886 }
887
888 public void readExternal(ObjectInput in)
889 throws IOException {
890 ffd = in.readInt();
891 }
892 }
893
894
895
896
897
898
899
900
901 private static class InputFileOpenMessage
902 extends JobFrontendMessage {
903
904 private static final long serialVersionUID = -791306998166025239L;
905
906 private int bfd;
907 private File file;
908
909 public InputFileOpenMessage() {
910 }
911
912 public InputFileOpenMessage(JobBackendRef theJobBackend,
913 int bfd,
914 File file) {
915 super(Message.FROM_JOB_BACKEND);
916 this.bfd = bfd;
917 this.file = file;
918 }
919
920 public void invoke(JobFrontendRef theJobFrontend,
921 JobBackendRef theJobBackend)
922 throws IOException {
923 theJobFrontend.inputFileOpen(theJobBackend, bfd, file);
924 }
925
926 public void writeExternal(ObjectOutput out)
927 throws IOException {
928 out.writeInt(bfd);
929 out.writeObject(file);
930 }
931
932 public void readExternal(ObjectInput in)
933 throws IOException, ClassNotFoundException {
934 bfd = in.readInt();
935 file = (File) in.readObject();
936 }
937 }
938
939
940
941
942
943
944
945
946 private static class InputFileReadMessage
947 extends JobFrontendMessage {
948
949 private static final long serialVersionUID = 7727558874589005187L;
950
951 private int ffd;
952 private int len;
953
954 public InputFileReadMessage() {
955 }
956
957 public InputFileReadMessage(JobBackendRef theJobBackend,
958 int ffd,
959 int len) {
960 super(Message.FROM_JOB_BACKEND);
961 this.ffd = ffd;
962 this.len = len;
963 }
964
965 public void invoke(JobFrontendRef theJobFrontend,
966 JobBackendRef theJobBackend)
967 throws IOException {
968 theJobFrontend.inputFileRead(theJobBackend, ffd, len);
969 }
970
971 public void writeExternal(ObjectOutput out)
972 throws IOException {
973 out.writeInt(ffd);
974 out.writeInt(len);
975 }
976
977 public void readExternal(ObjectInput in)
978 throws IOException {
979 ffd = in.readInt();
980 len = in.readInt();
981 }
982 }
983
984
985
986
987
988
989
990
991 private static class InputFileSkipMessage
992 extends JobFrontendMessage {
993
994 private static final long serialVersionUID = 7867427744015166954L;
995
996 private int ffd;
997 private long len;
998
999 public InputFileSkipMessage() {
1000 }
1001
1002 public InputFileSkipMessage(JobBackendRef theJobBackend,
1003 int ffd,
1004 long len) {
1005 super(Message.FROM_JOB_BACKEND);
1006 this.ffd = ffd;
1007 this.len = len;
1008 }
1009
1010 public void invoke(JobFrontendRef theJobFrontend,
1011 JobBackendRef theJobBackend)
1012 throws IOException {
1013 theJobFrontend.inputFileSkip(theJobBackend, ffd, len);
1014 }
1015
1016 public void writeExternal(ObjectOutput out)
1017 throws IOException {
1018 out.writeInt(ffd);
1019 out.writeLong(len);
1020 }
1021
1022 public void readExternal(ObjectInput in)
1023 throws IOException {
1024 ffd = in.readInt();
1025 len = in.readLong();
1026 }
1027 }
1028
1029
1030
1031
1032
1033
1034
1035
1036 private static class InputFileCloseMessage
1037 extends JobFrontendMessage {
1038
1039 private static final long serialVersionUID = 1223549294646718409L;
1040
1041 private int ffd;
1042
1043 public InputFileCloseMessage() {
1044 }
1045
1046 public InputFileCloseMessage(JobBackendRef theJobBackend,
1047 int ffd) {
1048 super(Message.FROM_JOB_BACKEND);
1049 this.ffd = ffd;
1050 }
1051
1052 public void invoke(JobFrontendRef theJobFrontend,
1053 JobBackendRef theJobBackend)
1054 throws IOException {
1055 theJobFrontend.inputFileClose(theJobBackend, ffd);
1056 }
1057
1058 public void writeExternal(ObjectOutput out)
1059 throws IOException {
1060 out.writeInt(ffd);
1061 }
1062
1063 public void readExternal(ObjectInput in)
1064 throws IOException {
1065 ffd = in.readInt();
1066 }
1067 }
1068
1069
1070
1071
1072
1073
1074
1075
1076 private static class ReportCommentMessage
1077 extends JobFrontendMessage {
1078
1079 private static final long serialVersionUID = 1092254806177598252L;
1080
1081 private int rank;
1082 private String comment;
1083
1084 public ReportCommentMessage() {
1085 }
1086
1087 public ReportCommentMessage(JobBackendRef theJobBackend,
1088 int rank,
1089 String comment) {
1090 super(Message.FROM_JOB_BACKEND);
1091 this.rank = rank;
1092 this.comment = comment == null ? "" : comment;
1093 }
1094
1095 public void invoke(JobFrontendRef theJobFrontend,
1096 JobBackendRef theJobBackend)
1097 throws IOException {
1098 theJobFrontend.reportComment(theJobBackend, rank, comment);
1099 }
1100
1101 public void writeExternal(ObjectOutput out)
1102 throws IOException {
1103 out.writeInt(rank);
1104 out.writeUTF(comment);
1105 }
1106
1107 public void readExternal(ObjectInput in)
1108 throws IOException {
1109 rank = in.readInt();
1110 comment = in.readUTF();
1111 }
1112 }
1113
1114 }