1 //****************************************************************************** 2 // 3 // File: ParallelTeam.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.ParallelTeam 6 // 7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 41 //****************************************************************************** 42 // File modified 10/8/2014 by Jacob Litman to enable garbage collection of 43 // ParallelTeamThreads. 44 //****************************************************************************** 45 package edu.rit.pj; 46 47 import java.util.concurrent.ConcurrentHashMap; 48 import java.util.concurrent.Semaphore; 49 import java.util.concurrent.atomic.AtomicInteger; 50 51 /** 52 * Class ParallelTeam provides a team of threads for executing a {@linkplain 53 * ParallelRegion} in parallel. 54 * <P> 55 * To execute a parallel region, create a ParallelTeam object; create an 56 * instance of a concrete subclass of class {@linkplain ParallelRegion}; and 57 * pass this instance to the parallel team's <code>execute()</code> method. For 58 * further information, see class {@linkplain ParallelRegion}. 59 * 60 * @author Alan Kaminsky 61 * @version 19-May-2008 62 */ 63 public class ParallelTeam { 64 65 // Hidden data members. 66 // Number of threads. 67 int K; 68 69 // Array of threads in the team. 70 ParallelTeamThread[] myThread; 71 72 // Parallel region being executed, or null if none is being executed. 73 ParallelRegion myRegion; 74 75 // Semaphore for synchronizing threads at the end of a parallel region. 76 Semaphore myRegionEndSemaphore = new Semaphore(0); 77 78 // Exception map for parallel region, or null if none is being executed. 79 ConcurrentHashMap<Integer, Throwable> myExceptionMap; 80 81 // Team barrier flag. Used by the ParallelRegion.barrier() method. 82 volatile int myBarrierFlag; 83 84 // Parallel construct counter. Counts how many parallel constructs have been 85 // encountered. 86 AtomicInteger myConstructCount = new AtomicInteger(0); 87 88 // Set false if the ParallelTeam is shut down. 89 boolean isActive = true; 90 91 // Exported constructors. 92 /** 93 * Construct a new parallel team with the default number of threads. If the 94 * <code>"pj.nt"</code> Java property is specified, that property gives the 95 * default number of threads, which must be an integer greater than or equal 96 * to 1. If the <code>"pj.nt"</code> Java property is not specified, the default 97 * number of threads is the value returned by the 98 * <code>Runtime.availableProcessors()</code> method. You can specify the 99 * default number of threads on the Java command line like this: 100 * <PRE> 101 * java -Dpj.nt=4 . . . 102 * </PRE> 103 * 104 * @exception IllegalArgumentException (unchecked exception) Thrown if the 105 * <code>"pj.nt"</code> property value is not an integer greater than or equal 106 * to 1. 107 */ 108 public ParallelTeam() { 109 this(getDefaultThreadCount()); 110 } 111 112 /** 113 * Construct a new parallel team with the given number of threads. 114 * 115 * @param K Number of threads. 116 * @exception IllegalArgumentException (unchecked exception) Thrown if 117 * <I>K</I> is less than 1. 118 */ 119 public ParallelTeam(int K) { 120 if (K < 1) { 121 throw new IllegalArgumentException("ParallelTeam(): K must be >= 1"); 122 } 123 this.K = K; 124 125 myThread = new ParallelTeamThread[K]; 126 myThread[0] = new ParallelTeamThread_0(this, 0); 127 for (int i = 1; i < K; ++i) { 128 myThread[i] = new ParallelTeamThread(this, i); 129 } 130 } 131 132 // Exported operations. 133 /** 134 * Execute the given parallel region. 135 * 136 * @param theRegion Parallel region. 137 * @exception NullPointerException (unchecked exception) Thrown if 138 * <code>theRegion</code> is null. 139 * @exception IllegalStateException (unchecked exception) Thrown if this 140 * parallel team is already executing a parallel region. Thrown if 141 * <code>theRegion</code> is already being executed by a parallel team. 142 * @exception Exception Exception thrown by the parallel region's 143 * <code>start()</code>, 144 * <code>run()</code>, or <code>finish()</code> methods. 145 * @throws java.lang.Exception if any. 146 */ 147 public final void execute(ParallelRegion theRegion) 148 throws Exception { 149 // Verify preconditions. 150 if (theRegion == null) { 151 throw new NullPointerException("ParallelTeam.execute(): theRegion is null"); 152 } 153 if (myRegion != null) { 154 throw new IllegalStateException("ParallelTeam.execute(): Already executing a parallel region"); 155 } 156 if (theRegion.myTeam != null) { 157 throw new IllegalStateException("ParallelTeam.execute(): theRegion already being executed by a parallel team"); 158 } 159 if (!isActive) { 160 throw new IllegalStateException("ParallelTeam.execute(): The team has been shut down."); 161 } 162 163 // Record parallel region. 164 myRegion = theRegion; 165 myExceptionMap = new ConcurrentHashMap<Integer, Throwable>(K, 0.75f, K); 166 theRegion.myTeam = this; 167 168 try { 169 // Perform the parallel region's start() method. Any exception 170 // aborts the execute() method. 171 myRegion.start(); 172 173 // Release the team threads to perform the parallel region's run() 174 // method. 175 for (ParallelTeamThread thread : myThread) { 176 thread.myRegionBeginSemaphore.release(); 177 } 178 179 // Wait until all team threads have returned from the parallel 180 // region's run() method. 181 myRegionEndSemaphore.acquireUninterruptibly(K); 182 183 // Propagate any exceptions thrown by the run() method. 184 if (myExceptionMap.isEmpty()) { 185 } else if (myExceptionMap.size() == 1) { 186 rethrow(myExceptionMap.values().iterator().next()); 187 } else { 188 throw new MultipleParallelException("ParallelTeam.execute(): Multiple threads threw exceptions", 189 myExceptionMap); 190 } 191 192 // Perform the parallel region's finish() method. Any exception 193 // aborts the execute() method. 194 myRegion.finish(); 195 } finally { 196 // Clean up. 197 myRegion.myTeam = null; 198 myExceptionMap = null; 199 myRegion = null; 200 } 201 } 202 203 /** 204 * Determine if this parallel team is executing a parallel region. 205 * 206 * @return True if this parallel team is executing a parallel region, false 207 * otherwise. 208 */ 209 public final boolean isExecutingInParallel() { 210 return myRegion != null; 211 } 212 213 /** 214 * Returns the parallel region of code that this parallel team is executing. 215 * 216 * @return Parallel region. 217 * @exception IllegalStateException (unchecked exception) Thrown if this 218 * parallel team is not executing a parallel region. 219 */ 220 public final ParallelRegion region() { 221 if (myRegion == null) { 222 throw new IllegalStateException("ParallelTeam.region(): Not executing a parallel region"); 223 } 224 return myRegion; 225 } 226 227 /** 228 * Determine the number of threads in this parallel team. 229 * 230 * @return Number of threads in the team. 231 */ 232 public final int getThreadCount() { 233 return K; 234 } 235 236 /** 237 * Determine the default number of threads for a parallel team. If the 238 * <code>"pj.nt"</code> Java property is specified, that property gives the 239 * default number of threads, which must be an integer greater than or equal 240 * to 1. If the <code>"pj.nt"</code> Java property is not specified, the default 241 * number of threads is the value returned by the 242 * <code>Runtime.availableProcessors()</code> method. You can specify the 243 * default number of threads on the Java command line like this: 244 * <PRE> 245 * java -Dpj.nt=4 . . . 246 * </PRE> 247 * 248 * @return Default number of threads for a parallel team. 249 * @exception IllegalArgumentException (unchecked exception) Thrown if the 250 * <code>"pj.nt"</code> property value is not an integer greater than or equal 251 * to 1. 252 */ 253 public static int getDefaultThreadCount() { 254 int k = PJProperties.getPjNt(); 255 if (k == 0) { 256 k = Runtime.getRuntime().availableProcessors(); 257 } 258 return k; 259 } 260 261 // Hidden operations. 262 /** 263 * Do the thread-0 portion of a barrier with no barrier action. This method 264 * is called by thread 0 of the parallel team. 265 */ 266 void barrier() { 267 // Get the new team barrier flag. 268 int newBarrierFlag = myBarrierFlag ^ 1; 269 270 // Wait until each team thread 1 .. K-1 has switched to the new 271 // barrier flag. 272 for (int i = 1; i < K; ++i) { 273 ParallelTeamThread thread_i = myThread[i]; 274 if (thread_i.myBarrierFlag != newBarrierFlag) { 275 Spinner spinner = new Spinner(); 276 while (thread_i.myBarrierFlag != newBarrierFlag) { 277 spinner.spin(); 278 } 279 } 280 } 281 282 // Switch to the new team barrier flag. 283 myBarrierFlag = newBarrierFlag; 284 } 285 286 /** 287 * Do the thread-0 portion of a barrier with a barrier action. This method 288 * is called by thread 0 of the parallel team. 289 * 290 * @param action Barrier action. 291 * 292 * @exception Exception Thrown if the <code>action</code>'s <code>run()</code> 293 * method throws an exception. 294 */ 295 void barrier(BarrierAction action) 296 throws Exception { 297 // Get the new team barrier flag. 298 int newBarrierFlag = myBarrierFlag ^ 1; 299 300 // Wait until each team thread 1 .. K-1 has switched to the new 301 // barrier flag. 302 for (int i = 1; i < K; ++i) { 303 ParallelTeamThread thread_i = myThread[i]; 304 if (thread_i.myBarrierFlag != newBarrierFlag) { 305 Spinner spinner = new Spinner(); 306 while (thread_i.myBarrierFlag != newBarrierFlag) { 307 spinner.spin(); 308 } 309 } 310 } 311 312 try { 313 // Do the barrier action. 314 action.myTeam = this; 315 action.run(); 316 } finally { 317 action.myTeam = null; 318 319 // Switch to the new team barrier flag. 320 myBarrierFlag = newBarrierFlag; 321 } 322 } 323 324 /** 325 * Re-throw the given object as a checked or unchecked exception. If the 326 * given object is null or is not throwable, do nothing. 327 */ 328 static void rethrow(Object exc) 329 throws Exception { 330 if (exc instanceof RuntimeException) { 331 throw (RuntimeException) exc; 332 } else if (exc instanceof Exception) { 333 throw (Exception) exc; 334 } else if (exc instanceof Error) { 335 throw (Error) exc; 336 } 337 } 338 339 /** 340 * Kills the team's threads run() methods so that they are no longer GC roots. 341 * Useful if you are repetitively constructing ParallelTeam objects, although 342 * it is slightly more elegant just to keep the same ParallelTeam objects through 343 * the entire execution of a program. 344 * 345 * @throws java.lang.Exception if any. 346 */ 347 public void shutdown() throws Exception { 348 if (isActive) { 349 this.execute(new KillRegion()); 350 isActive = false; 351 } 352 } 353 }