View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerIntegerForLoop.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerIntegerForLoop
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.io.IOException;
43  
44  import edu.rit.mp.ObjectBuf;
45  import edu.rit.mp.buf.ObjectItemBuf;
46  import edu.rit.util.Range;
47  
48  /**
49   * Class WorkerIntegerForLoop is the abstract base class for one variation of a
50   * worker for loop that is executed inside a {@linkplain WorkerRegion}. The loop
51   * index data type is <code>int</code>. The loop stride is implicit (+1).
52   * <P>
53   * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
54   * create an instance of a concrete subclass of class WorkerIntegerForLoop; and
55   * pass this instance to the worker region's <code>execute()</code> method. Either
56   * every worker team thread must call the worker region's <code>execute()</code>
57   * method with identical arguments, or every thread must not call the
58   * <code>execute()</code> method. You can do all this using an anonymous inner
59   * class; for example:
60   * <PRE>
61   *     new WorkerRegion()
62   *         {
63   *         . . .
64   *         public void run()
65   *             {
66   *             . . .
67   *             execute (0, 99, new WorkerIntegerForLoop()
68   *                 {
69   *                 // Thread local variable declarations
70   *                 . . .
71   *                 public void start()
72   *                     {
73   *                     // Per-thread pre-loop initialization code
74   *                     . . .
75   *                     }
76   *                 public void run (int first, int last)
77   *                     {
78   *                     // Loop code
79   *                     . . .
80   *                     }
81   *                 public void finish()
82   *                     {
83   *                     // Per-thread post-loop finalization code
84   *                     . . .
85   *                     }
86   *                 });
87   *             }
88   *         . . .
89   *         }
90   * </PRE>
91   * <P>
92   * In each process of a cluster parallel program, the worker team has one or
93   * more worker threads. Every worker thread in every process has a unique worker
94   * index, going from index 0 for the first worker thread in the first process to
95   * index <I>K</I>&minus;1 for the last worker thread in the last process, where
96   * <I>K</I> is the total number of worker threads in all the processes. In
97   * addition, in one process there is a master thread. The worker and master
98   * threads all call the worker region's <code>execute()</code> method to execute the
99   * worker for loop. However, the worker and master threads differ in their
100  * actions.
101  * <P>
102  * The master thread does the following. The master obtains the worker for
103  * loop's schedule as returned by the <code>schedule()</code> method. The range of
104  * loop indexes is divided into "chunks" and the chunks are apportioned among
105  * the workers in accordance with the schedule. The master repeatedly sends
106  * "tasks" to the workers and receives "responses" from the workers. To send a
107  * task to a particular worker, the master (1) sends a message containing the
108  * chunk index range to the worker's process; and (2) calls the worker for
109  * loop's <code>sendTaskInput()</code> method. This method's default implementation
110  * does nothing, but it can be overridden to send additional task input data to
111  * the worker. To receive a response from a particular worker, the master (1)
112  * receives a message containing the chunk index range from the worker's
113  * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
114  * method. This method's default implementation does nothing, but it can be
115  * overridden to receive additional task output data from the worker. Once all
116  * tasks have been sent to the workers and all responses have been received from
117  * the workers, the master returns from the worker region's <code>execute()</code>
118  * method.
119  * <P>
120  * Each worker thread does the following. The worker calls the worker for loop's
121  * <code>start()</code> method once before beginning any loop iterations. The worker
122  * repeatedly receives tasks from the master and sends responses to the master.
123  * To receive a task from the master, the worker (1) receives a message
124  * containing the chunk index range from the master's process; and (2) calls the
125  * worker for loop's <code>receiveTaskInput()</code> method. This method's default
126  * implementation does nothing, but it can be overridden to receive additional
127  * task input data from the master. The worker now calls the worker for loop's
128  * <code>run()</code> method, passing in the chunk index range lower and upper
129  * bounds. When the <code>run()</code> method returns, the worker sends the response
130  * to the master. To send the response, the worker (1) sends a message
131  * containing the chunk index range to the master's process; and (2) calls the
132  * worker for loop's <code>sendTaskOutput()</code> method. This method's default
133  * implementation does nothing, but it can be overridden to send additional task
134  * output data to the master. Once all tasks have been received from the master
135  * and all responses have been sent to the master, the worker calls the worker
136  * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
137  * threads, the workers do <I>not</I> synchronize with each other at a barrier
138  * at this point.) The worker then returns from the worker region's
139  * <code>execute()</code> method.
140  * <P>
141  * If the worker for loop has a fixed schedule (in which there is exactly one
142  * chunk with a predetermined index range for each worker), then the messages
143  * containing the chunk index range are omitted, and each worker gets its chunk
144  * index range directly from the fixed schedule. However, the task input data
145  * (if any) and task output data (if any) are still sent and received.
146  * <P>
147  * Each message described above is sent with a message tag equal to
148  * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
149  * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
150  * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
151  * message tags fall in the range <I>T</I> .. <I>K</I>&minus;1+<I>T</I>, where
152  * <I>K</I> is the total number of workers in all the processes. The program
153  * should not use message tags in this range except to send and receive the
154  * messages described above.
155  * <P>
156  * Note that each worker team thread actually creates its own instance of the
157  * worker for loop class and passes that instance to the worker region's
158  * <code>execute()</code> method. Thus, any fields declared in the worker for loop
159  * class will <I>not</I> be shared by all the workers, but instead will be
160  * private to each worker.
161  * <P>
162  * The <code>start()</code> method is intended for performing per-thread
163  * initialization before starting the loop iterations. If no such initialization
164  * is needed, omit the <code>start()</code> method.
165  * <P>
166  * The <code>run()</code> method contains the code for the loop. The first and last
167  * indexes for a chunk of loop iterations are passed in as arguments. The loop
168  * stride is implicit (+1). The worker for loop's <code>run()</code> method must be
169  * coded this way:
170  * <PRE>
171  *     public void run (int first, int last)
172  *         {
173  *         for (int i = first; i &lt;= last; ++ i)
174  *             {
175  *             // Loop body code
176  *             . . .
177  *             }
178  *         }
179  * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
180  * inclusive and increasing by +1 on each iteration.
181  * <P>
182  * The <code>finish()</code> method is intended for performing per-thread
183  * finalization after finishing the loop iterations. If no such finalization is
184  * needed, omit the <code>finish()</code> method.
185  * <P>
186  * If the worker for loop's <code>start()</code>, <code>run()</code>, or
187  * <code>finish()</code> method throws an exception in one of the worker threads,
188  * then that worker thread executes no further code in the loop, and the worker
189  * region's <code>execute()</code> method throws that same exception in that thread.
190  * However, the other worker threads in the worker team continue to execute.
191  *
192  * @author Alan Kaminsky
193  * @version 27-Jan-2010
194  */
195 public abstract class WorkerIntegerForLoop
196         extends WorkerForLoop {
197 
198 // Exported constructors.
199     /**
200      * Construct a new worker for loop.
201      */
202     public WorkerIntegerForLoop() {
203         super();
204     }
205 
206 // Exported operations.
207     /**
208      * Determine this worker for loop's schedule. Called by the master and
209      * worker threads. The schedule determines how the loop iterations are
210      * apportioned among the worker team threads. For further information, see
211      * class {@linkplain IntegerSchedule}.
212      * <P>
213      * The <code>schedule()</code> method may be overridden in a subclass to return
214      * the desired schedule. If not overridden, the default is a runtime
215      * schedule (see {@link edu.rit.pj.IntegerSchedule#runtime()}).
216      *
217      * @return Schedule for this worker for loop.
218      */
219     public IntegerSchedule schedule() {
220         return IntegerSchedule.runtime();
221     }
222 
223     /**
224      * Perform per-thread initialization actions before starting the loop
225      * iterations. Called by a worker thread.
226      * <P>
227      * The <code>start()</code> method may be overridden in a subclass. If not
228      * overridden, the <code>start()</code> method does nothing.
229      *
230      * @exception Exception The <code>start()</code> method may throw any exception.
231      * @throws java.lang.Exception if any.
232      */
233     public void start()
234             throws Exception {
235     }
236 
237     /**
238      * Send additional input data associated with a task. Called by the master
239      * thread. The task is denoted by the given chunk of loop iterations. The
240      * input data must be sent using the given communicator, to the given worker
241      * process rank, with the given message tag.
242      * <P>
243      * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
244      * not overridden, the <code>sendTaskInput()</code> method does nothing.
245      *
246      * @param range Chunk of loop iterations.
247      * @param comm Communicator.
248      * @param wRank Worker process rank.
249      * @param tag Message tag.
250      * @exception IOException Thrown if an I/O error occurred.
251      * @throws java.io.IOException if any.
252      */
253     public void sendTaskInput(Range range,
254             Comm comm,
255             int wRank,
256             int tag)
257             throws IOException {
258     }
259 
260     /**
261      * Receive additional input data associated with a task. Called by a worker
262      * thread. The task is denoted by the given chunk of loop iterations. The
263      * input data must be received using the given communicator, from the given
264      * master process rank, with the given message tag.
265      * <P>
266      * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
267      * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
268      *
269      * @param range Chunk of loop iterations.
270      * @param comm Communicator.
271      * @param mRank Master process rank.
272      * @param tag Message tag.
273      * @exception IOException Thrown if an I/O error occurred.
274      * @throws java.io.IOException if any.
275      */
276     public void receiveTaskInput(Range range,
277             Comm comm,
278             int mRank,
279             int tag)
280             throws IOException {
281     }
282 
283     /**
284      * Execute one chunk of iterations of this worker for loop. Called by a
285      * worker thread. The <code>run()</code> method must perform the loop body for
286      * indexes <code>first</code> through <code>last</code> inclusive, increasing the
287      * loop index by +1 after each iteration.
288      * <P>
289      * The <code>run()</code> method must be overridden in a subclass.
290      *
291      * @param first First loop index.
292      * @param last Last loop index.
293      * @exception Exception The <code>run()</code> method may throw any exception.
294      * @throws java.lang.Exception if any.
295      */
296     public abstract void run(int first,
297             int last)
298             throws Exception;
299 
300     /**
301      * Send additional output data associated with a task. Called by a worker
302      * thread. The task is denoted by the given chunk of loop iterations. The
303      * output data must be sent using the given communicator, to the given
304      * master process rank, with the given message tag.
305      * <P>
306      * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
307      * not overridden, the <code>sendTaskOutput()</code> method does nothing.
308      *
309      * @param range Chunk of loop iterations.
310      * @param comm Communicator.
311      * @param mRank Master process rank.
312      * @param tag Message tag.
313      * @exception IOException Thrown if an I/O error occurred.
314      * @throws java.io.IOException if any.
315      */
316     public void sendTaskOutput(Range range,
317             Comm comm,
318             int mRank,
319             int tag)
320             throws IOException {
321     }
322 
323     /**
324      * Receive additional output data associated with a task. Called by the
325      * master thread. The task is denoted by the given chunk of loop iterations.
326      * The output data must be received using the given communicator, from the
327      * given worker process rank, with the given message tag.
328      * <P>
329      * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
330      * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
331      *
332      * @param range Chunk of loop iterations.
333      * @param comm Communicator.
334      * @param wRank Worker process rank.
335      * @param tag Message tag.
336      * @exception IOException Thrown if an I/O error occurred.
337      * @throws java.io.IOException if any.
338      */
339     public void receiveTaskOutput(Range range,
340             Comm comm,
341             int wRank,
342             int tag)
343             throws IOException {
344     }
345 
346     /**
347      * Perform per-thread finalization actions after finishing the loop
348      * iterations. Called by a worker thread.
349      * <P>
350      * The <code>finish()</code> method may be overridden in a subclass. If not
351      * overridden, the <code>finish()</code> method does nothing.
352      *
353      * @exception Exception The <code>finish()</code> method may throw any
354      * exception.
355      * @throws java.lang.Exception if any.
356      */
357     public void finish()
358             throws Exception {
359     }
360 
361     /**
362      * Returns the tag offset for this worker for loop. Each message between the
363      * master and worker threads is sent with a message tag equal to
364      * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
365      * tag offset.
366      * <P>
367      * The <code>tagOffset()</code> method may be overridden in a subclass. If not
368      * overridden, the <code>tagOffset()</code> returns a default tag offset of
369      * <code>Integer.MIN_VALUE</code>.
370      *
371      * @return Tag offset.
372      */
373     public int tagOffset() {
374         return Integer.MIN_VALUE;
375     }
376 
377 // Hidden operations.
378     /**
379      * Execute this worker for loop in the master thread.
380      *
381      * @param range Loop index range.
382      *
383      * @exception IOException Thrown if an I/O error occurred.
384      */
385     void masterExecute(Range range)
386             throws IOException {
387         IntegerSchedule sch = schedule();
388         if (sch.isFixedSchedule()) {
389             masterExecuteFixed(range, sch);
390         } else {
391             masterExecuteNonFixed(range, sch);
392         }
393     }
394 
395     /**
396      * Execute this worker for loop in the master thread with a fixed schedule.
397      *
398      * @param range Loop index range.
399      * @param sch Schedule.
400      *
401      * @exception IOException Thrown if an I/O error occurred.
402      */
403     void masterExecuteFixed(Range range,
404             IntegerSchedule sch)
405             throws IOException {
406         int count = myTeam.count;
407         Comm comm = myTeam.comm;
408 
409         // Send additional task input to each worker.
410         sch.start(count, range);
411         for (int w = 0; w < count; ++w) {
412             Range chunk = sch.next(w);
413             if (chunk != null) {
414                 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
415             }
416         }
417 
418         // Receive additional task output from each worker.
419         sch.start(count, range);
420         for (int w = 0; w < count; ++w) {
421             Range chunk = sch.next(w);
422             if (chunk != null) {
423                 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
424             }
425         }
426     }
427 
428     /**
429      * Execute this worker for loop in the master thread with a non-fixed
430      * schedule.
431      *
432      * @param range Loop index range.
433      * @param sch Schedule.
434      *
435      * @exception IOException Thrown if an I/O error occurred.
436      */
437     void masterExecuteNonFixed(Range range,
438             IntegerSchedule sch)
439             throws IOException {
440         int count = myTeam.count;
441         sch.start(count, range);
442         int remaining = count;
443         ObjectItemBuf<Range> buf = ObjectBuf.buffer();
444         Range tagRange = new Range(tagFor(0), tagFor(count - 1));
445         Comm comm = myTeam.comm;
446 
447         // Send initial task to each worker.
448         for (int w = 0; w < count; ++w) {
449             Range chunk = sch.next(w);
450             buf.item = chunk;
451             buf.reset();
452             int r = myTeam.workerRank(w);
453             int tag = tagFor(w);
454             comm.send(r, tag, buf);
455             if (chunk == null) {
456                 --remaining;
457             } else {
458                 sendTaskInput(chunk, comm, r, tag);
459             }
460         }
461 
462         // Repeatedly receive a response from a worker and send next task to
463         // that worker.
464         while (remaining > 0) {
465             CommStatus status = comm.receive(null, tagRange, buf);
466             Range chunk = buf.item;
467             int r = status.fromRank;
468             int tag = status.tag;
469             int w = workerFor(tag);
470             receiveTaskOutput(chunk, comm, r, tag);
471             chunk = sch.next(w);
472             buf.item = chunk;
473             buf.reset();
474             comm.send(r, tag, buf);
475             if (chunk == null) {
476                 --remaining;
477             } else {
478                 sendTaskInput(chunk, comm, r, tag);
479             }
480         }
481     }
482 
483     /**
484      * Execute this worker for loop in a worker thread.
485      *
486      * @param w Worker index.
487      * @param range Loop index range.
488      *
489      * @exception Exception This method may throw any exception.
490      */
491     void workerExecute(int w,
492             Range range)
493             throws Exception {
494         IntegerSchedule sch = schedule();
495         if (sch.isFixedSchedule()) {
496             sch.start(myTeam.count, range);
497             workerExecuteFixed(sch.next(w), w);
498         } else {
499             workerExecuteNonFixed(w);
500         }
501     }
502 
503     /**
504      * Execute this worker for loop in a worker thread using a fixed schedule.
505      *
506      * @param range Chunk of loop iterations.
507      * @param w Worker index.
508      *
509      * @exception Exception This method may throw any exception.
510      */
511     void workerExecuteFixed(Range range,
512             int w)
513             throws Exception {
514         start();
515         if (range != null) {
516             Comm comm = myTeam.comm;
517             int r = myTeam.masterRank();
518             int tag = tagFor(w);
519             receiveTaskInput(range, comm, r, tag);
520             run(range.lb(), range.ub());
521             sendTaskOutput(range, comm, r, tag);
522         }
523         finish();
524     }
525 
526     /**
527      * Execute this worker for loop in a worker thread using a non-fixed
528      * schedule.
529      *
530      * @param w Worker index.
531      *
532      * @exception Exception This method may throw any exception.
533      */
534     void workerExecuteNonFixed(int w)
535             throws Exception {
536         Comm comm = myTeam.comm;
537         int r = myTeam.masterRank();
538         int tag = tagFor(w);
539         start();
540         ObjectItemBuf<Range> buf = ObjectBuf.buffer();
541         for (;;) {
542             comm.receive(r, tag, buf);
543             Range range = buf.item;
544             if (range == null) {
545                 break;
546             }
547             receiveTaskInput(range, comm, r, tag);
548             run(range.lb(), range.ub());
549 
550             // The next two statements constitute a critical section; other
551             // workers in this team must not send messages in between these two
552             // messages, or the master can deadlock.
553             synchronized (myTeam) {
554                 comm.send(r, tag, buf);
555                 sendTaskOutput(range, comm, r, tag);
556             }
557         }
558         finish();
559     }
560 
561     /**
562      * Returns the message tag for the given worker index.
563      *
564      * @param w Worker index.
565      *
566      * @return Message tag.
567      */
568     private int tagFor(int w) {
569         return w + tagOffset();
570     }
571 
572     /**
573      * Returns the worker index for the given message tag.
574      *
575      * @param tag Message tag.
576      *
577      * @return Worker index.
578      */
579     private int workerFor(int tag) {
580         return tag - tagOffset();
581     }
582 
583 }