1 //****************************************************************************** 2 // 3 // File: WorkerRegion.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.WorkerRegion 6 // 7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.util.Iterator; 43 44 import edu.rit.util.LongRange; 45 import edu.rit.util.Range; 46 47 /** 48 * Class WorkerRegion is the abstract base class for a worker region that is 49 * executed by a {@linkplain WorkerTeam} of threads distributed across the 50 * processes of a cluster parallel program. 51 * <P> 52 * To execute a worker region, create a {@linkplain WorkerTeam} object; create 53 * an instance of a concrete subclass of class WorkerRegion; and pass this 54 * instance to the worker team's <code>execute()</code> method. You can do all this 55 * using an anonymous inner class; for example: 56 * <PRE> 57 * new WorkerTeam().execute (new WorkerRegion() 58 * { 59 * public void start() 60 * { 61 * // Initialization code 62 * . . . 63 * } 64 * public void run() 65 * { 66 * // Parallel code 67 * . . . 68 * } 69 * public void finish() 70 * { 71 * // Finalization code 72 * . . . 73 * } 74 * }); 75 * </PRE> 76 * <P> 77 * The worker team's <code>execute()</code> method does the following. In each 78 * process, the worker team has a certain number of <B>worker threads</B> 79 * <I>K</I>, where <I>K</I> was specified when the worker team was constructed. 80 * In the highest-ranked process of the communicator, there is a <B>master 81 * thread</B> in addition to the worker threads. The <B>main thread</B> is the 82 * thread calling the worker team's <code>execute()</code> method. The main thread 83 * calls the worker region's <code>start()</code> method. When the <code>start()</code> 84 * method returns, all the worker threads, plus the master thread if any, call 85 * the worker region's <code>run()</code> method concurrently. When all the team 86 * threads have returned from the <code>run()</code> method, the main thread calls 87 * the worker region's <code>finish()</code> method. When the <code>finish()</code> 88 * method returns, the main thread returns from the worker team's 89 * <code>execute()</code> method. 90 * <P> 91 * The chief purpose of a worker team is to execute a work-sharing parallel loop 92 * in a cluster parallel program, partitioning the loop iterations among the 93 * worker threads in all the processes. The worker team uses the 94 * <B>master-worker pattern</B> to partition the iterations. The master thread 95 * partitions the loop iterations and sends tasks to the worker threads; the 96 * worker threads send results back to the master thread. The worker team uses a 97 * certain <B>communicator</B> to do this message passing. The communicator was 98 * specified when the worker team was constructed. For further information, see 99 * class {@linkplain WorkerIntegerForLoop} and {@linkplain WorkerLongForLoop}. 100 * <P> 101 * Within each process, variables to be shared by all threads in the team may be 102 * declared as fields of the WorkerRegion subclass. (Variables cannot be shared 103 * between processes.) The <code>start()</code> method is intended for performing 104 * initialization in a single thread before parallel execution begins. If no 105 * such initialization is needed, omit the <code>start()</code> method. The 106 * <code>run()</code> method contains code to be executed in parallel by all threads 107 * in the team. Variables that are private to each thread may be declared inside 108 * the <code>run()</code> method. The <code>finish()</code> method is intended for 109 * performing finalization in a single thread after parallel execution ends. If 110 * no such finalization is needed, omit the <code>finish()</code> method. 111 * <P> 112 * If the worker region's <code>start()</code> method throws an exception, the 113 * worker team's <code>execute()</code> method throws that same exception, and the 114 * <code>run()</code> method is not called. 115 * <P> 116 * If the worker region's <code>run()</code> method throws an exception in one of 117 * the team threads, the exception's stack trace is printed on the standard 118 * error, the worker team waits until all the other team threads have returned 119 * from the <code>run()</code> method, then the worker team's <code>execute()</code> 120 * method throws that same exception, and the worker region's <code>finish()</code> 121 * method is not called. If the worker region's <code>run()</code> method throws an 122 * exception in more than one of the team threads, each exception's stack trace 123 * is printed on the standard error, the worker team waits until all the other 124 * team threads have returned from the <code>run()</code> method, then the worker 125 * team's <code>execute()</code> method throws a {@linkplain 126 * MultipleParallelException} wrapping all the thrown exceptions, and the worker 127 * region's <code>finish()</code> method is not called. 128 * <P> 129 * If the worker region's <code>finish()</code> method throws an exception, the 130 * worker team's <code>execute()</code> method throws that same exception. 131 * 132 * @author Alan Kaminsky 133 * @version 07-Oct-2010 134 */ 135 public abstract class WorkerRegion 136 extends WorkerConstruct { 137 138 // Exported constructors. 139 /** 140 * Construct a new worker region. 141 */ 142 public WorkerRegion() { 143 super(); 144 } 145 146 // Exported operations. 147 /** 148 * Perform initialization actions before parallel execution begins. Only one 149 * thread in each process calls the <code>start()</code> method. 150 * <P> 151 * The <code>start()</code> method may be overridden in a subclass. If not 152 * overridden, the <code>start()</code> method does nothing. 153 * 154 * @exception Exception The <code>start()</code> method may throw any exception. 155 * @throws java.lang.Exception if any. 156 */ 157 public void start() 158 throws Exception { 159 } 160 161 /** 162 * Execute parallel code. All threads of the worker team in each process 163 * call the <code>run()</code> method concurrently. 164 * <P> 165 * The <code>run()</code> method must be implemented in a subclass. 166 * 167 * @exception Exception The <code>run()</code> method may throw any exception. 168 * @throws java.lang.Exception if any. 169 */ 170 public abstract void run() 171 throws Exception; 172 173 /** 174 * Perform finalization actions after parallel execution ends. Only one 175 * thread in each process calls the <code>finish()</code> method. 176 * <P> 177 * The <code>finish()</code> method may be overridden in a subclass. If not 178 * overridden, the <code>finish()</code> method does nothing. 179 * 180 * @exception Exception The <code>finish()</code> method may throw any 181 * exception. 182 * @throws java.lang.Exception if any. 183 */ 184 public void finish() 185 throws Exception { 186 } 187 188 /** 189 * Execute a worker for loop within this worker region. For further 190 * information, see class {@linkplain WorkerIntegerForLoop}. The loop index 191 * goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) in 192 * steps of +1. If <code>first</code> is greater than <code>last</code>, then no 193 * loop iterations are performed. 194 * <P> 195 * <I>Note:</I> Either all threads in the worker team must call the 196 * <code>execute()</code> method with identical arguments, or none of the 197 * threads must call the <code>execute()</code> method. 198 * 199 * @param first First loop index. 200 * @param last Last loop index. 201 * @param theLoop Worker for loop. 202 * @exception NullPointerException (unchecked exception) Thrown if 203 * <code>theLoop</code> is null. 204 * @exception IllegalStateException (unchecked exception) Thrown if no 205 * worker team is executing this worker region. 206 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws 207 * an exception. 208 * @throws java.lang.Exception if any. 209 */ 210 public final void execute(int first, 211 int last, 212 WorkerIntegerForLoop theLoop) 213 throws Exception { 214 // Verify preconditions. 215 if (theLoop == null) { 216 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null"); 217 } 218 if (myTeam == null) { 219 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 220 } 221 222 try { 223 // Record parallel team. 224 theLoop.myTeam = this.myTeam; 225 226 // Get current parallel team thread. 227 WorkerTeamThread currentThread = getCurrentThread(); 228 int w = currentThread.myIndex; 229 230 // Do master or worker thread processing. 231 Range range = new Range(first, last); 232 if (w == -1) { 233 theLoop.masterExecute(range); 234 } else { 235 theLoop.workerExecute(w, range); 236 } 237 } finally { 238 // Forget parallel team. 239 theLoop.myTeam = null; 240 } 241 } 242 243 /** 244 * Execute a worker for loop within this worker region. For further 245 * information, see class {@linkplain WorkerIntegerStrideForLoop}. The loop 246 * index goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) 247 * in steps of <code>stride</code>. The stride must be positive. If 248 * <code>first</code> is greater than <code>last</code>, then no loop iterations are 249 * performed. 250 * <P> 251 * <I>Note:</I> Either all threads in the worker team must call the 252 * <code>execute()</code> method with identical arguments, or none of the 253 * threads must call the <code>execute()</code> method. 254 * 255 * @param first First loop index. 256 * @param last Last loop index. 257 * @param stride Loop index stride, >= 1. 258 * @param theLoop Worker for loop. 259 * @exception IllegalArgumentException (unchecked exception) Thrown if 260 * <code>stride</code> < 1. 261 * @exception NullPointerException (unchecked exception) Thrown if 262 * <code>theLoop</code> is null. 263 * @exception IllegalStateException (unchecked exception) Thrown if no 264 * worker team is executing this worker region. 265 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws 266 * an exception. 267 * @throws java.lang.Exception if any. 268 */ 269 public final void execute(int first, 270 int last, 271 int stride, 272 WorkerIntegerStrideForLoop theLoop) 273 throws Exception { 274 // Verify preconditions. 275 if (stride <= 0) { 276 throw new IllegalArgumentException("WorkerRegion.execute(): Stride = " + stride + " illegal"); 277 } 278 if (theLoop == null) { 279 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null"); 280 } 281 if (myTeam == null) { 282 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 283 } 284 285 try { 286 // Record parallel team. 287 theLoop.myTeam = this.myTeam; 288 289 // Get current parallel team thread. 290 WorkerTeamThread currentThread = getCurrentThread(); 291 int w = currentThread.myIndex; 292 293 // Do master or worker thread processing. 294 Range range = new Range(first, last, stride); 295 if (w == -1) { 296 theLoop.masterExecute(range); 297 } else { 298 theLoop.workerExecute(w, range); 299 } 300 } finally { 301 // Forget parallel team. 302 theLoop.myTeam = null; 303 } 304 } 305 306 /** 307 * Execute a worker for loop within this worker region. For further 308 * information, see class {@linkplain WorkerLongForLoop}. The loop index 309 * goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) in 310 * steps of +1. If <code>first</code> is greater than <code>last</code>, then no 311 * loop iterations are performed. 312 * <P> 313 * <I>Note:</I> Either all threads in the worker team must call the 314 * <code>execute()</code> method with identical arguments, or none of the 315 * threads must call the <code>execute()</code> method. 316 * 317 * @param first First loop index. 318 * @param last Last loop index. 319 * @param theLoop Worker for loop. 320 * @exception NullPointerException (unchecked exception) Thrown if 321 * <code>theLoop</code> is null. 322 * @exception IllegalStateException (unchecked exception) Thrown if no 323 * worker team is executing this worker region. 324 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws 325 * an exception. 326 * @throws java.lang.Exception if any. 327 */ 328 public final void execute(long first, 329 long last, 330 WorkerLongForLoop theLoop) 331 throws Exception { 332 // Verify preconditions. 333 if (theLoop == null) { 334 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null"); 335 } 336 if (myTeam == null) { 337 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 338 } 339 340 try { 341 // Record parallel team. 342 theLoop.myTeam = this.myTeam; 343 344 // Get current parallel team thread. 345 WorkerTeamThread currentThread = getCurrentThread(); 346 int w = currentThread.myIndex; 347 348 // Do master or worker thread processing. 349 LongRange range = new LongRange(first, last); 350 if (w == -1) { 351 theLoop.masterExecute(range); 352 } else { 353 theLoop.workerExecute(w, range); 354 } 355 } finally { 356 // Forget parallel team. 357 theLoop.myTeam = null; 358 } 359 } 360 361 /** 362 * Execute a worker for loop within this worker region. For further 363 * information, see class {@linkplain WorkerLongStrideForLoop}. The loop 364 * index goes from <code>first</code> (inclusive) to <code>last</code> (inclusive) 365 * in steps of <code>stride</code>. The stride must be positive. If 366 * <code>first</code> is greater than <code>last</code>, then no loop iterations are 367 * performed. 368 * <P> 369 * <I>Note:</I> Either all threads in the worker team must call the 370 * <code>execute()</code> method with identical arguments, or none of the 371 * threads must call the <code>execute()</code> method. 372 * 373 * @param first First loop index. 374 * @param last Last loop index. 375 * @param stride Loop index stride, >= 1. 376 * @param theLoop Worker for loop. 377 * @exception IllegalArgumentException (unchecked exception) Thrown if 378 * <code>stride</code> < 1. 379 * @exception NullPointerException (unchecked exception) Thrown if 380 * <code>theLoop</code> is null. 381 * @exception IllegalStateException (unchecked exception) Thrown if no 382 * worker team is executing this worker region. 383 * @exception Exception Thrown if one of <code>theLoop</code>'s methods throws 384 * an exception. 385 * @throws java.lang.Exception if any. 386 */ 387 public final void execute(long first, 388 long last, 389 long stride, 390 WorkerLongStrideForLoop theLoop) 391 throws Exception { 392 // Verify preconditions. 393 if (stride <= 0) { 394 throw new IllegalArgumentException("WorkerRegion.execute(): Stride = " + stride + " illegal"); 395 } 396 if (theLoop == null) { 397 throw new NullPointerException("WorkerRegion.execute(): Worker for loop is null"); 398 } 399 if (myTeam == null) { 400 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 401 } 402 403 try { 404 // Record parallel team. 405 theLoop.myTeam = this.myTeam; 406 407 // Get current parallel team thread. 408 WorkerTeamThread currentThread = getCurrentThread(); 409 int w = currentThread.myIndex; 410 411 // Do master or worker thread processing. 412 LongRange range = new LongRange(first, last, stride); 413 if (w == -1) { 414 theLoop.masterExecute(range); 415 } else { 416 theLoop.workerExecute(w, range); 417 } 418 } finally { 419 // Forget parallel team. 420 theLoop.myTeam = null; 421 } 422 } 423 424 /** 425 * Execute a worker iteration within this worker region. For further 426 * information, see class {@linkplain WorkerIteration}. The items processed 427 * by the iteration are the elements of the given array. The iteration order 428 * is from index 0 upwards. 429 * <P> 430 * <I>Note:</I> Either all threads in the worker team must call the 431 * <code>execute()</code> method with identical arguments, or none of the 432 * threads must call the <code>execute()</code> method. 433 * 434 * @param <T> Data type of the items iterated over. 435 * @param theArray Array containing the items. 436 * @param theIteration Worker iteration. 437 * @exception NullPointerException (unchecked exception) Thrown if this is 438 * the master process and 439 * <code>theArray</code> is null. Thrown if <code>theIteration</code> is null. 440 * @exception IllegalStateException (unchecked exception) Thrown if no 441 * worker team is executing this worker region. 442 * @exception Exception Thrown if one of <code>theIteration</code>'s methods 443 * throws an exception. 444 * @throws java.lang.Exception if any. 445 */ 446 public final <T> void execute(T[] theArray, 447 WorkerIteration<T> theIteration) 448 throws Exception { 449 // Verify preconditions. 450 if (myTeam == null) { 451 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 452 } 453 if (myTeam.rank == myTeam.masterRank() && theArray == null) { 454 throw new NullPointerException("WorkerRegion.execute(): Array is null"); 455 } 456 if (theIteration == null) { 457 throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null"); 458 } 459 460 try { 461 // Record parallel team. 462 theIteration.myTeam = this.myTeam; 463 464 // Get current parallel team thread. 465 WorkerTeamThread currentThread = getCurrentThread(); 466 int w = currentThread.myIndex; 467 468 // Do master or worker thread processing. 469 if (w == -1) { 470 theIteration.masterExecute(new ArrayItemGenerator<T>(theArray)); 471 } else { 472 theIteration.workerExecute(w); 473 } 474 } finally { 475 // Forget parallel team. 476 theIteration.myTeam = null; 477 } 478 } 479 480 /** 481 * Execute a worker iteration within this worker region. For further 482 * information, see class {@linkplain WorkerIteration}. The items processed 483 * by the iteration are the items returned by the given iterator. The 484 * iteration order is that of the given iterator. 485 * <P> 486 * <I>Note:</I> Either all threads in the worker team must call the 487 * <code>execute()</code> method with identical arguments, or none of the 488 * threads must call the <code>execute()</code> method. 489 * 490 * @param <T> Data type of the items iterated over. 491 * @param theIterator Iterator over the items. 492 * @param theIteration Worker iteration. 493 * @exception NullPointerException (unchecked exception) Thrown if this is 494 * the master process and 495 * <code>theIterator</code> is null. Thrown if <code>theIteration</code> is null. 496 * @exception IllegalStateException (unchecked exception) Thrown if no 497 * worker team is executing this worker region. 498 * @exception Exception Thrown if one of <code>theIteration</code>'s methods 499 * throws an exception. 500 * @throws java.lang.Exception if any. 501 */ 502 public final <T> void execute(Iterator<T> theIterator, 503 WorkerIteration<T> theIteration) 504 throws Exception { 505 // Verify preconditions. 506 if (myTeam == null) { 507 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 508 } 509 if (myTeam.rank == myTeam.masterRank() && theIterator == null) { 510 throw new NullPointerException("WorkerRegion.execute(): Iterator is null"); 511 } 512 if (theIteration == null) { 513 throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null"); 514 } 515 516 try { 517 // Record parallel team. 518 theIteration.myTeam = this.myTeam; 519 520 // Get current parallel team thread. 521 WorkerTeamThread currentThread = getCurrentThread(); 522 int w = currentThread.myIndex; 523 524 // Do master or worker thread processing. 525 if (w == -1) { 526 theIteration.masterExecute(new IteratorItemGenerator<T>(theIterator)); 527 } else { 528 theIteration.workerExecute(w); 529 } 530 } finally { 531 // Forget parallel team. 532 theIteration.myTeam = null; 533 } 534 } 535 536 /** 537 * Execute a worker iteration within this worker region. For further 538 * information, see class {@linkplain WorkerIteration}. The items processed 539 * by the iteration are the items contained in the given iterable 540 * collection. The iteration order is that of the given iterable 541 * collection's iterator. 542 * <P> 543 * <I>Note:</I> Either all threads in the worker team must call the 544 * <code>execute()</code> method with identical arguments, or none of the 545 * threads must call the <code>execute()</code> method. 546 * 547 * @param <T> Data type of the items iterated over. 548 * @param theIterable Iterable collection containing the items. 549 * @param theIteration Worker iteration. 550 * @exception NullPointerException (unchecked exception) Thrown if this is 551 * the master process and 552 * <code>theIterable</code> is null. Thrown if <code>theIteration</code> is null. 553 * @exception IllegalStateException (unchecked exception) Thrown if no 554 * worker team is executing this worker region. 555 * @exception Exception Thrown if one of <code>theIteration</code>'s methods 556 * throws an exception. 557 * @throws java.lang.Exception if any. 558 */ 559 public final <T> void execute(Iterable<T> theIterable, 560 WorkerIteration<T> theIteration) 561 throws Exception { 562 // Verify preconditions. 563 if (myTeam == null) { 564 throw new IllegalStateException("WorkerRegion.execute(): No parallel team executing"); 565 } 566 if (myTeam.rank == myTeam.masterRank() && theIterable == null) { 567 throw new NullPointerException("WorkerRegion.execute(): Iterable collection is null"); 568 } 569 if (theIteration == null) { 570 throw new NullPointerException("WorkerRegion.execute(): Worker iteration is null"); 571 } 572 573 try { 574 // Record parallel team. 575 theIteration.myTeam = this.myTeam; 576 577 // Get current parallel team thread. 578 WorkerTeamThread currentThread = getCurrentThread(); 579 int w = currentThread.myIndex; 580 581 // Do master or worker thread processing. 582 if (w == -1) { 583 theIteration.masterExecute(new IteratorItemGenerator<T>(theIterable.iterator())); 584 } else { 585 theIteration.workerExecute(w); 586 } 587 } finally { 588 // Forget parallel team. 589 theIteration.myTeam = null; 590 } 591 } 592 593 }