View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerLongForLoop.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerLongForLoop
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.io.IOException;
43  
44  import edu.rit.mp.ObjectBuf;
45  import edu.rit.mp.buf.ObjectItemBuf;
46  import edu.rit.util.LongRange;
47  import edu.rit.util.Range;
48  
49  /**
50   * Class WorkerLongForLoop is the abstract base class for one variation of a
51   * worker for loop that is executed inside a {@linkplain WorkerRegion}. The loop
52   * index data type is <code>long</code>. The loop stride is implicit (+1).
53   * <P>
54   * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
55   * create an instance of a concrete subclass of class WorkerLongForLoop; and
56   * pass this instance to the worker region's <code>execute()</code> method. Either
57   * every worker team thread must call the worker region's <code>execute()</code>
58   * method with identical arguments, or every thread must not call the
59   * <code>execute()</code> method. You can do all this using an anonymous inner
60   * class; for example:
61   * <PRE>
62   *     new WorkerRegion()
63   *         {
64   *         . . .
65   *         public void run()
66   *             {
67   *             . . .
68   *             execute (0L, 99L, new WorkerLongForLoop()
69   *                 {
70   *                 // Thread local variable declarations
71   *                 . . .
72   *                 public void start()
73   *                     {
74   *                     // Per-thread pre-loop initialization code
75   *                     . . .
76   *                     }
77   *                 public void run (long first, long last)
78   *                     {
79   *                     // Loop code
80   *                     . . .
81   *                     }
82   *                 public void finish()
83   *                     {
84   *                     // Per-thread post-loop finalization code
85   *                     . . .
86   *                     }
87   *                 });
88   *             }
89   *         . . .
90   *         }
91   * </PRE>
92   * <P>
93   * In each process of a cluster parallel program, the worker team has one or
94   * more worker threads. Every worker thread in every process has a unique worker
95   * tag, going from tag 0 for the first worker thread in the first process to tag
96   * <I>K</I>&minus;1 for the last worker thread in the last process, where
97   * <I>K</I> is the total number of worker threads in all the processes. In
98   * addition, in one process there is a master thread. The worker and master
99   * threads all call the worker region's <code>execute()</code> method to execute the
100  * worker for loop. However, the worker and master threads differ in their
101  * actions.
102  * <P>
103  * The master thread does the following. The master obtains the worker for
104  * loop's schedule as returned by the <code>schedule()</code> method. The range of
105  * loop indexes is divided into "chunks" and the chunks are apportioned among
106  * the workers in accordance with the schedule. The master repeatedly sends
107  * "tasks" to the workers and receives "responses" from the workers. To send a
108  * task to a particular worker, the master (1) sends a message containing the
109  * chunk index range to the worker's process; and (2) calls the worker for
110  * loop's <code>sendTaskInput()</code> method. This method's default implementation
111  * does nothing, but it can be overridden to send additional task input data to
112  * the worker. To receive a response from a particular worker, the master (1)
113  * receives a message containing the chunk index range from the worker's
114  * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
115  * method. This method's default implementation does nothing, but it can be
116  * overridden to receive additional task output data from the worker. Once all
117  * tasks have been sent to the workers and all responses have been received from
118  * the workers, the master returns from the worker region's <code>execute()</code>
119  * method.
120  * <P>
121  * Each worker thread does the following. The worker calls the worker for loop's
122  * <code>start()</code> method once before beginning any loop iterations. The worker
123  * repeatedly receives tasks from the master and sends responses to the master.
124  * To receive a task from the master, the worker (1) receives a message
125  * containing the chunk index range from the master's process; and (2) calls the
126  * worker for loop's <code>receiveTaskInput()</code> method. This method's default
127  * implementation does nothing, but it can be overridden to receive additional
128  * task input data from the master. The worker now calls the worker for loop's
129  * <code>run()</code> method, passing in the chunk index range lower and upper
130  * bounds. When the <code>run()</code> method returns, the worker sends the response
131  * to the master. To send the response, the worker (1) sends a message
132  * containing the chunk index range to the master's process; and (2) calls the
133  * worker for loop's <code>sendTaskOutput()</code> method. This method's default
134  * implementation does nothing, but it can be overridden to send additional task
135  * output data to the master. Once all tasks have been received from the master
136  * and all responses have been sent to the master, the worker calls the worker
137  * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
138  * threads, the workers do <I>not</I> synchronize with each other at a barrier
139  * at this point.) The worker then returns from the worker region's
140  * <code>execute()</code> method.
141  * <P>
142  * If the worker for loop has a fixed schedule (in which there is exactly one
143  * chunk with a predetermined index range for each worker), then the messages
144  * containing the chunk index range are omitted, and each worker gets its chunk
145  * index range directly from the fixed schedule. However, the task input data
146  * (if any) and task output data (if any) are still sent and received.
147  * <P>
148  * Each message described above is sent with a message tag equal to
149  * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
150  * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
151  * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
152  * message tags fall in the range <I>T</I> .. <I>K</I>&minus;1+<I>T</I>, where
153  * <I>K</I> is the total number of workers in all the processes. The program
154  * should not use message tags in this range except to send and receive the
155  * messages described above.
156  * <P>
157  * Note that each worker team thread actually creates its own instance of the
158  * worker for loop class and passes that instance to the worker region's
159  * <code>execute()</code> method. Thus, any fields declared in the worker for loop
160  * class will <I>not</I> be shared by all the workers, but instead will be
161  * private to each worker.
162  * <P>
163  * The <code>start()</code> method is intended for performing per-thread
164  * initialization before starting the loop iterations. If no such initialization
165  * is needed, omit the <code>start()</code> method.
166  * <P>
167  * The <code>run()</code> method contains the code for the loop. The first and last
168  * indexes for a chunk of loop iterations are passed in as arguments. The loop
169  * stride is implicit (+1). The worker for loop's <code>run()</code> method must be
170  * coded this way:
171  * <PRE>
172  *     public void run (long first, long last)
173  *         {
174  *         for (long i = first; i &lt;= last; ++ i)
175  *             {
176  *             // Loop body code
177  *             . . .
178  *             }
179  *         }
180  * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
181  * inclusive and increasing by +1 on each iteration.
182  * <P>
183  * The <code>finish()</code> method is intended for performing per-thread
184  * finalization after finishing the loop iterations. If no such finalization is
185  * needed, omit the <code>finish()</code> method.
186  * <P>
187  * If the worker for loop's <code>start()</code>, <code>run()</code>, or
188  * <code>finish()</code> method throws an exception in one of the worker threads,
189  * then that worker thread executes no further code in the loop, and the worker
190  * region's <code>execute()</code> method throws that same exception in that thread.
191  * However, the other worker threads in the worker team continue to execute.
192  *
193  * @author Alan Kaminsky
194  * @version 27-Jan-2010
195  */
196 public abstract class WorkerLongForLoop
197         extends WorkerForLoop {
198 
199 // Exported constructors.
200     /**
201      * Construct a new worker for loop.
202      */
203     public WorkerLongForLoop() {
204         super();
205     }
206 
207 // Exported operations.
208     /**
209      * Determine this worker for loop's schedule. Called by the master and
210      * worker threads. The schedule determines how the loop iterations are
211      * apportioned among the worker team threads. For further information, see
212      * class {@linkplain LongSchedule}.
213      * <P>
214      * The <code>schedule()</code> method may be overridden in a subclass to return
215      * the desired schedule. If not overridden, the default is a runtime
216      * schedule (see {@link edu.rit.pj.LongSchedule#runtime()}).
217      *
218      * @return Schedule for this worker for loop.
219      */
220     public LongSchedule schedule() {
221         return LongSchedule.runtime();
222     }
223 
224     /**
225      * Perform per-thread initialization actions before starting the loop
226      * iterations. Called by a worker thread.
227      * <P>
228      * The <code>start()</code> method may be overridden in a subclass. If not
229      * overridden, the <code>start()</code> method does nothing.
230      *
231      * @exception Exception The <code>start()</code> method may throw any exception.
232      * @throws java.lang.Exception if any.
233      */
234     public void start()
235             throws Exception {
236     }
237 
238     /**
239      * Send additional input data associated with a task. Called by the master
240      * thread. The task is denoted by the given chunk of loop iterations. The
241      * input data must be sent using the given communicator, to the given worker
242      * process rank, with the given message tag.
243      * <P>
244      * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
245      * not overridden, the <code>sendTaskInput()</code> method does nothing.
246      *
247      * @param range Chunk of loop iterations.
248      * @param comm Communicator.
249      * @param wRank Worker process rank.
250      * @param tag Message tag.
251      * @exception IOException Thrown if an I/O error occurred.
252      * @throws java.io.IOException if any.
253      */
254     public void sendTaskInput(LongRange range,
255             Comm comm,
256             int wRank,
257             int tag)
258             throws IOException {
259     }
260 
261     /**
262      * Receive additional input data associated with a task. Called by a worker
263      * thread. The task is denoted by the given chunk of loop iterations. The
264      * input data must be received using the given communicator, from the given
265      * master process rank, with the given message tag.
266      * <P>
267      * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
268      * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
269      *
270      * @param range Chunk of loop iterations.
271      * @param comm Communicator.
272      * @param mRank Master process rank.
273      * @param tag Message tag.
274      * @exception IOException Thrown if an I/O error occurred.
275      * @throws java.io.IOException if any.
276      */
277     public void receiveTaskInput(LongRange range,
278             Comm comm,
279             int mRank,
280             int tag)
281             throws IOException {
282     }
283 
284     /**
285      * Execute one chunk of iterations of this worker for loop. Called by a
286      * worker thread. The <code>run()</code> method must perform the loop body for
287      * indexes <code>first</code> through <code>last</code> inclusive, increasing the
288      * loop index by +1 after each iteration.
289      * <P>
290      * The <code>run()</code> method must be overridden in a subclass.
291      *
292      * @param first First loop index.
293      * @param last Last loop index.
294      * @exception Exception The <code>run()</code> method may throw any exception.
295      * @throws java.lang.Exception if any.
296      */
297     public abstract void run(long first,
298             long last)
299             throws Exception;
300 
301     /**
302      * Send additional output data associated with a task. Called by a worker
303      * thread. The task is denoted by the given chunk of loop iterations. The
304      * output data must be sent using the given communicator, to the given
305      * master process rank, with the given message tag.
306      * <P>
307      * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
308      * not overridden, the <code>sendTaskOutput()</code> method does nothing.
309      *
310      * @param range Chunk of loop iterations.
311      * @param comm Communicator.
312      * @param mRank Master process rank.
313      * @param tag Message tag.
314      * @exception IOException Thrown if an I/O error occurred.
315      * @throws java.io.IOException if any.
316      */
317     public void sendTaskOutput(LongRange range,
318             Comm comm,
319             int mRank,
320             int tag)
321             throws IOException {
322     }
323 
324     /**
325      * Receive additional output data associated with a task. Called by the
326      * master thread. The task is denoted by the given chunk of loop iterations.
327      * The output data must be received using the given communicator, from the
328      * given worker process rank, with the given message tag.
329      * <P>
330      * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
331      * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
332      *
333      * @param range Chunk of loop iterations.
334      * @param comm Communicator.
335      * @param wRank Worker process rank.
336      * @param tag Message tag.
337      * @exception IOException Thrown if an I/O error occurred.
338      * @throws java.io.IOException if any.
339      */
340     public void receiveTaskOutput(LongRange range,
341             Comm comm,
342             int wRank,
343             int tag)
344             throws IOException {
345     }
346 
347     /**
348      * Perform per-thread finalization actions after finishing the loop
349      * iterations. Called by a worker thread.
350      * <P>
351      * The <code>finish()</code> method may be overridden in a subclass. If not
352      * overridden, the <code>finish()</code> method does nothing.
353      *
354      * @exception Exception The <code>finish()</code> method may throw any
355      * exception.
356      * @throws java.lang.Exception if any.
357      */
358     public void finish()
359             throws Exception {
360     }
361 
362     /**
363      * Returns the tag offset for this worker for loop. Each message between the
364      * master and worker threads is sent with a message tag equal to
365      * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
366      * tag offset.
367      * <P>
368      * The <code>tagOffset()</code> method may be overridden in a subclass. If not
369      * overridden, the <code>tagOffset()</code> returns a default tag offset of
370      * <code>Integer.MIN_VALUE</code>.
371      *
372      * @return Tag offset.
373      */
374     public int tagOffset() {
375         return Integer.MIN_VALUE;
376     }
377 
378 // Hidden operations.
379     /**
380      * Execute this worker for loop in the master thread.
381      *
382      * @param range Loop index range.
383      *
384      * @exception IOException Thrown if an I/O error occurred.
385      */
386     void masterExecute(LongRange range)
387             throws IOException {
388         LongSchedule sch = schedule();
389         if (sch.isFixedSchedule()) {
390             masterExecuteFixed(range, sch);
391         } else {
392             masterExecuteNonFixed(range, sch);
393         }
394     }
395 
396     /**
397      * Execute this worker for loop in the master thread with a fixed schedule.
398      *
399      * @param range Loop index range.
400      * @param sch Schedule.
401      *
402      * @exception IOException Thrown if an I/O error occurred.
403      */
404     void masterExecuteFixed(LongRange range,
405             LongSchedule sch)
406             throws IOException {
407         int count = myTeam.count;
408         Comm comm = myTeam.comm;
409 
410         // Send additional task input to each worker.
411         sch.start(count, range);
412         for (int w = 0; w < count; ++w) {
413             LongRange chunk = sch.next(w);
414             if (chunk != null) {
415                 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
416             }
417         }
418 
419         // Receive additional task output from each worker.
420         sch.start(count, range);
421         for (int w = 0; w < count; ++w) {
422             LongRange chunk = sch.next(w);
423             if (chunk != null) {
424                 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
425             }
426         }
427     }
428 
429     /**
430      * Execute this worker for loop in the master thread with a non-fixed
431      * schedule.
432      *
433      * @param range Loop index range.
434      * @param sch Schedule.
435      *
436      * @exception IOException Thrown if an I/O error occurred.
437      */
438     void masterExecuteNonFixed(LongRange range,
439             LongSchedule sch)
440             throws IOException {
441         int count = myTeam.count;
442         sch.start(count, range);
443         int remaining = count;
444         ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
445         Range tagRange = new Range(tagFor(0), tagFor(count - 1));
446         Comm comm = myTeam.comm;
447 
448         // Send initial task to each worker.
449         for (int w = 0; w < count; ++w) {
450             LongRange chunk = sch.next(w);
451             buf.item = chunk;
452             buf.reset();
453             int r = myTeam.workerRank(w);
454             int tag = tagFor(w);
455             comm.send(r, tag, buf);
456             if (chunk == null) {
457                 --remaining;
458             } else {
459                 sendTaskInput(chunk, comm, r, tag);
460             }
461         }
462 
463         // Repeatedly receive a response from a worker and send next task to
464         // that worker.
465         while (remaining > 0) {
466             CommStatus status = comm.receive(null, tagRange, buf);
467             LongRange chunk = buf.item;
468             int r = status.fromRank;
469             int tag = status.tag;
470             int w = workerFor(tag);
471             receiveTaskOutput(chunk, comm, r, tag);
472             chunk = sch.next(w);
473             buf.item = chunk;
474             buf.reset();
475             comm.send(r, tag, buf);
476             if (chunk == null) {
477                 --remaining;
478             } else {
479                 sendTaskInput(chunk, comm, r, tag);
480             }
481         }
482     }
483 
484     /**
485      * Execute this worker for loop in a worker thread.
486      *
487      * @param w Worker index.
488      * @param range Loop index range.
489      *
490      * @exception Exception This method may throw any exception.
491      */
492     void workerExecute(int w,
493             LongRange range)
494             throws Exception {
495         LongSchedule sch = schedule();
496         if (sch.isFixedSchedule()) {
497             sch.start(myTeam.count, range);
498             workerExecuteFixed(sch.next(w), w);
499         } else {
500             workerExecuteNonFixed(w);
501         }
502     }
503 
504     /**
505      * Execute this worker for loop in a worker thread using a fixed schedule.
506      *
507      * @param range Chunk of loop iterations.
508      * @param w Worker index.
509      *
510      * @exception Exception This method may throw any exception.
511      */
512     void workerExecuteFixed(LongRange range,
513             int w)
514             throws Exception {
515         start();
516         if (range != null) {
517             Comm comm = myTeam.comm;
518             int r = myTeam.masterRank();
519             int tag = tagFor(w);
520             receiveTaskInput(range, comm, r, tag);
521             run(range.lb(), range.ub());
522             sendTaskOutput(range, comm, r, tag);
523         }
524         finish();
525     }
526 
527     /**
528      * Execute this worker for loop in a worker thread using a non-fixed
529      * schedule.
530      *
531      * @param w Worker index.
532      *
533      * @exception Exception This method may throw any exception.
534      */
535     void workerExecuteNonFixed(int w)
536             throws Exception {
537         Comm comm = myTeam.comm;
538         int r = myTeam.masterRank();
539         int tag = tagFor(w);
540         start();
541         ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
542         for (;;) {
543             comm.receive(r, tag, buf);
544             LongRange range = buf.item;
545             if (range == null) {
546                 break;
547             }
548             receiveTaskInput(range, comm, r, tag);
549             run(range.lb(), range.ub());
550 
551             // The next two statements constitute a critical section; other
552             // workers in this team must not send messages in between these two
553             // messages, or the master can deadlock.
554             synchronized (myTeam) {
555                 comm.send(r, tag, buf);
556                 sendTaskOutput(range, comm, r, tag);
557             }
558         }
559         finish();
560     }
561 
562     /**
563      * Returns the message tag for the given worker index.
564      *
565      * @param w Worker index.
566      *
567      * @return Message tag.
568      */
569     private int tagFor(int w) {
570         return w + tagOffset();
571     }
572 
573     /**
574      * Returns the worker index for the given message tag.
575      *
576      * @param tag Message tag.
577      *
578      * @return Worker index.
579      */
580     private int workerFor(int tag) {
581         return tag - tagOffset();
582     }
583 
584 }