1 //******************************************************************************
2 //
3 // File: WorkerLongStrideForLoop.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerLongStrideForLoop
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.io.IOException;
43
44 import edu.rit.mp.ObjectBuf;
45 import edu.rit.mp.buf.ObjectItemBuf;
46 import edu.rit.util.LongRange;
47 import edu.rit.util.Range;
48
49 /**
50 * Class WorkerLongStrideForLoop is the abstract base class for one variation of
51 * a worker for loop that is executed inside a {@linkplain WorkerRegion}. The
52 * loop index data type is <code>long</code>. The loop stride is explicitly
53 * specified.
54 * <P>
55 * To execute a worker for loop, create a {@linkplain WorkerRegion} object;
56 * create an instance of a concrete subclass of class WorkerLongStrideForLoop;
57 * and pass this instance to the worker region's <code>execute()</code> method.
58 * Either every worker team thread must call the worker region's
59 * <code>execute()</code> method with identical arguments, or every thread must not
60 * call the <code>execute()</code> method. You can do all this using an anonymous
61 * inner class; for example:
62 * <PRE>
63 * new WorkerRegion()
64 * {
65 * . . .
66 * public void run()
67 * {
68 * . . .
69 * execute (0L, 98L, 2L, new WorkerLongStrideForLoop()
70 * {
71 * // Thread local variable declarations
72 * . . .
73 * public void start()
74 * {
75 * // Per-thread pre-loop initialization code
76 * . . .
77 * }
78 * public void run (long first, long last, long stride)
79 * {
80 * // Loop code
81 * . . .
82 * }
83 * public void finish()
84 * {
85 * // Per-thread post-loop finalization code
86 * . . .
87 * }
88 * });
89 * }
90 * . . .
91 * }
92 * </PRE>
93 * <P>
94 * In each process of a cluster parallel program, the worker team has one or
95 * more worker threads. Every worker thread in every process has a unique worker
96 * tag, going from tag 0 for the first worker thread in the first process to tag
97 * <I>K</I>−1 for the last worker thread in the last process, where
98 * <I>K</I> is the total number of worker threads in all the processes. In
99 * addition, in one process there is a master thread. The worker and master
100 * threads all call the worker region's <code>execute()</code> method to execute the
101 * worker for loop. However, the worker and master threads differ in their
102 * actions.
103 * <P>
104 * The master thread does the following. The master obtains the worker for
105 * loop's schedule as returned by the <code>schedule()</code> method. The range of
106 * loop indexes is divided into "chunks" and the chunks are apportioned among
107 * the workers in accordance with the schedule. The master repeatedly sends
108 * "tasks" to the workers and receives "responses" from the workers. To send a
109 * task to a particular worker, the master (1) sends a message containing the
110 * chunk index range to the worker's process; and (2) calls the worker for
111 * loop's <code>sendTaskInput()</code> method. This method's default implementation
112 * does nothing, but it can be overridden to send additional task input data to
113 * the worker. To receive a response from a particular worker, the master (1)
114 * receives a message containing the chunk index range from the worker's
115 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code>
116 * method. This method's default implementation does nothing, but it can be
117 * overridden to receive additional task output data from the worker. Once all
118 * tasks have been sent to the workers and all responses have been received from
119 * the workers, the master returns from the worker region's <code>execute()</code>
120 * method.
121 * <P>
122 * Each worker thread does the following. The worker calls the worker for loop's
123 * <code>start()</code> method once before beginning any loop iterations. The worker
124 * repeatedly receives tasks from the master and sends responses to the master.
125 * To receive a task from the master, the worker (1) receives a message
126 * containing the chunk index range from the master's process; and (2) calls the
127 * worker for loop's <code>receiveTaskInput()</code> method. This method's default
128 * implementation does nothing, but it can be overridden to receive additional
129 * task input data from the master. The worker now calls the worker for loop's
130 * <code>run()</code> method, passing in the chunk index range lower and upper
131 * bounds. When the <code>run()</code> method returns, the worker sends the response
132 * to the master. To send the response, the worker (1) sends a message
133 * containing the chunk index range to the master's process; and (2) calls the
134 * worker for loop's <code>sendTaskOutput()</code> method. This method's default
135 * implementation does nothing, but it can be overridden to send additional task
136 * output data to the master. Once all tasks have been received from the master
137 * and all responses have been sent to the master, the worker calls the worker
138 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s
139 * threads, the workers do <I>not</I> synchronize with each other at a barrier
140 * at this point.) The worker then returns from the worker region's
141 * <code>execute()</code> method.
142 * <P>
143 * If the worker for loop has a fixed schedule (in which there is exactly one
144 * chunk with a predetermined index range for each worker), then the messages
145 * containing the chunk index range are omitted, and each worker gets its chunk
146 * index range directly from the fixed schedule. However, the task input data
147 * (if any) and task output data (if any) are still sent and received.
148 * <P>
149 * Each message described above is sent with a message tag equal to
150 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
151 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but
152 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the
153 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where
154 * <I>K</I> is the total number of workers in all the processes. The program
155 * should not use message tags in this range except to send and receive the
156 * messages described above.
157 * <P>
158 * Note that each worker team thread actually creates its own instance of the
159 * worker for loop class and passes that instance to the worker region's
160 * <code>execute()</code> method. Thus, any fields declared in the worker for loop
161 * class will <I>not</I> be shared by all the workers, but instead will be
162 * private to each worker.
163 * <P>
164 * The <code>start()</code> method is intended for performing per-thread
165 * initialization before starting the loop iterations. If no such initialization
166 * is needed, omit the <code>start()</code> method.
167 * <P>
168 * The <code>run()</code> method contains the code for the loop. The first and last
169 * indexes for a chunk of loop iterations are passed in as arguments. The loop
170 * stride, which is always positive, is also explicitly specified as an
171 * argument. The worker for loop's <code>run()</code> method must be coded this way:
172 * <PRE>
173 * public void run (long first, long last, long stride)
174 * {
175 * for (long i = first; i <= last; i += stride)
176 * {
177 * // Loop body code
178 * . . .
179 * }
180 * }
181 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code>
182 * inclusive and increasing by <code>stride</code> on each iteration.
183 * <P>
184 * The <code>finish()</code> method is intended for performing per-thread
185 * finalization after finishing the loop iterations. If no such finalization is
186 * needed, omit the <code>finish()</code> method.
187 * <P>
188 * If the worker for loop's <code>start()</code>, <code>run()</code>, or
189 * <code>finish()</code> method throws an exception in one of the worker threads,
190 * then that worker thread executes no further code in the loop, and the worker
191 * region's <code>execute()</code> method throws that same exception in that thread.
192 * However, the other worker threads in the worker team continue to execute.
193 *
194 * @author Alan Kaminsky
195 * @version 27-Jan-2010
196 */
197 public abstract class WorkerLongStrideForLoop
198 extends WorkerForLoop {
199
200 // Exported constructors.
201 /**
202 * Construct a new worker for loop.
203 */
204 public WorkerLongStrideForLoop() {
205 super();
206 }
207
208 // Exported operations.
209 /**
210 * Determine this worker for loop's schedule. Called by the master and
211 * worker threads. The schedule determines how the loop iterations are
212 * apportioned among the worker team threads. For further information, see
213 * class {@linkplain LongSchedule}.
214 * <P>
215 * The <code>schedule()</code> method may be overridden in a subclass to return
216 * the desired schedule. If not overridden, the default is a runtime
217 * schedule (see {@link edu.rit.pj.LongSchedule#runtime()}).
218 *
219 * @return Schedule for this worker for loop.
220 */
221 public LongSchedule schedule() {
222 return LongSchedule.runtime();
223 }
224
225 /**
226 * Perform per-thread initialization actions before starting the loop
227 * iterations. Called by a worker thread.
228 * <P>
229 * The <code>start()</code> method may be overridden in a subclass. If not
230 * overridden, the <code>start()</code> method does nothing.
231 *
232 * @exception Exception The <code>start()</code> method may throw any exception.
233 * @throws java.lang.Exception if any.
234 */
235 public void start()
236 throws Exception {
237 }
238
239 /**
240 * Send additional input data associated with a task. Called by the master
241 * thread. The task is denoted by the given chunk of loop iterations. The
242 * input data must be sent using the given communicator, to the given worker
243 * process rank, with the given message tag.
244 * <P>
245 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If
246 * not overridden, the <code>sendTaskInput()</code> method does nothing.
247 *
248 * @param range Chunk of loop iterations.
249 * @param comm Communicator.
250 * @param wRank Worker process rank.
251 * @param tag Message tag.
252 * @exception IOException Thrown if an I/O error occurred.
253 * @throws java.io.IOException if any.
254 */
255 public void sendTaskInput(LongRange range,
256 Comm comm,
257 int wRank,
258 int tag)
259 throws IOException {
260 }
261
262 /**
263 * Receive additional input data associated with a task. Called by a worker
264 * thread. The task is denoted by the given chunk of loop iterations. The
265 * input data must be received using the given communicator, from the given
266 * master process rank, with the given message tag.
267 * <P>
268 * The <code>receiveTaskInput()</code> method may be overridden in a subclass.
269 * If not overridden, the <code>receiveTaskInput()</code> method does nothing.
270 *
271 * @param range Chunk of loop iterations.
272 * @param comm Communicator.
273 * @param mRank Master process rank.
274 * @param tag Message tag.
275 * @exception IOException Thrown if an I/O error occurred.
276 * @throws java.io.IOException if any.
277 */
278 public void receiveTaskInput(LongRange range,
279 Comm comm,
280 int mRank,
281 int tag)
282 throws IOException {
283 }
284
285 /**
286 * Execute one chunk of iterations of this worker for loop. Called by a
287 * worker thread. The <code>run()</code> method must perform the loop body for
288 * indexes <code>first</code> through <code>last</code> inclusive, increasing the
289 * loop index by <code>stride</code> after each iteration.
290 * <P>
291 * The <code>run()</code> method must be overridden in a subclass.
292 *
293 * @param first First loop index.
294 * @param last Last loop index.
295 * @param stride Loop index stride, always positive.
296 * @exception Exception The <code>run()</code> method may throw any exception.
297 * @throws java.lang.Exception if any.
298 */
299 public abstract void run(long first,
300 long last,
301 long stride)
302 throws Exception;
303
304 /**
305 * Send additional output data associated with a task. Called by a worker
306 * thread. The task is denoted by the given chunk of loop iterations. The
307 * output data must be sent using the given communicator, to the given
308 * master process rank, with the given message tag.
309 * <P>
310 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If
311 * not overridden, the <code>sendTaskOutput()</code> method does nothing.
312 *
313 * @param range Chunk of loop iterations.
314 * @param comm Communicator.
315 * @param mRank Master process rank.
316 * @param tag Message tag.
317 * @exception IOException Thrown if an I/O error occurred.
318 * @throws java.io.IOException if any.
319 */
320 public void sendTaskOutput(LongRange range,
321 Comm comm,
322 int mRank,
323 int tag)
324 throws IOException {
325 }
326
327 /**
328 * Receive additional output data associated with a task. Called by the
329 * master thread. The task is denoted by the given chunk of loop iterations.
330 * The output data must be received using the given communicator, from the
331 * given worker process rank, with the given message tag.
332 * <P>
333 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass.
334 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing.
335 *
336 * @param range Chunk of loop iterations.
337 * @param comm Communicator.
338 * @param wRank Worker process rank.
339 * @param tag Message tag.
340 * @exception IOException Thrown if an I/O error occurred.
341 * @throws java.io.IOException if any.
342 */
343 public void receiveTaskOutput(LongRange range,
344 Comm comm,
345 int wRank,
346 int tag)
347 throws IOException {
348 }
349
350 /**
351 * Perform per-thread finalization actions after finishing the loop
352 * iterations. Called by a worker thread.
353 * <P>
354 * The <code>finish()</code> method may be overridden in a subclass. If not
355 * overridden, the <code>finish()</code> method does nothing.
356 *
357 * @exception Exception The <code>finish()</code> method may throw any
358 * exception.
359 * @throws java.lang.Exception if any.
360 */
361 public void finish()
362 throws Exception {
363 }
364
365 /**
366 * Returns the tag offset for this worker for loop. Each message between the
367 * master and worker threads is sent with a message tag equal to
368 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the
369 * tag offset.
370 * <P>
371 * The <code>tagOffset()</code> method may be overridden in a subclass. If not
372 * overridden, the <code>tagOffset()</code> returns a default tag offset of
373 * <code>Integer.MIN_VALUE</code>.
374 *
375 * @return Tag offset.
376 */
377 public int tagOffset() {
378 return Integer.MIN_VALUE;
379 }
380
381 // Hidden operations.
382 /**
383 * Execute this worker for loop in the master thread.
384 *
385 * @param range Loop index range.
386 *
387 * @exception IOException Thrown if an I/O error occurred.
388 */
389 void masterExecute(LongRange range)
390 throws IOException {
391 LongSchedule sch = schedule();
392 if (sch.isFixedSchedule()) {
393 masterExecuteFixed(range, sch);
394 } else {
395 masterExecuteNonFixed(range, sch);
396 }
397 }
398
399 /**
400 * Execute this worker for loop in the master thread with a fixed schedule.
401 *
402 * @param range Loop index range.
403 * @param sch Schedule.
404 *
405 * @exception IOException Thrown if an I/O error occurred.
406 */
407 void masterExecuteFixed(LongRange range,
408 LongSchedule sch)
409 throws IOException {
410 int count = myTeam.count;
411 Comm comm = myTeam.comm;
412
413 // Send additional task input to each worker.
414 sch.start(count, range);
415 for (int w = 0; w < count; ++w) {
416 LongRange chunk = sch.next(w);
417 if (chunk != null) {
418 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w));
419 }
420 }
421
422 // Receive additional task output from each worker.
423 sch.start(count, range);
424 for (int w = 0; w < count; ++w) {
425 LongRange chunk = sch.next(w);
426 if (chunk != null) {
427 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w));
428 }
429 }
430 }
431
432 /**
433 * Execute this worker for loop in the master thread with a non-fixed
434 * schedule.
435 *
436 * @param range Loop index range.
437 * @param sch Schedule.
438 *
439 * @exception IOException Thrown if an I/O error occurred.
440 */
441 void masterExecuteNonFixed(LongRange range,
442 LongSchedule sch)
443 throws IOException {
444 int count = myTeam.count;
445 sch.start(count, range);
446 int remaining = count;
447 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
448 Range tagRange = new Range(tagFor(0), tagFor(count - 1));
449 Comm comm = myTeam.comm;
450
451 // Send initial task to each worker.
452 for (int w = 0; w < count; ++w) {
453 LongRange chunk = sch.next(w);
454 buf.item = chunk;
455 buf.reset();
456 int r = myTeam.workerRank(w);
457 int tag = tagFor(w);
458 comm.send(r, tag, buf);
459 if (chunk == null) {
460 --remaining;
461 } else {
462 sendTaskInput(chunk, comm, r, tag);
463 }
464 }
465
466 // Repeatedly receive a response from a worker and send next task to
467 // that worker.
468 while (remaining > 0) {
469 CommStatus status = comm.receive(null, tagRange, buf);
470 LongRange chunk = buf.item;
471 int r = status.fromRank;
472 int tag = status.tag;
473 int w = workerFor(tag);
474 receiveTaskOutput(chunk, comm, r, tag);
475 chunk = sch.next(w);
476 buf.item = chunk;
477 buf.reset();
478 comm.send(r, tag, buf);
479 if (chunk == null) {
480 --remaining;
481 } else {
482 sendTaskInput(chunk, comm, r, tag);
483 }
484 }
485 }
486
487 /**
488 * Execute this worker for loop in a worker thread.
489 *
490 * @param w Worker index.
491 * @param range Loop index range.
492 *
493 * @exception Exception This method may throw any exception.
494 */
495 void workerExecute(int w,
496 LongRange range)
497 throws Exception {
498 LongSchedule sch = schedule();
499 if (sch.isFixedSchedule()) {
500 sch.start(myTeam.count, range);
501 workerExecuteFixed(sch.next(w), w);
502 } else {
503 workerExecuteNonFixed(w);
504 }
505 }
506
507 /**
508 * Execute this worker for loop in a worker thread using a fixed schedule.
509 *
510 * @param range Chunk of loop iterations.
511 * @param w Worker index.
512 *
513 * @exception Exception This method may throw any exception.
514 */
515 void workerExecuteFixed(LongRange range,
516 int w)
517 throws Exception {
518 start();
519 if (range != null) {
520 Comm comm = myTeam.comm;
521 int r = myTeam.masterRank();
522 int tag = tagFor(w);
523 receiveTaskInput(range, comm, r, tag);
524 run(range.lb(), range.ub(), range.stride());
525 sendTaskOutput(range, comm, r, tag);
526 }
527 finish();
528 }
529
530 /**
531 * Execute this worker for loop in a worker thread using a non-fixed
532 * schedule.
533 *
534 * @param w Worker index.
535 *
536 * @exception Exception This method may throw any exception.
537 */
538 void workerExecuteNonFixed(int w)
539 throws Exception {
540 Comm comm = myTeam.comm;
541 int r = myTeam.masterRank();
542 int tag = tagFor(w);
543 start();
544 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer();
545 for (;;) {
546 comm.receive(r, tag, buf);
547 LongRange range = buf.item;
548 if (range == null) {
549 break;
550 }
551 receiveTaskInput(range, comm, r, tag);
552 run(range.lb(), range.ub(), range.stride());
553
554 // The next two statements constitute a critical section; other
555 // workers in this team must not send messages in between these two
556 // messages, or the master can deadlock.
557 synchronized (myTeam) {
558 comm.send(r, tag, buf);
559 sendTaskOutput(range, comm, r, tag);
560 }
561 }
562 finish();
563 }
564
565 /**
566 * Returns the message tag for the given worker index.
567 *
568 * @param w Worker index.
569 *
570 * @return Message tag.
571 */
572 private int tagFor(int w) {
573 return w + tagOffset();
574 }
575
576 /**
577 * Returns the worker index for the given message tag.
578 *
579 * @param tag Message tag.
580 *
581 * @return Worker index.
582 */
583 private int workerFor(int tag) {
584 return tag - tagOffset();
585 }
586
587 }