View Javadoc
1   //******************************************************************************
2   //
3   // File:    WorkerTeam.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.WorkerTeam
6   //
7   // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.Semaphore;
44  
45  /**
46   * Class WorkerTeam provides a team of threads, distributed across the processes
47   * of a cluster parallel program, for executing a {@linkplain WorkerRegion} in
48   * parallel.
49   * <P>
50   * A worker team uses a communicator for message passing. The communicator is
51   * specified as a constructor argument; if not specified, the world communicator
52   * is used. Every process that is part of the communicator must create the
53   * worker team. In class WorkerTeam, there is one worker thread per process. (To
54   * get more than one worker thread per process, use class {@linkplain
55   * HybridTeam}.) Every worker thread in every process has a unique index, going
56   * from index 0 for the first thread in the first process to index
57   * <I>K</I>&minus;1 for the last thread in the last process, where <I>K</I> is
58   * the total number of worker threads in all the processes. In process rank 0,
59   * there is an additional master thread.
60   * <P>
61   * To execute a worker region, create a WorkerTeam object; create an instance of
62   * a concrete subclass of class {@linkplain WorkerRegion}; and pass this
63   * instance to the worker team's <code>execute()</code> method. For further
64   * information, see class {@linkplain WorkerRegion}.
65   *
66   * @author Alan Kaminsky
67   * @version 19-Jan-2010
68   */
69  public class WorkerTeam {
70  
71  // Hidden data members.
72      // Number of worker threads in this process.
73      int K;
74  
75      // Communicator for message passing, its size, this process's rank.
76      Comm comm;
77      int size;
78      int rank;
79  
80      // Number of worker threads in all processes.
81      int count;
82  
83      // Array of worker and master team threads in this process. There are K
84      // worker threads. There is an additional master thread in the last process
85      // of the communicator.
86      WorkerTeamThread[] myThread;
87  
88      // Worker region being executed, or null if none is being executed.
89      WorkerRegion myRegion;
90  
91      // Semaphore for synchronizing threads at the end of a worker region.
92      Semaphore myRegionEndSemaphore = new Semaphore(0);
93  
94      // Exception map for worker region, or null if none is being executed.
95      ConcurrentHashMap<Integer, Throwable> myExceptionMap;
96  
97  // Hidden constructors.
98      /**
99       * Construct a new, uninitialized worker team.
100      *
101      * @param flag To distinguish this constructor from the others.
102      */
103     WorkerTeam(boolean flag) {
104     }
105 
106 // Exported constructors.
107     /**
108      * Construct a new worker team with one thread per process and using the
109      * world communicator for message passing.
110      */
111     public WorkerTeam() {
112         this(Comm.world());
113     }
114 
115     /**
116      * Construct a new worker team with one thread per process and using the
117      * given communicator for message passing.
118      *
119      * @param comm Communicator to use for message passing.
120      * @exception NullPointerException (unchecked exception) Thrown if
121      * <code>comm</code> is null.
122      */
123     public WorkerTeam(Comm comm) {
124         if (comm == null) {
125             throw new NullPointerException("WorkerTeam(): comm is null");
126         }
127         initialize(/*K    */1,
128                 /*comm */ comm,
129                 /*size */ comm.size(),
130                 /*rank */ comm.rank(),
131                 /*count*/ comm.size(),
132                 /*wlb  */ comm.rank());
133     }
134 
135 // Hidden initializers.
136     /**
137      * Initialize a new worker team.
138      *
139      * @param K Number of worker threads in this process.
140      * @param comm Communicator to use for message passing.
141      * @param size Communicator's size.
142      * @param rank This process's rank in the communicator.
143      * @param count Number of worker threads in all processes.
144      * @param wlb First worker index in this process.
145      */
146     void initialize(int K,
147             Comm comm,
148             int size,
149             int rank,
150             int count,
151             int wlb) {
152         // Record parameters.
153         this.K = K;
154         this.comm = comm;
155         this.size = size;
156         this.rank = rank;
157         this.count = count;
158 
159         // Set up worker team threads. Additional master thread in process 0.
160         int WM = K + (rank == 0 ? 1 : 0);
161         myThread = new WorkerTeamThread[WM];
162         for (int i = 0; i < K; ++i) {
163             myThread[i] = new WorkerTeamThread(this, wlb + i);
164         }
165         if (WM > K) {
166             myThread[K] = new WorkerTeamThread(this, -1);
167         }
168     }
169 
170 // Exported operations.
171     /**
172      * Execute the given worker region.
173      *
174      * @param theRegion Worker region.
175      * @exception NullPointerException (unchecked exception) Thrown if
176      * <code>theRegion</code> is null.
177      * @exception IllegalStateException (unchecked exception) Thrown if this
178      * worker team is already executing a worker region. Thrown if
179      * <code>theRegion</code> is already being executed by a worker team.
180      * @exception Exception Exception thrown by the worker region's
181      * <code>start()</code>,
182      * <code>run()</code>, or <code>finish()</code> methods.
183      * @throws java.lang.Exception if any.
184      */
185     public final void execute(WorkerRegion theRegion)
186             throws Exception {
187         // Verify preconditions.
188         if (theRegion == null) {
189             throw new NullPointerException("WorkerTeam.execute(): theRegion is null");
190         }
191         if (myRegion != null) {
192             throw new IllegalStateException("WorkerTeam.execute(): Already executing a worker region");
193         }
194         if (theRegion.myTeam != null) {
195             throw new IllegalStateException("WorkerTeam.execute(): theRegion already being executed by a worker team");
196         }
197 
198         // Record worker region.
199         myRegion = theRegion;
200         myExceptionMap = new ConcurrentHashMap<Integer, Throwable>(K, 0.75f, K);
201         theRegion.myTeam = this;
202 
203         try {
204             // Perform the worker region's start() method. Any exception aborts
205             // the execute() method.
206             myRegion.start();
207 
208             // Release the team threads to perform the worker region's run()
209             // method.
210             for (WorkerTeamThread thread : myThread) {
211                 thread.myRegionBeginSemaphore.release();
212             }
213 
214             // Wait until all team threads have returned from the worker
215             // region's run() method.
216             myRegionEndSemaphore.acquireUninterruptibly(myThread.length);
217 
218             // Propagate any exceptions thrown by the run() method.
219             if (myExceptionMap.isEmpty()) {
220             } else if (myExceptionMap.size() == 1) {
221                 rethrow(myExceptionMap.values().iterator().next());
222             } else {
223                 throw new MultipleParallelException("WorkerTeam.execute(): Multiple threads threw exceptions",
224                         myExceptionMap);
225             }
226 
227             // Perform the worker region's finish() method. Any exception aborts
228             // the execute() method.
229             myRegion.finish();
230         } finally {
231             // Clean up.
232             myRegion.myTeam = null;
233             myExceptionMap = null;
234             myRegion = null;
235         }
236     }
237 
238     /**
239      * Determine if this worker team is executing a worker region.
240      *
241      * @return True if this worker team is executing a worker region, false
242      * otherwise.
243      */
244     public final boolean isExecutingInParallel() {
245         return myRegion != null;
246     }
247 
248     /**
249      * Returns the worker region of code that this worker team is executing.
250      *
251      * @return Worker region.
252      * @exception IllegalStateException (unchecked exception) Thrown if this
253      * worker team is not executing a worker region.
254      */
255     public final WorkerRegion region() {
256         if (myRegion == null) {
257             throw new IllegalStateException("WorkerTeam.region(): Not executing a worker region");
258         }
259         return myRegion;
260     }
261 
262     /**
263      * Determine the number of worker threads in this worker team in this
264      * process. This does not include the master thread if any.
265      *
266      * @return Number of worker threads in this process.
267      */
268     public final int getThreadCount() {
269         return K;
270     }
271 
272     /**
273      * Determine the total number of worker threads in this worker team in all
274      * processes. This does not include the master thread.
275      *
276      * @return Number of worker threads in all processes.
277      */
278     public final int getTotalThreadCount() {
279         return count;
280     }
281 
282     /**
283      * Determine the rank of the process that contains the master thread. At
284      * present, this is always rank 0.
285      *
286      * @return Master process rank.
287      */
288     public int masterRank() {
289         return 0;
290     }
291 
292     /**
293      * Determine the rank of the process that contains the worker thread with
294      * the given index.
295      *
296      * @param w Worker index.
297      * @return Worker process rank.
298      * @exception IllegalArgumentException (unchecked exception) Thrown if
299      * <code>w</code> is not in the range 0 ..
300      * <code>getTotalThreadCount()</code>&minus;1.
301      */
302     public int workerRank(int w) {
303         if (0 > w || w >= count) {
304             throw new IllegalArgumentException("WorkerTeam.workerRank(): w (= " + w + ") illegal");
305         }
306         return w;
307     }
308 
309 // Hidden operations.
310     /**
311      * Re-throw the given object as a checked or unchecked exception. If the
312      * given object is null or is not throwable, do nothing.
313      */
314     static void rethrow(Object exc)
315             throws Exception {
316         if (exc instanceof RuntimeException) {
317             throw (RuntimeException) exc;
318         } else if (exc instanceof Exception) {
319             throw (Exception) exc;
320         } else if (exc instanceof Error) {
321             throw (Error) exc;
322         }
323     }
324 
325 }