1 //******************************************************************************
2 //
3 // File: WorkerIntegerForLoop.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerIntegerForLoop
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.io.IOException;
43
44 import edu.rit.mp.ObjectBuf;
45 import edu.rit.mp.buf.ObjectItemBuf;
46 import edu.rit.util.Range;
47
48 /**
49 * Class WorkerIntegerForLoop is the abstract base class for one variation of a
50 * worker for loop that is executed inside a {@linkplain WorkerRegion}. The loop
51 * index data type is <code>int</code>. The loop stride is implicit (+1).
52 * <P>
53 * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
54 * create an instance of a concrete subclass of class WorkerIntegerForLoop; and
55 * pass this instance to the worker region's <code>execute()</code> method. Either
56 * every worker team thread must call the worker region's <code>execute()</code>
57 * method with identical arguments, or every thread must not call the
58 * <code>execute()</code> method. You can do all this using an anonymous inner
59 * class; for example:
60 * <PRE>
61 * new WorkerRegion()
62 * {
63 * . . .
64 * public void run()
65 * {
66 * . . .
67 * execute (0, 99, new WorkerIntegerForLoop()
68 * {
69 * // Thread local variable declarations
70 * . . .
71 * public void start()
72 * {
73 * // Per-thread pre-loop initialization code
74 * . . .
75 * }
76 * public void run (int first, int last)
77 * {
78 * // Loop code
79 * . . .
80 * }
81 * public void finish()
82 * {
83 * // Per-thread post-loop finalization code
84 * . . .
85 * }
86 * });
87 * }
88 * . . .
89 * }
90 * </PRE>
91 * <P>
92 * In each process of a cluster parallel program, the worker team has one or
93 * more worker threads. Every worker thread in every process has a unique worker
94 * index, going from index 0 for the first worker thread in the first process to
95 * index <I>K</I>−1 for the last worker thread in the last process, where
96 * <I>K</I> is the total number of worker threads in all the processes. In
97 * addition, in one process there is a master thread. The worker and master
98 * threads all call the worker region's <code>execute()</code> method to execute the
99 * worker for loop. However, the worker and master threads differ in their
100 * actions.
101 * <P>
102 * The master thread does the following. The master obtains the worker for
103 * loop's schedule as returned by the <code>schedule()</code> method. The range of
104 * loop indexes is divided into "chunks" and the chunks are apportioned among
105 * the workers in accordance with the schedule. The master repeatedly sends
106 * "tasks" to the workers and receives "responses" from the workers. To send a
107 * task to a particular worker, the master (1) sends a message containing the
108 * chunk index range to the worker's process; and (2) calls the worker for
109 * loop's <code>sendTaskInput()</code> method. This method's default implementation
110 * does nothing, but it can be overridden to send additional task input data to
111 * the worker. To receive a response from a particular worker, the master (1)
112 * receives a message containing the chunk index range from the worker's
113 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
114 * method. This method's default implementation does nothing, but it can be
115 * overridden to receive additional task output data from the worker. Once all
116 * tasks have been sent to the workers and all responses have been received from
117 * the workers, the master returns from the worker region's <code>execute()</code>
118 * method.
119 * <P>
120 * Each worker thread does the following. The worker calls the worker for loop's
121 * <code>start()</code> method once before beginning any loop iterations. The worker
122 * repeatedly receives tasks from the master and sends responses to the master.
123 * To receive a task from the master, the worker (1) receives a message
124 * containing the chunk index range from the master's process; and (2) calls the
125 * worker for loop's <code>receiveTaskInput()</code> method. This method's default
126 * implementation does nothing, but it can be overridden to receive additional
127 * task input data from the master. The worker now calls the worker for loop's
128 * <code>run()</code> method, passing in the chunk index range lower and upper
129 * bounds. When the <code>run()</code> method returns, the worker sends the response
130 * to the master. To send the response, the worker (1) sends a message
131 * containing the chunk index range to the master's process; and (2) calls the
132 * worker for loop's <code>sendTaskOutput()</code> method. This method's default
133 * implementation does nothing, but it can be overridden to send additional task
134 * output data to the master. Once all tasks have been received from the master
135 * and all responses have been sent to the master, the worker calls the worker
136 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
137 * threads, the workers do <I>not</I> synchronize with each other at a barrier
138 * at this point.) The worker then returns from the worker region's
139 * <code>execute()</code> method.
140 * <P>
141 * If the worker for loop has a fixed schedule (in which there is exactly one
142 * chunk with a predetermined index range for each worker), then the messages
143 * containing the chunk index range are omitted, and each worker gets its chunk
144 * index range directly from the fixed schedule. However, the task input data
145 * (if any) and task output data (if any) are still sent and received.
146 * <P>
147 * Each message described above is sent with a message tag equal to
148 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
149 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
150 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
151 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where
152 * <I>K</I> is the total number of workers in all the processes. The program
153 * should not use message tags in this range except to send and receive the
154 * messages described above.
155 * <P>
156 * Note that each worker team thread actually creates its own instance of the
157 * worker for loop class and passes that instance to the worker region's
158 * <code>execute()</code> method. Thus, any fields declared in the worker for loop
159 * class will <I>not</I> be shared by all the workers, but instead will be
160 * private to each worker.
161 * <P>
162 * The <code>start()</code> method is intended for performing per-thread
163 * initialization before starting the loop iterations. If no such initialization
164 * is needed, omit the <code>start()</code> method.
165 * <P>
166 * The <code>run()</code> method contains the code for the loop. The first and last
167 * indexes for a chunk of loop iterations are passed in as arguments. The loop
168 * stride is implicit (+1). The worker for loop's <code>run()</code> method must be
169 * coded this way:
170 * <PRE>
171 * public void run (int first, int last)
172 * {
173 * for (int i = first; i <= last; ++ i)
174 * {
175 * // Loop body code
176 * . . .
177 * }
178 * }
179 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
180 * inclusive and increasing by +1 on each iteration.
181 * <P>
182 * The <code>finish()</code> method is intended for performing per-thread
183 * finalization after finishing the loop iterations. If no such finalization is
184 * needed, omit the <code>finish()</code> method.
185 * <P>
186 * If the worker for loop's <code>start()</code>, <code>run()</code>, or
187 * <code>finish()</code> method throws an exception in one of the worker threads,
188 * then that worker thread executes no further code in the loop, and the worker
189 * region's <code>execute()</code> method throws that same exception in that thread.
190 * However, the other worker threads in the worker team continue to execute.
191 *
192 * @author Alan Kaminsky
193 * @version 27-Jan-2010
194 */
195 public abstract class WorkerIntegerForLoop
196 extends WorkerForLoop {
197
198 // Exported constructors.
199 /**
200 * Construct a new worker for loop.
201 */
202 public WorkerIntegerForLoop() {
203 super();
204 }
205
206 // Exported operations.
207 /**
208 * Determine this worker for loop's schedule. Called by the master and
209 * worker threads. The schedule determines how the loop iterations are
210 * apportioned among the worker team threads. For further information, see
211 * class {@linkplain IntegerSchedule}.
212 * <P>
213 * The <code>schedule()</code> method may be overridden in a subclass to return
214 * the desired schedule. If not overridden, the default is a runtime
215 * schedule (see {@link edu.rit.pj.IntegerSchedule#runtime()}).
216 *
217 * @return Schedule for this worker for loop.
218 */
219 public IntegerSchedule schedule() {
220 return IntegerSchedule.runtime();
221 }
222
223 /**
224 * Perform per-thread initialization actions before starting the loop
225 * iterations. Called by a worker thread.
226 * <P>
227 * The <code>start()</code> method may be overridden in a subclass. If not
228 * overridden, the <code>start()</code> method does nothing.
229 *
230 * @exception Exception The <code>start()</code> method may throw any exception.
231 * @throws java.lang.Exception if any.
232 */
233 public void start()
234 throws Exception {
235 }
236
237 /**
238 * Send additional input data associated with a task. Called by the master
239 * thread. The task is denoted by the given chunk of loop iterations. The
240 * input data must be sent using the given communicator, to the given worker
241 * process rank, with the given message tag.
242 * <P>
243 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
244 * not overridden, the <code>sendTaskInput()</code> method does nothing.
245 *
246 * @param range Chunk of loop iterations.
247 * @param comm Communicator.
248 * @param wRank Worker process rank.
249 * @param tag Message tag.
250 * @exception IOException Thrown if an I/O error occurred.
251 * @throws java.io.IOException if any.
252 */
253 public void sendTaskInput(Range range,
254 Comm comm,
255 int wRank,
256 int tag)
257 throws IOException {
258 }
259
260 /**
261 * Receive additional input data associated with a task. Called by a worker
262 * thread. The task is denoted by the given chunk of loop iterations. The
263 * input data must be received using the given communicator, from the given
264 * master process rank, with the given message tag.
265 * <P>
266 * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
267 * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
268 *
269 * @param range Chunk of loop iterations.
270 * @param comm Communicator.
271 * @param mRank Master process rank.
272 * @param tag Message tag.
273 * @exception IOException Thrown if an I/O error occurred.
274 * @throws java.io.IOException if any.
275 */
276 public void receiveTaskInput(Range range,
277 Comm comm,
278 int mRank,
279 int tag)
280 throws IOException {
281 }
282
283 /**
284 * Execute one chunk of iterations of this worker for loop. Called by a
285 * worker thread. The <code>run()</code> method must perform the loop body for
286 * indexes <code>first</code> through <code>last</code> inclusive, increasing the
287 * loop index by +1 after each iteration.
288 * <P>
289 * The <code>run()</code> method must be overridden in a subclass.
290 *
291 * @param first First loop index.
292 * @param last Last loop index.
293 * @exception Exception The <code>run()</code> method may throw any exception.
294 * @throws java.lang.Exception if any.
295 */
296 public abstract void run(int first,
297 int last)
298 throws Exception;
299
300 /**
301 * Send additional output data associated with a task. Called by a worker
302 * thread. The task is denoted by the given chunk of loop iterations. The
303 * output data must be sent using the given communicator, to the given
304 * master process rank, with the given message tag.
305 * <P>
306 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
307 * not overridden, the <code>sendTaskOutput()</code> method does nothing.
308 *
309 * @param range Chunk of loop iterations.
310 * @param comm Communicator.
311 * @param mRank Master process rank.
312 * @param tag Message tag.
313 * @exception IOException Thrown if an I/O error occurred.
314 * @throws java.io.IOException if any.
315 */
316 public void sendTaskOutput(Range range,
317 Comm comm,
318 int mRank,
319 int tag)
320 throws IOException {
321 }
322
323 /**
324 * Receive additional output data associated with a task. Called by the
325 * master thread. The task is denoted by the given chunk of loop iterations.
326 * The output data must be received using the given communicator, from the
327 * given worker process rank, with the given message tag.
328 * <P>
329 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
330 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
331 *
332 * @param range Chunk of loop iterations.
333 * @param comm Communicator.
334 * @param wRank Worker process rank.
335 * @param tag Message tag.
336 * @exception IOException Thrown if an I/O error occurred.
337 * @throws java.io.IOException if any.
338 */
339 public void receiveTaskOutput(Range range,
340 Comm comm,
341 int wRank,
342 int tag)
343 throws IOException {
344 }
345
346 /**
347 * Perform per-thread finalization actions after finishing the loop
348 * iterations. Called by a worker thread.
349 * <P>
350 * The <code>finish()</code> method may be overridden in a subclass. If not
351 * overridden, the <code>finish()</code> method does nothing.
352 *
353 * @exception Exception The <code>finish()</code> method may throw any
354 * exception.
355 * @throws java.lang.Exception if any.
356 */
357 public void finish()
358 throws Exception {
359 }
360
361 /**
362 * Returns the tag offset for this worker for loop. Each message between the
363 * master and worker threads is sent with a message tag equal to
364 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
365 * tag offset.
366 * <P>
367 * The <code>tagOffset()</code> method may be overridden in a subclass. If not
368 * overridden, the <code>tagOffset()</code> returns a default tag offset of
369 * <code>Integer.MIN_VALUE</code>.
370 *
371 * @return Tag offset.
372 */
373 public int tagOffset() {
374 return Integer.MIN_VALUE;
375 }
376
377 // Hidden operations.
378 /**
379 * Execute this worker for loop in the master thread.
380 *
381 * @param range Loop index range.
382 *
383 * @exception IOException Thrown if an I/O error occurred.
384 */
385 void masterExecute(Range range)
386 throws IOException {
387 IntegerSchedule sch = schedule();
388 if (sch.isFixedSchedule()) {
389 masterExecuteFixed(range, sch);
390 } else {
391 masterExecuteNonFixed(range, sch);
392 }
393 }
394
395 /**
396 * Execute this worker for loop in the master thread with a fixed schedule.
397 *
398 * @param range Loop index range.
399 * @param sch Schedule.
400 *
401 * @exception IOException Thrown if an I/O error occurred.
402 */
403 void masterExecuteFixed(Range range,
404 IntegerSchedule sch)
405 throws IOException {
406 int count = myTeam.count;
407 Comm comm = myTeam.comm;
408
409 // Send additional task input to each worker.
410 sch.start(count, range);
411 for (int w = 0; w < count; ++w) {
412 Range chunk = sch.next(w);
413 if (chunk != null) {
414 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
415 }
416 }
417
418 // Receive additional task output from each worker.
419 sch.start(count, range);
420 for (int w = 0; w < count; ++w) {
421 Range chunk = sch.next(w);
422 if (chunk != null) {
423 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
424 }
425 }
426 }
427
428 /**
429 * Execute this worker for loop in the master thread with a non-fixed
430 * schedule.
431 *
432 * @param range Loop index range.
433 * @param sch Schedule.
434 *
435 * @exception IOException Thrown if an I/O error occurred.
436 */
437 void masterExecuteNonFixed(Range range,
438 IntegerSchedule sch)
439 throws IOException {
440 int count = myTeam.count;
441 sch.start(count, range);
442 int remaining = count;
443 ObjectItemBuf<Range> buf = ObjectBuf.buffer();
444 Range tagRange = new Range(tagFor(0), tagFor(count - 1));
445 Comm comm = myTeam.comm;
446
447 // Send initial task to each worker.
448 for (int w = 0; w < count; ++w) {
449 Range chunk = sch.next(w);
450 buf.item = chunk;
451 buf.reset();
452 int r = myTeam.workerRank(w);
453 int tag = tagFor(w);
454 comm.send(r, tag, buf);
455 if (chunk == null) {
456 --remaining;
457 } else {
458 sendTaskInput(chunk, comm, r, tag);
459 }
460 }
461
462 // Repeatedly receive a response from a worker and send next task to
463 // that worker.
464 while (remaining > 0) {
465 CommStatus status = comm.receive(null, tagRange, buf);
466 Range chunk = buf.item;
467 int r = status.fromRank;
468 int tag = status.tag;
469 int w = workerFor(tag);
470 receiveTaskOutput(chunk, comm, r, tag);
471 chunk = sch.next(w);
472 buf.item = chunk;
473 buf.reset();
474 comm.send(r, tag, buf);
475 if (chunk == null) {
476 --remaining;
477 } else {
478 sendTaskInput(chunk, comm, r, tag);
479 }
480 }
481 }
482
483 /**
484 * Execute this worker for loop in a worker thread.
485 *
486 * @param w Worker index.
487 * @param range Loop index range.
488 *
489 * @exception Exception This method may throw any exception.
490 */
491 void workerExecute(int w,
492 Range range)
493 throws Exception {
494 IntegerSchedule sch = schedule();
495 if (sch.isFixedSchedule()) {
496 sch.start(myTeam.count, range);
497 workerExecuteFixed(sch.next(w), w);
498 } else {
499 workerExecuteNonFixed(w);
500 }
501 }
502
503 /**
504 * Execute this worker for loop in a worker thread using a fixed schedule.
505 *
506 * @param range Chunk of loop iterations.
507 * @param w Worker index.
508 *
509 * @exception Exception This method may throw any exception.
510 */
511 void workerExecuteFixed(Range range,
512 int w)
513 throws Exception {
514 start();
515 if (range != null) {
516 Comm comm = myTeam.comm;
517 int r = myTeam.masterRank();
518 int tag = tagFor(w);
519 receiveTaskInput(range, comm, r, tag);
520 run(range.lb(), range.ub());
521 sendTaskOutput(range, comm, r, tag);
522 }
523 finish();
524 }
525
526 /**
527 * Execute this worker for loop in a worker thread using a non-fixed
528 * schedule.
529 *
530 * @param w Worker index.
531 *
532 * @exception Exception This method may throw any exception.
533 */
534 void workerExecuteNonFixed(int w)
535 throws Exception {
536 Comm comm = myTeam.comm;
537 int r = myTeam.masterRank();
538 int tag = tagFor(w);
539 start();
540 ObjectItemBuf<Range> buf = ObjectBuf.buffer();
541 for (;;) {
542 comm.receive(r, tag, buf);
543 Range range = buf.item;
544 if (range == null) {
545 break;
546 }
547 receiveTaskInput(range, comm, r, tag);
548 run(range.lb(), range.ub());
549
550 // The next two statements constitute a critical section; other
551 // workers in this team must not send messages in between these two
552 // messages, or the master can deadlock.
553 synchronized (myTeam) {
554 comm.send(r, tag, buf);
555 sendTaskOutput(range, comm, r, tag);
556 }
557 }
558 finish();
559 }
560
561 /**
562 * Returns the message tag for the given worker index.
563 *
564 * @param w Worker index.
565 *
566 * @return Message tag.
567 */
568 private int tagFor(int w) {
569 return w + tagOffset();
570 }
571
572 /**
573 * Returns the worker index for the given message tag.
574 *
575 * @param tag Message tag.
576 *
577 * @return Worker index.
578 */
579 private int workerFor(int tag) {
580 return tag - tagOffset();
581 }
582
583 }