1 //******************************************************************************
2 //
3 // File: WorkerRegion.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerRegion
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.util.Iterator;
43
44 import edu.rit.util.LongRange;
45 import edu.rit.util.Range;
46
47 /**
48 * Class WorkerRegion is the abstract base class for a worker region that is
49 * executed by a {@linkplain WorkerTeam} of threads distributed across the
50 * processes of a cluster parallel program.
51 * <P>
52 * To execute a worker region, create a {@linkplain WorkerTeam} object; create
53 * an instance of a concrete subclass of class WorkerRegion; and pass this
54 * instance to the worker team's <code>execute()</code> method. You can do all this
55 * using an anonymous inner class; for example:
56 * <PRE>
57 * new WorkerTeam().execute (new WorkerRegion()
58 * {
59 * public void start()
60 * {
61 * // Initialization code
62 * . . .
63 * }
64 * public void run()
65 * {
66 * // Parallel code
67 * . . .
68 * }
69 * public void finish()
70 * {
71 * // Finalization code
72 * . . .
73 * }
74 * });
75 * </PRE>
76 * <P>
77 * The worker team's <code>execute()</code> method does the following. In each
78 * process, the worker team has a certain number of <B>worker threads</B>
79 * <I>K</I>, where <I>K</I> was specified when the worker team was constructed.
80 * In the highest-ranked process of the communicator, there is a <B>master
81 * thread</B> in addition to the worker threads. The <B>main thread</B> is the
82 * thread calling the worker team's <code>execute()</code> method. The main thread
83 * calls the worker region's <code>start()</code> method. When the <code>start()</code>
84 * method returns, all the worker threads, plus the master thread if any, call
85 * the worker region's <code>run()</code> method concurrently. When all the team
86 * threads have returned from the <code>run()</code> method, the main thread calls
87 * the worker region's <code>finish()</code> method. When the <code>finish()</code>
88 * method returns, the main thread returns from the worker team's
89 * <code>execute()</code> method.
90 * <P>
91 * The chief purpose of a worker team is to execute a work-sharing parallel loop
92 * in a cluster parallel program, partitioning the loop iterations among the
93 * worker threads in all the processes. The worker team uses the
94 * <B>master-worker pattern</B> to partition the iterations. The master thread
95 * partitions the loop iterations and sends tasks to the worker threads; the
96 * worker threads send results back to the master thread. The worker team uses a
97 * certain <B>communicator</B> to do this message passing. The communicator was
98 * specified when the worker team was constructed. For further information, see
99 * class {@linkplain WorkerIntegerForLoop} and {@linkplain WorkerLongForLoop}.
100 * <P>
101 * Within each process, variables to be shared by all threads in the team may be
102 * declared as fields of the WorkerRegion subclass. (Variables cannot be shared
103 * between processes.) The <code>start()</code> method is intended for performing
104 * initialization in a single thread before parallel execution begins. If no
105 * such initialization is needed, omit the <code>start()</code> method. The
106 * <code>run()</code> method contains code to be executed in parallel by all threads
107 * in the team. Variables that are private to each thread may be declared inside
108 * the <code>run()</code> method. The <code>finish()</code> method is intended for
109 * performing finalization in a single thread after parallel execution ends. If
110 * no such finalization is needed, omit the <code>finish()</code> method.
111 * <P>
112 * If the worker region's <code>start()</code> method throws an exception, the
113 * worker team's <code>execute()</code> method throws that same exception, and the
114 * <code>run()</code> method is not called.
115 * <P>
116 * If the worker region's <code>run()</code> method throws an exception in one of
117 * the team threads, the exception's stack trace is printed on the standard
118 * error, the worker team waits until all the other team threads have returned
119 * from the <code>run()</code> method, then the worker team's <code>execute()</code>
120 * method throws that same exception, and the worker region's <code>finish()</code>
121 * method is not called. If the worker region's <code>run()</code> method throws an
122 * exception in more than one of the team threads, each exception's stack trace
123 * is printed on the standard error, the worker team waits until all the other
124 * team threads have returned from the <code>run()</code> method, then the worker
125 * team's <code>execute()</code> method throws a {@linkplain
126 * MultipleParallelException} wrapping all the thrown exceptions, and the worker
127 * region's <code>finish()</code> method is not called.
128 * <P>
129 * If the worker region's <code>finish()</code> method throws an exception, the
130 * worker team's <code>execute()</code> method throws that same exception.
131 *
132 * @author Alan Kaminsky
133 * @version 07-Oct-2010
134 */
135 public abstract class WorkerRegion
136 extends WorkerConstruct {
137
138 // Exported constructors.
139 /**
140 * Construct a new worker region.
141 */
142 public WorkerRegion() {
143 super();
144 }
145
146 // Exported operations.
147 /**
148 * Perform initialization actions before parallel execution begins. Only one
149 * thread in each process calls the <code>start()</code> method.
150 * <P>
151 * The <code>start()</code> method may be overridden in a subclass. If not
152 * overridden, the <code>start()</code> method does nothing.
153 *
154 * @exception Exception The <code>start()</code> method may throw any exception.
155 * @throws java.lang.Exception if any.
156 */
157 public void start()
158 throws Exception {
159 }
160
161 /**
162 * Execute parallel code. All threads of the worker team in each process
163 * call the <code>run()</code> method concurrently.
164 * <P>
165 * The <code>run()</code> method must be implemented in a subclass.
166 *
167 * @exception Exception The <code>run()</code> method may throw any exception.
168 * @throws java.lang.Exception if any.
169 */
170 public abstract void run()
171 throws Exception;
172
173 /**
174 * Perform finalization actions after parallel execution ends. Only one
175 * thread in each process calls the <code>finish()</code> method.
176 * <P>
177 * The <code>finish()</code> method may be overridden in a subclass. If not
178 * overridden, the <code>finish()</code> method does nothing.
179 *
180 * @exception Exception The <code>finish()</code> method may throw any
181 * exception.
182 * @throws java.lang.Exception if any.
183 */
184 public void finish()
185 throws Exception {
186 }
187
188 /**
189 * Execute a worker for loop within this worker region. For further
190 * information, see class {@linkplain WorkerIntegerForLoop}. The loop index
191 * goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) in
192 * steps of +1. If <code>first</code> is greater than <code>last</code>, then no
193 * loop iterations are performed.
194 * <P>
195 * <I>Note:</I> Either all threads in the worker team must call the
196 * <code>execute()</code> method with identical arguments, or none of the
197 * threads must call the <code>execute()</code> method.
198 *
199 * @param first First loop index.
200 * @param last Last loop index.
201 * @param theLoop Worker for loop.
202 * @exception NullPointerException (unchecked exception) Thrown if
203 * <code>theLoop</code> is null.
204 * @exception IllegalStateException (unchecked exception) Thrown if no
205 * worker team is executing this worker region.
206 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
207 * an exception.
208 * @throws java.lang.Exception if any.
209 */
210 public final void execute(int first,
211 int last,
212 WorkerIntegerForLoop theLoop)
213 throws Exception {
214 // Verify preconditions.
215 if (theLoop == null) {
216 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
217 }
218 if (myTeam == null) {
219 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
220 }
221
222 try {
223 // Record parallel team.
224 theLoop.myTeam = this.myTeam;
225
226 // Get current parallel team thread.
227 WorkerTeamThread currentThread = getCurrentThread();
228 int w = currentThread.myIndex;
229
230 // Do master or worker thread processing.
231 Range range = new Range(first, last);
232 if (w == -1) {
233 theLoop.masterExecute(range);
234 } else {
235 theLoop.workerExecute(w, range);
236 }
237 } finally {
238 // Forget parallel team.
239 theLoop.myTeam = null;
240 }
241 }
242
243 /**
244 * Execute a worker for loop within this worker region. For further
245 * information, see class {@linkplain WorkerIntegerStrideForLoop}. The loop
246 * index goes from <code>first</code> (inclusive) to <code>last</code> (inclusive)
247 * in steps of <code>stride</code>. The stride must be positive. If
248 * <code>first</code> is greater than <code>last</code>, then no loop iterations are
249 * performed.
250 * <P>
251 * <I>Note:</I> Either all threads in the worker team must call the
252 * <code>execute()</code> method with identical arguments, or none of the
253 * threads must call the <code>execute()</code> method.
254 *
255 * @param first First loop index.
256 * @param last Last loop index.
257 * @param stride Loop index stride, >= 1.
258 * @param theLoop Worker for loop.
259 * @exception IllegalArgumentException (unchecked exception) Thrown if
260 * <code>stride</code> < 1.
261 * @exception NullPointerException (unchecked exception) Thrown if
262 * <code>theLoop</code> is null.
263 * @exception IllegalStateException (unchecked exception) Thrown if no
264 * worker team is executing this worker region.
265 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
266 * an exception.
267 * @throws java.lang.Exception if any.
268 */
269 public final void execute(int first,
270 int last,
271 int stride,
272 WorkerIntegerStrideForLoop theLoop)
273 throws Exception {
274 // Verify preconditions.
275 if (stride <= 0) {
276 throw new IllegalArgumentException("WorkerRegion.execute(): Stride = " + stride + " illegal");
277 }
278 if (theLoop == null) {
279 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
280 }
281 if (myTeam == null) {
282 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
283 }
284
285 try {
286 // Record parallel team.
287 theLoop.myTeam = this.myTeam;
288
289 // Get current parallel team thread.
290 WorkerTeamThread currentThread = getCurrentThread();
291 int w = currentThread.myIndex;
292
293 // Do master or worker thread processing.
294 Range range = new Range(first, last, stride);
295 if (w == -1) {
296 theLoop.masterExecute(range);
297 } else {
298 theLoop.workerExecute(w, range);
299 }
300 } finally {
301 // Forget parallel team.
302 theLoop.myTeam = null;
303 }
304 }
305
306 /**
307 * Execute a worker for loop within this worker region. For further
308 * information, see class {@linkplain WorkerLongForLoop}. The loop index
309 * goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) in
310 * steps of +1. If <code>first</code> is greater than <code>last</code>, then no
311 * loop iterations are performed.
312 * <P>
313 * <I>Note:</I> Either all threads in the worker team must call the
314 * <code>execute()</code> method with identical arguments, or none of the
315 * threads must call the <code>execute()</code> method.
316 *
317 * @param first First loop index.
318 * @param last Last loop index.
319 * @param theLoop Worker for loop.
320 * @exception NullPointerException (unchecked exception) Thrown if
321 * <code>theLoop</code> is null.
322 * @exception IllegalStateException (unchecked exception) Thrown if no
323 * worker team is executing this worker region.
324 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
325 * an exception.
326 * @throws java.lang.Exception if any.
327 */
328 public final void execute(long first,
329 long last,
330 WorkerLongForLoop theLoop)
331 throws Exception {
332 // Verify preconditions.
333 if (theLoop == null) {
334 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
335 }
336 if (myTeam == null) {
337 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
338 }
339
340 try {
341 // Record parallel team.
342 theLoop.myTeam = this.myTeam;
343
344 // Get current parallel team thread.
345 WorkerTeamThread currentThread = getCurrentThread();
346 int w = currentThread.myIndex;
347
348 // Do master or worker thread processing.
349 LongRange range = new LongRange(first, last);
350 if (w == -1) {
351 theLoop.masterExecute(range);
352 } else {
353 theLoop.workerExecute(w, range);
354 }
355 } finally {
356 // Forget parallel team.
357 theLoop.myTeam = null;
358 }
359 }
360
361 /**
362 * Execute a worker for loop within this worker region. For further
363 * information, see class {@linkplain WorkerLongStrideForLoop}. The loop
364 * index goes from <code>first</code> (inclusive) to <code>last</code> (inclusive)
365 * in steps of <code>stride</code>. The stride must be positive. If
366 * <code>first</code> is greater than <code>last</code>, then no loop iterations are
367 * performed.
368 * <P>
369 * <I>Note:</I> Either all threads in the worker team must call the
370 * <code>execute()</code> method with identical arguments, or none of the
371 * threads must call the <code>execute()</code> method.
372 *
373 * @param first First loop index.
374 * @param last Last loop index.
375 * @param stride Loop index stride, >= 1.
376 * @param theLoop Worker for loop.
377 * @exception IllegalArgumentException (unchecked exception) Thrown if
378 * <code>stride</code> < 1.
379 * @exception NullPointerException (unchecked exception) Thrown if
380 * <code>theLoop</code> is null.
381 * @exception IllegalStateException (unchecked exception) Thrown if no
382 * worker team is executing this worker region.
383 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
384 * an exception.
385 * @throws java.lang.Exception if any.
386 */
387 public final void execute(long first,
388 long last,
389 long stride,
390 WorkerLongStrideForLoop theLoop)
391 throws Exception {
392 // Verify preconditions.
393 if (stride <= 0) {
394 throw new IllegalArgumentException("WorkerRegion.execute(): Stride = " + stride + " illegal");
395 }
396 if (theLoop == null) {
397 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
398 }
399 if (myTeam == null) {
400 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
401 }
402
403 try {
404 // Record parallel team.
405 theLoop.myTeam = this.myTeam;
406
407 // Get current parallel team thread.
408 WorkerTeamThread currentThread = getCurrentThread();
409 int w = currentThread.myIndex;
410
411 // Do master or worker thread processing.
412 LongRange range = new LongRange(first, last, stride);
413 if (w == -1) {
414 theLoop.masterExecute(range);
415 } else {
416 theLoop.workerExecute(w, range);
417 }
418 } finally {
419 // Forget parallel team.
420 theLoop.myTeam = null;
421 }
422 }
423
424 /**
425 * Execute a worker iteration within this worker region. For further
426 * information, see class {@linkplain WorkerIteration}. The items processed
427 * by the iteration are the elements of the given array. The iteration order
428 * is from index 0 upwards.
429 * <P>
430 * <I>Note:</I> Either all threads in the worker team must call the
431 * <code>execute()</code> method with identical arguments, or none of the
432 * threads must call the <code>execute()</code> method.
433 *
434 * @param <T> Data type of the items iterated over.
435 * @param theArray Array containing the items.
436 * @param theIteration Worker iteration.
437 * @exception NullPointerException (unchecked exception) Thrown if this is
438 * the master process and
439 * <code>theArray</code> is null. Thrown if <code>theIteration</code> is null.
440 * @exception IllegalStateException (unchecked exception) Thrown if no
441 * worker team is executing this worker region.
442 * @exception Exception Thrown if one of <code>theIteration</code>'s methods
443 * throws an exception.
444 * @throws java.lang.Exception if any.
445 */
446 public final <T> void execute(T[] theArray,
447 WorkerIteration<T> theIteration)
448 throws Exception {
449 // Verify preconditions.
450 if (myTeam == null) {
451 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
452 }
453 if (myTeam.rank == myTeam.masterRank() && theArray == null) {
454 throw new NullPointerException("WorkerRegion.execute(): Array is null");
455 }
456 if (theIteration == null) {
457 throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null");
458 }
459
460 try {
461 // Record parallel team.
462 theIteration.myTeam = this.myTeam;
463
464 // Get current parallel team thread.
465 WorkerTeamThread currentThread = getCurrentThread();
466 int w = currentThread.myIndex;
467
468 // Do master or worker thread processing.
469 if (w == -1) {
470 theIteration.masterExecute(new ArrayItemGenerator<T>(theArray));
471 } else {
472 theIteration.workerExecute(w);
473 }
474 } finally {
475 // Forget parallel team.
476 theIteration.myTeam = null;
477 }
478 }
479
480 /**
481 * Execute a worker iteration within this worker region. For further
482 * information, see class {@linkplain WorkerIteration}. The items processed
483 * by the iteration are the items returned by the given iterator. The
484 * iteration order is that of the given iterator.
485 * <P>
486 * <I>Note:</I> Either all threads in the worker team must call the
487 * <code>execute()</code> method with identical arguments, or none of the
488 * threads must call the <code>execute()</code> method.
489 *
490 * @param <T> Data type of the items iterated over.
491 * @param theIterator Iterator over the items.
492 * @param theIteration Worker iteration.
493 * @exception NullPointerException (unchecked exception) Thrown if this is
494 * the master process and
495 * <code>theIterator</code> is null. Thrown if <code>theIteration</code> is null.
496 * @exception IllegalStateException (unchecked exception) Thrown if no
497 * worker team is executing this worker region.
498 * @exception Exception Thrown if one of <code>theIteration</code>'s methods
499 * throws an exception.
500 * @throws java.lang.Exception if any.
501 */
502 public final <T> void execute(Iterator<T> theIterator,
503 WorkerIteration<T> theIteration)
504 throws Exception {
505 // Verify preconditions.
506 if (myTeam == null) {
507 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
508 }
509 if (myTeam.rank == myTeam.masterRank() && theIterator == null) {
510 throw new NullPointerException("WorkerRegion.execute(): Iterator is null");
511 }
512 if (theIteration == null) {
513 throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null");
514 }
515
516 try {
517 // Record parallel team.
518 theIteration.myTeam = this.myTeam;
519
520 // Get current parallel team thread.
521 WorkerTeamThread currentThread = getCurrentThread();
522 int w = currentThread.myIndex;
523
524 // Do master or worker thread processing.
525 if (w == -1) {
526 theIteration.masterExecute(new IteratorItemGenerator<T>(theIterator));
527 } else {
528 theIteration.workerExecute(w);
529 }
530 } finally {
531 // Forget parallel team.
532 theIteration.myTeam = null;
533 }
534 }
535
536 /**
537 * Execute a worker iteration within this worker region. For further
538 * information, see class {@linkplain WorkerIteration}. The items processed
539 * by the iteration are the items contained in the given iterable
540 * collection. The iteration order is that of the given iterable
541 * collection's iterator.
542 * <P>
543 * <I>Note:</I> Either all threads in the worker team must call the
544 * <code>execute()</code> method with identical arguments, or none of the
545 * threads must call the <code>execute()</code> method.
546 *
547 * @param <T> Data type of the items iterated over.
548 * @param theIterable Iterable collection containing the items.
549 * @param theIteration Worker iteration.
550 * @exception NullPointerException (unchecked exception) Thrown if this is
551 * the master process and
552 * <code>theIterable</code> is null. Thrown if <code>theIteration</code> is null.
553 * @exception IllegalStateException (unchecked exception) Thrown if no
554 * worker team is executing this worker region.
555 * @exception Exception Thrown if one of <code>theIteration</code>'s methods
556 * throws an exception.
557 * @throws java.lang.Exception if any.
558 */
559 public final <T> void execute(Iterable<T> theIterable,
560 WorkerIteration<T> theIteration)
561 throws Exception {
562 // Verify preconditions.
563 if (myTeam == null) {
564 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
565 }
566 if (myTeam.rank == myTeam.masterRank() && theIterable == null) {
567 throw new NullPointerException("WorkerRegion.execute(): Iterable collection is null");
568 }
569 if (theIteration == null) {
570 throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null");
571 }
572
573 try {
574 // Record parallel team.
575 theIteration.myTeam = this.myTeam;
576
577 // Get current parallel team thread.
578 WorkerTeamThread currentThread = getCurrentThread();
579 int w = currentThread.myIndex;
580
581 // Do master or worker thread processing.
582 if (w == -1) {
583 theIteration.masterExecute(new IteratorItemGenerator<T>(theIterable.iterator()));
584 } else {
585 theIteration.workerExecute(w);
586 }
587 } finally {
588 // Forget parallel team.
589 theIteration.myTeam = null;
590 }
591 }
592
593 }