1 //****************************************************************************** 2 // 3 // File: WorkerTeam.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.WorkerTeam 6 // 7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.util.concurrent.ConcurrentHashMap; 43 import java.util.concurrent.Semaphore; 44 45 /** 46 * Class WorkerTeam provides a team of threads, distributed across the processes 47 * of a cluster parallel program, for executing a {@linkplain WorkerRegion} in 48 * parallel. 49 * <P> 50 * A worker team uses a communicator for message passing. The communicator is 51 * specified as a constructor argument; if not specified, the world communicator 52 * is used. Every process that is part of the communicator must create the 53 * worker team. In class WorkerTeam, there is one worker thread per process. (To 54 * get more than one worker thread per process, use class {@linkplain 55 * HybridTeam}.) Every worker thread in every process has a unique index, going 56 * from index 0 for the first thread in the first process to index 57 * <I>K</I>−1 for the last thread in the last process, where <I>K</I> is 58 * the total number of worker threads in all the processes. In process rank 0, 59 * there is an additional master thread. 60 * <P> 61 * To execute a worker region, create a WorkerTeam object; create an instance of 62 * a concrete subclass of class {@linkplain WorkerRegion}; and pass this 63 * instance to the worker team's <code>execute()</code> method. For further 64 * information, see class {@linkplain WorkerRegion}. 65 * 66 * @author Alan Kaminsky 67 * @version 19-Jan-2010 68 */ 69 public class WorkerTeam { 70 71 // Hidden data members. 72 // Number of worker threads in this process. 73 int K; 74 75 // Communicator for message passing, its size, this process's rank. 76 Comm comm; 77 int size; 78 int rank; 79 80 // Number of worker threads in all processes. 81 int count; 82 83 // Array of worker and master team threads in this process. There are K 84 // worker threads. There is an additional master thread in the last process 85 // of the communicator. 86 WorkerTeamThread[] myThread; 87 88 // Worker region being executed, or null if none is being executed. 89 WorkerRegion myRegion; 90 91 // Semaphore for synchronizing threads at the end of a worker region. 92 Semaphore myRegionEndSemaphore = new Semaphore(0); 93 94 // Exception map for worker region, or null if none is being executed. 95 ConcurrentHashMap<Integer, Throwable> myExceptionMap; 96 97 // Hidden constructors. 98 /** 99 * Construct a new, uninitialized worker team. 100 * 101 * @param flag To distinguish this constructor from the others. 102 */ 103 WorkerTeam(boolean flag) { 104 } 105 106 // Exported constructors. 107 /** 108 * Construct a new worker team with one thread per process and using the 109 * world communicator for message passing. 110 */ 111 public WorkerTeam() { 112 this(Comm.world()); 113 } 114 115 /** 116 * Construct a new worker team with one thread per process and using the 117 * given communicator for message passing. 118 * 119 * @param comm Communicator to use for message passing. 120 * @exception NullPointerException (unchecked exception) Thrown if 121 * <code>comm</code> is null. 122 */ 123 public WorkerTeam(Comm comm) { 124 if (comm == null) { 125 throw new NullPointerException("WorkerTeam(): comm is null"); 126 } 127 initialize(/*K */1, 128 /*comm */ comm, 129 /*size */ comm.size(), 130 /*rank */ comm.rank(), 131 /*count*/ comm.size(), 132 /*wlb */ comm.rank()); 133 } 134 135 // Hidden initializers. 136 /** 137 * Initialize a new worker team. 138 * 139 * @param K Number of worker threads in this process. 140 * @param comm Communicator to use for message passing. 141 * @param size Communicator's size. 142 * @param rank This process's rank in the communicator. 143 * @param count Number of worker threads in all processes. 144 * @param wlb First worker index in this process. 145 */ 146 void initialize(int K, 147 Comm comm, 148 int size, 149 int rank, 150 int count, 151 int wlb) { 152 // Record parameters. 153 this.K = K; 154 this.comm = comm; 155 this.size = size; 156 this.rank = rank; 157 this.count = count; 158 159 // Set up worker team threads. Additional master thread in process 0. 160 int WM = K + (rank == 0 ? 1 : 0); 161 myThread = new WorkerTeamThread[WM]; 162 for (int i = 0; i < K; ++i) { 163 myThread[i] = new WorkerTeamThread(this, wlb + i); 164 } 165 if (WM > K) { 166 myThread[K] = new WorkerTeamThread(this, -1); 167 } 168 } 169 170 // Exported operations. 171 /** 172 * Execute the given worker region. 173 * 174 * @param theRegion Worker region. 175 * @exception NullPointerException (unchecked exception) Thrown if 176 * <code>theRegion</code> is null. 177 * @exception IllegalStateException (unchecked exception) Thrown if this 178 * worker team is already executing a worker region. Thrown if 179 * <code>theRegion</code> is already being executed by a worker team. 180 * @exception Exception Exception thrown by the worker region's 181 * <code>start()</code>, 182 * <code>run()</code>, or <code>finish()</code> methods. 183 * @throws java.lang.Exception if any. 184 */ 185 public final void execute(WorkerRegion theRegion) 186 throws Exception { 187 // Verify preconditions. 188 if (theRegion == null) { 189 throw new NullPointerException("WorkerTeam.execute(): theRegion is null"); 190 } 191 if (myRegion != null) { 192 throw new IllegalStateException("WorkerTeam.execute(): Already executing a worker region"); 193 } 194 if (theRegion.myTeam != null) { 195 throw new IllegalStateException("WorkerTeam.execute(): theRegion already being executed by a worker team"); 196 } 197 198 // Record worker region. 199 myRegion = theRegion; 200 myExceptionMap = new ConcurrentHashMap<Integer, Throwable>(K, 0.75f, K); 201 theRegion.myTeam = this; 202 203 try { 204 // Perform the worker region's start() method. Any exception aborts 205 // the execute() method. 206 myRegion.start(); 207 208 // Release the team threads to perform the worker region's run() 209 // method. 210 for (WorkerTeamThread thread : myThread) { 211 thread.myRegionBeginSemaphore.release(); 212 } 213 214 // Wait until all team threads have returned from the worker 215 // region's run() method. 216 myRegionEndSemaphore.acquireUninterruptibly(myThread.length); 217 218 // Propagate any exceptions thrown by the run() method. 219 if (myExceptionMap.isEmpty()) { 220 } else if (myExceptionMap.size() == 1) { 221 rethrow(myExceptionMap.values().iterator().next()); 222 } else { 223 throw new MultipleParallelException("WorkerTeam.execute(): Multiple threads threw exceptions", 224 myExceptionMap); 225 } 226 227 // Perform the worker region's finish() method. Any exception aborts 228 // the execute() method. 229 myRegion.finish(); 230 } finally { 231 // Clean up. 232 myRegion.myTeam = null; 233 myExceptionMap = null; 234 myRegion = null; 235 } 236 } 237 238 /** 239 * Determine if this worker team is executing a worker region. 240 * 241 * @return True if this worker team is executing a worker region, false 242 * otherwise. 243 */ 244 public final boolean isExecutingInParallel() { 245 return myRegion != null; 246 } 247 248 /** 249 * Returns the worker region of code that this worker team is executing. 250 * 251 * @return Worker region. 252 * @exception IllegalStateException (unchecked exception) Thrown if this 253 * worker team is not executing a worker region. 254 */ 255 public final WorkerRegion region() { 256 if (myRegion == null) { 257 throw new IllegalStateException("WorkerTeam.region(): Not executing a worker region"); 258 } 259 return myRegion; 260 } 261 262 /** 263 * Determine the number of worker threads in this worker team in this 264 * process. This does not include the master thread if any. 265 * 266 * @return Number of worker threads in this process. 267 */ 268 public final int getThreadCount() { 269 return K; 270 } 271 272 /** 273 * Determine the total number of worker threads in this worker team in all 274 * processes. This does not include the master thread. 275 * 276 * @return Number of worker threads in all processes. 277 */ 278 public final int getTotalThreadCount() { 279 return count; 280 } 281 282 /** 283 * Determine the rank of the process that contains the master thread. At 284 * present, this is always rank 0. 285 * 286 * @return Master process rank. 287 */ 288 public int masterRank() { 289 return 0; 290 } 291 292 /** 293 * Determine the rank of the process that contains the worker thread with 294 * the given index. 295 * 296 * @param w Worker index. 297 * @return Worker process rank. 298 * @exception IllegalArgumentException (unchecked exception) Thrown if 299 * <code>w</code> is not in the range 0 .. 300 * <code>getTotalThreadCount()</code>−1. 301 */ 302 public int workerRank(int w) { 303 if (0 > w || w >= count) { 304 throw new IllegalArgumentException("WorkerTeam.workerRank(): w (= " + w + ") illegal"); 305 } 306 return w; 307 } 308 309 // Hidden operations. 310 /** 311 * Re-throw the given object as a checked or unchecked exception. If the 312 * given object is null or is not throwable, do nothing. 313 */ 314 static void rethrow(Object exc) 315 throws Exception { 316 if (exc instanceof RuntimeException) { 317 throw (RuntimeException) exc; 318 } else if (exc instanceof Exception) { 319 throw (Exception) exc; 320 } else if (exc instanceof Error) { 321 throw (Error) exc; 322 } 323 } 324 325 }