1 //******************************************************************************
2 //
3 // File: ParallelTeam.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.ParallelTeam
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40
41 //******************************************************************************
42 // File modified 10/8/2014 by Jacob Litman to enable garbage collection of
43 // ParallelTeamThreads.
44 //******************************************************************************
45 package edu.rit.pj;
46
47 import java.util.concurrent.ConcurrentHashMap;
48 import java.util.concurrent.Semaphore;
49 import java.util.concurrent.atomic.AtomicInteger;
50
51 /**
52 * Class ParallelTeam provides a team of threads for executing a {@linkplain
53 * ParallelRegion} in parallel.
54 * <P>
55 * To execute a parallel region, create a ParallelTeam object; create an
56 * instance of a concrete subclass of class {@linkplain ParallelRegion}; and
57 * pass this instance to the parallel team's <code>execute()</code> method. For
58 * further information, see class {@linkplain ParallelRegion}.
59 *
60 * @author Alan Kaminsky
61 * @version 19-May-2008
62 */
63 public class ParallelTeam {
64
65 // Hidden data members.
66 // Number of threads.
67 int K;
68
69 // Array of threads in the team.
70 ParallelTeamThread[] myThread;
71
72 // Parallel region being executed, or null if none is being executed.
73 ParallelRegion myRegion;
74
75 // Semaphore for synchronizing threads at the end of a parallel region.
76 Semaphore myRegionEndSemaphore = new Semaphore(0);
77
78 // Exception map for parallel region, or null if none is being executed.
79 ConcurrentHashMap<Integer, Throwable> myExceptionMap;
80
81 // Team barrier flag. Used by the ParallelRegion.barrier() method.
82 volatile int myBarrierFlag;
83
84 // Parallel construct counter. Counts how many parallel constructs have been
85 // encountered.
86 AtomicInteger myConstructCount = new AtomicInteger(0);
87
88 // Set false if the ParallelTeam is shut down.
89 boolean isActive = true;
90
91 // Exported constructors.
92 /**
93 * Construct a new parallel team with the default number of threads. If the
94 * <code>"pj.nt"</code> Java property is specified, that property gives the
95 * default number of threads, which must be an integer greater than or equal
96 * to 1. If the <code>"pj.nt"</code> Java property is not specified, the default
97 * number of threads is the value returned by the
98 * <code>Runtime.availableProcessors()</code> method. You can specify the
99 * default number of threads on the Java command line like this:
100 * <PRE>
101 * java -Dpj.nt=4 . . .
102 * </PRE>
103 *
104 * @exception IllegalArgumentException (unchecked exception) Thrown if the
105 * <code>"pj.nt"</code> property value is not an integer greater than or equal
106 * to 1.
107 */
108 public ParallelTeam() {
109 this(getDefaultThreadCount());
110 }
111
112 /**
113 * Construct a new parallel team with the given number of threads.
114 *
115 * @param K Number of threads.
116 * @exception IllegalArgumentException (unchecked exception) Thrown if
117 * <I>K</I> is less than 1.
118 */
119 public ParallelTeam(int K) {
120 if (K < 1) {
121 throw new IllegalArgumentException("ParallelTeam(): K must be >= 1");
122 }
123 this.K = K;
124
125 myThread = new ParallelTeamThread[K];
126 myThread[0] = new ParallelTeamThread_0(this, 0);
127 for (int i = 1; i < K; ++i) {
128 myThread[i] = new ParallelTeamThread(this, i);
129 }
130 }
131
132 // Exported operations.
133 /**
134 * Execute the given parallel region.
135 *
136 * @param theRegion Parallel region.
137 * @exception NullPointerException (unchecked exception) Thrown if
138 * <code>theRegion</code> is null.
139 * @exception IllegalStateException (unchecked exception) Thrown if this
140 * parallel team is already executing a parallel region. Thrown if
141 * <code>theRegion</code> is already being executed by a parallel team.
142 * @exception Exception Exception thrown by the parallel region's
143 * <code>start()</code>,
144 * <code>run()</code>, or <code>finish()</code> methods.
145 * @throws java.lang.Exception if any.
146 */
147 public final void execute(ParallelRegion theRegion)
148 throws Exception {
149 // Verify preconditions.
150 if (theRegion == null) {
151 throw new NullPointerException("ParallelTeam.execute(): theRegion is null");
152 }
153 if (myRegion != null) {
154 throw new IllegalStateException("ParallelTeam.execute(): Already executing a parallel region");
155 }
156 if (theRegion.myTeam != null) {
157 throw new IllegalStateException("ParallelTeam.execute(): theRegion already being executed by a parallel team");
158 }
159 if (!isActive) {
160 throw new IllegalStateException("ParallelTeam.execute(): The team has been shut down.");
161 }
162
163 // Record parallel region.
164 myRegion = theRegion;
165 myExceptionMap = new ConcurrentHashMap<>(K, 0.75f, K);
166 theRegion.myTeam = this;
167
168 try {
169 // Perform the parallel region's start() method. Any exception
170 // aborts the execute() method.
171 myRegion.start();
172
173 // Release the team threads to perform the parallel region's run()
174 // method.
175 for (ParallelTeamThread thread : myThread) {
176 thread.myRegionBeginSemaphore.release();
177 }
178
179 // Wait until all team threads have returned from the parallel
180 // region's run() method.
181 myRegionEndSemaphore.acquireUninterruptibly(K);
182
183 // Propagate any exceptions thrown by the run() method.
184 if (myExceptionMap.isEmpty()) {
185 } else if (myExceptionMap.size() == 1) {
186 rethrow(myExceptionMap.values().iterator().next());
187 } else {
188 throw new MultipleParallelException("ParallelTeam.execute(): Multiple threads threw exceptions",
189 myExceptionMap);
190 }
191
192 // Perform the parallel region's finish() method. Any exception
193 // aborts the execute() method.
194 myRegion.finish();
195 } finally {
196 // Clean up.
197 myRegion.myTeam = null;
198 myExceptionMap = null;
199 myRegion = null;
200 }
201 }
202
203 /**
204 * Determine if this parallel team is executing a parallel region.
205 *
206 * @return True if this parallel team is executing a parallel region, false
207 * otherwise.
208 */
209 public final boolean isExecutingInParallel() {
210 return myRegion != null;
211 }
212
213 /**
214 * Returns the parallel region of code that this parallel team is executing.
215 *
216 * @return Parallel region.
217 * @exception IllegalStateException (unchecked exception) Thrown if this
218 * parallel team is not executing a parallel region.
219 */
220 public final ParallelRegion region() {
221 if (myRegion == null) {
222 throw new IllegalStateException("ParallelTeam.region(): Not executing a parallel region");
223 }
224 return myRegion;
225 }
226
227 /**
228 * Determine the number of threads in this parallel team.
229 *
230 * @return Number of threads in the team.
231 */
232 public final int getThreadCount() {
233 return K;
234 }
235
236 /**
237 * Determine the default number of threads for a parallel team. If the
238 * <code>"pj.nt"</code> Java property is specified, that property gives the
239 * default number of threads, which must be an integer greater than or equal
240 * to 1. If the <code>"pj.nt"</code> Java property is not specified, the default
241 * number of threads is the value returned by the
242 * <code>Runtime.availableProcessors()</code> method. You can specify the
243 * default number of threads on the Java command line like this:
244 * <PRE>
245 * java -Dpj.nt=4 . . .
246 * </PRE>
247 *
248 * @return Default number of threads for a parallel team.
249 * @exception IllegalArgumentException (unchecked exception) Thrown if the
250 * <code>"pj.nt"</code> property value is not an integer greater than or equal
251 * to 1.
252 */
253 public static int getDefaultThreadCount() {
254 int k = PJProperties.getPjNt();
255 if (k == 0) {
256 k = Runtime.getRuntime().availableProcessors();
257 }
258 return k;
259 }
260
261 // Hidden operations.
262 /**
263 * Do the thread-0 portion of a barrier with no barrier action. This method
264 * is called by thread 0 of the parallel team.
265 */
266 void barrier() {
267 // Get the new team barrier flag.
268 int newBarrierFlag = myBarrierFlag ^ 1;
269
270 // Wait until each team thread 1 .. K-1 has switched to the new
271 // barrier flag.
272 for (int i = 1; i < K; ++i) {
273 ParallelTeamThread thread_i = myThread[i];
274 if (thread_i.myBarrierFlag != newBarrierFlag) {
275 Spinner spinner = new Spinner();
276 while (thread_i.myBarrierFlag != newBarrierFlag) {
277 spinner.spin();
278 }
279 }
280 }
281
282 // Switch to the new team barrier flag.
283 myBarrierFlag = newBarrierFlag;
284 }
285
286 /**
287 * Do the thread-0 portion of a barrier with a barrier action. This method
288 * is called by thread 0 of the parallel team.
289 *
290 * @param action Barrier action.
291 *
292 * @exception Exception Thrown if the <code>action</code>'s <code>run()</code>
293 * method throws an exception.
294 */
295 void barrier(BarrierAction action)
296 throws Exception {
297 // Get the new team barrier flag.
298 int newBarrierFlag = myBarrierFlag ^ 1;
299
300 // Wait until each team thread 1 .. K-1 has switched to the new
301 // barrier flag.
302 for (int i = 1; i < K; ++i) {
303 ParallelTeamThread thread_i = myThread[i];
304 if (thread_i.myBarrierFlag != newBarrierFlag) {
305 Spinner spinner = new Spinner();
306 while (thread_i.myBarrierFlag != newBarrierFlag) {
307 spinner.spin();
308 }
309 }
310 }
311
312 try {
313 // Do the barrier action.
314 action.myTeam = this;
315 action.run();
316 } finally {
317 action.myTeam = null;
318
319 // Switch to the new team barrier flag.
320 myBarrierFlag = newBarrierFlag;
321 }
322 }
323
324 /**
325 * Re-throw the given object as a checked or unchecked exception. If the
326 * given object is null or is not throwable, do nothing.
327 */
328 static void rethrow(Object exc)
329 throws Exception {
330 if (exc instanceof RuntimeException) {
331 throw (RuntimeException) exc;
332 } else if (exc instanceof Exception) {
333 throw (Exception) exc;
334 } else if (exc instanceof Error) {
335 throw (Error) exc;
336 }
337 }
338
339 /**
340 * Kills the team's threads run() methods so that they are no longer GC roots.
341 * Useful if you are repetitively constructing ParallelTeam objects, although
342 * it is slightly more elegant just to keep the same ParallelTeam objects through
343 * the entire execution of a program.
344 *
345 * @throws java.lang.Exception if any.
346 */
347 public void shutdown() throws Exception {
348 if (isActive) {
349 this.execute(new KillRegion());
350 isActive = false;
351 }
352 }
353 }