View Javadoc
1   //******************************************************************************
2   //
3   // File:    Runner.java
4   // Package: edu.rit.pj.job
5   // Unit:    Class edu.rit.pj.job.Runner
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.job;
41  
42  import java.io.File;
43  import java.io.IOException;
44  import java.io.PrintStream;
45  import java.util.Date;
46  import java.util.HashSet;
47  import java.util.Scanner;
48  
49  import edu.rit.mp.BooleanBuf;
50  import edu.rit.mp.buf.BooleanItemBuf;
51  import edu.rit.pj.Comm;
52  import edu.rit.pj.WorkerIteration;
53  import edu.rit.pj.WorkerRegion;
54  import edu.rit.pj.WorkerTeam;
55  import edu.rit.util.Instance;
56  
57  /**
58   * Class Runner is a parallel program that runs, in parallel, a group of
59   * {@linkplain Job}s created by a {@linkplain JobGenerator}. The job generator
60   * is specified on the command line as a constructor expression. An instance of
61   * the class specified in the constructor expression is constructed, with the
62   * constructor arguments specified in the constructor expression. For further
63   * information, see class {@linkplain edu.rit.util.Instance Instance}.
64   * <P>
65   * The Runner program is targeted at three use cases:
66   * <UL>
67   *
68   * <LI>
69   * <B>Sequential jobs on a cluster parallel computer.</B> Each job is a
70   * sequential (single-threaded) program. The Runner program is running on a
71   * cluster parallel computer with <I>N</I> nodes and one CPU per node. Run the
72   * Runner program as follows:
73   * <PRE>
74   *     java -Dpj.nn=<I>N</I> edu.rit.pj.job.Runner . . .
75   * </PRE> The Runner program runs with one process per node and one thread per
76   * process.
77   *
78   * <LI>
79   * <B>Sequential jobs on a hybrid parallel computer.</B> Each job is a
80   * sequential (single-threaded) program. The Runner program is running on a
81   * hybrid SMP cluster parallel computer with <I>N</I> nodes and <I>C</I> total
82   * CPUs. (For example, on a hybrid parallel computer with 10 nodes and 4 CPUs
83   * per node, <I>C</I> = 40.) Run the Runner program as follows:
84   * <PRE>
85   *     java -Dpj.nn=<I>N</I> -Dpj.np=<I>C</I> edu.rit.pj.job.Runner . . .
86   * </PRE> The Runner program runs with multiple processes per node and one
87   * thread per process.
88   *
89   * <LI>
90   * <B>SMP parallel jobs on a hybrid parallel computer.</B> Each job is an SMP
91   * parallel (multi-threaded) program. The Runner program is running on a hybrid
92   * SMP cluster parallel computer with <I>N</I> nodes and multiple CPUs per node.
93   * Run the Runner program as follows:
94   * <PRE>
95   *     java -Dpj.nn=<I>N</I> edu.rit.pj.job.Runner . . .
96   * </PRE> The Runner program runs with one process per node and multiple threads
97   * per process, typically as many threads as there are CPUs on the node.
98   * </UL>
99   * <P>
100  * All these processes form a <I>worker team.</I> The Runner program uses the
101  * job generator specified on the command line to create jobs and sends each job
102  * to a worker team process to be executed.
103  * <P>
104  * When the Runner program starts, it prints the job generator constructor
105  * expression on the standard output. Whenever a job starts or finishes, the
106  * Runner program prints a log message on the standard output consisting of the
107  * job's number and description.
108  * <P>
109  * <B>Checkpointing.</B> It is recommended to redirect the Runner program's
110  * standard output into a <I>checkpoint file.</I> If a failure occurs before the
111  * Runner program finishes running all the jobs, the checkpoint file contains a
112  * record of the job generator that was used as well as which jobs did and did
113  * not finish. To resume the Runner program where it left off, specify the
114  * checkpoint file name on the command line instead of a job generator
115  * constructor expression. The Runner program reads the checkpoint file to
116  * determine the job generator and the jobs that finished. The Runner program
117  * then generates and runs the jobs that did not finish.
118  * <P>
119  * Usage: java edu.rit.pj.job.Runner { <I>generator</I> | <I>file</I> }
120  * <BR><I>generator</I> = Job generator constructor expression
121  * <BR><I>file</I> = Checkpoint file name
122  *
123  * @author Alan Kaminsky
124  * @version 22-Oct-2010
125  */
126 public class Runner {
127 
128 // Prevent construction.
129     private Runner() {
130     }
131 
132 // Global variables.
133     private static PrintStream stdout = System.out;
134     private static PrintStream stderr = System.err;
135 
136     private static Comm world;
137     private static int rank;
138 
139     private static WorkerTeam team;
140 
141     private static String generatorExpression;
142     private static HashSet<Integer> omitted;
143     private static JobGenerator generator;
144 
145 // Main program.
146     /**
147      * Main program.
148      *
149      * @param args an array of {@link java.lang.String} objects.
150      * @throws java.lang.Exception if any.
151      */
152     public static void main(String[] args)
153             throws Exception {
154         if (args.length != 1) {
155             usage();
156         }
157 
158         // Initialize world communicator.
159         Comm.init(args);
160         world = Comm.world();
161         rank = world.rank();
162 
163         // Set up worker team.
164         team = new WorkerTeam();
165 
166         // Master process sets up job generator.
167         if (rank == team.masterRank()) {
168             omitted = new HashSet<Integer>();
169 
170             // Assume argument is a checkpoint file name and try to read it.
171             Scanner scanner = null;
172             try {
173                 scanner = new Scanner(new File(args[0]));
174             } catch (IOException ignored) {
175             }
176 
177             // Read checkpoint file.
178             if (scanner != null) {
179                 while (scanner.hasNextLine()) {
180                     Scanner linescanner = new Scanner(scanner.nextLine());
181                     String word;
182                     int jobnum;
183                     if (!linescanner.hasNext()) {
184                         continue;
185                     }
186                     word = linescanner.next();
187                     if (!word.equals("***")) {
188                         continue;
189                     }
190                     if (!linescanner.hasNext()) {
191                         continue;
192                     }
193                     word = linescanner.next();
194                     if (word.equals("Generator")) {
195                         if (!linescanner.hasNext()) {
196                             continue;
197                         }
198                         if (generatorExpression == null) {
199                             generatorExpression = linescanner.next();
200                         }
201                     } else if (word.equals("Job")) {
202                         if (!linescanner.hasNextInt()) {
203                             continue;
204                         }
205                         jobnum = linescanner.nextInt();
206                         if (!linescanner.hasNext()) {
207                             continue;
208                         }
209                         word = linescanner.next();
210                         if (word.equals("finished")) {
211                             omitted.add(jobnum);
212                         }
213                     }
214                 }
215                 scanner.close();
216             } // Assume argument is a job generator constructor expression.
217             else {
218                 generatorExpression = args[0];
219             }
220 
221             // Create job generator.
222             if (generatorExpression == null) {
223                 stderr.printf("Runner: No job generator in checkpoint file %s%n",
224                         args[0]);
225             } else {
226                 try {
227                     generator = (JobGenerator) Instance.newInstance(generatorExpression);
228                     stdout.printf("*** Generator %s%n", generatorExpression);
229                     generator.omit(omitted);
230                 } catch (Throwable exc) {
231                     stderr.printf("Runner: Could not create job generator %s%n",
232                             generatorExpression);
233                     exc.printStackTrace(stderr);
234                 }
235             }
236         }
237 
238         // Abort every process if job generator was not created.
239         BooleanItemBuf buf = BooleanBuf.buffer(generator != null);
240         world.broadcast(team.masterRank(), buf);
241         if (!buf.item) {
242             System.exit(1);
243         }
244 
245         // Generate and run jobs.
246         team.execute(new WorkerRegion() {
247             public void run() throws Exception {
248                 execute(generator, new WorkerIteration<Job>() {
249                     public void sendTaskInput(Job job, Comm comm, int wRank, int tag) {
250                         stdout.printf("*** Job %d started %s %s%n",
251                                 job.getJobNumber(),
252                                 new Date(),
253                                 job.getDescription());
254                     }
255 
256                     public void run(Job job) {
257                         job.run();
258                     }
259 
260                     public void receiveTaskOutput(Job job, Comm comm, int wRank, int tag) {
261                         stdout.printf("*** Job %d finished %s%n",
262                                 job.getJobNumber(),
263                                 new Date());
264                     }
265                 });
266             }
267         });
268 
269         if (rank == team.masterRank()) {
270             stdout.printf("*** All jobs finished%n");
271         }
272     }
273 
274 // Hidden operations.
275     /**
276      * Print a usage message and exit.
277      */
278     private static void usage() {
279         stderr.println("Usage: java edu.rit.pj.job.Runner {<generator>|<file>}");
280         stderr.println("<generator> = Job generator constructor expression");
281         stderr.println("<file> = Checkpoint file name");
282         System.exit(1);
283     }
284 
285 }