1 //****************************************************************************** 2 // 3 // File: WorkerLongForLoop.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.WorkerLongForLoop 6 // 7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.io.IOException; 43 44 import edu.rit.mp.ObjectBuf; 45 import edu.rit.mp.buf.ObjectItemBuf; 46 import edu.rit.util.LongRange; 47 import edu.rit.util.Range; 48 49 /** 50 * Class WorkerLongForLoop is the abstract base class for one variation of a 51 * worker for loop that is executed inside a {@linkplain WorkerRegion}. The loop 52 * index data type is <code>long</code>. The loop stride is implicit (+1). 53 * <P> 54 * To execute a worker for loop, create a {@linkplain WorkerRegion} object; 55 * create an instance of a concrete subclass of class WorkerLongForLoop; and 56 * pass this instance to the worker region's <code>execute()</code> method. Either 57 * every worker team thread must call the worker region's <code>execute()</code> 58 * method with identical arguments, or every thread must not call the 59 * <code>execute()</code> method. You can do all this using an anonymous inner 60 * class; for example: 61 * <PRE> 62 * new WorkerRegion() 63 * { 64 * . . . 65 * public void run() 66 * { 67 * . . . 68 * execute (0L, 99L, new WorkerLongForLoop() 69 * { 70 * // Thread local variable declarations 71 * . . . 72 * public void start() 73 * { 74 * // Per-thread pre-loop initialization code 75 * . . . 76 * } 77 * public void run (long first, long last) 78 * { 79 * // Loop code 80 * . . . 81 * } 82 * public void finish() 83 * { 84 * // Per-thread post-loop finalization code 85 * . . . 86 * } 87 * }); 88 * } 89 * . . . 90 * } 91 * </PRE> 92 * <P> 93 * In each process of a cluster parallel program, the worker team has one or 94 * more worker threads. Every worker thread in every process has a unique worker 95 * tag, going from tag 0 for the first worker thread in the first process to tag 96 * <I>K</I>−1 for the last worker thread in the last process, where 97 * <I>K</I> is the total number of worker threads in all the processes. In 98 * addition, in one process there is a master thread. The worker and master 99 * threads all call the worker region's <code>execute()</code> method to execute the 100 * worker for loop. However, the worker and master threads differ in their 101 * actions. 102 * <P> 103 * The master thread does the following. The master obtains the worker for 104 * loop's schedule as returned by the <code>schedule()</code> method. The range of 105 * loop indexes is divided into "chunks" and the chunks are apportioned among 106 * the workers in accordance with the schedule. The master repeatedly sends 107 * "tasks" to the workers and receives "responses" from the workers. To send a 108 * task to a particular worker, the master (1) sends a message containing the 109 * chunk index range to the worker's process; and (2) calls the worker for 110 * loop's <code>sendTaskInput()</code> method. This method's default implementation 111 * does nothing, but it can be overridden to send additional task input data to 112 * the worker. To receive a response from a particular worker, the master (1) 113 * receives a message containing the chunk index range from the worker's 114 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code> 115 * method. This method's default implementation does nothing, but it can be 116 * overridden to receive additional task output data from the worker. Once all 117 * tasks have been sent to the workers and all responses have been received from 118 * the workers, the master returns from the worker region's <code>execute()</code> 119 * method. 120 * <P> 121 * Each worker thread does the following. The worker calls the worker for loop's 122 * <code>start()</code> method once before beginning any loop iterations. The worker 123 * repeatedly receives tasks from the master and sends responses to the master. 124 * To receive a task from the master, the worker (1) receives a message 125 * containing the chunk index range from the master's process; and (2) calls the 126 * worker for loop's <code>receiveTaskInput()</code> method. This method's default 127 * implementation does nothing, but it can be overridden to receive additional 128 * task input data from the master. The worker now calls the worker for loop's 129 * <code>run()</code> method, passing in the chunk index range lower and upper 130 * bounds. When the <code>run()</code> method returns, the worker sends the response 131 * to the master. To send the response, the worker (1) sends a message 132 * containing the chunk index range to the master's process; and (2) calls the 133 * worker for loop's <code>sendTaskOutput()</code> method. This method's default 134 * implementation does nothing, but it can be overridden to send additional task 135 * output data to the master. Once all tasks have been received from the master 136 * and all responses have been sent to the master, the worker calls the worker 137 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s 138 * threads, the workers do <I>not</I> synchronize with each other at a barrier 139 * at this point.) The worker then returns from the worker region's 140 * <code>execute()</code> method. 141 * <P> 142 * If the worker for loop has a fixed schedule (in which there is exactly one 143 * chunk with a predetermined index range for each worker), then the messages 144 * containing the chunk index range are omitted, and each worker gets its chunk 145 * index range directly from the fixed schedule. However, the task input data 146 * (if any) and task output data (if any) are still sent and received. 147 * <P> 148 * Each message described above is sent with a message tag equal to 149 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 150 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but 151 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the 152 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where 153 * <I>K</I> is the total number of workers in all the processes. The program 154 * should not use message tags in this range except to send and receive the 155 * messages described above. 156 * <P> 157 * Note that each worker team thread actually creates its own instance of the 158 * worker for loop class and passes that instance to the worker region's 159 * <code>execute()</code> method. Thus, any fields declared in the worker for loop 160 * class will <I>not</I> be shared by all the workers, but instead will be 161 * private to each worker. 162 * <P> 163 * The <code>start()</code> method is intended for performing per-thread 164 * initialization before starting the loop iterations. If no such initialization 165 * is needed, omit the <code>start()</code> method. 166 * <P> 167 * The <code>run()</code> method contains the code for the loop. The first and last 168 * indexes for a chunk of loop iterations are passed in as arguments. The loop 169 * stride is implicit (+1). The worker for loop's <code>run()</code> method must be 170 * coded this way: 171 * <PRE> 172 * public void run (long first, long last) 173 * { 174 * for (long i = first; i <= last; ++ i) 175 * { 176 * // Loop body code 177 * . . . 178 * } 179 * } 180 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code> 181 * inclusive and increasing by +1 on each iteration. 182 * <P> 183 * The <code>finish()</code> method is intended for performing per-thread 184 * finalization after finishing the loop iterations. If no such finalization is 185 * needed, omit the <code>finish()</code> method. 186 * <P> 187 * If the worker for loop's <code>start()</code>, <code>run()</code>, or 188 * <code>finish()</code> method throws an exception in one of the worker threads, 189 * then that worker thread executes no further code in the loop, and the worker 190 * region's <code>execute()</code> method throws that same exception in that thread. 191 * However, the other worker threads in the worker team continue to execute. 192 * 193 * @author Alan Kaminsky 194 * @version 27-Jan-2010 195 */ 196 public abstract class WorkerLongForLoop 197 extends WorkerForLoop { 198 199 // Exported constructors. 200 /** 201 * Construct a new worker for loop. 202 */ 203 public WorkerLongForLoop() { 204 super(); 205 } 206 207 // Exported operations. 208 /** 209 * Determine this worker for loop's schedule. Called by the master and 210 * worker threads. The schedule determines how the loop iterations are 211 * apportioned among the worker team threads. For further information, see 212 * class {@linkplain LongSchedule}. 213 * <P> 214 * The <code>schedule()</code> method may be overridden in a subclass to return 215 * the desired schedule. If not overridden, the default is a runtime 216 * schedule (see {@link edu.rit.pj.LongSchedule#runtime()}). 217 * 218 * @return Schedule for this worker for loop. 219 */ 220 public LongSchedule schedule() { 221 return LongSchedule.runtime(); 222 } 223 224 /** 225 * Perform per-thread initialization actions before starting the loop 226 * iterations. Called by a worker thread. 227 * <P> 228 * The <code>start()</code> method may be overridden in a subclass. If not 229 * overridden, the <code>start()</code> method does nothing. 230 * 231 * @exception Exception The <code>start()</code> method may throw any exception. 232 * @throws java.lang.Exception if any. 233 */ 234 public void start() 235 throws Exception { 236 } 237 238 /** 239 * Send additional input data associated with a task. Called by the master 240 * thread. The task is denoted by the given chunk of loop iterations. The 241 * input data must be sent using the given communicator, to the given worker 242 * process rank, with the given message tag. 243 * <P> 244 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If 245 * not overridden, the <code>sendTaskInput()</code> method does nothing. 246 * 247 * @param range Chunk of loop iterations. 248 * @param comm Communicator. 249 * @param wRank Worker process rank. 250 * @param tag Message tag. 251 * @exception IOException Thrown if an I/O error occurred. 252 * @throws java.io.IOException if any. 253 */ 254 public void sendTaskInput(LongRange range, 255 Comm comm, 256 int wRank, 257 int tag) 258 throws IOException { 259 } 260 261 /** 262 * Receive additional input data associated with a task. Called by a worker 263 * thread. The task is denoted by the given chunk of loop iterations. The 264 * input data must be received using the given communicator, from the given 265 * master process rank, with the given message tag. 266 * <P> 267 * The <code>receiveTaskInput()</code> method may be overridden in a subclass. 268 * If not overridden, the <code>receiveTaskInput()</code> method does nothing. 269 * 270 * @param range Chunk of loop iterations. 271 * @param comm Communicator. 272 * @param mRank Master process rank. 273 * @param tag Message tag. 274 * @exception IOException Thrown if an I/O error occurred. 275 * @throws java.io.IOException if any. 276 */ 277 public void receiveTaskInput(LongRange range, 278 Comm comm, 279 int mRank, 280 int tag) 281 throws IOException { 282 } 283 284 /** 285 * Execute one chunk of iterations of this worker for loop. Called by a 286 * worker thread. The <code>run()</code> method must perform the loop body for 287 * indexes <code>first</code> through <code>last</code> inclusive, increasing the 288 * loop index by +1 after each iteration. 289 * <P> 290 * The <code>run()</code> method must be overridden in a subclass. 291 * 292 * @param first First loop index. 293 * @param last Last loop index. 294 * @exception Exception The <code>run()</code> method may throw any exception. 295 * @throws java.lang.Exception if any. 296 */ 297 public abstract void run(long first, 298 long last) 299 throws Exception; 300 301 /** 302 * Send additional output data associated with a task. Called by a worker 303 * thread. The task is denoted by the given chunk of loop iterations. The 304 * output data must be sent using the given communicator, to the given 305 * master process rank, with the given message tag. 306 * <P> 307 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If 308 * not overridden, the <code>sendTaskOutput()</code> method does nothing. 309 * 310 * @param range Chunk of loop iterations. 311 * @param comm Communicator. 312 * @param mRank Master process rank. 313 * @param tag Message tag. 314 * @exception IOException Thrown if an I/O error occurred. 315 * @throws java.io.IOException if any. 316 */ 317 public void sendTaskOutput(LongRange range, 318 Comm comm, 319 int mRank, 320 int tag) 321 throws IOException { 322 } 323 324 /** 325 * Receive additional output data associated with a task. Called by the 326 * master thread. The task is denoted by the given chunk of loop iterations. 327 * The output data must be received using the given communicator, from the 328 * given worker process rank, with the given message tag. 329 * <P> 330 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass. 331 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing. 332 * 333 * @param range Chunk of loop iterations. 334 * @param comm Communicator. 335 * @param wRank Worker process rank. 336 * @param tag Message tag. 337 * @exception IOException Thrown if an I/O error occurred. 338 * @throws java.io.IOException if any. 339 */ 340 public void receiveTaskOutput(LongRange range, 341 Comm comm, 342 int wRank, 343 int tag) 344 throws IOException { 345 } 346 347 /** 348 * Perform per-thread finalization actions after finishing the loop 349 * iterations. Called by a worker thread. 350 * <P> 351 * The <code>finish()</code> method may be overridden in a subclass. If not 352 * overridden, the <code>finish()</code> method does nothing. 353 * 354 * @exception Exception The <code>finish()</code> method may throw any 355 * exception. 356 * @throws java.lang.Exception if any. 357 */ 358 public void finish() 359 throws Exception { 360 } 361 362 /** 363 * Returns the tag offset for this worker for loop. Each message between the 364 * master and worker threads is sent with a message tag equal to 365 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 366 * tag offset. 367 * <P> 368 * The <code>tagOffset()</code> method may be overridden in a subclass. If not 369 * overridden, the <code>tagOffset()</code> returns a default tag offset of 370 * <code>Integer.MIN_VALUE</code>. 371 * 372 * @return Tag offset. 373 */ 374 public int tagOffset() { 375 return Integer.MIN_VALUE; 376 } 377 378 // Hidden operations. 379 /** 380 * Execute this worker for loop in the master thread. 381 * 382 * @param range Loop index range. 383 * 384 * @exception IOException Thrown if an I/O error occurred. 385 */ 386 void masterExecute(LongRange range) 387 throws IOException { 388 LongSchedule sch = schedule(); 389 if (sch.isFixedSchedule()) { 390 masterExecuteFixed(range, sch); 391 } else { 392 masterExecuteNonFixed(range, sch); 393 } 394 } 395 396 /** 397 * Execute this worker for loop in the master thread with a fixed schedule. 398 * 399 * @param range Loop index range. 400 * @param sch Schedule. 401 * 402 * @exception IOException Thrown if an I/O error occurred. 403 */ 404 void masterExecuteFixed(LongRange range, 405 LongSchedule sch) 406 throws IOException { 407 int count = myTeam.count; 408 Comm comm = myTeam.comm; 409 410 // Send additional task input to each worker. 411 sch.start(count, range); 412 for (int w = 0; w < count; ++w) { 413 LongRange chunk = sch.next(w); 414 if (chunk != null) { 415 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w)); 416 } 417 } 418 419 // Receive additional task output from each worker. 420 sch.start(count, range); 421 for (int w = 0; w < count; ++w) { 422 LongRange chunk = sch.next(w); 423 if (chunk != null) { 424 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w)); 425 } 426 } 427 } 428 429 /** 430 * Execute this worker for loop in the master thread with a non-fixed 431 * schedule. 432 * 433 * @param range Loop index range. 434 * @param sch Schedule. 435 * 436 * @exception IOException Thrown if an I/O error occurred. 437 */ 438 void masterExecuteNonFixed(LongRange range, 439 LongSchedule sch) 440 throws IOException { 441 int count = myTeam.count; 442 sch.start(count, range); 443 int remaining = count; 444 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer(); 445 Range tagRange = new Range(tagFor(0), tagFor(count - 1)); 446 Comm comm = myTeam.comm; 447 448 // Send initial task to each worker. 449 for (int w = 0; w < count; ++w) { 450 LongRange chunk = sch.next(w); 451 buf.item = chunk; 452 buf.reset(); 453 int r = myTeam.workerRank(w); 454 int tag = tagFor(w); 455 comm.send(r, tag, buf); 456 if (chunk == null) { 457 --remaining; 458 } else { 459 sendTaskInput(chunk, comm, r, tag); 460 } 461 } 462 463 // Repeatedly receive a response from a worker and send next task to 464 // that worker. 465 while (remaining > 0) { 466 CommStatus status = comm.receive(null, tagRange, buf); 467 LongRange chunk = buf.item; 468 int r = status.fromRank; 469 int tag = status.tag; 470 int w = workerFor(tag); 471 receiveTaskOutput(chunk, comm, r, tag); 472 chunk = sch.next(w); 473 buf.item = chunk; 474 buf.reset(); 475 comm.send(r, tag, buf); 476 if (chunk == null) { 477 --remaining; 478 } else { 479 sendTaskInput(chunk, comm, r, tag); 480 } 481 } 482 } 483 484 /** 485 * Execute this worker for loop in a worker thread. 486 * 487 * @param w Worker index. 488 * @param range Loop index range. 489 * 490 * @exception Exception This method may throw any exception. 491 */ 492 void workerExecute(int w, 493 LongRange range) 494 throws Exception { 495 LongSchedule sch = schedule(); 496 if (sch.isFixedSchedule()) { 497 sch.start(myTeam.count, range); 498 workerExecuteFixed(sch.next(w), w); 499 } else { 500 workerExecuteNonFixed(w); 501 } 502 } 503 504 /** 505 * Execute this worker for loop in a worker thread using a fixed schedule. 506 * 507 * @param range Chunk of loop iterations. 508 * @param w Worker index. 509 * 510 * @exception Exception This method may throw any exception. 511 */ 512 void workerExecuteFixed(LongRange range, 513 int w) 514 throws Exception { 515 start(); 516 if (range != null) { 517 Comm comm = myTeam.comm; 518 int r = myTeam.masterRank(); 519 int tag = tagFor(w); 520 receiveTaskInput(range, comm, r, tag); 521 run(range.lb(), range.ub()); 522 sendTaskOutput(range, comm, r, tag); 523 } 524 finish(); 525 } 526 527 /** 528 * Execute this worker for loop in a worker thread using a non-fixed 529 * schedule. 530 * 531 * @param w Worker index. 532 * 533 * @exception Exception This method may throw any exception. 534 */ 535 void workerExecuteNonFixed(int w) 536 throws Exception { 537 Comm comm = myTeam.comm; 538 int r = myTeam.masterRank(); 539 int tag = tagFor(w); 540 start(); 541 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer(); 542 for (;;) { 543 comm.receive(r, tag, buf); 544 LongRange range = buf.item; 545 if (range == null) { 546 break; 547 } 548 receiveTaskInput(range, comm, r, tag); 549 run(range.lb(), range.ub()); 550 551 // The next two statements constitute a critical section; other 552 // workers in this team must not send messages in between these two 553 // messages, or the master can deadlock. 554 synchronized (myTeam) { 555 comm.send(r, tag, buf); 556 sendTaskOutput(range, comm, r, tag); 557 } 558 } 559 finish(); 560 } 561 562 /** 563 * Returns the message tag for the given worker index. 564 * 565 * @param w Worker index. 566 * 567 * @return Message tag. 568 */ 569 private int tagFor(int w) { 570 return w + tagOffset(); 571 } 572 573 /** 574 * Returns the worker index for the given message tag. 575 * 576 * @param tag Message tag. 577 * 578 * @return Worker index. 579 */ 580 private int workerFor(int tag) { 581 return tag - tagOffset(); 582 } 583 584 }