View Javadoc
1   //******************************************************************************
2   //
3   // File:    JobFrontendMessage.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.JobFrontendMessage
6   //
7   // This Java source file is copyright (C) 2012 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import java.io.Externalizable;
43  import java.io.File;
44  import java.io.IOException;
45  import java.io.ObjectInput;
46  import java.io.ObjectOutput;
47  import java.net.InetSocketAddress;
48  
49  /**
50   * Class JobFrontendMessage provides a message sent to a Job Frontend process
51   * (interface {@linkplain JobFrontendRef}) in the PJ cluster middleware.
52   *
53   * @author Alan Kaminsky
54   * @version 20-Jun-2012
55   */
56  public abstract class JobFrontendMessage
57          extends Message
58          implements Externalizable {
59  
60  // Hidden data members.
61      private static final long serialVersionUID = -6601793901631997673L;
62  
63  // Exported constructors.
64      /**
65       * Construct a new job frontend message.
66       */
67      public JobFrontendMessage() {
68      }
69  
70      /**
71       * Construct a new job frontend message with the given message tag.
72       *
73       * @param theTag Message tag to use when sending this message.
74       */
75      public JobFrontendMessage(int theTag) {
76          super(theTag);
77      }
78  
79  // Exported operations.
80      /**
81       * Construct a new "assign backend" message.
82       *
83       * @param theJobScheduler Job Scheduler that is calling this method.
84       * @param name Backend node name.
85       * @param host Host name for SSH remote login.
86       * @param jvm Full pathname of Java Virtual Machine.
87       * @param jvmflags Array of JVM command line flags.
88       * @param classpath Java class path for PJ Library.
89       * @param shellCommand Shell command string.
90       * @param Nt Number of CPUs assigned to the process.
91       * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
92       */
93      public static JobFrontendMessage assignBackend(JobSchedulerRef theJobScheduler,
94              String name,
95              String host,
96              String jvm,
97              String classpath,
98              String[] jvmflags,
99              String shellCommand,
100             int Nt) {
101         return new AssignBackendMessage(theJobScheduler, name, host, jvm, classpath, jvmflags,
102                 shellCommand, Nt);
103     }
104 
105     /**
106      * Construct a new "assign job number" message.
107      *
108      * @param theJobScheduler Job Scheduler that is calling this method.
109      * @param jobnum Job number.
110      * @param pjhost Host name for middleware channel group.
111      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
112      */
113     public static JobFrontendMessage assignJobNumber(JobSchedulerRef theJobScheduler,
114             int jobnum,
115             String pjhost) {
116         return new AssignJobNumberMessage(theJobScheduler, jobnum, pjhost);
117     }
118 
119     /**
120      * Construct a new "cancel job" message.
121      *
122      * @param theJobScheduler Job Scheduler that is calling this method.
123      * @param errmsg Error message string.
124      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
125      */
126     public static JobFrontendMessage cancelJob(JobSchedulerRef theJobScheduler,
127             String errmsg) {
128         return new CancelJobMessage(theJobScheduler, errmsg);
129     }
130 
131     /**
132      * Construct a new "renew lease" message.
133      *
134      * @param theJobScheduler Job Scheduler that is calling this method.
135      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
136      */
137     public static JobFrontendMessage renewLease(JobSchedulerRef theJobScheduler) {
138         return new RenewLeaseMessage(theJobScheduler);
139     }
140 
141     /**
142      * Construct a new "backend finished" message.
143      *
144      * @param theJobBackend Job Backend that is calling this method.
145      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
146      */
147     public static JobFrontendMessage backendFinished(JobBackendRef theJobBackend) {
148         return new BackendFinishedMessage(theJobBackend);
149     }
150 
151     /**
152      * Construct a new "backend ready" message.
153      *
154      * @param theJobBackend Job Backend that is calling this method.
155      * @param rank Rank of the job backend process.
156      * @param middlewareAddress Host/port to which the job backend process is
157      * listening for middleware messages.
158      * @param worldAddress Host/port to which the job backend process is
159      * listening for the world communicator.
160      * @param frontendAddress Host/port to which the job backend process is
161      * listening for the frontend communicator, or null if the frontend
162      * communicator does not exist.
163      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
164      */
165     public static JobFrontendMessage backendReady(JobBackendRef theJobBackend,
166             int rank,
167             InetSocketAddress middlewareAddress,
168             InetSocketAddress worldAddress,
169             InetSocketAddress frontendAddress) {
170         return new BackendReadyMessage(theJobBackend, rank, middlewareAddress,
171                 worldAddress, frontendAddress);
172     }
173 
174     /**
175      * Construct a new "cancel job" message.
176      *
177      * @param theJobBackend Job Backend that is calling this method.
178      * @param errmsg Error message string.
179      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
180      */
181     public static JobFrontendMessage cancelJob(JobBackendRef theJobBackend,
182             String errmsg) {
183         return new CancelJobMessage(theJobBackend, errmsg);
184     }
185 
186     /**
187      * Construct a new "renew lease" message.
188      *
189      * @param theJobBackend Job Backend that is calling this method.
190      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
191      */
192     public static JobFrontendMessage renewLease(JobBackendRef theJobBackend) {
193         return new RenewLeaseMessage(theJobBackend);
194     }
195 
196     /**
197      * Construct a new "request resource" message.
198      *
199      * @param theJobBackend Job Backend that is calling this method.
200      * @param resourceName Resource name.
201      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
202      */
203     public static JobFrontendMessage requestResource(JobBackendRef theJobBackend,
204             String resourceName) {
205         return new RequestResourceMessage(theJobBackend, resourceName);
206     }
207 
208     /**
209      * Construct a new "output file open" message.
210      *
211      * @param theJobBackend Job Backend that is calling this method.
212      * @param bfd Backend file descriptor.
213      * @param file File.
214      * @param append True to append, false to overwrite.
215      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
216      */
217     public static JobFrontendMessage outputFileOpen(JobBackendRef theJobBackend,
218             int bfd,
219             File file,
220             boolean append) {
221         return new OutputFileOpenMessage(theJobBackend, bfd, file, append);
222     }
223 
224     /**
225      * Construct a new "output file write" message.
226      *
227      * @param theJobBackend Job Backend that is calling this method.
228      * @param ffd Frontend file descriptor.
229      * @param len Number of bytes to write.
230      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
231      */
232     public static JobFrontendMessage outputFileWrite(JobBackendRef theJobBackend,
233             int ffd,
234             int len) {
235         return new OutputFileWriteMessage(theJobBackend, ffd, len);
236     }
237 
238     /**
239      * Construct a new "output file flush" message.
240      *
241      * @param theJobBackend Job Backend that is calling this method.
242      * @param ffd Frontend file descriptor.
243      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
244      */
245     public static JobFrontendMessage outputFileFlush(JobBackendRef theJobBackend,
246             int ffd) {
247         return new OutputFileFlushMessage(theJobBackend, ffd);
248     }
249 
250     /**
251      * Construct a new "output file close" message.
252      *
253      * @param theJobBackend Job Backend that is calling this method.
254      * @param ffd Frontend file descriptor.
255      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
256      */
257     public static JobFrontendMessage outputFileClose(JobBackendRef theJobBackend,
258             int ffd) {
259         return new OutputFileCloseMessage(theJobBackend, ffd);
260     }
261 
262     /**
263      * Construct a new "input file open" message.
264      *
265      * @param theJobBackend Job Backend that is calling this method.
266      * @param bfd Backend file descriptor.
267      * @param file File.
268      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
269      */
270     public static JobFrontendMessage inputFileOpen(JobBackendRef theJobBackend,
271             int bfd,
272             File file) {
273         return new InputFileOpenMessage(theJobBackend, bfd, file);
274     }
275 
276     /**
277      * Construct a new "input file read" message.
278      *
279      * @param theJobBackend Job Backend that is calling this method.
280      * @param ffd Frontend file descriptor.
281      * @param len Number of bytes to read.
282      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
283      */
284     public static JobFrontendMessage inputFileRead(JobBackendRef theJobBackend,
285             int ffd,
286             int len) {
287         return new InputFileReadMessage(theJobBackend, ffd, len);
288     }
289 
290     /**
291      * Construct a new "input file skip" message.
292      *
293      * @param theJobBackend Job Backend that is calling this method.
294      * @param ffd Frontend file descriptor.
295      * @param len Number of bytes to skip.
296      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
297      */
298     public static JobFrontendMessage inputFileSkip(JobBackendRef theJobBackend,
299             int ffd,
300             long len) {
301         return new InputFileSkipMessage(theJobBackend, ffd, len);
302     }
303 
304     /**
305      * Construct a new "input file close" message.
306      *
307      * @param theJobBackend Job Backend that is calling this method.
308      * @param ffd Frontend file descriptor.
309      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
310      */
311     public static JobFrontendMessage inputFileClose(JobBackendRef theJobBackend,
312             int ffd) {
313         return new InputFileCloseMessage(theJobBackend, ffd);
314     }
315 
316     /**
317      * Construct a new "report comment" message.
318      *
319      * @param theJobBackend Job backend that is calling this method.
320      * @param rank Process rank.
321      * @param comment Comment string.
322      * @return a {@link edu.rit.pj.cluster.JobFrontendMessage} object.
323      */
324     public static JobFrontendMessage reportComment(JobBackendRef theJobBackend,
325             int rank,
326             String comment) {
327         return new ReportCommentMessage(theJobBackend, rank, comment);
328     }
329 
330     /**
331      * Invoke the method corresponding to this job frontend message on the given
332      * Job Frontend object. The method arguments come from the fields of this
333      * job frontend message object.
334      *
335      * @param theJobFrontend Job Frontend on which to invoke the method.
336      * @param theJobScheduler Job Scheduler that is calling the method.
337      * @exception IOException Thrown if an I/O error occurred.
338      * @throws java.io.IOException if any.
339      */
340     public void invoke(JobFrontendRef theJobFrontend,
341             JobSchedulerRef theJobScheduler)
342             throws IOException {
343         throw new UnsupportedOperationException();
344     }
345 
346     /**
347      * Invoke the method corresponding to this job frontend message on the given
348      * Job Frontend object. The method arguments come from the fields of this
349      * job frontend message object.
350      *
351      * @param theJobFrontend Job Frontend on which to invoke the method.
352      * @param theJobBackend Job Backend that is calling the method.
353      * @exception IOException Thrown if an I/O error occurred.
354      * @throws java.io.IOException if any.
355      */
356     public void invoke(JobFrontendRef theJobFrontend,
357             JobBackendRef theJobBackend)
358             throws IOException {
359         throw new UnsupportedOperationException();
360     }
361 
362     /**
363      * {@inheritDoc}
364      *
365      * Write this job frontend message to the given object output stream.
366      * @exception IOException Thrown if an I/O error occurred.
367      */
368     public void writeExternal(ObjectOutput out)
369             throws IOException {
370     }
371 
372     /**
373      * {@inheritDoc}
374      *
375      * Read this job frontend message from the given object input stream.
376      * @exception IOException Thrown if an I/O error occurred.
377      * @exception ClassNotFoundException Thrown if a class needed to read this
378      * job backend message could not be found.
379      */
380     public void readExternal(ObjectInput in)
381             throws IOException, ClassNotFoundException {
382     }
383 
384 // Hidden subclasses.
385     /**
386      * Class AssignBackendMessage provides the Job Frontend "assign backend"
387      * message in the PJ cluster middleware.
388      *
389      * @author Alan Kaminsky
390      * @version 20-Jun-2008
391      */
392     private static class AssignBackendMessage
393             extends JobFrontendMessage {
394 
395         private static final long serialVersionUID = 4426930548078115598L;
396 
397         private String name;
398         private String host;
399         private String jvm;
400         private String classpath;
401         private String[] jvmflags;
402         private String shellCommand;
403         private int Nt;
404 
405         public AssignBackendMessage() {
406         }
407 
408         public AssignBackendMessage(JobSchedulerRef theJobScheduler,
409                 String name,
410                 String host,
411                 String jvm,
412                 String classpath,
413                 String[] jvmflags,
414                 String shellCommand,
415                 int Nt) {
416             super(Message.FROM_JOB_SCHEDULER);
417             this.name = name;
418             this.host = host;
419             this.jvm = jvm;
420             this.classpath = classpath;
421             this.jvmflags = jvmflags;
422             this.shellCommand = shellCommand;
423             this.Nt = Nt;
424         }
425 
426         public void invoke(JobFrontendRef theJobFrontend,
427                 JobSchedulerRef theJobScheduler)
428                 throws IOException {
429             theJobFrontend.assignBackend(theJobScheduler, name, host, jvm, classpath, jvmflags,
430                     shellCommand, Nt);
431         }
432 
433         public void writeExternal(ObjectOutput out)
434                 throws IOException {
435             out.writeUTF(name);
436             out.writeUTF(host);
437             out.writeUTF(jvm);
438             out.writeUTF(classpath);
439             int n = jvmflags.length;
440             out.writeInt(n);
441             for (int i = 0; i < n; ++i) {
442                 out.writeUTF(jvmflags[i]);
443             }
444             out.writeUTF(shellCommand);
445             out.writeInt(Nt);
446         }
447 
448         public void readExternal(ObjectInput in)
449                 throws IOException {
450             name = in.readUTF();
451             host = in.readUTF();
452             jvm = in.readUTF();
453             classpath = in.readUTF();
454             int n = in.readInt();
455             jvmflags = new String[n];
456             for (int i = 0; i < n; ++i) {
457                 jvmflags[i] = in.readUTF();
458             }
459             shellCommand = in.readUTF();
460             Nt = in.readInt();
461         }
462     }
463 
464     /**
465      * Class AssignJobNumberMessage provides the Job Frontend "assign job
466      * number" message in the PJ cluster middleware.
467      *
468      * @author Alan Kaminsky
469      * @version 12-Oct-2006
470      */
471     private static class AssignJobNumberMessage
472             extends JobFrontendMessage {
473 
474         private static final long serialVersionUID = -197099388467750972L;
475 
476         private int jobnum;
477         private String pjhost;
478 
479         public AssignJobNumberMessage() {
480         }
481 
482         public AssignJobNumberMessage(JobSchedulerRef theJobScheduler,
483                 int jobnum,
484                 String pjhost) {
485             super(Message.FROM_JOB_SCHEDULER);
486             this.jobnum = jobnum;
487             this.pjhost = pjhost;
488         }
489 
490         public void invoke(JobFrontendRef theJobFrontend,
491                 JobSchedulerRef theJobScheduler)
492                 throws IOException {
493             theJobFrontend.assignJobNumber(theJobScheduler, jobnum, pjhost);
494         }
495 
496         public void writeExternal(ObjectOutput out)
497                 throws IOException {
498             out.writeInt(jobnum);
499             out.writeUTF(pjhost);
500         }
501 
502         public void readExternal(ObjectInput in)
503                 throws IOException {
504             jobnum = in.readInt();
505             pjhost = in.readUTF();
506         }
507     }
508 
509     /**
510      * Class BackendFinishedMessage provides the Job Frontend "backend finished"
511      * message in the PJ cluster middleware.
512      *
513      * @author Alan Kaminsky
514      * @version 12-Oct-2006
515      */
516     private static class BackendFinishedMessage
517             extends JobFrontendMessage {
518 
519         private static final long serialVersionUID = 759872032212034107L;
520 
521         public BackendFinishedMessage() {
522         }
523 
524         public BackendFinishedMessage(JobBackendRef theJobBackend) {
525             super(Message.FROM_JOB_BACKEND);
526         }
527 
528         public void invoke(JobFrontendRef theJobFrontend,
529                 JobBackendRef theJobBackend)
530                 throws IOException {
531             theJobFrontend.backendFinished(theJobBackend);
532         }
533     }
534 
535     /**
536      * Class BackendReadyMessage provides the Job Frontend "backend ready"
537      * message in the PJ cluster middleware.
538      *
539      * @author Alan Kaminsky
540      * @version 20-Oct-2006
541      */
542     private static class BackendReadyMessage
543             extends JobFrontendMessage {
544 
545         private static final long serialVersionUID = -8872540352133660209L;
546 
547         private int rank;
548         private InetSocketAddress middlewareAddress;
549         private InetSocketAddress worldAddress;
550         private InetSocketAddress frontendAddress;
551 
552         public BackendReadyMessage() {
553         }
554 
555         public BackendReadyMessage(JobBackendRef theJobBackend,
556                 int rank,
557                 InetSocketAddress middlewareAddress,
558                 InetSocketAddress worldAddress,
559                 InetSocketAddress frontendAddress) {
560             super(Message.FROM_JOB_BACKEND);
561             this.rank = rank;
562             this.middlewareAddress = middlewareAddress;
563             this.worldAddress = worldAddress;
564             this.frontendAddress = frontendAddress;
565         }
566 
567         public void invoke(JobFrontendRef theJobFrontend,
568                 JobBackendRef theJobBackend)
569                 throws IOException {
570             theJobFrontend.backendReady(theJobBackend, rank, middlewareAddress,
571                     worldAddress, frontendAddress);
572         }
573 
574         public void writeExternal(ObjectOutput out)
575                 throws IOException {
576             out.writeInt(rank);
577             out.writeObject(middlewareAddress);
578             out.writeObject(worldAddress);
579             out.writeObject(frontendAddress);
580         }
581 
582         public void readExternal(ObjectInput in)
583                 throws IOException, ClassNotFoundException {
584             rank = in.readInt();
585             middlewareAddress = (InetSocketAddress) in.readObject();
586             worldAddress = (InetSocketAddress) in.readObject();
587             frontendAddress = (InetSocketAddress) in.readObject();
588         }
589     }
590 
591     /**
592      * Class CancelJobMessage provides the Job Frontend "cancel job" message in
593      * the PJ cluster middleware.
594      *
595      * @author Alan Kaminsky
596      * @version 12-Oct-2006
597      */
598     private static class CancelJobMessage
599             extends JobFrontendMessage {
600 
601         private static final long serialVersionUID = -2009079595329812496L;
602 
603         private String errmsg;
604 
605         public CancelJobMessage() {
606         }
607 
608         public CancelJobMessage(JobSchedulerRef theJobScheduler,
609                 String errmsg) {
610             super(Message.FROM_JOB_SCHEDULER);
611             this.errmsg = errmsg;
612         }
613 
614         public CancelJobMessage(JobBackendRef theJobBackend,
615                 String errmsg) {
616             super(Message.FROM_JOB_BACKEND);
617             this.errmsg = errmsg;
618         }
619 
620         public void invoke(JobFrontendRef theJobFrontend,
621                 JobSchedulerRef theJobScheduler)
622                 throws IOException {
623             theJobFrontend.cancelJob(theJobScheduler, errmsg);
624         }
625 
626         public void invoke(JobFrontendRef theJobFrontend,
627                 JobBackendRef theJobBackend)
628                 throws IOException {
629             theJobFrontend.cancelJob(theJobBackend, errmsg);
630         }
631 
632         public void writeExternal(ObjectOutput out)
633                 throws IOException {
634             out.writeUTF(errmsg);
635         }
636 
637         public void readExternal(ObjectInput in)
638                 throws IOException {
639             errmsg = in.readUTF();
640         }
641     }
642 
643     /**
644      * Class RenewLeaseMessage provides the Job Frontend "renew lease" message
645      * in the PJ cluster middleware.
646      *
647      * @author Alan Kaminsky
648      * @version 12-Oct-2006
649      */
650     private static class RenewLeaseMessage
651             extends JobFrontendMessage {
652 
653         private static final long serialVersionUID = -9030468939160436622L;
654 
655         public RenewLeaseMessage() {
656         }
657 
658         public RenewLeaseMessage(JobSchedulerRef theJobScheduler) {
659             super(Message.FROM_JOB_SCHEDULER);
660         }
661 
662         public RenewLeaseMessage(JobBackendRef theJobBackend) {
663             super(Message.FROM_JOB_BACKEND);
664         }
665 
666         public void invoke(JobFrontendRef theJobFrontend,
667                 JobSchedulerRef theJobScheduler)
668                 throws IOException {
669             theJobFrontend.renewLease(theJobScheduler);
670         }
671 
672         public void invoke(JobFrontendRef theJobFrontend,
673                 JobBackendRef theJobBackend)
674                 throws IOException {
675             theJobFrontend.renewLease(theJobBackend);
676         }
677     }
678 
679     /**
680      * Class RequestResourceMessage provides the Job Frontend "request resource"
681      * message in the PJ cluster middleware.
682      *
683      * @author Alan Kaminsky
684      * @version 26-Oct-2006
685      */
686     private static class RequestResourceMessage
687             extends JobFrontendMessage {
688 
689         private static final long serialVersionUID = 9184806604713339263L;
690 
691         private String resourceName;
692 
693         public RequestResourceMessage() {
694         }
695 
696         public RequestResourceMessage(JobBackendRef theJobBackend,
697                 String resourceName) {
698             super(Message.FROM_JOB_BACKEND);
699             this.resourceName = resourceName;
700         }
701 
702         public void invoke(JobFrontendRef theJobFrontend,
703                 JobBackendRef theJobBackend)
704                 throws IOException {
705             theJobFrontend.requestResource(theJobBackend, resourceName);
706         }
707 
708         public void writeExternal(ObjectOutput out)
709                 throws IOException {
710             out.writeUTF(resourceName);
711         }
712 
713         public void readExternal(ObjectInput in)
714                 throws IOException {
715             resourceName = in.readUTF();
716         }
717     }
718 
719     /**
720      * Class OutputFileOpenMessage provides the Job Frontend "output file open"
721      * message in the PJ cluster middleware.
722      *
723      * @author Alan Kaminsky
724      * @version 05-Nov-2006
725      */
726     private static class OutputFileOpenMessage
727             extends JobFrontendMessage {
728 
729         private static final long serialVersionUID = -4987754930269039852L;
730 
731         private int bfd;
732         private File file;
733         private boolean append;
734 
735         public OutputFileOpenMessage() {
736         }
737 
738         public OutputFileOpenMessage(JobBackendRef theJobBackend,
739                 int bfd,
740                 File file,
741                 boolean append) {
742             super(Message.FROM_JOB_BACKEND);
743             this.bfd = bfd;
744             this.file = file;
745             this.append = append;
746         }
747 
748         public void invoke(JobFrontendRef theJobFrontend,
749                 JobBackendRef theJobBackend)
750                 throws IOException {
751             theJobFrontend.outputFileOpen(theJobBackend, bfd, file, append);
752         }
753 
754         public void writeExternal(ObjectOutput out)
755                 throws IOException {
756             out.writeInt(bfd);
757             out.writeBoolean(append);
758             out.writeObject(file);
759         }
760 
761         public void readExternal(ObjectInput in)
762                 throws IOException, ClassNotFoundException {
763             bfd = in.readInt();
764             append = in.readBoolean();
765             file = (File) in.readObject();
766         }
767     }
768 
769     /**
770      * Class OutputFileWriteMessage provides the Job Frontend "output file
771      * write" message in the PJ cluster middleware.
772      *
773      * @author Alan Kaminsky
774      * @version 20-Nov-2006
775      */
776     private static class OutputFileWriteMessage
777             extends JobFrontendMessage {
778 
779         private static final long serialVersionUID = -4460426636308841602L;
780 
781         private int ffd;
782         private int len;
783 
784         public OutputFileWriteMessage() {
785         }
786 
787         public OutputFileWriteMessage(JobBackendRef theJobBackend,
788                 int ffd,
789                 int len) {
790             super(Message.FROM_JOB_BACKEND);
791             this.ffd = ffd;
792             this.len = len;
793         }
794 
795         public void invoke(JobFrontendRef theJobFrontend,
796                 JobBackendRef theJobBackend)
797                 throws IOException {
798             theJobFrontend.outputFileWrite(theJobBackend, ffd, null, 0, len);
799         }
800 
801         public void writeExternal(ObjectOutput out)
802                 throws IOException {
803             out.writeInt(ffd);
804             out.writeInt(len);
805         }
806 
807         public void readExternal(ObjectInput in)
808                 throws IOException {
809             ffd = in.readInt();
810             len = in.readInt();
811         }
812     }
813 
814     /**
815      * Class OutputFileFlushMessage provides the Job Frontend "output file
816      * flush" message in the PJ cluster middleware.
817      *
818      * @author Alan Kaminsky
819      * @version 05-Nov-2006
820      */
821     private static class OutputFileFlushMessage
822             extends JobFrontendMessage {
823 
824         private static final long serialVersionUID = 7074849708663078210L;
825 
826         private int ffd;
827 
828         public OutputFileFlushMessage() {
829         }
830 
831         public OutputFileFlushMessage(JobBackendRef theJobBackend,
832                 int ffd) {
833             super(Message.FROM_JOB_BACKEND);
834             this.ffd = ffd;
835         }
836 
837         public void invoke(JobFrontendRef theJobFrontend,
838                 JobBackendRef theJobBackend)
839                 throws IOException {
840             theJobFrontend.outputFileFlush(theJobBackend, ffd);
841         }
842 
843         public void writeExternal(ObjectOutput out)
844                 throws IOException {
845             out.writeInt(ffd);
846         }
847 
848         public void readExternal(ObjectInput in)
849                 throws IOException {
850             ffd = in.readInt();
851         }
852     }
853 
854     /**
855      * Class OutputFileCloseMessage provides the Job Frontend "output file
856      * close" message in the PJ cluster middleware.
857      *
858      * @author Alan Kaminsky
859      * @version 05-Nov-2006
860      */
861     private static class OutputFileCloseMessage
862             extends JobFrontendMessage {
863 
864         private static final long serialVersionUID = -5637017577338427153L;
865 
866         private int ffd;
867 
868         public OutputFileCloseMessage() {
869         }
870 
871         public OutputFileCloseMessage(JobBackendRef theJobBackend,
872                 int ffd) {
873             super(Message.FROM_JOB_BACKEND);
874             this.ffd = ffd;
875         }
876 
877         public void invoke(JobFrontendRef theJobFrontend,
878                 JobBackendRef theJobBackend)
879                 throws IOException {
880             theJobFrontend.outputFileClose(theJobBackend, ffd);
881         }
882 
883         public void writeExternal(ObjectOutput out)
884                 throws IOException {
885             out.writeInt(ffd);
886         }
887 
888         public void readExternal(ObjectInput in)
889                 throws IOException {
890             ffd = in.readInt();
891         }
892     }
893 
894     /**
895      * Class InputFileOpenMessage provides the Job Frontend "input file open"
896      * message in the PJ cluster middleware.
897      *
898      * @author Alan Kaminsky
899      * @version 05-Nov-2006
900      */
901     private static class InputFileOpenMessage
902             extends JobFrontendMessage {
903 
904         private static final long serialVersionUID = -791306998166025239L;
905 
906         private int bfd;
907         private File file;
908 
909         public InputFileOpenMessage() {
910         }
911 
912         public InputFileOpenMessage(JobBackendRef theJobBackend,
913                 int bfd,
914                 File file) {
915             super(Message.FROM_JOB_BACKEND);
916             this.bfd = bfd;
917             this.file = file;
918         }
919 
920         public void invoke(JobFrontendRef theJobFrontend,
921                 JobBackendRef theJobBackend)
922                 throws IOException {
923             theJobFrontend.inputFileOpen(theJobBackend, bfd, file);
924         }
925 
926         public void writeExternal(ObjectOutput out)
927                 throws IOException {
928             out.writeInt(bfd);
929             out.writeObject(file);
930         }
931 
932         public void readExternal(ObjectInput in)
933                 throws IOException, ClassNotFoundException {
934             bfd = in.readInt();
935             file = (File) in.readObject();
936         }
937     }
938 
939     /**
940      * Class InputFileReadMessage provides the Job Frontend "input file read"
941      * message in the PJ cluster middleware.
942      *
943      * @author Alan Kaminsky
944      * @version 05-Nov-2006
945      */
946     private static class InputFileReadMessage
947             extends JobFrontendMessage {
948 
949         private static final long serialVersionUID = 7727558874589005187L;
950 
951         private int ffd;
952         private int len;
953 
954         public InputFileReadMessage() {
955         }
956 
957         public InputFileReadMessage(JobBackendRef theJobBackend,
958                 int ffd,
959                 int len) {
960             super(Message.FROM_JOB_BACKEND);
961             this.ffd = ffd;
962             this.len = len;
963         }
964 
965         public void invoke(JobFrontendRef theJobFrontend,
966                 JobBackendRef theJobBackend)
967                 throws IOException {
968             theJobFrontend.inputFileRead(theJobBackend, ffd, len);
969         }
970 
971         public void writeExternal(ObjectOutput out)
972                 throws IOException {
973             out.writeInt(ffd);
974             out.writeInt(len);
975         }
976 
977         public void readExternal(ObjectInput in)
978                 throws IOException {
979             ffd = in.readInt();
980             len = in.readInt();
981         }
982     }
983 
984     /**
985      * Class InputFileSkipMessage provides the Job Frontend "input file skip"
986      * message in the PJ cluster middleware.
987      *
988      * @author Alan Kaminsky
989      * @version 05-Nov-2006
990      */
991     private static class InputFileSkipMessage
992             extends JobFrontendMessage {
993 
994         private static final long serialVersionUID = 7867427744015166954L;
995 
996         private int ffd;
997         private long len;
998 
999         public InputFileSkipMessage() {
1000         }
1001 
1002         public InputFileSkipMessage(JobBackendRef theJobBackend,
1003                 int ffd,
1004                 long len) {
1005             super(Message.FROM_JOB_BACKEND);
1006             this.ffd = ffd;
1007             this.len = len;
1008         }
1009 
1010         public void invoke(JobFrontendRef theJobFrontend,
1011                 JobBackendRef theJobBackend)
1012                 throws IOException {
1013             theJobFrontend.inputFileSkip(theJobBackend, ffd, len);
1014         }
1015 
1016         public void writeExternal(ObjectOutput out)
1017                 throws IOException {
1018             out.writeInt(ffd);
1019             out.writeLong(len);
1020         }
1021 
1022         public void readExternal(ObjectInput in)
1023                 throws IOException {
1024             ffd = in.readInt();
1025             len = in.readLong();
1026         }
1027     }
1028 
1029     /**
1030      * Class InputFileCloseMessage provides the Job Frontend "input file close"
1031      * message in the PJ cluster middleware.
1032      *
1033      * @author Alan Kaminsky
1034      * @version 05-Nov-2006
1035      */
1036     private static class InputFileCloseMessage
1037             extends JobFrontendMessage {
1038 
1039         private static final long serialVersionUID = 1223549294646718409L;
1040 
1041         private int ffd;
1042 
1043         public InputFileCloseMessage() {
1044         }
1045 
1046         public InputFileCloseMessage(JobBackendRef theJobBackend,
1047                 int ffd) {
1048             super(Message.FROM_JOB_BACKEND);
1049             this.ffd = ffd;
1050         }
1051 
1052         public void invoke(JobFrontendRef theJobFrontend,
1053                 JobBackendRef theJobBackend)
1054                 throws IOException {
1055             theJobFrontend.inputFileClose(theJobBackend, ffd);
1056         }
1057 
1058         public void writeExternal(ObjectOutput out)
1059                 throws IOException {
1060             out.writeInt(ffd);
1061         }
1062 
1063         public void readExternal(ObjectInput in)
1064                 throws IOException {
1065             ffd = in.readInt();
1066         }
1067     }
1068 
1069     /**
1070      * Class ReportCommentMessage provides the Job Frontend "report comment"
1071      * message in the PJ cluster middleware.
1072      *
1073      * @author Alan Kaminsky
1074      * @version 24-Jan-2012
1075      */
1076     private static class ReportCommentMessage
1077             extends JobFrontendMessage {
1078 
1079         private static final long serialVersionUID = 1092254806177598252L;
1080 
1081         private int rank;
1082         private String comment;
1083 
1084         public ReportCommentMessage() {
1085         }
1086 
1087         public ReportCommentMessage(JobBackendRef theJobBackend,
1088                 int rank,
1089                 String comment) {
1090             super(Message.FROM_JOB_BACKEND);
1091             this.rank = rank;
1092             this.comment = comment == null ? "" : comment;
1093         }
1094 
1095         public void invoke(JobFrontendRef theJobFrontend,
1096                 JobBackendRef theJobBackend)
1097                 throws IOException {
1098             theJobFrontend.reportComment(theJobBackend, rank, comment);
1099         }
1100 
1101         public void writeExternal(ObjectOutput out)
1102                 throws IOException {
1103             out.writeInt(rank);
1104             out.writeUTF(comment);
1105         }
1106 
1107         public void readExternal(ObjectInput in)
1108                 throws IOException {
1109             rank = in.readInt();
1110             comment = in.readUTF();
1111         }
1112     }
1113 
1114 }