1 //******************************************************************************
2 //
3 // File: WorkerIntegerStrideForLoop.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerIntegerStrideForLoop
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.io.IOException;
43
44 import edu.rit.mp.ObjectBuf;
45 import edu.rit.mp.buf.ObjectItemBuf;
46 import edu.rit.util.Range;
47
48 /**
49 * Class WorkerIntegerStrideForLoop is the abstract base class for one variation
50 * of a worker for loop that is executed inside a {@linkplain WorkerRegion}. The
51 * loop index data type is <code>int</code>. The loop stride is explicitly
52 * specified.
53 * <P>
54 * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
55 * create an instance of a concrete subclass of class
56 * WorkerIntegerStrideForLoop; and pass this instance to the worker region's
57 * <code>execute()</code> method. Either every worker team thread must call the
58 * worker region's <code>execute()</code> method with identical arguments, or every
59 * thread must not call the <code>execute()</code> method. You can do all this using
60 * an anonymous inner class; for example:
61 * <PRE>
62 * new WorkerRegion()
63 * {
64 * . . .
65 * public void run()
66 * {
67 * . . .
68 * execute (0, 98, 2, new WorkerIntegerStrideForLoop()
69 * {
70 * // Thread local variable declarations
71 * . . .
72 * public void start()
73 * {
74 * // Per-thread pre-loop initialization code
75 * . . .
76 * }
77 * public void run (int first, int last, int stride)
78 * {
79 * // Loop code
80 * . . .
81 * }
82 * public void finish()
83 * {
84 * // Per-thread post-loop finalization code
85 * . . .
86 * }
87 * });
88 * }
89 * . . .
90 * }
91 * </PRE>
92 * <P>
93 * In each process of a cluster parallel program, the worker team has one or
94 * more worker threads. Every worker thread in every process has a unique worker
95 * tag, going from tag 0 for the first worker thread in the first process to tag
96 * <I>K</I>−1 for the last worker thread in the last process, where
97 * <I>K</I> is the total number of worker threads in all the processes. In
98 * addition, in one process there is a master thread. The worker and master
99 * threads all call the worker region's <code>execute()</code> method to execute the
100 * worker for loop. However, the worker and master threads differ in their
101 * actions.
102 * <P>
103 * The master thread does the following. The master obtains the worker for
104 * loop's schedule as returned by the <code>schedule()</code> method. The range of
105 * loop indexes is divided into "chunks" and the chunks are apportioned among
106 * the workers in accordance with the schedule. The master repeatedly sends
107 * "tasks" to the workers and receives "responses" from the workers. To send a
108 * task to a particular worker, the master (1) sends a message containing the
109 * chunk index range to the worker's process; and (2) calls the worker for
110 * loop's <code>sendTaskInput()</code> method. This method's default implementation
111 * does nothing, but it can be overridden to send additional task input data to
112 * the worker. To receive a response from a particular worker, the master (1)
113 * receives a message containing the chunk index range from the worker's
114 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
115 * method. This method's default implementation does nothing, but it can be
116 * overridden to receive additional task output data from the worker. Once all
117 * tasks have been sent to the workers and all responses have been received from
118 * the workers, the master returns from the worker region's <code>execute()</code>
119 * method.
120 * <P>
121 * Each worker thread does the following. The worker calls the worker for loop's
122 * <code>start()</code> method once before beginning any loop iterations. The worker
123 * repeatedly receives tasks from the master and sends responses to the master.
124 * To receive a task from the master, the worker (1) receives a message
125 * containing the chunk index range from the master's process; and (2) calls the
126 * worker for loop's <code>receiveTaskInput()</code> method. This method's default
127 * implementation does nothing, but it can be overridden to receive additional
128 * task input data from the master. The worker now calls the worker for loop's
129 * <code>run()</code> method, passing in the chunk index range lower and upper
130 * bounds. When the <code>run()</code> method returns, the worker sends the response
131 * to the master. To send the response, the worker (1) sends a message
132 * containing the chunk index range to the master's process; and (2) calls the
133 * worker for loop's <code>sendTaskOutput()</code> method. This method's default
134 * implementation does nothing, but it can be overridden to send additional task
135 * output data to the master. Once all tasks have been received from the master
136 * and all responses have been sent to the master, the worker calls the worker
137 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
138 * threads, the workers do <I>not</I> synchronize with each other at a barrier
139 * at this point.) The worker then returns from the worker region's
140 * <code>execute()</code> method.
141 * <P>
142 * If the worker for loop has a fixed schedule (in which there is exactly one
143 * chunk with a predetermined index range for each worker), then the messages
144 * containing the chunk index range are omitted, and each worker gets its chunk
145 * index range directly from the fixed schedule. However, the task input data
146 * (if any) and task output data (if any) are still sent and received.
147 * <P>
148 * Each message described above is sent with a message tag equal to
149 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
150 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
151 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
152 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where
153 * <I>K</I> is the total number of workers in all the processes. The program
154 * should not use message tags in this range except to send and receive the
155 * messages described above.
156 * <P>
157 * Note that each worker team thread actually creates its own instance of the
158 * worker for loop class and passes that instance to the worker region's
159 * <code>execute()</code> method. Thus, any fields declared in the worker for loop
160 * class will <I>not</I> be shared by all the workers, but instead will be
161 * private to each worker.
162 * <P>
163 * The <code>start()</code> method is intended for performing per-thread
164 * initialization before starting the loop iterations. If no such initialization
165 * is needed, omit the <code>start()</code> method.
166 * <P>
167 * The <code>run()</code> method contains the code for the loop. The first and last
168 * indexes for a chunk of loop iterations are passed in as arguments. The loop
169 * stride, which is always positive, is also explicitly specified as an
170 * argument. The worker for loop's <code>run()</code> method must be coded this way:
171 * <PRE>
172 * public void run (int first, int last, int stride)
173 * {
174 * for (int i = first; i <= last; i += stride)
175 * {
176 * // Loop body code
177 * . . .
178 * }
179 * }
180 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
181 * inclusive and increasing by <code>stride</code> on each iteration.
182 * <P>
183 * The <code>finish()</code> method is intended for performing per-thread
184 * finalization after finishing the loop iterations. If no such finalization is
185 * needed, omit the <code>finish()</code> method.
186 * <P>
187 * If the worker for loop's <code>start()</code>, <code>run()</code>, or
188 * <code>finish()</code> method throws an exception in one of the worker threads,
189 * then that worker thread executes no further code in the loop, and the worker
190 * region's <code>execute()</code> method throws that same exception in that thread.
191 * However, the other worker threads in the worker team continue to execute.
192 *
193 * @author Alan Kaminsky
194 * @version 27-Jan-2010
195 */
196 public abstract class WorkerIntegerStrideForLoop
197 extends WorkerForLoop {
198
199 // Exported constructors.
200 /**
201 * Construct a new worker for loop.
202 */
203 public WorkerIntegerStrideForLoop() {
204 super();
205 }
206
207 // Exported operations.
208 /**
209 * Determine this worker for loop's schedule. Called by the master and
210 * worker threads. The schedule determines how the loop iterations are
211 * apportioned among the worker team threads. For further information, see
212 * class {@linkplain IntegerSchedule}.
213 * <P>
214 * The <code>schedule()</code> method may be overridden in a subclass to return
215 * the desired schedule. If not overridden, the default is a runtime
216 * schedule (see {@link edu.rit.pj.IntegerSchedule#runtime()}).
217 *
218 * @return Schedule for this worker for loop.
219 */
220 public IntegerSchedule schedule() {
221 return IntegerSchedule.runtime();
222 }
223
224 /**
225 * Perform per-thread initialization actions before starting the loop
226 * iterations. Called by a worker thread.
227 * <P>
228 * The <code>start()</code> method may be overridden in a subclass. If not
229 * overridden, the <code>start()</code> method does nothing.
230 *
231 * @exception Exception The <code>start()</code> method may throw any exception.
232 * @throws java.lang.Exception if any.
233 */
234 public void start()
235 throws Exception {
236 }
237
238 /**
239 * Send additional input data associated with a task. Called by the master
240 * thread. The task is denoted by the given chunk of loop iterations. The
241 * input data must be sent using the given communicator, to the given worker
242 * process rank, with the given message tag.
243 * <P>
244 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
245 * not overridden, the <code>sendTaskInput()</code> method does nothing.
246 *
247 * @param range Chunk of loop iterations.
248 * @param comm Communicator.
249 * @param wRank Worker process rank.
250 * @param tag Message tag.
251 * @exception IOException Thrown if an I/O error occurred.
252 * @throws java.io.IOException if any.
253 */
254 public void sendTaskInput(Range range,
255 Comm comm,
256 int wRank,
257 int tag)
258 throws IOException {
259 }
260
261 /**
262 * Receive additional input data associated with a task. Called by a worker
263 * thread. The task is denoted by the given chunk of loop iterations. The
264 * input data must be received using the given communicator, from the given
265 * master process rank, with the given message tag.
266 * <P>
267 * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
268 * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
269 *
270 * @param range Chunk of loop iterations.
271 * @param comm Communicator.
272 * @param mRank Master process rank.
273 * @param tag Message tag.
274 * @exception IOException Thrown if an I/O error occurred.
275 * @throws java.io.IOException if any.
276 */
277 public void receiveTaskInput(Range range,
278 Comm comm,
279 int mRank,
280 int tag)
281 throws IOException {
282 }
283
284 /**
285 * Execute one chunk of iterations of this worker for loop. Called by a
286 * worker thread. The <code>run()</code> method must perform the loop body for
287 * indexes <code>first</code> through <code>last</code> inclusive, increasing the
288 * loop index by <code>stride</code> after each iteration.
289 * <P>
290 * The <code>run()</code> method must be overridden in a subclass.
291 *
292 * @param first First loop index.
293 * @param last Last loop index.
294 * @param stride Loop index stride, always positive.
295 * @exception Exception The <code>run()</code> method may throw any exception.
296 * @throws java.lang.Exception if any.
297 */
298 public abstract void run(int first,
299 int last,
300 int stride)
301 throws Exception;
302
303 /**
304 * Send additional output data associated with a task. Called by a worker
305 * thread. The task is denoted by the given chunk of loop iterations. The
306 * output data must be sent using the given communicator, to the given
307 * master process rank, with the given message tag.
308 * <P>
309 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
310 * not overridden, the <code>sendTaskOutput()</code> method does nothing.
311 *
312 * @param range Chunk of loop iterations.
313 * @param comm Communicator.
314 * @param mRank Master process rank.
315 * @param tag Message tag.
316 * @exception IOException Thrown if an I/O error occurred.
317 * @throws java.io.IOException if any.
318 */
319 public void sendTaskOutput(Range range,
320 Comm comm,
321 int mRank,
322 int tag)
323 throws IOException {
324 }
325
326 /**
327 * Receive additional output data associated with a task. Called by the
328 * master thread. The task is denoted by the given chunk of loop iterations.
329 * The output data must be received using the given communicator, from the
330 * given worker process rank, with the given message tag.
331 * <P>
332 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
333 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
334 *
335 * @param range Chunk of loop iterations.
336 * @param comm Communicator.
337 * @param wRank Worker process rank.
338 * @param tag Message tag.
339 * @exception IOException Thrown if an I/O error occurred.
340 * @throws java.io.IOException if any.
341 */
342 public void receiveTaskOutput(Range range,
343 Comm comm,
344 int wRank,
345 int tag)
346 throws IOException {
347 }
348
349 /**
350 * Perform per-thread finalization actions after finishing the loop
351 * iterations. Called by a worker thread.
352 * <P>
353 * The <code>finish()</code> method may be overridden in a subclass. If not
354 * overridden, the <code>finish()</code> method does nothing.
355 *
356 * @exception Exception The <code>finish()</code> method may throw any
357 * exception.
358 * @throws java.lang.Exception if any.
359 */
360 public void finish()
361 throws Exception {
362 }
363
364 /**
365 * Returns the tag offset for this worker for loop. Each message between the
366 * master and worker threads is sent with a message tag equal to
367 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
368 * tag offset.
369 * <P>
370 * The <code>tagOffset()</code> method may be overridden in a subclass. If not
371 * overridden, the <code>tagOffset()</code> returns a default tag offset of
372 * <code>Integer.MIN_VALUE</code>.
373 *
374 * @return Tag offset.
375 */
376 public int tagOffset() {
377 return Integer.MIN_VALUE;
378 }
379
380 // Hidden operations.
381 /**
382 * Execute this worker for loop in the master thread.
383 *
384 * @param range Loop index range.
385 *
386 * @exception IOException Thrown if an I/O error occurred.
387 */
388 void masterExecute(Range range)
389 throws IOException {
390 IntegerSchedule sch = schedule();
391 if (sch.isFixedSchedule()) {
392 masterExecuteFixed(range, sch);
393 } else {
394 masterExecuteNonFixed(range, sch);
395 }
396 }
397
398 /**
399 * Execute this worker for loop in the master thread with a fixed schedule.
400 *
401 * @param range Loop index range.
402 * @param sch Schedule.
403 *
404 * @exception IOException Thrown if an I/O error occurred.
405 */
406 void masterExecuteFixed(Range range,
407 IntegerSchedule sch)
408 throws IOException {
409 int count = myTeam.count;
410 Comm comm = myTeam.comm;
411
412 // Send additional task input to each worker.
413 sch.start(count, range);
414 for (int w = 0; w < count; ++w) {
415 Range chunk = sch.next(w);
416 if (chunk != null) {
417 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
418 }
419 }
420
421 // Receive additional task output from each worker.
422 sch.start(count, range);
423 for (int w = 0; w < count; ++w) {
424 Range chunk = sch.next(w);
425 if (chunk != null) {
426 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
427 }
428 }
429 }
430
431 /**
432 * Execute this worker for loop in the master thread with a non-fixed
433 * schedule.
434 *
435 * @param range Loop index range.
436 * @param sch Schedule.
437 *
438 * @exception IOException Thrown if an I/O error occurred.
439 */
440 void masterExecuteNonFixed(Range range,
441 IntegerSchedule sch)
442 throws IOException {
443 int count = myTeam.count;
444 sch.start(count, range);
445 int remaining = count;
446 ObjectItemBuf<Range> buf = ObjectBuf.buffer();
447 Range tagRange = new Range(tagFor(0), tagFor(count - 1));
448 Comm comm = myTeam.comm;
449
450 // Send initial task to each worker.
451 for (int w = 0; w < count; ++w) {
452 Range chunk = sch.next(w);
453 buf.item = chunk;
454 buf.reset();
455 int r = myTeam.workerRank(w);
456 int tag = tagFor(w);
457 comm.send(r, tag, buf);
458 if (chunk == null) {
459 --remaining;
460 } else {
461 sendTaskInput(chunk, comm, r, tag);
462 }
463 }
464
465 // Repeatedly receive a response from a worker and send next task to
466 // that worker.
467 while (remaining > 0) {
468 CommStatus status = comm.receive(null, tagRange, buf);
469 Range chunk = buf.item;
470 int r = status.fromRank;
471 int tag = status.tag;
472 int w = workerFor(tag);
473 receiveTaskOutput(chunk, comm, r, tag);
474 chunk = sch.next(w);
475 buf.item = chunk;
476 buf.reset();
477 comm.send(r, tag, buf);
478 if (chunk == null) {
479 --remaining;
480 } else {
481 sendTaskInput(chunk, comm, r, tag);
482 }
483 }
484 }
485
486 /**
487 * Execute this worker for loop in a worker thread.
488 *
489 * @param tag Worker tag.
490 * @param range Loop index range.
491 *
492 * @exception Exception This method may throw any exception.
493 */
494 void workerExecute(int tag,
495 Range range)
496 throws Exception {
497 IntegerSchedule sch = schedule();
498 if (sch.isFixedSchedule()) {
499 sch.start(myTeam.count, range);
500 workerExecuteFixed(sch.next(tag), tag);
501 } else {
502 workerExecuteNonFixed(tag);
503 }
504 }
505
506 /**
507 * Execute this worker for loop in a worker thread using a fixed schedule.
508 *
509 * @param range Chunk of loop iterations.
510 * @param w Worker index.
511 *
512 * @exception Exception This method may throw any exception.
513 */
514 void workerExecuteFixed(Range range,
515 int w)
516 throws Exception {
517 start();
518 if (range != null) {
519 Comm comm = myTeam.comm;
520 int r = myTeam.masterRank();
521 int tag = tagFor(w);
522 receiveTaskInput(range, comm, r, tag);
523 run(range.lb(), range.ub(), range.stride());
524 sendTaskOutput(range, comm, r, tag);
525 }
526 finish();
527 }
528
529 /**
530 * Execute this worker for loop in a worker thread using a non-fixed
531 * schedule.
532 *
533 * @param w Worker index.
534 *
535 * @exception Exception This method may throw any exception.
536 */
537 void workerExecuteNonFixed(int w)
538 throws Exception {
539 Comm comm = myTeam.comm;
540 int r = myTeam.masterRank();
541 int tag = tagFor(w);
542 start();
543 ObjectItemBuf<Range> buf = ObjectBuf.buffer();
544 for (;;) {
545 comm.receive(r, tag, buf);
546 Range range = buf.item;
547 if (range == null) {
548 break;
549 }
550 receiveTaskInput(range, comm, r, tag);
551 run(range.lb(), range.ub(), range.stride());
552
553 // The next two statements constitute a critical section; other
554 // workers in this team must not send messages in between these two
555 // messages, or the master can deadlock.
556 synchronized (myTeam) {
557 comm.send(r, tag, buf);
558 sendTaskOutput(range, comm, r, tag);
559 }
560 }
561 finish();
562 }
563
564 /**
565 * Returns the message tag for the given worker index.
566 *
567 * @param w Worker index.
568 *
569 * @return Message tag.
570 */
571 private int tagFor(int w) {
572 return w + tagOffset();
573 }
574
575 /**
576 * Returns the worker index for the given message tag.
577 *
578 * @param tag Message tag.
579 *
580 * @return Worker index.
581 */
582 private int workerFor(int tag) {
583 return tag - tagOffset();
584 }
585
586 }