1 //******************************************************************************
2 //
3 // File: WorkerTeam.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.WorkerTeam
6 //
7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.Semaphore;
44
45 /**
46 * Class WorkerTeam provides a team of threads, distributed across the processes
47 * of a cluster parallel program, for executing a {@linkplain WorkerRegion} in
48 * parallel.
49 * <P>
50 * A worker team uses a communicator for message passing. The communicator is
51 * specified as a constructor argument; if not specified, the world communicator
52 * is used. Every process that is part of the communicator must create the
53 * worker team. In class WorkerTeam, there is one worker thread per process. (To
54 * get more than one worker thread per process, use class {@linkplain
55 * HybridTeam}.) Every worker thread in every process has a unique index, going
56 * from index 0 for the first thread in the first process to index
57 * <I>K</I>−1 for the last thread in the last process, where <I>K</I> is
58 * the total number of worker threads in all the processes. In process rank 0,
59 * there is an additional master thread.
60 * <P>
61 * To execute a worker region, create a WorkerTeam object; create an instance of
62 * a concrete subclass of class {@linkplain WorkerRegion}; and pass this
63 * instance to the worker team's <code>execute()</code> method. For further
64 * information, see class {@linkplain WorkerRegion}.
65 *
66 * @author Alan Kaminsky
67 * @version 19-Jan-2010
68 */
69 public class WorkerTeam {
70
71 // Hidden data members.
72 // Number of worker threads in this process.
73 int K;
74
75 // Communicator for message passing, its size, this process's rank.
76 Comm comm;
77 int size;
78 int rank;
79
80 // Number of worker threads in all processes.
81 int count;
82
83 // Array of worker and master team threads in this process. There are K
84 // worker threads. There is an additional master thread in the last process
85 // of the communicator.
86 WorkerTeamThread[] myThread;
87
88 // Worker region being executed, or null if none is being executed.
89 WorkerRegion myRegion;
90
91 // Semaphore for synchronizing threads at the end of a worker region.
92 Semaphore myRegionEndSemaphore = new Semaphore(0);
93
94 // Exception map for worker region, or null if none is being executed.
95 ConcurrentHashMap<Integer, Throwable> myExceptionMap;
96
97 // Hidden constructors.
98 /**
99 * Construct a new, uninitialized worker team.
100 *
101 * @param flag To distinguish this constructor from the others.
102 */
103 WorkerTeam(boolean flag) {
104 }
105
106 // Exported constructors.
107 /**
108 * Construct a new worker team with one thread per process and using the
109 * world communicator for message passing.
110 */
111 public WorkerTeam() {
112 this(Comm.world());
113 }
114
115 /**
116 * Construct a new worker team with one thread per process and using the
117 * given communicator for message passing.
118 *
119 * @param comm Communicator to use for message passing.
120 * @exception NullPointerException (unchecked exception) Thrown if
121 * <code>comm</code> is null.
122 */
123 public WorkerTeam(Comm comm) {
124 if (comm == null) {
125 throw new NullPointerException("WorkerTeam(): comm is null");
126 }
127 initialize(/*K */1,
128 /*comm */ comm,
129 /*size */ comm.size(),
130 /*rank */ comm.rank(),
131 /*count*/ comm.size(),
132 /*wlb */ comm.rank());
133 }
134
135 // Hidden initializers.
136 /**
137 * Initialize a new worker team.
138 *
139 * @param K Number of worker threads in this process.
140 * @param comm Communicator to use for message passing.
141 * @param size Communicator's size.
142 * @param rank This process's rank in the communicator.
143 * @param count Number of worker threads in all processes.
144 * @param wlb First worker index in this process.
145 */
146 void initialize(int K,
147 Comm comm,
148 int size,
149 int rank,
150 int count,
151 int wlb) {
152 // Record parameters.
153 this.K = K;
154 this.comm = comm;
155 this.size = size;
156 this.rank = rank;
157 this.count = count;
158
159 // Set up worker team threads. Additional master thread in process 0.
160 int WM = K + (rank == 0 ? 1 : 0);
161 myThread = new WorkerTeamThread[WM];
162 for (int i = 0; i < K; ++i) {
163 myThread[i] = new WorkerTeamThread(this, wlb + i);
164 }
165 if (WM > K) {
166 myThread[K] = new WorkerTeamThread(this, -1);
167 }
168 }
169
170 // Exported operations.
171 /**
172 * Execute the given worker region.
173 *
174 * @param theRegion Worker region.
175 * @exception NullPointerException (unchecked exception) Thrown if
176 * <code>theRegion</code> is null.
177 * @exception IllegalStateException (unchecked exception) Thrown if this
178 * worker team is already executing a worker region. Thrown if
179 * <code>theRegion</code> is already being executed by a worker team.
180 * @exception Exception Exception thrown by the worker region's
181 * <code>start()</code>,
182 * <code>run()</code>, or <code>finish()</code> methods.
183 * @throws java.lang.Exception if any.
184 */
185 public final void execute(WorkerRegion theRegion)
186 throws Exception {
187 // Verify preconditions.
188 if (theRegion == null) {
189 throw new NullPointerException("WorkerTeam.execute(): theRegion is null");
190 }
191 if (myRegion != null) {
192 throw new IllegalStateException("WorkerTeam.execute(): Already executing a worker region");
193 }
194 if (theRegion.myTeam != null) {
195 throw new IllegalStateException("WorkerTeam.execute(): theRegion already being executed by a worker team");
196 }
197
198 // Record worker region.
199 myRegion = theRegion;
200 myExceptionMap = new ConcurrentHashMap<Integer, Throwable>(K, 0.75f, K);
201 theRegion.myTeam = this;
202
203 try {
204 // Perform the worker region's start() method. Any exception aborts
205 // the execute() method.
206 myRegion.start();
207
208 // Release the team threads to perform the worker region's run()
209 // method.
210 for (WorkerTeamThread thread : myThread) {
211 thread.myRegionBeginSemaphore.release();
212 }
213
214 // Wait until all team threads have returned from the worker
215 // region's run() method.
216 myRegionEndSemaphore.acquireUninterruptibly(myThread.length);
217
218 // Propagate any exceptions thrown by the run() method.
219 if (myExceptionMap.isEmpty()) {
220 } else if (myExceptionMap.size() == 1) {
221 rethrow(myExceptionMap.values().iterator().next());
222 } else {
223 throw new MultipleParallelException("WorkerTeam.execute(): Multiple threads threw exceptions",
224 myExceptionMap);
225 }
226
227 // Perform the worker region's finish() method. Any exception aborts
228 // the execute() method.
229 myRegion.finish();
230 } finally {
231 // Clean up.
232 myRegion.myTeam = null;
233 myExceptionMap = null;
234 myRegion = null;
235 }
236 }
237
238 /**
239 * Determine if this worker team is executing a worker region.
240 *
241 * @return True if this worker team is executing a worker region, false
242 * otherwise.
243 */
244 public final boolean isExecutingInParallel() {
245 return myRegion != null;
246 }
247
248 /**
249 * Returns the worker region of code that this worker team is executing.
250 *
251 * @return Worker region.
252 * @exception IllegalStateException (unchecked exception) Thrown if this
253 * worker team is not executing a worker region.
254 */
255 public final WorkerRegion region() {
256 if (myRegion == null) {
257 throw new IllegalStateException("WorkerTeam.region(): Not executing a worker region");
258 }
259 return myRegion;
260 }
261
262 /**
263 * Determine the number of worker threads in this worker team in this
264 * process. This does not include the master thread if any.
265 *
266 * @return Number of worker threads in this process.
267 */
268 public final int getThreadCount() {
269 return K;
270 }
271
272 /**
273 * Determine the total number of worker threads in this worker team in all
274 * processes. This does not include the master thread.
275 *
276 * @return Number of worker threads in all processes.
277 */
278 public final int getTotalThreadCount() {
279 return count;
280 }
281
282 /**
283 * Determine the rank of the process that contains the master thread. At
284 * present, this is always rank 0.
285 *
286 * @return Master process rank.
287 */
288 public int masterRank() {
289 return 0;
290 }
291
292 /**
293 * Determine the rank of the process that contains the worker thread with
294 * the given index.
295 *
296 * @param w Worker index.
297 * @return Worker process rank.
298 * @exception IllegalArgumentException (unchecked exception) Thrown if
299 * <code>w</code> is not in the range 0 ..
300 * <code>getTotalThreadCount()</code>−1.
301 */
302 public int workerRank(int w) {
303 if (0 > w || w >= count) {
304 throw new IllegalArgumentException("WorkerTeam.workerRank(): w (= " + w + ") illegal");
305 }
306 return w;
307 }
308
309 // Hidden operations.
310 /**
311 * Re-throw the given object as a checked or unchecked exception. If the
312 * given object is null or is not throwable, do nothing.
313 */
314 static void rethrow(Object exc)
315 throws Exception {
316 if (exc instanceof RuntimeException) {
317 throw (RuntimeException) exc;
318 } else if (exc instanceof Exception) {
319 throw (Exception) exc;
320 } else if (exc instanceof Error) {
321 throw (Error) exc;
322 }
323 }
324
325 }