1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.pj.cluster;
41
42 import java.io.*;
43 import java.net.InetSocketAddress;
44 import java.util.Map;
45 import java.util.Properties;
46
47 import edu.rit.util.ByteSequence;
48
49
50
51
52
53
54
55
56 @SuppressWarnings("serial")
57 public abstract class JobBackendMessage
58 extends Message
59 implements Externalizable {
60
61
62 @Serial
63 private static final long serialVersionUID = 3747140854797048519L;
64
65
66
67
68
69 public JobBackendMessage() {
70 }
71
72
73
74
75
76
77 public JobBackendMessage(int theTag) {
78 super(theTag);
79 }
80
81
82
83
84
85
86
87
88
89 public static JobBackendMessage cancelJob(JobFrontendRef theJobFrontend,
90 String errmsg) {
91 return new CancelJobMessage(theJobFrontend, errmsg);
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public static JobBackendMessage commenceJob(JobFrontendRef theJobFrontend,
117 InetSocketAddress[] middlewareAddress,
118 InetSocketAddress[] worldAddress,
119 InetSocketAddress[] frontendAddress,
120 Properties properties,
121 String mainClassName,
122 String[] args) {
123 return new CommenceJobMessage(theJobFrontend, middlewareAddress, worldAddress, frontendAddress,
124 properties, mainClassName, args);
125 }
126
127
128
129
130
131
132
133 public static JobBackendMessage jobFinished(JobFrontendRef theJobFrontend) {
134 return new JobFinishedMessage(theJobFrontend);
135 }
136
137
138
139
140
141
142
143 public static JobBackendMessage renewLease(JobFrontendRef theJobFrontend) {
144 return new RenewLeaseMessage(theJobFrontend);
145 }
146
147
148
149
150
151
152
153
154
155 public static JobBackendMessage reportResource(JobFrontendRef theJobFrontend,
156 String resourceName,
157 byte[] content) {
158 return new ReportResourceMessage(theJobFrontend, resourceName, content);
159 }
160
161
162
163
164
165
166
167
168
169 public static JobBackendMessage reportResource(JobFrontendRef theJobFrontend,
170 String resourceName,
171 ByteSequence content) {
172 return new ReportResourceMessage(theJobFrontend, resourceName, content);
173 }
174
175
176
177
178
179
180
181
182
183
184 public static JobBackendMessage outputFileOpenResult(JobFrontendRef theJobFrontend,
185 int bfd,
186 int ffd,
187 IOException exc) {
188 return new OutputFileOpenResultMessage(theJobFrontend, bfd, ffd, exc);
189 }
190
191
192
193
194
195
196
197
198
199 public static JobBackendMessage outputFileWriteResult(JobFrontendRef theJobFrontend,
200 int ffd,
201 IOException exc) {
202 return new OutputFileWriteResultMessage(theJobFrontend, ffd, exc);
203 }
204
205
206
207
208
209
210
211
212
213 public static JobBackendMessage outputFileFlushResult(JobFrontendRef theJobFrontend,
214 int ffd,
215 IOException exc) {
216 return new OutputFileFlushResultMessage(theJobFrontend, ffd, exc);
217 }
218
219
220
221
222
223
224
225
226
227 public static JobBackendMessage outputFileCloseResult(JobFrontendRef theJobFrontend,
228 int ffd,
229 IOException exc) {
230 return new OutputFileCloseResultMessage(theJobFrontend, ffd, exc);
231 }
232
233
234
235
236
237
238
239
240
241
242 public static JobBackendMessage inputFileOpenResult(JobFrontendRef theJobFrontend,
243 int bfd,
244 int ffd,
245 IOException exc) {
246 return new InputFileOpenResultMessage(theJobFrontend, bfd, ffd, exc);
247 }
248
249
250
251
252
253
254
255
256
257
258 public static JobBackendMessage inputFileReadResult(JobFrontendRef theJobFrontend,
259 int ffd,
260 int len,
261 IOException exc) {
262 return new InputFileReadResultMessage(theJobFrontend, ffd, len, exc);
263 }
264
265
266
267
268
269
270
271
272
273
274 public static JobBackendMessage inputFileSkipResult(JobFrontendRef theJobFrontend,
275 int ffd,
276 long len,
277 IOException exc) {
278 return new InputFileSkipResultMessage(theJobFrontend, ffd, len, exc);
279 }
280
281
282
283
284
285
286
287
288
289 public static JobBackendMessage inputFileCloseResult(JobFrontendRef theJobFrontend,
290 int ffd,
291 IOException exc) {
292 return new InputFileCloseResultMessage(theJobFrontend, ffd, exc);
293 }
294
295
296
297
298
299
300
301
302
303
304
305 public void invoke(JobBackendRef theJobBackend,
306 JobFrontendRef theJobFrontend)
307 throws IOException {
308 throw new UnsupportedOperationException();
309 }
310
311
312
313
314
315
316
317 public void writeExternal(ObjectOutput out)
318 throws IOException {
319 }
320
321
322
323
324
325
326
327
328
329 public void readExternal(ObjectInput in)
330 throws IOException, ClassNotFoundException {
331 }
332
333
334
335
336
337
338
339
340
341 private static class CancelJobMessage
342 extends JobBackendMessage {
343
344 @Serial
345 private static final long serialVersionUID = -1706674774429384654L;
346
347 private String errmsg;
348
349 public CancelJobMessage() {
350 }
351
352 public CancelJobMessage(JobFrontendRef theJobFrontend,
353 String errmsg) {
354 super(Message.FROM_JOB_FRONTEND);
355 this.errmsg = errmsg;
356 }
357
358 public void invoke(JobBackendRef theJobBackend,
359 JobFrontendRef theJobFrontend)
360 throws IOException {
361 theJobBackend.cancelJob(theJobFrontend, errmsg);
362 }
363
364 public void writeExternal(ObjectOutput out)
365 throws IOException {
366 out.writeUTF(errmsg);
367 }
368
369 public void readExternal(ObjectInput in)
370 throws IOException {
371 errmsg = in.readUTF();
372 }
373 }
374
375
376
377
378
379
380
381
382 private static class CommenceJobMessage
383 extends JobBackendMessage {
384
385 @Serial
386 private static final long serialVersionUID = -8262872991140404870L;
387
388 private InetSocketAddress[] middlewareAddress;
389 private InetSocketAddress[] worldAddress;
390 private InetSocketAddress[] frontendAddress;
391 private Properties properties;
392 private String mainClassName;
393 private String[] args;
394
395 public CommenceJobMessage() {
396 }
397
398 public CommenceJobMessage(JobFrontendRef theJobFrontend,
399 InetSocketAddress[] middlewareAddress,
400 InetSocketAddress[] worldAddress,
401 InetSocketAddress[] frontendAddress,
402 Properties properties,
403 String mainClassName,
404 String[] args) {
405 super(Message.FROM_JOB_FRONTEND);
406 this.middlewareAddress = middlewareAddress;
407 this.worldAddress = worldAddress;
408 this.frontendAddress = frontendAddress;
409 this.properties = properties;
410 this.mainClassName = mainClassName;
411 this.args = args;
412 }
413
414 public void invoke(JobBackendRef theJobBackend,
415 JobFrontendRef theJobFrontend)
416 throws IOException {
417 theJobBackend.commenceJob(theJobFrontend, middlewareAddress, worldAddress,
418 frontendAddress, properties, mainClassName, args);
419 }
420
421 public void writeExternal(ObjectOutput out)
422 throws IOException {
423 out.writeInt(properties.size());
424 for (Map.Entry<Object, Object> entry : properties.entrySet()) {
425 out.writeUTF((String) entry.getKey());
426 out.writeUTF((String) entry.getValue());
427 }
428 out.writeUTF(mainClassName);
429 out.writeInt(args.length);
430 for (String arg : args) {
431 out.writeUTF(arg);
432 }
433 int n1 = middlewareAddress.length;
434 int n2 = worldAddress.length;
435 int n3 = frontendAddress == null ? 0 : frontendAddress.length;
436 out.writeInt(n1);
437 out.writeInt(n2);
438 out.writeInt(n3);
439 for (int i = 0; i < n1; ++i) {
440 out.writeObject(middlewareAddress[i]);
441 }
442 for (int i = 0; i < n2; ++i) {
443 out.writeObject(worldAddress[i]);
444 }
445 for (int i = 0; i < n3; ++i) {
446 out.writeObject(frontendAddress[i]);
447 }
448 }
449
450 public void readExternal(ObjectInput in)
451 throws IOException, ClassNotFoundException {
452 int n = in.readInt();
453 properties = new Properties();
454 for (int i = 0; i < n; ++i) {
455 properties.setProperty(in.readUTF(), in.readUTF());
456 }
457 mainClassName = in.readUTF();
458 n = in.readInt();
459 args = new String[n];
460 for (int i = 0; i < n; ++i) {
461 args[i] = in.readUTF();
462 }
463 int n1 = in.readInt();
464 int n2 = in.readInt();
465 int n3 = in.readInt();
466 middlewareAddress = new InetSocketAddress[n1];
467 for (int i = 0; i < n1; ++i) {
468 middlewareAddress[i] = (InetSocketAddress) in.readObject();
469 }
470 worldAddress = new InetSocketAddress[n2];
471 for (int i = 0; i < n2; ++i) {
472 worldAddress[i] = (InetSocketAddress) in.readObject();
473 }
474 if (n3 > 0) {
475 frontendAddress = new InetSocketAddress[n3];
476 for (int i = 0; i < n3; ++i) {
477 frontendAddress[i] = (InetSocketAddress) in.readObject();
478 }
479 }
480 }
481 }
482
483
484
485
486
487
488
489
490 private static class JobFinishedMessage
491 extends JobBackendMessage {
492
493 @Serial
494 private static final long serialVersionUID = 1363549433797859519L;
495
496 public JobFinishedMessage() {
497 }
498
499 public JobFinishedMessage(JobFrontendRef theJobFrontend) {
500 super(Message.FROM_JOB_FRONTEND);
501 }
502
503 public void invoke(JobBackendRef theJobBackend,
504 JobFrontendRef theJobFrontend)
505 throws IOException {
506 theJobBackend.jobFinished(theJobFrontend);
507 }
508 }
509
510
511
512
513
514
515
516
517 private static class RenewLeaseMessage
518 extends JobBackendMessage {
519
520 private static final long serialVersionUID = -5146916532326180730L;
521
522 public RenewLeaseMessage() {
523 }
524
525 public RenewLeaseMessage(JobFrontendRef theJobFrontend) {
526 super(Message.FROM_JOB_FRONTEND);
527 }
528
529 public void invoke(JobBackendRef theJobBackend,
530 JobFrontendRef theJobFrontend)
531 throws IOException {
532 theJobBackend.renewLease(theJobFrontend);
533 }
534 }
535
536
537
538
539
540
541
542
543 private static class ReportResourceMessage
544 extends JobBackendMessage {
545
546 private static final long serialVersionUID = 8709827200639757997L;
547
548 private String resourceName;
549 private ByteSequence contentSeq;
550 private byte[] content;
551
552 public ReportResourceMessage() {
553 }
554
555 public ReportResourceMessage(JobFrontendRef theJobFrontend,
556 String resourceName,
557 byte[] content) {
558 super(Message.FROM_JOB_FRONTEND);
559 this.resourceName = resourceName;
560 this.content = content;
561 }
562
563 public ReportResourceMessage(JobFrontendRef theJobFrontend,
564 String resourceName,
565 ByteSequence content) {
566 super(Message.FROM_JOB_FRONTEND);
567 this.resourceName = resourceName;
568 this.contentSeq = content;
569 }
570
571 public void invoke(JobBackendRef theJobBackend,
572 JobFrontendRef theJobFrontend)
573 throws IOException {
574 if (content != null) {
575 theJobBackend.reportResource(theJobFrontend, resourceName, content);
576 } else {
577 theJobBackend.reportResource(theJobFrontend, resourceName, contentSeq);
578 }
579 }
580
581 public void writeExternal(ObjectOutput out)
582 throws IOException {
583 out.writeUTF(resourceName);
584 if (content != null) {
585 out.writeInt(content.length);
586 out.write(content);
587 } else if (contentSeq != null) {
588 out.writeInt(contentSeq.length());
589 contentSeq.write(out);
590 } else {
591 out.writeInt(-1);
592 }
593 }
594
595 public void readExternal(ObjectInput in)
596 throws IOException {
597 resourceName = in.readUTF();
598 int n = in.readInt();
599 if (n < 0) {
600 content = null;
601 } else {
602 content = new byte[n];
603 in.readFully(content);
604 }
605 }
606 }
607
608
609
610
611
612
613
614
615 private static class OutputFileOpenResultMessage
616 extends JobBackendMessage {
617
618 private static final long serialVersionUID = 1460222094425830409L;
619
620 private int bfd;
621 private int ffd;
622 private IOException exc;
623
624 public OutputFileOpenResultMessage() {
625 }
626
627 public OutputFileOpenResultMessage(JobFrontendRef theJobFrontend,
628 int bfd,
629 int ffd,
630 IOException exc) {
631 super(Message.FROM_JOB_FRONTEND);
632 this.bfd = bfd;
633 this.ffd = ffd;
634 this.exc = exc;
635 }
636
637 public void invoke(JobBackendRef theJobBackend,
638 JobFrontendRef theJobFrontend)
639 throws IOException {
640 theJobBackend.outputFileOpenResult(theJobFrontend, bfd, ffd, exc);
641 }
642
643 public void writeExternal(ObjectOutput out)
644 throws IOException {
645 out.writeInt(bfd);
646 out.writeInt(ffd);
647 out.writeObject(exc);
648 }
649
650 public void readExternal(ObjectInput in)
651 throws IOException, ClassNotFoundException {
652 bfd = in.readInt();
653 ffd = in.readInt();
654 exc = (IOException) in.readObject();
655 }
656 }
657
658
659
660
661
662
663
664
665 private static class OutputFileWriteResultMessage
666 extends JobBackendMessage {
667
668 private static final long serialVersionUID = -4734876024127370851L;
669
670 private int ffd;
671 private IOException exc;
672
673 public OutputFileWriteResultMessage() {
674 }
675
676 public OutputFileWriteResultMessage(JobFrontendRef theJobFrontend,
677 int ffd,
678 IOException exc) {
679 super(Message.FROM_JOB_FRONTEND);
680 this.ffd = ffd;
681 this.exc = exc;
682 }
683
684 public void invoke(JobBackendRef theJobBackend,
685 JobFrontendRef theJobFrontend)
686 throws IOException {
687 theJobBackend.outputFileWriteResult(theJobFrontend, ffd, exc);
688 }
689
690 public void writeExternal(ObjectOutput out)
691 throws IOException {
692 out.writeInt(ffd);
693 out.writeObject(exc);
694 }
695
696 public void readExternal(ObjectInput in)
697 throws IOException, ClassNotFoundException {
698 ffd = in.readInt();
699 exc = (IOException) in.readObject();
700 }
701 }
702
703
704
705
706
707
708
709
710 private static class OutputFileFlushResultMessage
711 extends JobBackendMessage {
712
713 private static final long serialVersionUID = 8921581871627030306L;
714
715 private int ffd;
716 private IOException exc;
717
718 public OutputFileFlushResultMessage() {
719 }
720
721 public OutputFileFlushResultMessage(JobFrontendRef theJobFrontend,
722 int ffd,
723 IOException exc) {
724 super(Message.FROM_JOB_FRONTEND);
725 this.ffd = ffd;
726 this.exc = exc;
727 }
728
729 public void invoke(JobBackendRef theJobBackend,
730 JobFrontendRef theJobFrontend)
731 throws IOException {
732 theJobBackend.outputFileFlushResult(theJobFrontend, ffd, exc);
733 }
734
735 public void writeExternal(ObjectOutput out)
736 throws IOException {
737 out.writeInt(ffd);
738 out.writeObject(exc);
739 }
740
741 public void readExternal(ObjectInput in)
742 throws IOException, ClassNotFoundException {
743 ffd = in.readInt();
744 exc = (IOException) in.readObject();
745 }
746 }
747
748
749
750
751
752
753
754
755 private static class OutputFileCloseResultMessage
756 extends JobBackendMessage {
757
758 private static final long serialVersionUID = 5976163600237430235L;
759
760 private int ffd;
761 private IOException exc;
762
763 public OutputFileCloseResultMessage() {
764 }
765
766 public OutputFileCloseResultMessage(JobFrontendRef theJobFrontend,
767 int ffd,
768 IOException exc) {
769 super(Message.FROM_JOB_FRONTEND);
770 this.ffd = ffd;
771 this.exc = exc;
772 }
773
774 public void invoke(JobBackendRef theJobBackend,
775 JobFrontendRef theJobFrontend)
776 throws IOException {
777 theJobBackend.outputFileCloseResult(theJobFrontend, ffd, exc);
778 }
779
780 public void writeExternal(ObjectOutput out)
781 throws IOException {
782 out.writeInt(ffd);
783 out.writeObject(exc);
784 }
785
786 public void readExternal(ObjectInput in)
787 throws IOException, ClassNotFoundException {
788 ffd = in.readInt();
789 exc = (IOException) in.readObject();
790 }
791 }
792
793
794
795
796
797
798
799
800 private static class InputFileOpenResultMessage
801 extends JobBackendMessage {
802
803 private static final long serialVersionUID = -1082499193559062581L;
804
805 private int bfd;
806 private int ffd;
807 private IOException exc;
808
809 public InputFileOpenResultMessage() {
810 }
811
812 public InputFileOpenResultMessage(JobFrontendRef theJobFrontend,
813 int bfd,
814 int ffd,
815 IOException exc) {
816 super(Message.FROM_JOB_FRONTEND);
817 this.bfd = bfd;
818 this.ffd = ffd;
819 this.exc = exc;
820 }
821
822 public void invoke(JobBackendRef theJobBackend,
823 JobFrontendRef theJobFrontend)
824 throws IOException {
825 theJobBackend.inputFileOpenResult(theJobFrontend, bfd, ffd, exc);
826 }
827
828 public void writeExternal(ObjectOutput out)
829 throws IOException {
830 out.writeInt(bfd);
831 out.writeInt(ffd);
832 out.writeObject(exc);
833 }
834
835 public void readExternal(ObjectInput in)
836 throws IOException, ClassNotFoundException {
837 bfd = in.readInt();
838 ffd = in.readInt();
839 exc = (IOException) in.readObject();
840 }
841 }
842
843
844
845
846
847
848
849
850 private static class InputFileReadResultMessage
851 extends JobBackendMessage {
852
853 private static final long serialVersionUID = 4542261695089387333L;
854
855 private int ffd;
856 private int len;
857 private IOException exc;
858
859 public InputFileReadResultMessage() {
860 }
861
862 public InputFileReadResultMessage(JobFrontendRef theJobFrontend,
863 int ffd,
864 int len,
865 IOException exc) {
866 super(Message.FROM_JOB_FRONTEND);
867 this.ffd = ffd;
868 this.len = len;
869 this.exc = exc;
870 }
871
872 public void invoke(JobBackendRef theJobBackend,
873 JobFrontendRef theJobFrontend)
874 throws IOException {
875 theJobBackend.inputFileReadResult(theJobFrontend, ffd, null, len, exc);
876 }
877
878 public void writeExternal(ObjectOutput out)
879 throws IOException {
880 out.writeInt(ffd);
881 out.writeInt(len);
882 out.writeObject(exc);
883 }
884
885 public void readExternal(ObjectInput in)
886 throws IOException, ClassNotFoundException {
887 ffd = in.readInt();
888 len = in.readInt();
889 exc = (IOException) in.readObject();
890 }
891 }
892
893
894
895
896
897
898
899
900 private static class InputFileSkipResultMessage
901 extends JobBackendMessage {
902
903 private static final long serialVersionUID = 5050948958179612039L;
904
905 private int ffd;
906 private long len;
907 private IOException exc;
908
909 public InputFileSkipResultMessage() {
910 }
911
912 public InputFileSkipResultMessage(JobFrontendRef theJobFrontend,
913 int ffd,
914 long len,
915 IOException exc) {
916 super(Message.FROM_JOB_FRONTEND);
917 this.ffd = ffd;
918 this.len = len;
919 this.exc = exc;
920 }
921
922 public void invoke(JobBackendRef theJobBackend,
923 JobFrontendRef theJobFrontend)
924 throws IOException {
925 theJobBackend.inputFileSkipResult(theJobFrontend, ffd, len, exc);
926 }
927
928 public void writeExternal(ObjectOutput out)
929 throws IOException {
930 out.writeInt(ffd);
931 out.writeLong(len);
932 out.writeObject(exc);
933 }
934
935 public void readExternal(ObjectInput in)
936 throws IOException, ClassNotFoundException {
937 ffd = in.readInt();
938 len = in.readLong();
939 exc = (IOException) in.readObject();
940 }
941 }
942
943
944
945
946
947
948
949
950 private static class InputFileCloseResultMessage
951 extends JobBackendMessage {
952
953 private static final long serialVersionUID = -5645040374361899565L;
954
955 private int ffd;
956 private IOException exc;
957
958 public InputFileCloseResultMessage() {
959 }
960
961 public InputFileCloseResultMessage(JobFrontendRef theJobFrontend,
962 int ffd,
963 IOException exc) {
964 super(Message.FROM_JOB_FRONTEND);
965 this.ffd = ffd;
966 this.exc = exc;
967 }
968
969 public void invoke(JobBackendRef theJobBackend,
970 JobFrontendRef theJobFrontend)
971 throws IOException {
972 theJobBackend.inputFileCloseResult(theJobFrontend, ffd, exc);
973 }
974
975 public void writeExternal(ObjectOutput out)
976 throws IOException {
977 out.writeInt(ffd);
978 out.writeObject(exc);
979 }
980
981 public void readExternal(ObjectInput in)
982 throws IOException, ClassNotFoundException {
983 ffd = in.readInt();
984 exc = (IOException) in.readObject();
985 }
986 }
987
988 }