1 //******************************************************************************
2 //
3 // File: WorkerLongForLoop.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerLongForLoop
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.io.IOException;
43
44 import edu.rit.mp.ObjectBuf;
45 import edu.rit.mp.buf.ObjectItemBuf;
46 import edu.rit.util.LongRange;
47 import edu.rit.util.Range;
48
49 /**
50 * Class WorkerLongForLoop is the abstract base class for one variation of a
51 * worker for loop that is executed inside a {@linkplain WorkerRegion}. The loop
52 * index data type is <code>long</code>. The loop stride is implicit (+1).
53 * <P>
54 * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
55 * create an instance of a concrete subclass of class WorkerLongForLoop; and
56 * pass this instance to the worker region's <code>execute()</code> method. Either
57 * every worker team thread must call the worker region's <code>execute()</code>
58 * method with identical arguments, or every thread must not call the
59 * <code>execute()</code> method. You can do all this using an anonymous inner
60 * class; for example:
61 * <PRE>
62 * new WorkerRegion()
63 * {
64 * . . .
65 * public void run()
66 * {
67 * . . .
68 * execute (0L, 99L, new WorkerLongForLoop()
69 * {
70 * // Thread local variable declarations
71 * . . .
72 * public void start()
73 * {
74 * // Per-thread pre-loop initialization code
75 * . . .
76 * }
77 * public void run (long first, long last)
78 * {
79 * // Loop code
80 * . . .
81 * }
82 * public void finish()
83 * {
84 * // Per-thread post-loop finalization code
85 * . . .
86 * }
87 * });
88 * }
89 * . . .
90 * }
91 * </PRE>
92 * <P>
93 * In each process of a cluster parallel program, the worker team has one or
94 * more worker threads. Every worker thread in every process has a unique worker
95 * tag, going from tag 0 for the first worker thread in the first process to tag
96 * <I>K</I>−1 for the last worker thread in the last process, where
97 * <I>K</I> is the total number of worker threads in all the processes. In
98 * addition, in one process there is a master thread. The worker and master
99 * threads all call the worker region's <code>execute()</code> method to execute the
100 * worker for loop. However, the worker and master threads differ in their
101 * actions.
102 * <P>
103 * The master thread does the following. The master obtains the worker for
104 * loop's schedule as returned by the <code>schedule()</code> method. The range of
105 * loop indexes is divided into "chunks" and the chunks are apportioned among
106 * the workers in accordance with the schedule. The master repeatedly sends
107 * "tasks" to the workers and receives "responses" from the workers. To send a
108 * task to a particular worker, the master (1) sends a message containing the
109 * chunk index range to the worker's process; and (2) calls the worker for
110 * loop's <code>sendTaskInput()</code> method. This method's default implementation
111 * does nothing, but it can be overridden to send additional task input data to
112 * the worker. To receive a response from a particular worker, the master (1)
113 * receives a message containing the chunk index range from the worker's
114 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
115 * method. This method's default implementation does nothing, but it can be
116 * overridden to receive additional task output data from the worker. Once all
117 * tasks have been sent to the workers and all responses have been received from
118 * the workers, the master returns from the worker region's <code>execute()</code>
119 * method.
120 * <P>
121 * Each worker thread does the following. The worker calls the worker for loop's
122 * <code>start()</code> method once before beginning any loop iterations. The worker
123 * repeatedly receives tasks from the master and sends responses to the master.
124 * To receive a task from the master, the worker (1) receives a message
125 * containing the chunk index range from the master's process; and (2) calls the
126 * worker for loop's <code>receiveTaskInput()</code> method. This method's default
127 * implementation does nothing, but it can be overridden to receive additional
128 * task input data from the master. The worker now calls the worker for loop's
129 * <code>run()</code> method, passing in the chunk index range lower and upper
130 * bounds. When the <code>run()</code> method returns, the worker sends the response
131 * to the master. To send the response, the worker (1) sends a message
132 * containing the chunk index range to the master's process; and (2) calls the
133 * worker for loop's <code>sendTaskOutput()</code> method. This method's default
134 * implementation does nothing, but it can be overridden to send additional task
135 * output data to the master. Once all tasks have been received from the master
136 * and all responses have been sent to the master, the worker calls the worker
137 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
138 * threads, the workers do <I>not</I> synchronize with each other at a barrier
139 * at this point.) The worker then returns from the worker region's
140 * <code>execute()</code> method.
141 * <P>
142 * If the worker for loop has a fixed schedule (in which there is exactly one
143 * chunk with a predetermined index range for each worker), then the messages
144 * containing the chunk index range are omitted, and each worker gets its chunk
145 * index range directly from the fixed schedule. However, the task input data
146 * (if any) and task output data (if any) are still sent and received.
147 * <P>
148 * Each message described above is sent with a message tag equal to
149 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
150 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
151 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
152 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where
153 * <I>K</I> is the total number of workers in all the processes. The program
154 * should not use message tags in this range except to send and receive the
155 * messages described above.
156 * <P>
157 * Note that each worker team thread actually creates its own instance of the
158 * worker for loop class and passes that instance to the worker region's
159 * <code>execute()</code> method. Thus, any fields declared in the worker for loop
160 * class will <I>not</I> be shared by all the workers, but instead will be
161 * private to each worker.
162 * <P>
163 * The <code>start()</code> method is intended for performing per-thread
164 * initialization before starting the loop iterations. If no such initialization
165 * is needed, omit the <code>start()</code> method.
166 * <P>
167 * The <code>run()</code> method contains the code for the loop. The first and last
168 * indexes for a chunk of loop iterations are passed in as arguments. The loop
169 * stride is implicit (+1). The worker for loop's <code>run()</code> method must be
170 * coded this way:
171 * <PRE>
172 * public void run (long first, long last)
173 * {
174 * for (long i = first; i <= last; ++ i)
175 * {
176 * // Loop body code
177 * . . .
178 * }
179 * }
180 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
181 * inclusive and increasing by +1 on each iteration.
182 * <P>
183 * The <code>finish()</code> method is intended for performing per-thread
184 * finalization after finishing the loop iterations. If no such finalization is
185 * needed, omit the <code>finish()</code> method.
186 * <P>
187 * If the worker for loop's <code>start()</code>, <code>run()</code>, or
188 * <code>finish()</code> method throws an exception in one of the worker threads,
189 * then that worker thread executes no further code in the loop, and the worker
190 * region's <code>execute()</code> method throws that same exception in that thread.
191 * However, the other worker threads in the worker team continue to execute.
192 *
193 * @author Alan Kaminsky
194 * @version 27-Jan-2010
195 */
196 public abstract class WorkerLongForLoop
197 extends WorkerForLoop {
198
199 // Exported constructors.
200 /**
201 * Construct a new worker for loop.
202 */
203 public WorkerLongForLoop() {
204 super();
205 }
206
207 // Exported operations.
208 /**
209 * Determine this worker for loop's schedule. Called by the master and
210 * worker threads. The schedule determines how the loop iterations are
211 * apportioned among the worker team threads. For further information, see
212 * class {@linkplain LongSchedule}.
213 * <P>
214 * The <code>schedule()</code> method may be overridden in a subclass to return
215 * the desired schedule. If not overridden, the default is a runtime
216 * schedule (see {@link edu.rit.pj.LongSchedule#runtime()}).
217 *
218 * @return Schedule for this worker for loop.
219 */
220 public LongSchedule schedule() {
221 return LongSchedule.runtime();
222 }
223
224 /**
225 * Perform per-thread initialization actions before starting the loop
226 * iterations. Called by a worker thread.
227 * <P>
228 * The <code>start()</code> method may be overridden in a subclass. If not
229 * overridden, the <code>start()</code> method does nothing.
230 *
231 * @exception Exception The <code>start()</code> method may throw any exception.
232 * @throws java.lang.Exception if any.
233 */
234 public void start()
235 throws Exception {
236 }
237
238 /**
239 * Send additional input data associated with a task. Called by the master
240 * thread. The task is denoted by the given chunk of loop iterations. The
241 * input data must be sent using the given communicator, to the given worker
242 * process rank, with the given message tag.
243 * <P>
244 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
245 * not overridden, the <code>sendTaskInput()</code> method does nothing.
246 *
247 * @param range Chunk of loop iterations.
248 * @param comm Communicator.
249 * @param wRank Worker process rank.
250 * @param tag Message tag.
251 * @exception IOException Thrown if an I/O error occurred.
252 * @throws java.io.IOException if any.
253 */
254 public void sendTaskInput(LongRange range,
255 Comm comm,
256 int wRank,
257 int tag)
258 throws IOException {
259 }
260
261 /**
262 * Receive additional input data associated with a task. Called by a worker
263 * thread. The task is denoted by the given chunk of loop iterations. The
264 * input data must be received using the given communicator, from the given
265 * master process rank, with the given message tag.
266 * <P>
267 * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
268 * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
269 *
270 * @param range Chunk of loop iterations.
271 * @param comm Communicator.
272 * @param mRank Master process rank.
273 * @param tag Message tag.
274 * @exception IOException Thrown if an I/O error occurred.
275 * @throws java.io.IOException if any.
276 */
277 public void receiveTaskInput(LongRange range,
278 Comm comm,
279 int mRank,
280 int tag)
281 throws IOException {
282 }
283
284 /**
285 * Execute one chunk of iterations of this worker for loop. Called by a
286 * worker thread. The <code>run()</code> method must perform the loop body for
287 * indexes <code>first</code> through <code>last</code> inclusive, increasing the
288 * loop index by +1 after each iteration.
289 * <P>
290 * The <code>run()</code> method must be overridden in a subclass.
291 *
292 * @param first First loop index.
293 * @param last Last loop index.
294 * @exception Exception The <code>run()</code> method may throw any exception.
295 * @throws java.lang.Exception if any.
296 */
297 public abstract void run(long first,
298 long last)
299 throws Exception;
300
301 /**
302 * Send additional output data associated with a task. Called by a worker
303 * thread. The task is denoted by the given chunk of loop iterations. The
304 * output data must be sent using the given communicator, to the given
305 * master process rank, with the given message tag.
306 * <P>
307 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
308 * not overridden, the <code>sendTaskOutput()</code> method does nothing.
309 *
310 * @param range Chunk of loop iterations.
311 * @param comm Communicator.
312 * @param mRank Master process rank.
313 * @param tag Message tag.
314 * @exception IOException Thrown if an I/O error occurred.
315 * @throws java.io.IOException if any.
316 */
317 public void sendTaskOutput(LongRange range,
318 Comm comm,
319 int mRank,
320 int tag)
321 throws IOException {
322 }
323
324 /**
325 * Receive additional output data associated with a task. Called by the
326 * master thread. The task is denoted by the given chunk of loop iterations.
327 * The output data must be received using the given communicator, from the
328 * given worker process rank, with the given message tag.
329 * <P>
330 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
331 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
332 *
333 * @param range Chunk of loop iterations.
334 * @param comm Communicator.
335 * @param wRank Worker process rank.
336 * @param tag Message tag.
337 * @exception IOException Thrown if an I/O error occurred.
338 * @throws java.io.IOException if any.
339 */
340 public void receiveTaskOutput(LongRange range,
341 Comm comm,
342 int wRank,
343 int tag)
344 throws IOException {
345 }
346
347 /**
348 * Perform per-thread finalization actions after finishing the loop
349 * iterations. Called by a worker thread.
350 * <P>
351 * The <code>finish()</code> method may be overridden in a subclass. If not
352 * overridden, the <code>finish()</code> method does nothing.
353 *
354 * @exception Exception The <code>finish()</code> method may throw any
355 * exception.
356 * @throws java.lang.Exception if any.
357 */
358 public void finish()
359 throws Exception {
360 }
361
362 /**
363 * Returns the tag offset for this worker for loop. Each message between the
364 * master and worker threads is sent with a message tag equal to
365 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
366 * tag offset.
367 * <P>
368 * The <code>tagOffset()</code> method may be overridden in a subclass. If not
369 * overridden, the <code>tagOffset()</code> returns a default tag offset of
370 * <code>Integer.MIN_VALUE</code>.
371 *
372 * @return Tag offset.
373 */
374 public int tagOffset() {
375 return Integer.MIN_VALUE;
376 }
377
378 // Hidden operations.
379 /**
380 * Execute this worker for loop in the master thread.
381 *
382 * @param range Loop index range.
383 *
384 * @exception IOException Thrown if an I/O error occurred.
385 */
386 void masterExecute(LongRange range)
387 throws IOException {
388 LongSchedule sch = schedule();
389 if (sch.isFixedSchedule()) {
390 masterExecuteFixed(range, sch);
391 } else {
392 masterExecuteNonFixed(range, sch);
393 }
394 }
395
396 /**
397 * Execute this worker for loop in the master thread with a fixed schedule.
398 *
399 * @param range Loop index range.
400 * @param sch Schedule.
401 *
402 * @exception IOException Thrown if an I/O error occurred.
403 */
404 void masterExecuteFixed(LongRange range,
405 LongSchedule sch)
406 throws IOException {
407 int count = myTeam.count;
408 Comm comm = myTeam.comm;
409
410 // Send additional task input to each worker.
411 sch.start(count, range);
412 for (int w = 0; w < count; ++w) {
413 LongRange chunk = sch.next(w);
414 if (chunk != null) {
415 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
416 }
417 }
418
419 // Receive additional task output from each worker.
420 sch.start(count, range);
421 for (int w = 0; w < count; ++w) {
422 LongRange chunk = sch.next(w);
423 if (chunk != null) {
424 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
425 }
426 }
427 }
428
429 /**
430 * Execute this worker for loop in the master thread with a non-fixed
431 * schedule.
432 *
433 * @param range Loop index range.
434 * @param sch Schedule.
435 *
436 * @exception IOException Thrown if an I/O error occurred.
437 */
438 void masterExecuteNonFixed(LongRange range,
439 LongSchedule sch)
440 throws IOException {
441 int count = myTeam.count;
442 sch.start(count, range);
443 int remaining = count;
444 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
445 Range tagRange = new Range(tagFor(0), tagFor(count - 1));
446 Comm comm = myTeam.comm;
447
448 // Send initial task to each worker.
449 for (int w = 0; w < count; ++w) {
450 LongRange chunk = sch.next(w);
451 buf.item = chunk;
452 buf.reset();
453 int r = myTeam.workerRank(w);
454 int tag = tagFor(w);
455 comm.send(r, tag, buf);
456 if (chunk == null) {
457 --remaining;
458 } else {
459 sendTaskInput(chunk, comm, r, tag);
460 }
461 }
462
463 // Repeatedly receive a response from a worker and send next task to
464 // that worker.
465 while (remaining > 0) {
466 CommStatus status = comm.receive(null, tagRange, buf);
467 LongRange chunk = buf.item;
468 int r = status.fromRank;
469 int tag = status.tag;
470 int w = workerFor(tag);
471 receiveTaskOutput(chunk, comm, r, tag);
472 chunk = sch.next(w);
473 buf.item = chunk;
474 buf.reset();
475 comm.send(r, tag, buf);
476 if (chunk == null) {
477 --remaining;
478 } else {
479 sendTaskInput(chunk, comm, r, tag);
480 }
481 }
482 }
483
484 /**
485 * Execute this worker for loop in a worker thread.
486 *
487 * @param w Worker index.
488 * @param range Loop index range.
489 *
490 * @exception Exception This method may throw any exception.
491 */
492 void workerExecute(int w,
493 LongRange range)
494 throws Exception {
495 LongSchedule sch = schedule();
496 if (sch.isFixedSchedule()) {
497 sch.start(myTeam.count, range);
498 workerExecuteFixed(sch.next(w), w);
499 } else {
500 workerExecuteNonFixed(w);
501 }
502 }
503
504 /**
505 * Execute this worker for loop in a worker thread using a fixed schedule.
506 *
507 * @param range Chunk of loop iterations.
508 * @param w Worker index.
509 *
510 * @exception Exception This method may throw any exception.
511 */
512 void workerExecuteFixed(LongRange range,
513 int w)
514 throws Exception {
515 start();
516 if (range != null) {
517 Comm comm = myTeam.comm;
518 int r = myTeam.masterRank();
519 int tag = tagFor(w);
520 receiveTaskInput(range, comm, r, tag);
521 run(range.lb(), range.ub());
522 sendTaskOutput(range, comm, r, tag);
523 }
524 finish();
525 }
526
527 /**
528 * Execute this worker for loop in a worker thread using a non-fixed
529 * schedule.
530 *
531 * @param w Worker index.
532 *
533 * @exception Exception This method may throw any exception.
534 */
535 void workerExecuteNonFixed(int w)
536 throws Exception {
537 Comm comm = myTeam.comm;
538 int r = myTeam.masterRank();
539 int tag = tagFor(w);
540 start();
541 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
542 for (;;) {
543 comm.receive(r, tag, buf);
544 LongRange range = buf.item;
545 if (range == null) {
546 break;
547 }
548 receiveTaskInput(range, comm, r, tag);
549 run(range.lb(), range.ub());
550
551 // The next two statements constitute a critical section; other
552 // workers in this team must not send messages in between these two
553 // messages, or the master can deadlock.
554 synchronized (myTeam) {
555 comm.send(r, tag, buf);
556 sendTaskOutput(range, comm, r, tag);
557 }
558 }
559 finish();
560 }
561
562 /**
563 * Returns the message tag for the given worker index.
564 *
565 * @param w Worker index.
566 *
567 * @return Message tag.
568 */
569 private int tagFor(int w) {
570 return w + tagOffset();
571 }
572
573 /**
574 * Returns the worker index for the given message tag.
575 *
576 * @param tag Message tag.
577 *
578 * @return Worker index.
579 */
580 private int workerFor(int tag) {
581 return tag - tagOffset();
582 }
583
584 }