View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerLongStrideForLoop.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerLongStrideForLoop
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.io.IOException;
43  
44  import edu.rit.mp.ObjectBuf;
45  import edu.rit.mp.buf.ObjectItemBuf;
46  import edu.rit.util.LongRange;
47  import edu.rit.util.Range;
48  
49  /**
50   * Class WorkerLongStrideForLoop is the abstract base class for one variation of
51   * a worker for loop that is executed inside a {@linkplain WorkerRegion}. The
52   * loop index data type is <code>long</code>. The loop stride is explicitly
53   * specified.
54   * <P>
55   * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
56   * create an instance of a concrete subclass of class WorkerLongStrideForLoop;
57   * and pass this instance to the worker region's <code>execute()</code> method.
58   * Either every worker team thread must call the worker region's
59   * <code>execute()</code> method with identical arguments, or every thread must not
60   * call the <code>execute()</code> method. You can do all this using an anonymous
61   * inner class; for example:
62   * <PRE>
63   *     new WorkerRegion()
64   *         {
65   *         . . .
66   *         public void run()
67   *             {
68   *             . . .
69   *             execute (0L, 98L, 2L, new WorkerLongStrideForLoop()
70   *                 {
71   *                 // Thread local variable declarations
72   *                 . . .
73   *                 public void start()
74   *                     {
75   *                     // Per-thread pre-loop initialization code
76   *                     . . .
77   *                     }
78   *                 public void run (long first, long last, long stride)
79   *                     {
80   *                     // Loop code
81   *                     . . .
82   *                     }
83   *                 public void finish()
84   *                     {
85   *                     // Per-thread post-loop finalization code
86   *                     . . .
87   *                     }
88   *                 });
89   *             }
90   *         . . .
91   *         }
92   * </PRE>
93   * <P>
94   * In each process of a cluster parallel program, the worker team has one or
95   * more worker threads. Every worker thread in every process has a unique worker
96   * tag, going from tag 0 for the first worker thread in the first process to tag
97   * <I>K</I>&minus;1 for the last worker thread in the last process, where
98   * <I>K</I> is the total number of worker threads in all the processes. In
99   * addition, in one process there is a master thread. The worker and master
100  * threads all call the worker region's <code>execute()</code> method to execute the
101  * worker for loop. However, the worker and master threads differ in their
102  * actions.
103  * <P>
104  * The master thread does the following. The master obtains the worker for
105  * loop's schedule as returned by the <code>schedule()</code> method. The range of
106  * loop indexes is divided into "chunks" and the chunks are apportioned among
107  * the workers in accordance with the schedule. The master repeatedly sends
108  * "tasks" to the workers and receives "responses" from the workers. To send a
109  * task to a particular worker, the master (1) sends a message containing the
110  * chunk index range to the worker's process; and (2) calls the worker for
111  * loop's <code>sendTaskInput()</code> method. This method's default implementation
112  * does nothing, but it can be overridden to send additional task input data to
113  * the worker. To receive a response from a particular worker, the master (1)
114  * receives a message containing the chunk index range from the worker's
115  * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
116  * method. This method's default implementation does nothing, but it can be
117  * overridden to receive additional task output data from the worker. Once all
118  * tasks have been sent to the workers and all responses have been received from
119  * the workers, the master returns from the worker region's <code>execute()</code>
120  * method.
121  * <P>
122  * Each worker thread does the following. The worker calls the worker for loop's
123  * <code>start()</code> method once before beginning any loop iterations. The worker
124  * repeatedly receives tasks from the master and sends responses to the master.
125  * To receive a task from the master, the worker (1) receives a message
126  * containing the chunk index range from the master's process; and (2) calls the
127  * worker for loop's <code>receiveTaskInput()</code> method. This method's default
128  * implementation does nothing, but it can be overridden to receive additional
129  * task input data from the master. The worker now calls the worker for loop's
130  * <code>run()</code> method, passing in the chunk index range lower and upper
131  * bounds. When the <code>run()</code> method returns, the worker sends the response
132  * to the master. To send the response, the worker (1) sends a message
133  * containing the chunk index range to the master's process; and (2) calls the
134  * worker for loop's <code>sendTaskOutput()</code> method. This method's default
135  * implementation does nothing, but it can be overridden to send additional task
136  * output data to the master. Once all tasks have been received from the master
137  * and all responses have been sent to the master, the worker calls the worker
138  * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
139  * threads, the workers do <I>not</I> synchronize with each other at a barrier
140  * at this point.) The worker then returns from the worker region's
141  * <code>execute()</code> method.
142  * <P>
143  * If the worker for loop has a fixed schedule (in which there is exactly one
144  * chunk with a predetermined index range for each worker), then the messages
145  * containing the chunk index range are omitted, and each worker gets its chunk
146  * index range directly from the fixed schedule. However, the task input data
147  * (if any) and task output data (if any) are still sent and received.
148  * <P>
149  * Each message described above is sent with a message tag equal to
150  * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
151  * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
152  * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
153  * message tags fall in the range <I>T</I> .. <I>K</I>&minus;1+<I>T</I>, where
154  * <I>K</I> is the total number of workers in all the processes. The program
155  * should not use message tags in this range except to send and receive the
156  * messages described above.
157  * <P>
158  * Note that each worker team thread actually creates its own instance of the
159  * worker for loop class and passes that instance to the worker region's
160  * <code>execute()</code> method. Thus, any fields declared in the worker for loop
161  * class will <I>not</I> be shared by all the workers, but instead will be
162  * private to each worker.
163  * <P>
164  * The <code>start()</code> method is intended for performing per-thread
165  * initialization before starting the loop iterations. If no such initialization
166  * is needed, omit the <code>start()</code> method.
167  * <P>
168  * The <code>run()</code> method contains the code for the loop. The first and last
169  * indexes for a chunk of loop iterations are passed in as arguments. The loop
170  * stride, which is always positive, is also explicitly specified as an
171  * argument. The worker for loop's <code>run()</code> method must be coded this way:
172  * <PRE>
173  *     public void run (long first, long last, long stride)
174  *         {
175  *         for (long i = first; i &lt;= last; i += stride)
176  *             {
177  *             // Loop body code
178  *             . . .
179  *             }
180  *         }
181  * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
182  * inclusive and increasing by <code>stride</code> on each iteration.
183  * <P>
184  * The <code>finish()</code> method is intended for performing per-thread
185  * finalization after finishing the loop iterations. If no such finalization is
186  * needed, omit the <code>finish()</code> method.
187  * <P>
188  * If the worker for loop's <code>start()</code>, <code>run()</code>, or
189  * <code>finish()</code> method throws an exception in one of the worker threads,
190  * then that worker thread executes no further code in the loop, and the worker
191  * region's <code>execute()</code> method throws that same exception in that thread.
192  * However, the other worker threads in the worker team continue to execute.
193  *
194  * @author Alan Kaminsky
195  * @version 27-Jan-2010
196  */
197 public abstract class WorkerLongStrideForLoop
198         extends WorkerForLoop {
199 
200 // Exported constructors.
201     /**
202      * Construct a new worker for loop.
203      */
204     public WorkerLongStrideForLoop() {
205         super();
206     }
207 
208 // Exported operations.
209     /**
210      * Determine this worker for loop's schedule. Called by the master and
211      * worker threads. The schedule determines how the loop iterations are
212      * apportioned among the worker team threads. For further information, see
213      * class {@linkplain LongSchedule}.
214      * <P>
215      * The <code>schedule()</code> method may be overridden in a subclass to return
216      * the desired schedule. If not overridden, the default is a runtime
217      * schedule (see {@link edu.rit.pj.LongSchedule#runtime()}).
218      *
219      * @return Schedule for this worker for loop.
220      */
221     public LongSchedule schedule() {
222         return LongSchedule.runtime();
223     }
224 
225     /**
226      * Perform per-thread initialization actions before starting the loop
227      * iterations. Called by a worker thread.
228      * <P>
229      * The <code>start()</code> method may be overridden in a subclass. If not
230      * overridden, the <code>start()</code> method does nothing.
231      *
232      * @exception Exception The <code>start()</code> method may throw any exception.
233      * @throws java.lang.Exception if any.
234      */
235     public void start()
236             throws Exception {
237     }
238 
239     /**
240      * Send additional input data associated with a task. Called by the master
241      * thread. The task is denoted by the given chunk of loop iterations. The
242      * input data must be sent using the given communicator, to the given worker
243      * process rank, with the given message tag.
244      * <P>
245      * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
246      * not overridden, the <code>sendTaskInput()</code> method does nothing.
247      *
248      * @param range Chunk of loop iterations.
249      * @param comm Communicator.
250      * @param wRank Worker process rank.
251      * @param tag Message tag.
252      * @exception IOException Thrown if an I/O error occurred.
253      * @throws java.io.IOException if any.
254      */
255     public void sendTaskInput(LongRange range,
256             Comm comm,
257             int wRank,
258             int tag)
259             throws IOException {
260     }
261 
262     /**
263      * Receive additional input data associated with a task. Called by a worker
264      * thread. The task is denoted by the given chunk of loop iterations. The
265      * input data must be received using the given communicator, from the given
266      * master process rank, with the given message tag.
267      * <P>
268      * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
269      * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
270      *
271      * @param range Chunk of loop iterations.
272      * @param comm Communicator.
273      * @param mRank Master process rank.
274      * @param tag Message tag.
275      * @exception IOException Thrown if an I/O error occurred.
276      * @throws java.io.IOException if any.
277      */
278     public void receiveTaskInput(LongRange range,
279             Comm comm,
280             int mRank,
281             int tag)
282             throws IOException {
283     }
284 
285     /**
286      * Execute one chunk of iterations of this worker for loop. Called by a
287      * worker thread. The <code>run()</code> method must perform the loop body for
288      * indexes <code>first</code> through <code>last</code> inclusive, increasing the
289      * loop index by <code>stride</code> after each iteration.
290      * <P>
291      * The <code>run()</code> method must be overridden in a subclass.
292      *
293      * @param first First loop index.
294      * @param last Last loop index.
295      * @param stride Loop index stride, always positive.
296      * @exception Exception The <code>run()</code> method may throw any exception.
297      * @throws java.lang.Exception if any.
298      */
299     public abstract void run(long first,
300             long last,
301             long stride)
302             throws Exception;
303 
304     /**
305      * Send additional output data associated with a task. Called by a worker
306      * thread. The task is denoted by the given chunk of loop iterations. The
307      * output data must be sent using the given communicator, to the given
308      * master process rank, with the given message tag.
309      * <P>
310      * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
311      * not overridden, the <code>sendTaskOutput()</code> method does nothing.
312      *
313      * @param range Chunk of loop iterations.
314      * @param comm Communicator.
315      * @param mRank Master process rank.
316      * @param tag Message tag.
317      * @exception IOException Thrown if an I/O error occurred.
318      * @throws java.io.IOException if any.
319      */
320     public void sendTaskOutput(LongRange range,
321             Comm comm,
322             int mRank,
323             int tag)
324             throws IOException {
325     }
326 
327     /**
328      * Receive additional output data associated with a task. Called by the
329      * master thread. The task is denoted by the given chunk of loop iterations.
330      * The output data must be received using the given communicator, from the
331      * given worker process rank, with the given message tag.
332      * <P>
333      * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
334      * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
335      *
336      * @param range Chunk of loop iterations.
337      * @param comm Communicator.
338      * @param wRank Worker process rank.
339      * @param tag Message tag.
340      * @exception IOException Thrown if an I/O error occurred.
341      * @throws java.io.IOException if any.
342      */
343     public void receiveTaskOutput(LongRange range,
344             Comm comm,
345             int wRank,
346             int tag)
347             throws IOException {
348     }
349 
350     /**
351      * Perform per-thread finalization actions after finishing the loop
352      * iterations. Called by a worker thread.
353      * <P>
354      * The <code>finish()</code> method may be overridden in a subclass. If not
355      * overridden, the <code>finish()</code> method does nothing.
356      *
357      * @exception Exception The <code>finish()</code> method may throw any
358      * exception.
359      * @throws java.lang.Exception if any.
360      */
361     public void finish()
362             throws Exception {
363     }
364 
365     /**
366      * Returns the tag offset for this worker for loop. Each message between the
367      * master and worker threads is sent with a message tag equal to
368      * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
369      * tag offset.
370      * <P>
371      * The <code>tagOffset()</code> method may be overridden in a subclass. If not
372      * overridden, the <code>tagOffset()</code> returns a default tag offset of
373      * <code>Integer.MIN_VALUE</code>.
374      *
375      * @return Tag offset.
376      */
377     public int tagOffset() {
378         return Integer.MIN_VALUE;
379     }
380 
381 // Hidden operations.
382     /**
383      * Execute this worker for loop in the master thread.
384      *
385      * @param range Loop index range.
386      *
387      * @exception IOException Thrown if an I/O error occurred.
388      */
389     void masterExecute(LongRange range)
390             throws IOException {
391         LongSchedule sch = schedule();
392         if (sch.isFixedSchedule()) {
393             masterExecuteFixed(range, sch);
394         } else {
395             masterExecuteNonFixed(range, sch);
396         }
397     }
398 
399     /**
400      * Execute this worker for loop in the master thread with a fixed schedule.
401      *
402      * @param range Loop index range.
403      * @param sch Schedule.
404      *
405      * @exception IOException Thrown if an I/O error occurred.
406      */
407     void masterExecuteFixed(LongRange range,
408             LongSchedule sch)
409             throws IOException {
410         int count = myTeam.count;
411         Comm comm = myTeam.comm;
412 
413         // Send additional task input to each worker.
414         sch.start(count, range);
415         for (int w = 0; w < count; ++w) {
416             LongRange chunk = sch.next(w);
417             if (chunk != null) {
418                 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
419             }
420         }
421 
422         // Receive additional task output from each worker.
423         sch.start(count, range);
424         for (int w = 0; w < count; ++w) {
425             LongRange chunk = sch.next(w);
426             if (chunk != null) {
427                 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
428             }
429         }
430     }
431 
432     /**
433      * Execute this worker for loop in the master thread with a non-fixed
434      * schedule.
435      *
436      * @param range Loop index range.
437      * @param sch Schedule.
438      *
439      * @exception IOException Thrown if an I/O error occurred.
440      */
441     void masterExecuteNonFixed(LongRange range,
442             LongSchedule sch)
443             throws IOException {
444         int count = myTeam.count;
445         sch.start(count, range);
446         int remaining = count;
447         ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
448         Range tagRange = new Range(tagFor(0), tagFor(count - 1));
449         Comm comm = myTeam.comm;
450 
451         // Send initial task to each worker.
452         for (int w = 0; w < count; ++w) {
453             LongRange chunk = sch.next(w);
454             buf.item = chunk;
455             buf.reset();
456             int r = myTeam.workerRank(w);
457             int tag = tagFor(w);
458             comm.send(r, tag, buf);
459             if (chunk == null) {
460                 --remaining;
461             } else {
462                 sendTaskInput(chunk, comm, r, tag);
463             }
464         }
465 
466         // Repeatedly receive a response from a worker and send next task to
467         // that worker.
468         while (remaining > 0) {
469             CommStatus status = comm.receive(null, tagRange, buf);
470             LongRange chunk = buf.item;
471             int r = status.fromRank;
472             int tag = status.tag;
473             int w = workerFor(tag);
474             receiveTaskOutput(chunk, comm, r, tag);
475             chunk = sch.next(w);
476             buf.item = chunk;
477             buf.reset();
478             comm.send(r, tag, buf);
479             if (chunk == null) {
480                 --remaining;
481             } else {
482                 sendTaskInput(chunk, comm, r, tag);
483             }
484         }
485     }
486 
487     /**
488      * Execute this worker for loop in a worker thread.
489      *
490      * @param w Worker index.
491      * @param range Loop index range.
492      *
493      * @exception Exception This method may throw any exception.
494      */
495     void workerExecute(int w,
496             LongRange range)
497             throws Exception {
498         LongSchedule sch = schedule();
499         if (sch.isFixedSchedule()) {
500             sch.start(myTeam.count, range);
501             workerExecuteFixed(sch.next(w), w);
502         } else {
503             workerExecuteNonFixed(w);
504         }
505     }
506 
507     /**
508      * Execute this worker for loop in a worker thread using a fixed schedule.
509      *
510      * @param range Chunk of loop iterations.
511      * @param w Worker index.
512      *
513      * @exception Exception This method may throw any exception.
514      */
515     void workerExecuteFixed(LongRange range,
516             int w)
517             throws Exception {
518         start();
519         if (range != null) {
520             Comm comm = myTeam.comm;
521             int r = myTeam.masterRank();
522             int tag = tagFor(w);
523             receiveTaskInput(range, comm, r, tag);
524             run(range.lb(), range.ub(), range.stride());
525             sendTaskOutput(range, comm, r, tag);
526         }
527         finish();
528     }
529 
530     /**
531      * Execute this worker for loop in a worker thread using a non-fixed
532      * schedule.
533      *
534      * @param w Worker index.
535      *
536      * @exception Exception This method may throw any exception.
537      */
538     void workerExecuteNonFixed(int w)
539             throws Exception {
540         Comm comm = myTeam.comm;
541         int r = myTeam.masterRank();
542         int tag = tagFor(w);
543         start();
544         ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
545         for (;;) {
546             comm.receive(r, tag, buf);
547             LongRange range = buf.item;
548             if (range == null) {
549                 break;
550             }
551             receiveTaskInput(range, comm, r, tag);
552             run(range.lb(), range.ub(), range.stride());
553 
554             // The next two statements constitute a critical section; other
555             // workers in this team must not send messages in between these two
556             // messages, or the master can deadlock.
557             synchronized (myTeam) {
558                 comm.send(r, tag, buf);
559                 sendTaskOutput(range, comm, r, tag);
560             }
561         }
562         finish();
563     }
564 
565     /**
566      * Returns the message tag for the given worker index.
567      *
568      * @param w Worker index.
569      *
570      * @return Message tag.
571      */
572     private int tagFor(int w) {
573         return w + tagOffset();
574     }
575 
576     /**
577      * Returns the worker index for the given message tag.
578      *
579      * @param tag Message tag.
580      *
581      * @return Worker index.
582      */
583     private int workerFor(int tag) {
584         return tag - tagOffset();
585     }
586 
587 }