View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerIntegerStrideForLoop.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerIntegerStrideForLoop
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.io.IOException;
43  
44  import edu.rit.mp.ObjectBuf;
45  import edu.rit.mp.buf.ObjectItemBuf;
46  import edu.rit.util.Range;
47  
48  /**
49   * Class WorkerIntegerStrideForLoop is the abstract base class for one variation
50   * of a worker for loop that is executed inside a {@linkplain WorkerRegion}. The
51   * loop index data type is <code>int</code>. The loop stride is explicitly
52   * specified.
53   * <P>
54   * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
55   * create an instance of a concrete subclass of class
56   * WorkerIntegerStrideForLoop; and pass this instance to the worker region's
57   * <code>execute()</code> method. Either every worker team thread must call the
58   * worker region's <code>execute()</code> method with identical arguments, or every
59   * thread must not call the <code>execute()</code> method. You can do all this using
60   * an anonymous inner class; for example:
61   * <PRE>
62   *     new WorkerRegion()
63   *         {
64   *         . . .
65   *         public void run()
66   *             {
67   *             . . .
68   *             execute (0, 98, 2, new WorkerIntegerStrideForLoop()
69   *                 {
70   *                 // Thread local variable declarations
71   *                 . . .
72   *                 public void start()
73   *                     {
74   *                     // Per-thread pre-loop initialization code
75   *                     . . .
76   *                     }
77   *                 public void run (int first, int last, int stride)
78   *                     {
79   *                     // Loop code
80   *                     . . .
81   *                     }
82   *                 public void finish()
83   *                     {
84   *                     // Per-thread post-loop finalization code
85   *                     . . .
86   *                     }
87   *                 });
88   *             }
89   *         . . .
90   *         }
91   * </PRE>
92   * <P>
93   * In each process of a cluster parallel program, the worker team has one or
94   * more worker threads. Every worker thread in every process has a unique worker
95   * tag, going from tag 0 for the first worker thread in the first process to tag
96   * <I>K</I>&minus;1 for the last worker thread in the last process, where
97   * <I>K</I> is the total number of worker threads in all the processes. In
98   * addition, in one process there is a master thread. The worker and master
99   * threads all call the worker region's <code>execute()</code> method to execute the
100  * worker for loop. However, the worker and master threads differ in their
101  * actions.
102  * <P>
103  * The master thread does the following. The master obtains the worker for
104  * loop's schedule as returned by the <code>schedule()</code> method. The range of
105  * loop indexes is divided into "chunks" and the chunks are apportioned among
106  * the workers in accordance with the schedule. The master repeatedly sends
107  * "tasks" to the workers and receives "responses" from the workers. To send a
108  * task to a particular worker, the master (1) sends a message containing the
109  * chunk index range to the worker's process; and (2) calls the worker for
110  * loop's <code>sendTaskInput()</code> method. This method's default implementation
111  * does nothing, but it can be overridden to send additional task input data to
112  * the worker. To receive a response from a particular worker, the master (1)
113  * receives a message containing the chunk index range from the worker's
114  * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
115  * method. This method's default implementation does nothing, but it can be
116  * overridden to receive additional task output data from the worker. Once all
117  * tasks have been sent to the workers and all responses have been received from
118  * the workers, the master returns from the worker region's <code>execute()</code>
119  * method.
120  * <P>
121  * Each worker thread does the following. The worker calls the worker for loop's
122  * <code>start()</code> method once before beginning any loop iterations. The worker
123  * repeatedly receives tasks from the master and sends responses to the master.
124  * To receive a task from the master, the worker (1) receives a message
125  * containing the chunk index range from the master's process; and (2) calls the
126  * worker for loop's <code>receiveTaskInput()</code> method. This method's default
127  * implementation does nothing, but it can be overridden to receive additional
128  * task input data from the master. The worker now calls the worker for loop's
129  * <code>run()</code> method, passing in the chunk index range lower and upper
130  * bounds. When the <code>run()</code> method returns, the worker sends the response
131  * to the master. To send the response, the worker (1) sends a message
132  * containing the chunk index range to the master's process; and (2) calls the
133  * worker for loop's <code>sendTaskOutput()</code> method. This method's default
134  * implementation does nothing, but it can be overridden to send additional task
135  * output data to the master. Once all tasks have been received from the master
136  * and all responses have been sent to the master, the worker calls the worker
137  * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
138  * threads, the workers do <I>not</I> synchronize with each other at a barrier
139  * at this point.) The worker then returns from the worker region's
140  * <code>execute()</code> method.
141  * <P>
142  * If the worker for loop has a fixed schedule (in which there is exactly one
143  * chunk with a predetermined index range for each worker), then the messages
144  * containing the chunk index range are omitted, and each worker gets its chunk
145  * index range directly from the fixed schedule. However, the task input data
146  * (if any) and task output data (if any) are still sent and received.
147  * <P>
148  * Each message described above is sent with a message tag equal to
149  * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
150  * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
151  * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
152  * message tags fall in the range <I>T</I> .. <I>K</I>&minus;1+<I>T</I>, where
153  * <I>K</I> is the total number of workers in all the processes. The program
154  * should not use message tags in this range except to send and receive the
155  * messages described above.
156  * <P>
157  * Note that each worker team thread actually creates its own instance of the
158  * worker for loop class and passes that instance to the worker region's
159  * <code>execute()</code> method. Thus, any fields declared in the worker for loop
160  * class will <I>not</I> be shared by all the workers, but instead will be
161  * private to each worker.
162  * <P>
163  * The <code>start()</code> method is intended for performing per-thread
164  * initialization before starting the loop iterations. If no such initialization
165  * is needed, omit the <code>start()</code> method.
166  * <P>
167  * The <code>run()</code> method contains the code for the loop. The first and last
168  * indexes for a chunk of loop iterations are passed in as arguments. The loop
169  * stride, which is always positive, is also explicitly specified as an
170  * argument. The worker for loop's <code>run()</code> method must be coded this way:
171  * <PRE>
172  *     public void run (int first, int last, int stride)
173  *         {
174  *         for (int i = first; i &lt;= last; i += stride)
175  *             {
176  *             // Loop body code
177  *             . . .
178  *             }
179  *         }
180  * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
181  * inclusive and increasing by <code>stride</code> on each iteration.
182  * <P>
183  * The <code>finish()</code> method is intended for performing per-thread
184  * finalization after finishing the loop iterations. If no such finalization is
185  * needed, omit the <code>finish()</code> method.
186  * <P>
187  * If the worker for loop's <code>start()</code>, <code>run()</code>, or
188  * <code>finish()</code> method throws an exception in one of the worker threads,
189  * then that worker thread executes no further code in the loop, and the worker
190  * region's <code>execute()</code> method throws that same exception in that thread.
191  * However, the other worker threads in the worker team continue to execute.
192  *
193  * @author Alan Kaminsky
194  * @version 27-Jan-2010
195  */
196 public abstract class WorkerIntegerStrideForLoop
197         extends WorkerForLoop {
198 
199 // Exported constructors.
200     /**
201      * Construct a new worker for loop.
202      */
203     public WorkerIntegerStrideForLoop() {
204         super();
205     }
206 
207 // Exported operations.
208     /**
209      * Determine this worker for loop's schedule. Called by the master and
210      * worker threads. The schedule determines how the loop iterations are
211      * apportioned among the worker team threads. For further information, see
212      * class {@linkplain IntegerSchedule}.
213      * <P>
214      * The <code>schedule()</code> method may be overridden in a subclass to return
215      * the desired schedule. If not overridden, the default is a runtime
216      * schedule (see {@link edu.rit.pj.IntegerSchedule#runtime()}).
217      *
218      * @return Schedule for this worker for loop.
219      */
220     public IntegerSchedule schedule() {
221         return IntegerSchedule.runtime();
222     }
223 
224     /**
225      * Perform per-thread initialization actions before starting the loop
226      * iterations. Called by a worker thread.
227      * <P>
228      * The <code>start()</code> method may be overridden in a subclass. If not
229      * overridden, the <code>start()</code> method does nothing.
230      *
231      * @exception Exception The <code>start()</code> method may throw any exception.
232      * @throws java.lang.Exception if any.
233      */
234     public void start()
235             throws Exception {
236     }
237 
238     /**
239      * Send additional input data associated with a task. Called by the master
240      * thread. The task is denoted by the given chunk of loop iterations. The
241      * input data must be sent using the given communicator, to the given worker
242      * process rank, with the given message tag.
243      * <P>
244      * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
245      * not overridden, the <code>sendTaskInput()</code> method does nothing.
246      *
247      * @param range Chunk of loop iterations.
248      * @param comm Communicator.
249      * @param wRank Worker process rank.
250      * @param tag Message tag.
251      * @exception IOException Thrown if an I/O error occurred.
252      * @throws java.io.IOException if any.
253      */
254     public void sendTaskInput(Range range,
255             Comm comm,
256             int wRank,
257             int tag)
258             throws IOException {
259     }
260 
261     /**
262      * Receive additional input data associated with a task. Called by a worker
263      * thread. The task is denoted by the given chunk of loop iterations. The
264      * input data must be received using the given communicator, from the given
265      * master process rank, with the given message tag.
266      * <P>
267      * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
268      * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
269      *
270      * @param range Chunk of loop iterations.
271      * @param comm Communicator.
272      * @param mRank Master process rank.
273      * @param tag Message tag.
274      * @exception IOException Thrown if an I/O error occurred.
275      * @throws java.io.IOException if any.
276      */
277     public void receiveTaskInput(Range range,
278             Comm comm,
279             int mRank,
280             int tag)
281             throws IOException {
282     }
283 
284     /**
285      * Execute one chunk of iterations of this worker for loop. Called by a
286      * worker thread. The <code>run()</code> method must perform the loop body for
287      * indexes <code>first</code> through <code>last</code> inclusive, increasing the
288      * loop index by <code>stride</code> after each iteration.
289      * <P>
290      * The <code>run()</code> method must be overridden in a subclass.
291      *
292      * @param first First loop index.
293      * @param last Last loop index.
294      * @param stride Loop index stride, always positive.
295      * @exception Exception The <code>run()</code> method may throw any exception.
296      * @throws java.lang.Exception if any.
297      */
298     public abstract void run(int first,
299             int last,
300             int stride)
301             throws Exception;
302 
303     /**
304      * Send additional output data associated with a task. Called by a worker
305      * thread. The task is denoted by the given chunk of loop iterations. The
306      * output data must be sent using the given communicator, to the given
307      * master process rank, with the given message tag.
308      * <P>
309      * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
310      * not overridden, the <code>sendTaskOutput()</code> method does nothing.
311      *
312      * @param range Chunk of loop iterations.
313      * @param comm Communicator.
314      * @param mRank Master process rank.
315      * @param tag Message tag.
316      * @exception IOException Thrown if an I/O error occurred.
317      * @throws java.io.IOException if any.
318      */
319     public void sendTaskOutput(Range range,
320             Comm comm,
321             int mRank,
322             int tag)
323             throws IOException {
324     }
325 
326     /**
327      * Receive additional output data associated with a task. Called by the
328      * master thread. The task is denoted by the given chunk of loop iterations.
329      * The output data must be received using the given communicator, from the
330      * given worker process rank, with the given message tag.
331      * <P>
332      * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
333      * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
334      *
335      * @param range Chunk of loop iterations.
336      * @param comm Communicator.
337      * @param wRank Worker process rank.
338      * @param tag Message tag.
339      * @exception IOException Thrown if an I/O error occurred.
340      * @throws java.io.IOException if any.
341      */
342     public void receiveTaskOutput(Range range,
343             Comm comm,
344             int wRank,
345             int tag)
346             throws IOException {
347     }
348 
349     /**
350      * Perform per-thread finalization actions after finishing the loop
351      * iterations. Called by a worker thread.
352      * <P>
353      * The <code>finish()</code> method may be overridden in a subclass. If not
354      * overridden, the <code>finish()</code> method does nothing.
355      *
356      * @exception Exception The <code>finish()</code> method may throw any
357      * exception.
358      * @throws java.lang.Exception if any.
359      */
360     public void finish()
361             throws Exception {
362     }
363 
364     /**
365      * Returns the tag offset for this worker for loop. Each message between the
366      * master and worker threads is sent with a message tag equal to
367      * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
368      * tag offset.
369      * <P>
370      * The <code>tagOffset()</code> method may be overridden in a subclass. If not
371      * overridden, the <code>tagOffset()</code> returns a default tag offset of
372      * <code>Integer.MIN_VALUE</code>.
373      *
374      * @return Tag offset.
375      */
376     public int tagOffset() {
377         return Integer.MIN_VALUE;
378     }
379 
380 // Hidden operations.
381     /**
382      * Execute this worker for loop in the master thread.
383      *
384      * @param range Loop index range.
385      *
386      * @exception IOException Thrown if an I/O error occurred.
387      */
388     void masterExecute(Range range)
389             throws IOException {
390         IntegerSchedule sch = schedule();
391         if (sch.isFixedSchedule()) {
392             masterExecuteFixed(range, sch);
393         } else {
394             masterExecuteNonFixed(range, sch);
395         }
396     }
397 
398     /**
399      * Execute this worker for loop in the master thread with a fixed schedule.
400      *
401      * @param range Loop index range.
402      * @param sch Schedule.
403      *
404      * @exception IOException Thrown if an I/O error occurred.
405      */
406     void masterExecuteFixed(Range range,
407             IntegerSchedule sch)
408             throws IOException {
409         int count = myTeam.count;
410         Comm comm = myTeam.comm;
411 
412         // Send additional task input to each worker.
413         sch.start(count, range);
414         for (int w = 0; w < count; ++w) {
415             Range chunk = sch.next(w);
416             if (chunk != null) {
417                 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
418             }
419         }
420 
421         // Receive additional task output from each worker.
422         sch.start(count, range);
423         for (int w = 0; w < count; ++w) {
424             Range chunk = sch.next(w);
425             if (chunk != null) {
426                 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
427             }
428         }
429     }
430 
431     /**
432      * Execute this worker for loop in the master thread with a non-fixed
433      * schedule.
434      *
435      * @param range Loop index range.
436      * @param sch Schedule.
437      *
438      * @exception IOException Thrown if an I/O error occurred.
439      */
440     void masterExecuteNonFixed(Range range,
441             IntegerSchedule sch)
442             throws IOException {
443         int count = myTeam.count;
444         sch.start(count, range);
445         int remaining = count;
446         ObjectItemBuf<Range> buf = ObjectBuf.buffer();
447         Range tagRange = new Range(tagFor(0), tagFor(count - 1));
448         Comm comm = myTeam.comm;
449 
450         // Send initial task to each worker.
451         for (int w = 0; w < count; ++w) {
452             Range chunk = sch.next(w);
453             buf.item = chunk;
454             buf.reset();
455             int r = myTeam.workerRank(w);
456             int tag = tagFor(w);
457             comm.send(r, tag, buf);
458             if (chunk == null) {
459                 --remaining;
460             } else {
461                 sendTaskInput(chunk, comm, r, tag);
462             }
463         }
464 
465         // Repeatedly receive a response from a worker and send next task to
466         // that worker.
467         while (remaining > 0) {
468             CommStatus status = comm.receive(null, tagRange, buf);
469             Range chunk = buf.item;
470             int r = status.fromRank;
471             int tag = status.tag;
472             int w = workerFor(tag);
473             receiveTaskOutput(chunk, comm, r, tag);
474             chunk = sch.next(w);
475             buf.item = chunk;
476             buf.reset();
477             comm.send(r, tag, buf);
478             if (chunk == null) {
479                 --remaining;
480             } else {
481                 sendTaskInput(chunk, comm, r, tag);
482             }
483         }
484     }
485 
486     /**
487      * Execute this worker for loop in a worker thread.
488      *
489      * @param tag Worker tag.
490      * @param range Loop index range.
491      *
492      * @exception Exception This method may throw any exception.
493      */
494     void workerExecute(int tag,
495             Range range)
496             throws Exception {
497         IntegerSchedule sch = schedule();
498         if (sch.isFixedSchedule()) {
499             sch.start(myTeam.count, range);
500             workerExecuteFixed(sch.next(tag), tag);
501         } else {
502             workerExecuteNonFixed(tag);
503         }
504     }
505 
506     /**
507      * Execute this worker for loop in a worker thread using a fixed schedule.
508      *
509      * @param range Chunk of loop iterations.
510      * @param w Worker index.
511      *
512      * @exception Exception This method may throw any exception.
513      */
514     void workerExecuteFixed(Range range,
515             int w)
516             throws Exception {
517         start();
518         if (range != null) {
519             Comm comm = myTeam.comm;
520             int r = myTeam.masterRank();
521             int tag = tagFor(w);
522             receiveTaskInput(range, comm, r, tag);
523             run(range.lb(), range.ub(), range.stride());
524             sendTaskOutput(range, comm, r, tag);
525         }
526         finish();
527     }
528 
529     /**
530      * Execute this worker for loop in a worker thread using a non-fixed
531      * schedule.
532      *
533      * @param w Worker index.
534      *
535      * @exception Exception This method may throw any exception.
536      */
537     void workerExecuteNonFixed(int w)
538             throws Exception {
539         Comm comm = myTeam.comm;
540         int r = myTeam.masterRank();
541         int tag = tagFor(w);
542         start();
543         ObjectItemBuf<Range> buf = ObjectBuf.buffer();
544         for (;;) {
545             comm.receive(r, tag, buf);
546             Range range = buf.item;
547             if (range == null) {
548                 break;
549             }
550             receiveTaskInput(range, comm, r, tag);
551             run(range.lb(), range.ub(), range.stride());
552 
553             // The next two statements constitute a critical section; other
554             // workers in this team must not send messages in between these two
555             // messages, or the master can deadlock.
556             synchronized (myTeam) {
557                 comm.send(r, tag, buf);
558                 sendTaskOutput(range, comm, r, tag);
559             }
560         }
561         finish();
562     }
563 
564     /**
565      * Returns the message tag for the given worker index.
566      *
567      * @param w Worker index.
568      *
569      * @return Message tag.
570      */
571     private int tagFor(int w) {
572         return w + tagOffset();
573     }
574 
575     /**
576      * Returns the worker index for the given message tag.
577      *
578      * @param tag Message tag.
579      *
580      * @return Worker index.
581      */
582     private int workerFor(int tag) {
583         return tag - tagOffset();
584     }
585 
586 }