View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerRegion.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerRegion
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.util.Iterator;
43  
44  import edu.rit.util.LongRange;
45  import edu.rit.util.Range;
46  
47  /**
48   * Class WorkerRegion is the abstract base class for a worker region that is
49   * executed by a {@linkplain WorkerTeam} of threads distributed across the
50   * processes of a cluster parallel program.
51   * <P>
52   * To execute a worker region, create a {@linkplain WorkerTeam} object; create
53   * an instance of a concrete subclass of class WorkerRegion; and pass this
54   * instance to the worker team's <code>execute()</code> method. You can do all this
55   * using an anonymous inner class; for example:
56   * <PRE>
57   *     new WorkerTeam().execute (new WorkerRegion()
58   *         {
59   *         public void start()
60   *             {
61   *             // Initialization code
62   *             . . .
63   *             }
64   *         public void run()
65   *             {
66   *             // Parallel code
67   *             . . .
68   *             }
69   *         public void finish()
70   *             {
71   *             // Finalization code
72   *             . . .
73   *             }
74   *         });
75   * </PRE>
76   * <P>
77   * The worker team's <code>execute()</code> method does the following. In each
78   * process, the worker team has a certain number of <B>worker threads</B>
79   * <I>K</I>, where <I>K</I> was specified when the worker team was constructed.
80   * In the highest-ranked process of the communicator, there is a <B>master
81   * thread</B> in addition to the worker threads. The <B>main thread</B> is the
82   * thread calling the worker team's <code>execute()</code> method. The main thread
83   * calls the worker region's <code>start()</code> method. When the <code>start()</code>
84   * method returns, all the worker threads, plus the master thread if any, call
85   * the worker region's <code>run()</code> method concurrently. When all the team
86   * threads have returned from the <code>run()</code> method, the main thread calls
87   * the worker region's <code>finish()</code> method. When the <code>finish()</code>
88   * method returns, the main thread returns from the worker team's
89   * <code>execute()</code> method.
90   * <P>
91   * The chief purpose of a worker team is to execute a work-sharing parallel loop
92   * in a cluster parallel program, partitioning the loop iterations among the
93   * worker threads in all the processes. The worker team uses the
94   * <B>master-worker pattern</B> to partition the iterations. The master thread
95   * partitions the loop iterations and sends tasks to the worker threads; the
96   * worker threads send results back to the master thread. The worker team uses a
97   * certain <B>communicator</B> to do this message passing. The communicator was
98   * specified when the worker team was constructed. For further information, see
99   * class {@linkplain WorkerIntegerForLoop} and {@linkplain WorkerLongForLoop}.
100  * <P>
101  * Within each process, variables to be shared by all threads in the team may be
102  * declared as fields of the WorkerRegion subclass. (Variables cannot be shared
103  * between processes.) The <code>start()</code> method is intended for performing
104  * initialization in a single thread before parallel execution begins. If no
105  * such initialization is needed, omit the <code>start()</code> method. The
106  * <code>run()</code> method contains code to be executed in parallel by all threads
107  * in the team. Variables that are private to each thread may be declared inside
108  * the <code>run()</code> method. The <code>finish()</code> method is intended for
109  * performing finalization in a single thread after parallel execution ends. If
110  * no such finalization is needed, omit the <code>finish()</code> method.
111  * <P>
112  * If the worker region's <code>start()</code> method throws an exception, the
113  * worker team's <code>execute()</code> method throws that same exception, and the
114  * <code>run()</code> method is not called.
115  * <P>
116  * If the worker region's <code>run()</code> method throws an exception in one of
117  * the team threads, the exception's stack trace is printed on the standard
118  * error, the worker team waits until all the other team threads have returned
119  * from the <code>run()</code> method, then the worker team's <code>execute()</code>
120  * method throws that same exception, and the worker region's <code>finish()</code>
121  * method is not called. If the worker region's <code>run()</code> method throws an
122  * exception in more than one of the team threads, each exception's stack trace
123  * is printed on the standard error, the worker team waits until all the other
124  * team threads have returned from the <code>run()</code> method, then the worker
125  * team's <code>execute()</code> method throws a {@linkplain
126  * MultipleParallelException} wrapping all the thrown exceptions, and the worker
127  * region's <code>finish()</code> method is not called.
128  * <P>
129  * If the worker region's <code>finish()</code> method throws an exception, the
130  * worker team's <code>execute()</code> method throws that same exception.
131  *
132  * @author Alan Kaminsky
133  * @version 07-Oct-2010
134  */
135 public abstract class WorkerRegion
136         extends WorkerConstruct {
137 
138 // Exported constructors.
139     /**
140      * Construct a new worker region.
141      */
142     public WorkerRegion() {
143         super();
144     }
145 
146 // Exported operations.
147     /**
148      * Perform initialization actions before parallel execution begins. Only one
149      * thread in each process calls the <code>start()</code> method.
150      * <P>
151      * The <code>start()</code> method may be overridden in a subclass. If not
152      * overridden, the <code>start()</code> method does nothing.
153      *
154      * @exception Exception The <code>start()</code> method may throw any exception.
155      * @throws java.lang.Exception if any.
156      */
157     public void start()
158             throws Exception {
159     }
160 
161     /**
162      * Execute parallel code. All threads of the worker team in each process
163      * call the <code>run()</code> method concurrently.
164      * <P>
165      * The <code>run()</code> method must be implemented in a subclass.
166      *
167      * @exception Exception The <code>run()</code> method may throw any exception.
168      * @throws java.lang.Exception if any.
169      */
170     public abstract void run()
171             throws Exception;
172 
173     /**
174      * Perform finalization actions after parallel execution ends. Only one
175      * thread in each process calls the <code>finish()</code> method.
176      * <P>
177      * The <code>finish()</code> method may be overridden in a subclass. If not
178      * overridden, the <code>finish()</code> method does nothing.
179      *
180      * @exception Exception The <code>finish()</code> method may throw any
181      * exception.
182      * @throws java.lang.Exception if any.
183      */
184     public void finish()
185             throws Exception {
186     }
187 
188     /**
189      * Execute a worker for loop within this worker region. For further
190      * information, see class {@linkplain WorkerIntegerForLoop}. The loop index
191      * goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) in
192      * steps of +1. If <code>first</code> is greater than <code>last</code>, then no
193      * loop iterations are performed.
194      * <P>
195      * <I>Note:</I> Either all threads in the worker team must call the
196      * <code>execute()</code> method with identical arguments, or none of the
197      * threads must call the <code>execute()</code> method.
198      *
199      * @param first First loop index.
200      * @param last Last loop index.
201      * @param theLoop Worker for loop.
202      * @exception NullPointerException (unchecked exception) Thrown if
203      * <code>theLoop</code> is null.
204      * @exception IllegalStateException (unchecked exception) Thrown if no
205      * worker team is executing this worker region.
206      * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
207      * an exception.
208      * @throws java.lang.Exception if any.
209      */
210     public final void execute(int first,
211             int last,
212             WorkerIntegerForLoop theLoop)
213             throws Exception {
214         // Verify preconditions.
215         if (theLoop == null) {
216             throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
217         }
218         if (myTeam == null) {
219             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
220         }
221 
222         try {
223             // Record parallel team.
224             theLoop.myTeam = this.myTeam;
225 
226             // Get current parallel team thread.
227             WorkerTeamThread currentThread = getCurrentThread();
228             int w = currentThread.myIndex;
229 
230             // Do master or worker thread processing.
231             Range range = new Range(first, last);
232             if (w == -1) {
233                 theLoop.masterExecute(range);
234             } else {
235                 theLoop.workerExecute(w, range);
236             }
237         } finally {
238             // Forget parallel team.
239             theLoop.myTeam = null;
240         }
241     }
242 
243     /**
244      * Execute a worker for loop within this worker region. For further
245      * information, see class {@linkplain WorkerIntegerStrideForLoop}. The loop
246      * index goes from <code>first</code> (inclusive) to <code>last</code> (inclusive)
247      * in steps of <code>stride</code>. The stride must be positive. If
248      * <code>first</code> is greater than <code>last</code>, then no loop iterations are
249      * performed.
250      * <P>
251      * <I>Note:</I> Either all threads in the worker team must call the
252      * <code>execute()</code> method with identical arguments, or none of the
253      * threads must call the <code>execute()</code> method.
254      *
255      * @param first First loop index.
256      * @param last Last loop index.
257      * @param stride Loop index stride, &gt;= 1.
258      * @param theLoop Worker for loop.
259      * @exception IllegalArgumentException (unchecked exception) Thrown if
260      * <code>stride</code> &lt; 1.
261      * @exception NullPointerException (unchecked exception) Thrown if
262      * <code>theLoop</code> is null.
263      * @exception IllegalStateException (unchecked exception) Thrown if no
264      * worker team is executing this worker region.
265      * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
266      * an exception.
267      * @throws java.lang.Exception if any.
268      */
269     public final void execute(int first,
270             int last,
271             int stride,
272             WorkerIntegerStrideForLoop theLoop)
273             throws Exception {
274         // Verify preconditions.
275         if (stride <= 0) {
276             throw new IllegalArgumentException("WorkerRegion.execute(): Stride = " + stride + " illegal");
277         }
278         if (theLoop == null) {
279             throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
280         }
281         if (myTeam == null) {
282             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
283         }
284 
285         try {
286             // Record parallel team.
287             theLoop.myTeam = this.myTeam;
288 
289             // Get current parallel team thread.
290             WorkerTeamThread currentThread = getCurrentThread();
291             int w = currentThread.myIndex;
292 
293             // Do master or worker thread processing.
294             Range range = new Range(first, last, stride);
295             if (w == -1) {
296                 theLoop.masterExecute(range);
297             } else {
298                 theLoop.workerExecute(w, range);
299             }
300         } finally {
301             // Forget parallel team.
302             theLoop.myTeam = null;
303         }
304     }
305 
306     /**
307      * Execute a worker for loop within this worker region. For further
308      * information, see class {@linkplain WorkerLongForLoop}. The loop index
309      * goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) in
310      * steps of +1. If <code>first</code> is greater than <code>last</code>, then no
311      * loop iterations are performed.
312      * <P>
313      * <I>Note:</I> Either all threads in the worker team must call the
314      * <code>execute()</code> method with identical arguments, or none of the
315      * threads must call the <code>execute()</code> method.
316      *
317      * @param first First loop index.
318      * @param last Last loop index.
319      * @param theLoop Worker for loop.
320      * @exception NullPointerException (unchecked exception) Thrown if
321      * <code>theLoop</code> is null.
322      * @exception IllegalStateException (unchecked exception) Thrown if no
323      * worker team is executing this worker region.
324      * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
325      * an exception.
326      * @throws java.lang.Exception if any.
327      */
328     public final void execute(long first,
329             long last,
330             WorkerLongForLoop theLoop)
331             throws Exception {
332         // Verify preconditions.
333         if (theLoop == null) {
334             throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
335         }
336         if (myTeam == null) {
337             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
338         }
339 
340         try {
341             // Record parallel team.
342             theLoop.myTeam = this.myTeam;
343 
344             // Get current parallel team thread.
345             WorkerTeamThread currentThread = getCurrentThread();
346             int w = currentThread.myIndex;
347 
348             // Do master or worker thread processing.
349             LongRange range = new LongRange(first, last);
350             if (w == -1) {
351                 theLoop.masterExecute(range);
352             } else {
353                 theLoop.workerExecute(w, range);
354             }
355         } finally {
356             // Forget parallel team.
357             theLoop.myTeam = null;
358         }
359     }
360 
361     /**
362      * Execute a worker for loop within this worker region. For further
363      * information, see class {@linkplain WorkerLongStrideForLoop}. The loop
364      * index goes from <code>first</code> (inclusive) to <code>last</code> (inclusive)
365      * in steps of <code>stride</code>. The stride must be positive. If
366      * <code>first</code> is greater than <code>last</code>, then no loop iterations are
367      * performed.
368      * <P>
369      * <I>Note:</I> Either all threads in the worker team must call the
370      * <code>execute()</code> method with identical arguments, or none of the
371      * threads must call the <code>execute()</code> method.
372      *
373      * @param first First loop index.
374      * @param last Last loop index.
375      * @param stride Loop index stride, &gt;= 1.
376      * @param theLoop Worker for loop.
377      * @exception IllegalArgumentException (unchecked exception) Thrown if
378      * <code>stride</code> &lt; 1.
379      * @exception NullPointerException (unchecked exception) Thrown if
380      * <code>theLoop</code> is null.
381      * @exception IllegalStateException (unchecked exception) Thrown if no
382      * worker team is executing this worker region.
383      * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws
384      * an exception.
385      * @throws java.lang.Exception if any.
386      */
387     public final void execute(long first,
388             long last,
389             long stride,
390             WorkerLongStrideForLoop theLoop)
391             throws Exception {
392         // Verify preconditions.
393         if (stride <= 0) {
394             throw new IllegalArgumentException("WorkerRegion.execute(): Stride = " + stride + " illegal");
395         }
396         if (theLoop == null) {
397             throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null");
398         }
399         if (myTeam == null) {
400             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
401         }
402 
403         try {
404             // Record parallel team.
405             theLoop.myTeam = this.myTeam;
406 
407             // Get current parallel team thread.
408             WorkerTeamThread currentThread = getCurrentThread();
409             int w = currentThread.myIndex;
410 
411             // Do master or worker thread processing.
412             LongRange range = new LongRange(first, last, stride);
413             if (w == -1) {
414                 theLoop.masterExecute(range);
415             } else {
416                 theLoop.workerExecute(w, range);
417             }
418         } finally {
419             // Forget parallel team.
420             theLoop.myTeam = null;
421         }
422     }
423 
424     /**
425      * Execute a worker iteration within this worker region. For further
426      * information, see class {@linkplain WorkerIteration}. The items processed
427      * by the iteration are the elements of the given array. The iteration order
428      * is from index 0 upwards.
429      * <P>
430      * <I>Note:</I> Either all threads in the worker team must call the
431      * <code>execute()</code> method with identical arguments, or none of the
432      * threads must call the <code>execute()</code> method.
433      *
434      * @param <T> Data type of the items iterated over.
435      * @param theArray Array containing the items.
436      * @param theIteration Worker iteration.
437      * @exception NullPointerException (unchecked exception) Thrown if this is
438      * the master process and
439      * <code>theArray</code> is null. Thrown if <code>theIteration</code> is null.
440      * @exception IllegalStateException (unchecked exception) Thrown if no
441      * worker team is executing this worker region.
442      * @exception Exception Thrown if one of <code>theIteration</code>'s methods
443      * throws an exception.
444      * @throws java.lang.Exception if any.
445      */
446     public final <T> void execute(T[] theArray,
447             WorkerIteration<T> theIteration)
448             throws Exception {
449         // Verify preconditions.
450         if (myTeam == null) {
451             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
452         }
453         if (myTeam.rank == myTeam.masterRank() && theArray == null) {
454             throw new NullPointerException("WorkerRegion.execute(): Array is null");
455         }
456         if (theIteration == null) {
457             throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null");
458         }
459 
460         try {
461             // Record parallel team.
462             theIteration.myTeam = this.myTeam;
463 
464             // Get current parallel team thread.
465             WorkerTeamThread currentThread = getCurrentThread();
466             int w = currentThread.myIndex;
467 
468             // Do master or worker thread processing.
469             if (w == -1) {
470                 theIteration.masterExecute(new ArrayItemGenerator<T>(theArray));
471             } else {
472                 theIteration.workerExecute(w);
473             }
474         } finally {
475             // Forget parallel team.
476             theIteration.myTeam = null;
477         }
478     }
479 
480     /**
481      * Execute a worker iteration within this worker region. For further
482      * information, see class {@linkplain WorkerIteration}. The items processed
483      * by the iteration are the items returned by the given iterator. The
484      * iteration order is that of the given iterator.
485      * <P>
486      * <I>Note:</I> Either all threads in the worker team must call the
487      * <code>execute()</code> method with identical arguments, or none of the
488      * threads must call the <code>execute()</code> method.
489      *
490      * @param <T> Data type of the items iterated over.
491      * @param theIterator Iterator over the items.
492      * @param theIteration Worker iteration.
493      * @exception NullPointerException (unchecked exception) Thrown if this is
494      * the master process and
495      * <code>theIterator</code> is null. Thrown if <code>theIteration</code> is null.
496      * @exception IllegalStateException (unchecked exception) Thrown if no
497      * worker team is executing this worker region.
498      * @exception Exception Thrown if one of <code>theIteration</code>'s methods
499      * throws an exception.
500      * @throws java.lang.Exception if any.
501      */
502     public final <T> void execute(Iterator<T> theIterator,
503             WorkerIteration<T> theIteration)
504             throws Exception {
505         // Verify preconditions.
506         if (myTeam == null) {
507             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
508         }
509         if (myTeam.rank == myTeam.masterRank() && theIterator == null) {
510             throw new NullPointerException("WorkerRegion.execute(): Iterator is null");
511         }
512         if (theIteration == null) {
513             throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null");
514         }
515 
516         try {
517             // Record parallel team.
518             theIteration.myTeam = this.myTeam;
519 
520             // Get current parallel team thread.
521             WorkerTeamThread currentThread = getCurrentThread();
522             int w = currentThread.myIndex;
523 
524             // Do master or worker thread processing.
525             if (w == -1) {
526                 theIteration.masterExecute(new IteratorItemGenerator<T>(theIterator));
527             } else {
528                 theIteration.workerExecute(w);
529             }
530         } finally {
531             // Forget parallel team.
532             theIteration.myTeam = null;
533         }
534     }
535 
536     /**
537      * Execute a worker iteration within this worker region. For further
538      * information, see class {@linkplain WorkerIteration}. The items processed
539      * by the iteration are the items contained in the given iterable
540      * collection. The iteration order is that of the given iterable
541      * collection's iterator.
542      * <P>
543      * <I>Note:</I> Either all threads in the worker team must call the
544      * <code>execute()</code> method with identical arguments, or none of the
545      * threads must call the <code>execute()</code> method.
546      *
547      * @param <T> Data type of the items iterated over.
548      * @param theIterable Iterable collection containing the items.
549      * @param theIteration Worker iteration.
550      * @exception NullPointerException (unchecked exception) Thrown if this is
551      * the master process and
552      * <code>theIterable</code> is null. Thrown if <code>theIteration</code> is null.
553      * @exception IllegalStateException (unchecked exception) Thrown if no
554      * worker team is executing this worker region.
555      * @exception Exception Thrown if one of <code>theIteration</code>'s methods
556      * throws an exception.
557      * @throws java.lang.Exception if any.
558      */
559     public final <T> void execute(Iterable<T> theIterable,
560             WorkerIteration<T> theIteration)
561             throws Exception {
562         // Verify preconditions.
563         if (myTeam == null) {
564             throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing");
565         }
566         if (myTeam.rank == myTeam.masterRank() && theIterable == null) {
567             throw new NullPointerException("WorkerRegion.execute(): Iterable collection is null");
568         }
569         if (theIteration == null) {
570             throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null");
571         }
572 
573         try {
574             // Record parallel team.
575             theIteration.myTeam = this.myTeam;
576 
577             // Get current parallel team thread.
578             WorkerTeamThread currentThread = getCurrentThread();
579             int w = currentThread.myIndex;
580 
581             // Do master or worker thread processing.
582             if (w == -1) {
583                 theIteration.masterExecute(new IteratorItemGenerator<T>(theIterable.iterator()));
584             } else {
585                 theIteration.workerExecute(w);
586             }
587         } finally {
588             // Forget parallel team.
589             theIteration.myTeam = null;
590         }
591     }
592 
593 }