View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerIteration.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerIteration
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.io.IOException;
43  
44  import edu.rit.mp.ObjectBuf;
45  import edu.rit.mp.buf.ObjectItemBuf;
46  import edu.rit.util.Range;
47  
48  /**
49   * Class WorkerIteration is the abstract base class for a worker iteration that
50   * is executed inside a {@linkplain WorkerRegion}. The worker iteration lets you
51   * iterate over a group of items, with a separate worker team thread processing
52   * each item. The generic type parameter T specifies the items' data type. The
53   * items can be the elements of an array, the items obtained from an {@linkplain
54   * java.util.Iterator Iterator}, or the items contained in an {@linkplain
55   * java.lang.Iterable Iterable} collection.
56   * <P>
57   * To execute a worker iteration, create a {@linkplain WorkerRegion} object;
58   * create an instance of a concrete subclass of class WorkerIteration; and pass
59   * this instance to the worker region's <code>execute()</code> method. Either every
60   * worker team thread must call the worker region's <code>execute()</code> method
61   * with identical arguments, or every thread must not call the
62   * <code>execute()</code> method. You can do all this using an anonymous inner
63   * class; for example:
64   * <PRE>
65   *     new WorkerRegion()
66   *         {
67   *         ArrayList&lt;String&gt; list = new ArrayList&lt;String&gt;();
68   *         . . .
69   *         public void run()
70   *             {
71   *             . . .
72   *             execute (list, new WorkerIteration&lt;String&gt;()
73   *                 {
74   *                 // Thread local variable declarations
75   *                 . . .
76   *                 public void start()
77   *                     {
78   *                     // Per-thread pre-loop initialization code
79   *                     . . .
80   *                     }
81   *                 public void run (String item)
82   *                     {
83   *                     // Loop code
84   *                     . . .
85   *                     }
86   *                 public void finish()
87   *                     {
88   *                     // Per-thread post-loop finalization code
89   *                     . . .
90   *                     }
91   *                 });
92   *             }
93   *         . . .
94   *         }
95   * </PRE>
96   * <P>
97   * In each process of a cluster parallel program, the worker team has one or
98   * more worker threads. Every worker thread in every process has a unique worker
99   * index, going from index 0 for the first worker thread in the first process to
100  * index <I>K</I>&minus;1 for the last worker thread in the last process, where
101  * <I>K</I> is the total number of worker threads in all the processes. In
102  * addition, in one process there is a master thread. The worker and master
103  * threads all call the worker region's <code>execute()</code> method to execute the
104  * worker for loop. However, the worker and master threads differ in their
105  * actions.
106  * <P>
107  * The master thread does the following. The master sets up the source of items
108  * to be iterated over -- either an array's elements, an iterator's items, or an
109  * iterable collection's contents. The master repeatedly sends "tasks" to the
110  * workers and receives "responses" from the workers. To send a task to a
111  * particular worker, the master (1) sends a message containing the next item to
112  * the worker's process; and (2) calls the worker iteration's
113  * <code>sendTaskInput()</code> method. This method's default implementation does
114  * nothing, but it can be overridden to send additional task input data to the
115  * worker. To receive a response from a particular worker, the master (1)
116  * receives a message from the worker's process containing the item that was
117  * processed (whose state may have changed); and (2) calls the worker
118  * iteration's <code>receiveTaskOutput()</code> method. This method's default
119  * implementation does nothing, but it can be overridden to receive additional
120  * task output data from the worker. Once all tasks have been sent to the
121  * workers and all responses have been received from the workers, the master
122  * returns from the worker region's <code>execute()</code> method.
123  * <P>
124  * Each worker thread does the following. The worker calls the worker
125  * iteration's <code>start()</code> method once before processing any items. The
126  * worker repeatedly receives tasks from the master and sends responses to the
127  * master. To receive a task from the master, the worker (1) receives a message
128  * containing the next item from the master's process; and (2) calls the worker
129  * iteration's <code>receiveTaskInput()</code> method. This method's default
130  * implementation does nothing, but it can be overridden to receive additional
131  * task input data from the master. The worker now calls the worker iteration's
132  * <code>run()</code> method, passing in the item to be processed. When the
133  * <code>run()</code> method returns, the worker sends the response to the master.
134  * To send the response, the worker (1) sends a message to the master's process
135  * containing the item that was processed (whose state may have changed); and
136  * (2) calls the worker iteration's <code>sendTaskOutput()</code> method. This
137  * method's default implementation does nothing, but it can be overridden to
138  * send additional task output data to the master. Once all tasks have been
139  * received from the master and all responses have been sent to the master, the
140  * worker calls the worker iteration's <code>finish()</code> method. (Unlike a
141  * {@linkplain ParallelIteration}'s threads, the workers do <I>not</I>
142  * synchronize with each other at a barrier at this point.) The worker then
143  * returns from the worker region's <code>execute()</code> method.
144  * <P>
145  * Each message described above is sent with a message tag equal to
146  * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
147  * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
148  * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
149  * message tags fall in the range <I>T</I> .. <I>K</I>&minus;1+<I>T</I>, where
150  * <I>K</I> is the total number of workers in all the processes. The program
151  * should not use message tags in this range except to send and receive the
152  * messages described above.
153  * <P>
154  * Note that each worker team thread actually creates its own instance of the
155  * worker iteration class and passes that instance to the worker region's
156  * <code>execute()</code> method. Thus, any fields declared in the worker iteration
157  * class will <I>not</I> be shared by all the workers, but instead will be
158  * private to each worker.
159  * <P>
160  * The <code>start()</code> method is intended for performing per-thread
161  * initialization before starting the loop iterations. If no such initialization
162  * is needed, omit the <code>start()</code> method.
163  * <P>
164  * The <code>run()</code> method contains the code for the loop body. It does
165  * whatever processing is needed on the one item passed in as an argument. Note
166  * that, unlike a worker for loop (class {@linkplain WorkerForLoop}), a worker
167  * iteration is not "chunked;" each worker team thread always processes just one
168  * item at a time.
169  * <P>
170  * The <code>finish()</code> method is intended for performing per-thread
171  * finalization after finishing the loop iterations. If no such finalization is
172  * needed, omit the <code>finish()</code> method.
173  * <P>
174  * If the worker iteration's <code>start()</code>, <code>run()</code>, or
175  * <code>finish()</code> method throws an exception in one of the worker threads,
176  * then that worker thread executes no further code in the loop, and the worker
177  * region's <code>execute()</code> method throws that same exception in that thread.
178  * However, the other worker threads in the worker team continue to execute.
179  *
180  * @param <T> Data type of the items iterated over.
181  * @author Alan Kaminsky
182  * @version 07-Oct-2010
183  */
184 public abstract class WorkerIteration<T>
185         extends WorkerConstruct {
186 
187 // Exported constructors.
188     /**
189      * Construct a new worker iteration.
190      */
191     public WorkerIteration() {
192         super();
193     }
194 
195 // Exported operations.
196     /**
197      * Perform per-thread initialization actions before starting the loop
198      * iterations. Called by a worker thread.
199      * <P>
200      * The <code>start()</code> method may be overridden in a subclass. If not
201      * overridden, the <code>start()</code> method does nothing.
202      *
203      * @exception Exception The <code>start()</code> method may throw any exception.
204      * @throws java.lang.Exception if any.
205      */
206     public void start()
207             throws Exception {
208     }
209 
210     /**
211      * Send additional input data associated with a task. Called by the master
212      * thread. The task is denoted by the given item to be processed. The input
213      * data must be sent using the given communicator, to the given worker
214      * process rank, with the given message tag.
215      * <P>
216      * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
217      * not overridden, the <code>sendTaskInput()</code> method does nothing.
218      *
219      * @param item Item to be processed.
220      * @param comm Communicator.
221      * @param wRank Worker process rank.
222      * @param tag Message tag.
223      * @exception IOException Thrown if an I/O error occurred.
224      * @throws java.io.IOException if any.
225      */
226     public void sendTaskInput(T item,
227             Comm comm,
228             int wRank,
229             int tag)
230             throws IOException {
231     }
232 
233     /**
234      * Receive input data associated with a task. Called by a worker thread. The
235      * task is denoted by the given item to be processed. The input data must be
236      * received using the given communicator, from the given master process
237      * rank, with the given message tag.
238      * <P>
239      * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
240      * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
241      *
242      * @param item Item to be processed.
243      * @param comm Communicator.
244      * @param mRank Master process rank.
245      * @param tag Message tag.
246      * @exception IOException Thrown if an I/O error occurred.
247      * @throws java.io.IOException if any.
248      */
249     public void receiveTaskInput(T item,
250             Comm comm,
251             int mRank,
252             int tag)
253             throws IOException {
254     }
255 
256     /**
257      * Process one item in this worker iteration. The <code>run()</code> method must
258      * perform the loop body for the given item.
259      * <P>
260      * The <code>run()</code> method must be overridden in a subclass.
261      *
262      * @param item Item.
263      * @exception Exception The <code>run()</code> method may throw any exception.
264      * @throws java.lang.Exception if any.
265      */
266     public abstract void run(T item)
267             throws Exception;
268 
269     /**
270      * Send additional output data associated with a task. Called by a worker
271      * thread. The task is denoted by the given item that was processed (whose
272      * state may have changed during processing). The output data must be sent
273      * using the given communicator, to the given master process rank, with the
274      * given message tag.
275      * <P>
276      * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
277      * not overridden, the <code>sendTaskOutput()</code> method does nothing.
278      *
279      * @param item Item that was processed.
280      * @param comm Communicator.
281      * @param mRank Master process rank.
282      * @param tag Message tag.
283      * @exception IOException Thrown if an I/O error occurred.
284      * @throws java.io.IOException if any.
285      */
286     public void sendTaskOutput(T item,
287             Comm comm,
288             int mRank,
289             int tag)
290             throws IOException {
291     }
292 
293     /**
294      * Receive additional output data associated with a task. Called by the
295      * master thread. The task is denoted by the given item that was processed
296      * (whose state may have changed during processing). The output data must be
297      * received using the given communicator, from the given worker process
298      * rank, with the given message tag.
299      * <P>
300      * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
301      * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
302      *
303      * @param item Item that was processed.
304      * @param comm Communicator.
305      * @param wRank Worker process rank.
306      * @param tag Message tag.
307      * @exception IOException Thrown if an I/O error occurred.
308      * @throws java.io.IOException if any.
309      */
310     public void receiveTaskOutput(T item,
311             Comm comm,
312             int wRank,
313             int tag)
314             throws IOException {
315     }
316 
317     /**
318      * Perform per-thread finalization actions after finishing the loop
319      * iterations. Called by a worker thread.
320      * <P>
321      * The <code>finish()</code> method may be overridden in a subclass. If not
322      * overridden, the <code>finish()</code> method does nothing.
323      *
324      * @exception Exception The <code>finish()</code> method may throw any
325      * exception.
326      * @throws java.lang.Exception if any.
327      */
328     public void finish()
329             throws Exception {
330     }
331 
332     /**
333      * Returns the tag offset for this worker for loop. Each message between the
334      * master and worker threads is sent with a message tag equal to
335      * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
336      * tag offset.
337      * <P>
338      * The <code>tagOffset()</code> method may be overridden in a subclass. If not
339      * overridden, the <code>tagOffset()</code> returns a default tag offset of
340      * <code>Integer.MIN_VALUE</code>.
341      *
342      * @return Tag offset.
343      */
344     public int tagOffset() {
345         return Integer.MIN_VALUE;
346     }
347 
348 // Hidden operations.
349     /**
350      * Execute this worker iteration in the master thread.
351      *
352      * @param generator Item generator.
353      *
354      * @exception IOException Thrown if an I/O error occurred.
355      */
356     void masterExecute(ItemGenerator<T> generator)
357             throws IOException {
358         int count = myTeam.count;
359         int remaining = count;
360         ObjectItemBuf<ItemHolder<T>> buf = ObjectBuf.buffer();
361         Range tagRange = new Range(tagFor(0), tagFor(count - 1));
362         Comm comm = myTeam.comm;
363 
364         // Send initial task to each worker.
365         for (int w = 0; w < count; ++w) {
366             ItemHolder<T> holder = generator.nextItem();
367             buf.item = holder;
368             buf.reset();
369             int r = myTeam.workerRank(w);
370             int tag = tagFor(w);
371             comm.send(r, tag, buf);
372             if (holder == null) {
373                 --remaining;
374             } else {
375                 sendTaskInput(holder.myItem, comm, r, tag);
376             }
377         }
378 
379         // Repeatedly receive a response from a worker and send next task to
380         // that worker.
381         while (remaining > 0) {
382             CommStatus status = comm.receive(null, tagRange, buf);
383             ItemHolder<T> holder = buf.item;
384             int r = status.fromRank;
385             int tag = status.tag;
386             int w = workerFor(tag);
387             receiveTaskOutput(holder.myItem, comm, r, tag);
388             holder = generator.nextItem();
389             buf.item = holder;
390             buf.reset();
391             comm.send(r, tag, buf);
392             if (holder == null) {
393                 --remaining;
394             } else {
395                 sendTaskInput(holder.myItem, comm, r, tag);
396             }
397         }
398     }
399 
400     /**
401      * Execute this worker for loop in a worker thread.
402      *
403      * @param w Worker index.
404      *
405      * @exception Exception This method may throw any exception.
406      */
407     void workerExecute(int w)
408             throws Exception {
409         Comm comm = myTeam.comm;
410         int r = myTeam.masterRank();
411         int tag = tagFor(w);
412         start();
413         ObjectItemBuf<ItemHolder<T>> buf = ObjectBuf.buffer();
414         for (;;) {
415             comm.receive(r, tag, buf);
416             ItemHolder<T> holder = buf.item;
417             if (holder == null) {
418                 break;
419             }
420             receiveTaskInput(holder.myItem, comm, r, tag);
421             run(holder.myItem);
422             buf.reset();
423 
424             // The next two statements constitute a critical section; other
425             // workers in this team must not send messages in between these two
426             // messages, or the master can deadlock.
427             synchronized (myTeam) {
428                 comm.send(r, tag, buf);
429                 sendTaskOutput(holder.myItem, comm, r, tag);
430             }
431         }
432         finish();
433     }
434 
435     /**
436      * Returns the message tag for the given worker index.
437      *
438      * @param w Worker index.
439      *
440      * @return Message tag.
441      */
442     private int tagFor(int w) {
443         return w + tagOffset();
444     }
445 
446     /**
447      * Returns the worker index for the given message tag.
448      *
449      * @param tag Message tag.
450      *
451      * @return Worker index.
452      */
453     private int workerFor(int tag) {
454         return tag - tagOffset();
455     }
456 
457 }