1 //******************************************************************************
2 //
3 // File: WorkerIteration.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerIteration
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.io.IOException;
43
44 import edu.rit.mp.ObjectBuf;
45 import edu.rit.mp.buf.ObjectItemBuf;
46 import edu.rit.util.Range;
47
48 /**
49 * Class WorkerIteration is the abstract base class for a worker iteration that
50 * is executed inside a {@linkplain WorkerRegion}. The worker iteration lets you
51 * iterate over a group of items, with a separate worker team thread processing
52 * each item. The generic type parameter T specifies the items' data type. The
53 * items can be the elements of an array, the items obtained from an {@linkplain
54 * java.util.Iterator Iterator}, or the items contained in an {@linkplain
55 * java.lang.Iterable Iterable} collection.
56 * <P>
57 * To execute a worker iteration, create a {@linkplain WorkerRegion} object;
58 * create an instance of a concrete subclass of class WorkerIteration; and pass
59 * this instance to the worker region's <code>execute()</code> method. Either every
60 * worker team thread must call the worker region's <code>execute()</code> method
61 * with identical arguments, or every thread must not call the
62 * <code>execute()</code> method. You can do all this using an anonymous inner
63 * class; for example:
64 * <PRE>
65 * new WorkerRegion()
66 * {
67 * ArrayList<String> list = new ArrayList<String>();
68 * . . .
69 * public void run()
70 * {
71 * . . .
72 * execute (list, new WorkerIteration<String>()
73 * {
74 * // Thread local variable declarations
75 * . . .
76 * public void start()
77 * {
78 * // Per-thread pre-loop initialization code
79 * . . .
80 * }
81 * public void run (String item)
82 * {
83 * // Loop code
84 * . . .
85 * }
86 * public void finish()
87 * {
88 * // Per-thread post-loop finalization code
89 * . . .
90 * }
91 * });
92 * }
93 * . . .
94 * }
95 * </PRE>
96 * <P>
97 * In each process of a cluster parallel program, the worker team has one or
98 * more worker threads. Every worker thread in every process has a unique worker
99 * index, going from index 0 for the first worker thread in the first process to
100 * index <I>K</I>−1 for the last worker thread in the last process, where
101 * <I>K</I> is the total number of worker threads in all the processes. In
102 * addition, in one process there is a master thread. The worker and master
103 * threads all call the worker region's <code>execute()</code> method to execute the
104 * worker for loop. However, the worker and master threads differ in their
105 * actions.
106 * <P>
107 * The master thread does the following. The master sets up the source of items
108 * to be iterated over -- either an array's elements, an iterator's items, or an
109 * iterable collection's contents. The master repeatedly sends "tasks" to the
110 * workers and receives "responses" from the workers. To send a task to a
111 * particular worker, the master (1) sends a message containing the next item to
112 * the worker's process; and (2) calls the worker iteration's
113 * <code>sendTaskInput()</code> method. This method's default implementation does
114 * nothing, but it can be overridden to send additional task input data to the
115 * worker. To receive a response from a particular worker, the master (1)
116 * receives a message from the worker's process containing the item that was
117 * processed (whose state may have changed); and (2) calls the worker
118 * iteration's <code>receiveTaskOutput()</code> method. This method's default
119 * implementation does nothing, but it can be overridden to receive additional
120 * task output data from the worker. Once all tasks have been sent to the
121 * workers and all responses have been received from the workers, the master
122 * returns from the worker region's <code>execute()</code> method.
123 * <P>
124 * Each worker thread does the following. The worker calls the worker
125 * iteration's <code>start()</code> method once before processing any items. The
126 * worker repeatedly receives tasks from the master and sends responses to the
127 * master. To receive a task from the master, the worker (1) receives a message
128 * containing the next item from the master's process; and (2) calls the worker
129 * iteration's <code>receiveTaskInput()</code> method. This method's default
130 * implementation does nothing, but it can be overridden to receive additional
131 * task input data from the master. The worker now calls the worker iteration's
132 * <code>run()</code> method, passing in the item to be processed. When the
133 * <code>run()</code> method returns, the worker sends the response to the master.
134 * To send the response, the worker (1) sends a message to the master's process
135 * containing the item that was processed (whose state may have changed); and
136 * (2) calls the worker iteration's <code>sendTaskOutput()</code> method. This
137 * method's default implementation does nothing, but it can be overridden to
138 * send additional task output data to the master. Once all tasks have been
139 * received from the master and all responses have been sent to the master, the
140 * worker calls the worker iteration's <code>finish()</code> method. (Unlike a
141 * {@linkplain ParallelIteration}'s threads, the workers do <I>not</I>
142 * synchronize with each other at a barrier at this point.) The worker then
143 * returns from the worker region's <code>execute()</code> method.
144 * <P>
145 * Each message described above is sent with a message tag equal to
146 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
147 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
148 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
149 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where
150 * <I>K</I> is the total number of workers in all the processes. The program
151 * should not use message tags in this range except to send and receive the
152 * messages described above.
153 * <P>
154 * Note that each worker team thread actually creates its own instance of the
155 * worker iteration class and passes that instance to the worker region's
156 * <code>execute()</code> method. Thus, any fields declared in the worker iteration
157 * class will <I>not</I> be shared by all the workers, but instead will be
158 * private to each worker.
159 * <P>
160 * The <code>start()</code> method is intended for performing per-thread
161 * initialization before starting the loop iterations. If no such initialization
162 * is needed, omit the <code>start()</code> method.
163 * <P>
164 * The <code>run()</code> method contains the code for the loop body. It does
165 * whatever processing is needed on the one item passed in as an argument. Note
166 * that, unlike a worker for loop (class {@linkplain WorkerForLoop}), a worker
167 * iteration is not "chunked;" each worker team thread always processes just one
168 * item at a time.
169 * <P>
170 * The <code>finish()</code> method is intended for performing per-thread
171 * finalization after finishing the loop iterations. If no such finalization is
172 * needed, omit the <code>finish()</code> method.
173 * <P>
174 * If the worker iteration's <code>start()</code>, <code>run()</code>, or
175 * <code>finish()</code> method throws an exception in one of the worker threads,
176 * then that worker thread executes no further code in the loop, and the worker
177 * region's <code>execute()</code> method throws that same exception in that thread.
178 * However, the other worker threads in the worker team continue to execute.
179 *
180 * @param <T> Data type of the items iterated over.
181 * @author Alan Kaminsky
182 * @version 07-Oct-2010
183 */
184 public abstract class WorkerIteration<T>
185 extends WorkerConstruct {
186
187 // Exported constructors.
188 /**
189 * Construct a new worker iteration.
190 */
191 public WorkerIteration() {
192 super();
193 }
194
195 // Exported operations.
196 /**
197 * Perform per-thread initialization actions before starting the loop
198 * iterations. Called by a worker thread.
199 * <P>
200 * The <code>start()</code> method may be overridden in a subclass. If not
201 * overridden, the <code>start()</code> method does nothing.
202 *
203 * @exception Exception The <code>start()</code> method may throw any exception.
204 * @throws java.lang.Exception if any.
205 */
206 public void start()
207 throws Exception {
208 }
209
210 /**
211 * Send additional input data associated with a task. Called by the master
212 * thread. The task is denoted by the given item to be processed. The input
213 * data must be sent using the given communicator, to the given worker
214 * process rank, with the given message tag.
215 * <P>
216 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
217 * not overridden, the <code>sendTaskInput()</code> method does nothing.
218 *
219 * @param item Item to be processed.
220 * @param comm Communicator.
221 * @param wRank Worker process rank.
222 * @param tag Message tag.
223 * @exception IOException Thrown if an I/O error occurred.
224 * @throws java.io.IOException if any.
225 */
226 public void sendTaskInput(T item,
227 Comm comm,
228 int wRank,
229 int tag)
230 throws IOException {
231 }
232
233 /**
234 * Receive input data associated with a task. Called by a worker thread. The
235 * task is denoted by the given item to be processed. The input data must be
236 * received using the given communicator, from the given master process
237 * rank, with the given message tag.
238 * <P>
239 * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
240 * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
241 *
242 * @param item Item to be processed.
243 * @param comm Communicator.
244 * @param mRank Master process rank.
245 * @param tag Message tag.
246 * @exception IOException Thrown if an I/O error occurred.
247 * @throws java.io.IOException if any.
248 */
249 public void receiveTaskInput(T item,
250 Comm comm,
251 int mRank,
252 int tag)
253 throws IOException {
254 }
255
256 /**
257 * Process one item in this worker iteration. The <code>run()</code> method must
258 * perform the loop body for the given item.
259 * <P>
260 * The <code>run()</code> method must be overridden in a subclass.
261 *
262 * @param item Item.
263 * @exception Exception The <code>run()</code> method may throw any exception.
264 * @throws java.lang.Exception if any.
265 */
266 public abstract void run(T item)
267 throws Exception;
268
269 /**
270 * Send additional output data associated with a task. Called by a worker
271 * thread. The task is denoted by the given item that was processed (whose
272 * state may have changed during processing). The output data must be sent
273 * using the given communicator, to the given master process rank, with the
274 * given message tag.
275 * <P>
276 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
277 * not overridden, the <code>sendTaskOutput()</code> method does nothing.
278 *
279 * @param item Item that was processed.
280 * @param comm Communicator.
281 * @param mRank Master process rank.
282 * @param tag Message tag.
283 * @exception IOException Thrown if an I/O error occurred.
284 * @throws java.io.IOException if any.
285 */
286 public void sendTaskOutput(T item,
287 Comm comm,
288 int mRank,
289 int tag)
290 throws IOException {
291 }
292
293 /**
294 * Receive additional output data associated with a task. Called by the
295 * master thread. The task is denoted by the given item that was processed
296 * (whose state may have changed during processing). The output data must be
297 * received using the given communicator, from the given worker process
298 * rank, with the given message tag.
299 * <P>
300 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
301 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
302 *
303 * @param item Item that was processed.
304 * @param comm Communicator.
305 * @param wRank Worker process rank.
306 * @param tag Message tag.
307 * @exception IOException Thrown if an I/O error occurred.
308 * @throws java.io.IOException if any.
309 */
310 public void receiveTaskOutput(T item,
311 Comm comm,
312 int wRank,
313 int tag)
314 throws IOException {
315 }
316
317 /**
318 * Perform per-thread finalization actions after finishing the loop
319 * iterations. Called by a worker thread.
320 * <P>
321 * The <code>finish()</code> method may be overridden in a subclass. If not
322 * overridden, the <code>finish()</code> method does nothing.
323 *
324 * @exception Exception The <code>finish()</code> method may throw any
325 * exception.
326 * @throws java.lang.Exception if any.
327 */
328 public void finish()
329 throws Exception {
330 }
331
332 /**
333 * Returns the tag offset for this worker for loop. Each message between the
334 * master and worker threads is sent with a message tag equal to
335 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
336 * tag offset.
337 * <P>
338 * The <code>tagOffset()</code> method may be overridden in a subclass. If not
339 * overridden, the <code>tagOffset()</code> returns a default tag offset of
340 * <code>Integer.MIN_VALUE</code>.
341 *
342 * @return Tag offset.
343 */
344 public int tagOffset() {
345 return Integer.MIN_VALUE;
346 }
347
348 // Hidden operations.
349 /**
350 * Execute this worker iteration in the master thread.
351 *
352 * @param generator Item generator.
353 *
354 * @exception IOException Thrown if an I/O error occurred.
355 */
356 void masterExecute(ItemGenerator<T> generator)
357 throws IOException {
358 int count = myTeam.count;
359 int remaining = count;
360 ObjectItemBuf<ItemHolder<T>> buf = ObjectBuf.buffer();
361 Range tagRange = new Range(tagFor(0), tagFor(count - 1));
362 Comm comm = myTeam.comm;
363
364 // Send initial task to each worker.
365 for (int w = 0; w < count; ++w) {
366 ItemHolder<T> holder = generator.nextItem();
367 buf.item = holder;
368 buf.reset();
369 int r = myTeam.workerRank(w);
370 int tag = tagFor(w);
371 comm.send(r, tag, buf);
372 if (holder == null) {
373 --remaining;
374 } else {
375 sendTaskInput(holder.myItem, comm, r, tag);
376 }
377 }
378
379 // Repeatedly receive a response from a worker and send next task to
380 // that worker.
381 while (remaining > 0) {
382 CommStatus status = comm.receive(null, tagRange, buf);
383 ItemHolder<T> holder = buf.item;
384 int r = status.fromRank;
385 int tag = status.tag;
386 int w = workerFor(tag);
387 receiveTaskOutput(holder.myItem, comm, r, tag);
388 holder = generator.nextItem();
389 buf.item = holder;
390 buf.reset();
391 comm.send(r, tag, buf);
392 if (holder == null) {
393 --remaining;
394 } else {
395 sendTaskInput(holder.myItem, comm, r, tag);
396 }
397 }
398 }
399
400 /**
401 * Execute this worker for loop in a worker thread.
402 *
403 * @param w Worker index.
404 *
405 * @exception Exception This method may throw any exception.
406 */
407 void workerExecute(int w)
408 throws Exception {
409 Comm comm = myTeam.comm;
410 int r = myTeam.masterRank();
411 int tag = tagFor(w);
412 start();
413 ObjectItemBuf<ItemHolder<T>> buf = ObjectBuf.buffer();
414 for (;;) {
415 comm.receive(r, tag, buf);
416 ItemHolder<T> holder = buf.item;
417 if (holder == null) {
418 break;
419 }
420 receiveTaskInput(holder.myItem, comm, r, tag);
421 run(holder.myItem);
422 buf.reset();
423
424 // The next two statements constitute a critical section; other
425 // workers in this team must not send messages in between these two
426 // messages, or the master can deadlock.
427 synchronized (myTeam) {
428 comm.send(r, tag, buf);
429 sendTaskOutput(holder.myItem, comm, r, tag);
430 }
431 }
432 finish();
433 }
434
435 /**
436 * Returns the message tag for the given worker index.
437 *
438 * @param w Worker index.
439 *
440 * @return Message tag.
441 */
442 private int tagFor(int w) {
443 return w + tagOffset();
444 }
445
446 /**
447 * Returns the worker index for the given message tag.
448 *
449 * @param tag Message tag.
450 *
451 * @return Worker index.
452 */
453 private int workerFor(int tag) {
454 return tag - tagOffset();
455 }
456
457 }