View Javadoc
1   //******************************************************************************
2   //
3   // File:    ParallelTeam.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.ParallelTeam
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  
41  //******************************************************************************
42  // File modified 10/8/2014 by Jacob Litman to enable garbage collection of
43  // ParallelTeamThreads.
44  //******************************************************************************
45  package edu.rit.pj;
46  
47  import java.util.concurrent.ConcurrentHashMap;
48  import java.util.concurrent.Semaphore;
49  import java.util.concurrent.atomic.AtomicInteger;
50  
51  /**
52   * Class ParallelTeam provides a team of threads for executing a {@linkplain
53   * ParallelRegion} in parallel.
54   * <P>
55   * To execute a parallel region, create a ParallelTeam object; create an
56   * instance of a concrete subclass of class {@linkplain ParallelRegion}; and
57   * pass this instance to the parallel team's <code>execute()</code> method. For
58   * further information, see class {@linkplain ParallelRegion}.
59   *
60   * @author Alan Kaminsky
61   * @version 19-May-2008
62   */
63  public class ParallelTeam {
64  
65  // Hidden data members.
66      // Number of threads.
67      int K;
68  
69      // Array of threads in the team.
70      ParallelTeamThread[] myThread;
71  
72      // Parallel region being executed, or null if none is being executed.
73      ParallelRegion myRegion;
74  
75      // Semaphore for synchronizing threads at the end of a parallel region.
76      Semaphore myRegionEndSemaphore = new Semaphore(0);
77  
78      // Exception map for parallel region, or null if none is being executed.
79      ConcurrentHashMap<Integer, Throwable> myExceptionMap;
80  
81      // Team barrier flag. Used by the ParallelRegion.barrier() method.
82      volatile int myBarrierFlag;
83  
84      // Parallel construct counter. Counts how many parallel constructs have been
85      // encountered.
86      AtomicInteger myConstructCount = new AtomicInteger(0);
87      
88      // Set false if the ParallelTeam is shut down.
89      boolean isActive = true;
90  
91  // Exported constructors.
92      /**
93       * Construct a new parallel team with the default number of threads. If the
94       * <code>"pj.nt"</code> Java property is specified, that property gives the
95       * default number of threads, which must be an integer greater than or equal
96       * to 1. If the <code>"pj.nt"</code> Java property is not specified, the default
97       * number of threads is the value returned by the
98       * <code>Runtime.availableProcessors()</code> method. You can specify the
99       * default number of threads on the Java command line like this:
100      * <PRE>
101      *     java -Dpj.nt=4 . . .
102      * </PRE>
103      *
104      * @exception IllegalArgumentException (unchecked exception) Thrown if the
105      * <code>"pj.nt"</code> property value is not an integer greater than or equal
106      * to 1.
107      */
108     public ParallelTeam() {
109         this(getDefaultThreadCount());
110     }
111 
112     /**
113      * Construct a new parallel team with the given number of threads.
114      *
115      * @param K Number of threads.
116      * @exception IllegalArgumentException (unchecked exception) Thrown if
117      * <I>K</I> is less than 1.
118      */
119     public ParallelTeam(int K) {
120         if (K < 1) {
121             throw new IllegalArgumentException("ParallelTeam(): K must be >= 1");
122         }
123         this.K = K;
124 
125         myThread = new ParallelTeamThread[K];
126         myThread[0] = new ParallelTeamThread_0(this, 0);
127         for (int i = 1; i < K; ++i) {
128             myThread[i] = new ParallelTeamThread(this, i);
129         }
130     }
131 
132 // Exported operations.
133     /**
134      * Execute the given parallel region.
135      *
136      * @param theRegion Parallel region.
137      * @exception NullPointerException (unchecked exception) Thrown if
138      * <code>theRegion</code> is null.
139      * @exception IllegalStateException (unchecked exception) Thrown if this
140      * parallel team is already executing a parallel region. Thrown if
141      * <code>theRegion</code> is already being executed by a parallel team.
142      * @exception Exception Exception thrown by the parallel region's
143      * <code>start()</code>,
144      * <code>run()</code>, or <code>finish()</code> methods.
145      * @throws java.lang.Exception if any.
146      */
147     public final void execute(ParallelRegion theRegion)
148             throws Exception {
149         // Verify preconditions.
150         if (theRegion == null) {
151             throw new NullPointerException("ParallelTeam.execute(): theRegion is null");
152         }
153         if (myRegion != null) {
154             throw new IllegalStateException("ParallelTeam.execute(): Already executing a parallel region");
155         }
156         if (theRegion.myTeam != null) {
157             throw new IllegalStateException("ParallelTeam.execute(): theRegion already being executed by a parallel team");
158         }
159         if (!isActive) {
160             throw new IllegalStateException("ParallelTeam.execute(): The team has been shut down.");
161         }
162 
163         // Record parallel region.
164         myRegion = theRegion;
165         myExceptionMap = new ConcurrentHashMap<Integer, Throwable>(K, 0.75f, K);
166         theRegion.myTeam = this;
167 
168         try {
169             // Perform the parallel region's start() method. Any exception
170             // aborts the execute() method.
171             myRegion.start();
172 
173             // Release the team threads to perform the parallel region's run()
174             // method.
175             for (ParallelTeamThread thread : myThread) {
176                 thread.myRegionBeginSemaphore.release();
177             }
178 
179             // Wait until all team threads have returned from the parallel
180             // region's run() method.
181             myRegionEndSemaphore.acquireUninterruptibly(K);
182 
183             // Propagate any exceptions thrown by the run() method.
184             if (myExceptionMap.isEmpty()) {
185             } else if (myExceptionMap.size() == 1) {
186                 rethrow(myExceptionMap.values().iterator().next());
187             } else {
188                 throw new MultipleParallelException("ParallelTeam.execute(): Multiple threads threw exceptions",
189                         myExceptionMap);
190             }
191 
192             // Perform the parallel region's finish() method. Any exception
193             // aborts the execute() method.
194             myRegion.finish();
195         } finally {
196             // Clean up.
197             myRegion.myTeam = null;
198             myExceptionMap = null;
199             myRegion = null;
200         }
201     }
202 
203     /**
204      * Determine if this parallel team is executing a parallel region.
205      *
206      * @return True if this parallel team is executing a parallel region, false
207      * otherwise.
208      */
209     public final boolean isExecutingInParallel() {
210         return myRegion != null;
211     }
212 
213     /**
214      * Returns the parallel region of code that this parallel team is executing.
215      *
216      * @return Parallel region.
217      * @exception IllegalStateException (unchecked exception) Thrown if this
218      * parallel team is not executing a parallel region.
219      */
220     public final ParallelRegion region() {
221         if (myRegion == null) {
222             throw new IllegalStateException("ParallelTeam.region(): Not executing a parallel region");
223         }
224         return myRegion;
225     }
226 
227     /**
228      * Determine the number of threads in this parallel team.
229      *
230      * @return Number of threads in the team.
231      */
232     public final int getThreadCount() {
233         return K;
234     }
235 
236     /**
237      * Determine the default number of threads for a parallel team. If the
238      * <code>"pj.nt"</code> Java property is specified, that property gives the
239      * default number of threads, which must be an integer greater than or equal
240      * to 1. If the <code>"pj.nt"</code> Java property is not specified, the default
241      * number of threads is the value returned by the
242      * <code>Runtime.availableProcessors()</code> method. You can specify the
243      * default number of threads on the Java command line like this:
244      * <PRE>
245      *     java -Dpj.nt=4 . . .
246      * </PRE>
247      *
248      * @return Default number of threads for a parallel team.
249      * @exception IllegalArgumentException (unchecked exception) Thrown if the
250      * <code>"pj.nt"</code> property value is not an integer greater than or equal
251      * to 1.
252      */
253     public static int getDefaultThreadCount() {
254         int k = PJProperties.getPjNt();
255         if (k == 0) {
256             k = Runtime.getRuntime().availableProcessors();
257         }
258         return k;
259     }
260 
261 // Hidden operations.
262     /**
263      * Do the thread-0 portion of a barrier with no barrier action. This method
264      * is called by thread 0 of the parallel team.
265      */
266     void barrier() {
267         // Get the new team barrier flag.
268         int newBarrierFlag = myBarrierFlag ^ 1;
269 
270         // Wait until each team thread 1 .. K-1 has switched to the new
271         // barrier flag.
272         for (int i = 1; i < K; ++i) {
273             ParallelTeamThread thread_i = myThread[i];
274             if (thread_i.myBarrierFlag != newBarrierFlag) {
275                 Spinner spinner = new Spinner();
276                 while (thread_i.myBarrierFlag != newBarrierFlag) {
277                     spinner.spin();
278                 }
279             }
280         }
281 
282         // Switch to the new team barrier flag.
283         myBarrierFlag = newBarrierFlag;
284     }
285 
286     /**
287      * Do the thread-0 portion of a barrier with a barrier action. This method
288      * is called by thread 0 of the parallel team.
289      *
290      * @param action Barrier action.
291      *
292      * @exception Exception Thrown if the <code>action</code>'s <code>run()</code>
293      * method throws an exception.
294      */
295     void barrier(BarrierAction action)
296             throws Exception {
297         // Get the new team barrier flag.
298         int newBarrierFlag = myBarrierFlag ^ 1;
299 
300         // Wait until each team thread 1 .. K-1 has switched to the new
301         // barrier flag.
302         for (int i = 1; i < K; ++i) {
303             ParallelTeamThread thread_i = myThread[i];
304             if (thread_i.myBarrierFlag != newBarrierFlag) {
305                 Spinner spinner = new Spinner();
306                 while (thread_i.myBarrierFlag != newBarrierFlag) {
307                     spinner.spin();
308                 }
309             }
310         }
311 
312         try {
313             // Do the barrier action.
314             action.myTeam = this;
315             action.run();
316         } finally {
317             action.myTeam = null;
318 
319             // Switch to the new team barrier flag.
320             myBarrierFlag = newBarrierFlag;
321         }
322     }
323 
324     /**
325      * Re-throw the given object as a checked or unchecked exception. If the
326      * given object is null or is not throwable, do nothing.
327      */
328     static void rethrow(Object exc)
329             throws Exception {
330         if (exc instanceof RuntimeException) {
331             throw (RuntimeException) exc;
332         } else if (exc instanceof Exception) {
333             throw (Exception) exc;
334         } else if (exc instanceof Error) {
335             throw (Error) exc;
336         }
337     }
338     
339     /**
340      * Kills the team's threads run() methods so that they are no longer GC roots.
341      * Useful if you are repetitively constructing ParallelTeam objects, although
342      * it is slightly more elegant just to keep the same ParallelTeam objects through
343      * the entire execution of a program.
344      *
345      * @throws java.lang.Exception if any.
346      */
347     public void shutdown() throws Exception {
348         if (isActive) {
349             this.execute(new KillRegion());
350             isActive = false;
351         }
352     }
353 }