1 //****************************************************************************** 2 // 3 // File: WorkerIntegerForLoop.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.WorkerIntegerForLoop 6 // 7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.io.IOException; 43 44 import edu.rit.mp.ObjectBuf; 45 import edu.rit.mp.buf.ObjectItemBuf; 46 import edu.rit.util.Range; 47 48 /** 49 * Class WorkerIntegerForLoop is the abstract base class for one variation of a 50 * worker for loop that is executed inside a {@linkplain WorkerRegion}. The loop 51 * index data type is <code>int</code>. The loop stride is implicit (+1). 52 * <P> 53 * To execute a worker for loop, create a {@linkplain WorkerRegion} object; 54 * create an instance of a concrete subclass of class WorkerIntegerForLoop; and 55 * pass this instance to the worker region's <code>execute()</code> method. Either 56 * every worker team thread must call the worker region's <code>execute()</code> 57 * method with identical arguments, or every thread must not call the 58 * <code>execute()</code> method. You can do all this using an anonymous inner 59 * class; for example: 60 * <PRE> 61 * new WorkerRegion() 62 * { 63 * . . . 64 * public void run() 65 * { 66 * . . . 67 * execute (0, 99, new WorkerIntegerForLoop() 68 * { 69 * // Thread local variable declarations 70 * . . . 71 * public void start() 72 * { 73 * // Per-thread pre-loop initialization code 74 * . . . 75 * } 76 * public void run (int first, int last) 77 * { 78 * // Loop code 79 * . . . 80 * } 81 * public void finish() 82 * { 83 * // Per-thread post-loop finalization code 84 * . . . 85 * } 86 * }); 87 * } 88 * . . . 89 * } 90 * </PRE> 91 * <P> 92 * In each process of a cluster parallel program, the worker team has one or 93 * more worker threads. Every worker thread in every process has a unique worker 94 * index, going from index 0 for the first worker thread in the first process to 95 * index <I>K</I>−1 for the last worker thread in the last process, where 96 * <I>K</I> is the total number of worker threads in all the processes. In 97 * addition, in one process there is a master thread. The worker and master 98 * threads all call the worker region's <code>execute()</code> method to execute the 99 * worker for loop. However, the worker and master threads differ in their 100 * actions. 101 * <P> 102 * The master thread does the following. The master obtains the worker for 103 * loop's schedule as returned by the <code>schedule()</code> method. The range of 104 * loop indexes is divided into "chunks" and the chunks are apportioned among 105 * the workers in accordance with the schedule. The master repeatedly sends 106 * "tasks" to the workers and receives "responses" from the workers. To send a 107 * task to a particular worker, the master (1) sends a message containing the 108 * chunk index range to the worker's process; and (2) calls the worker for 109 * loop's <code>sendTaskInput()</code> method. This method's default implementation 110 * does nothing, but it can be overridden to send additional task input data to 111 * the worker. To receive a response from a particular worker, the master (1) 112 * receives a message containing the chunk index range from the worker's 113 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code> 114 * method. This method's default implementation does nothing, but it can be 115 * overridden to receive additional task output data from the worker. Once all 116 * tasks have been sent to the workers and all responses have been received from 117 * the workers, the master returns from the worker region's <code>execute()</code> 118 * method. 119 * <P> 120 * Each worker thread does the following. The worker calls the worker for loop's 121 * <code>start()</code> method once before beginning any loop iterations. The worker 122 * repeatedly receives tasks from the master and sends responses to the master. 123 * To receive a task from the master, the worker (1) receives a message 124 * containing the chunk index range from the master's process; and (2) calls the 125 * worker for loop's <code>receiveTaskInput()</code> method. This method's default 126 * implementation does nothing, but it can be overridden to receive additional 127 * task input data from the master. The worker now calls the worker for loop's 128 * <code>run()</code> method, passing in the chunk index range lower and upper 129 * bounds. When the <code>run()</code> method returns, the worker sends the response 130 * to the master. To send the response, the worker (1) sends a message 131 * containing the chunk index range to the master's process; and (2) calls the 132 * worker for loop's <code>sendTaskOutput()</code> method. This method's default 133 * implementation does nothing, but it can be overridden to send additional task 134 * output data to the master. Once all tasks have been received from the master 135 * and all responses have been sent to the master, the worker calls the worker 136 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s 137 * threads, the workers do <I>not</I> synchronize with each other at a barrier 138 * at this point.) The worker then returns from the worker region's 139 * <code>execute()</code> method. 140 * <P> 141 * If the worker for loop has a fixed schedule (in which there is exactly one 142 * chunk with a predetermined index range for each worker), then the messages 143 * containing the chunk index range are omitted, and each worker gets its chunk 144 * index range directly from the fixed schedule. However, the task input data 145 * (if any) and task output data (if any) are still sent and received. 146 * <P> 147 * Each message described above is sent with a message tag equal to 148 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 149 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but 150 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the 151 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where 152 * <I>K</I> is the total number of workers in all the processes. The program 153 * should not use message tags in this range except to send and receive the 154 * messages described above. 155 * <P> 156 * Note that each worker team thread actually creates its own instance of the 157 * worker for loop class and passes that instance to the worker region's 158 * <code>execute()</code> method. Thus, any fields declared in the worker for loop 159 * class will <I>not</I> be shared by all the workers, but instead will be 160 * private to each worker. 161 * <P> 162 * The <code>start()</code> method is intended for performing per-thread 163 * initialization before starting the loop iterations. If no such initialization 164 * is needed, omit the <code>start()</code> method. 165 * <P> 166 * The <code>run()</code> method contains the code for the loop. The first and last 167 * indexes for a chunk of loop iterations are passed in as arguments. The loop 168 * stride is implicit (+1). The worker for loop's <code>run()</code> method must be 169 * coded this way: 170 * <PRE> 171 * public void run (int first, int last) 172 * { 173 * for (int i = first; i <= last; ++ i) 174 * { 175 * // Loop body code 176 * . . . 177 * } 178 * } 179 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code> 180 * inclusive and increasing by +1 on each iteration. 181 * <P> 182 * The <code>finish()</code> method is intended for performing per-thread 183 * finalization after finishing the loop iterations. If no such finalization is 184 * needed, omit the <code>finish()</code> method. 185 * <P> 186 * If the worker for loop's <code>start()</code>, <code>run()</code>, or 187 * <code>finish()</code> method throws an exception in one of the worker threads, 188 * then that worker thread executes no further code in the loop, and the worker 189 * region's <code>execute()</code> method throws that same exception in that thread. 190 * However, the other worker threads in the worker team continue to execute. 191 * 192 * @author Alan Kaminsky 193 * @version 27-Jan-2010 194 */ 195 public abstract class WorkerIntegerForLoop 196 extends WorkerForLoop { 197 198 // Exported constructors. 199 /** 200 * Construct a new worker for loop. 201 */ 202 public WorkerIntegerForLoop() { 203 super(); 204 } 205 206 // Exported operations. 207 /** 208 * Determine this worker for loop's schedule. Called by the master and 209 * worker threads. The schedule determines how the loop iterations are 210 * apportioned among the worker team threads. For further information, see 211 * class {@linkplain IntegerSchedule}. 212 * <P> 213 * The <code>schedule()</code> method may be overridden in a subclass to return 214 * the desired schedule. If not overridden, the default is a runtime 215 * schedule (see {@link edu.rit.pj.IntegerSchedule#runtime()}). 216 * 217 * @return Schedule for this worker for loop. 218 */ 219 public IntegerSchedule schedule() { 220 return IntegerSchedule.runtime(); 221 } 222 223 /** 224 * Perform per-thread initialization actions before starting the loop 225 * iterations. Called by a worker thread. 226 * <P> 227 * The <code>start()</code> method may be overridden in a subclass. If not 228 * overridden, the <code>start()</code> method does nothing. 229 * 230 * @exception Exception The <code>start()</code> method may throw any exception. 231 * @throws java.lang.Exception if any. 232 */ 233 public void start() 234 throws Exception { 235 } 236 237 /** 238 * Send additional input data associated with a task. Called by the master 239 * thread. The task is denoted by the given chunk of loop iterations. The 240 * input data must be sent using the given communicator, to the given worker 241 * process rank, with the given message tag. 242 * <P> 243 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If 244 * not overridden, the <code>sendTaskInput()</code> method does nothing. 245 * 246 * @param range Chunk of loop iterations. 247 * @param comm Communicator. 248 * @param wRank Worker process rank. 249 * @param tag Message tag. 250 * @exception IOException Thrown if an I/O error occurred. 251 * @throws java.io.IOException if any. 252 */ 253 public void sendTaskInput(Range range, 254 Comm comm, 255 int wRank, 256 int tag) 257 throws IOException { 258 } 259 260 /** 261 * Receive additional input data associated with a task. Called by a worker 262 * thread. The task is denoted by the given chunk of loop iterations. The 263 * input data must be received using the given communicator, from the given 264 * master process rank, with the given message tag. 265 * <P> 266 * The <code>receiveTaskInput()</code> method may be overridden in a subclass. 267 * If not overridden, the <code>receiveTaskInput()</code> method does nothing. 268 * 269 * @param range Chunk of loop iterations. 270 * @param comm Communicator. 271 * @param mRank Master process rank. 272 * @param tag Message tag. 273 * @exception IOException Thrown if an I/O error occurred. 274 * @throws java.io.IOException if any. 275 */ 276 public void receiveTaskInput(Range range, 277 Comm comm, 278 int mRank, 279 int tag) 280 throws IOException { 281 } 282 283 /** 284 * Execute one chunk of iterations of this worker for loop. Called by a 285 * worker thread. The <code>run()</code> method must perform the loop body for 286 * indexes <code>first</code> through <code>last</code> inclusive, increasing the 287 * loop index by +1 after each iteration. 288 * <P> 289 * The <code>run()</code> method must be overridden in a subclass. 290 * 291 * @param first First loop index. 292 * @param last Last loop index. 293 * @exception Exception The <code>run()</code> method may throw any exception. 294 * @throws java.lang.Exception if any. 295 */ 296 public abstract void run(int first, 297 int last) 298 throws Exception; 299 300 /** 301 * Send additional output data associated with a task. Called by a worker 302 * thread. The task is denoted by the given chunk of loop iterations. The 303 * output data must be sent using the given communicator, to the given 304 * master process rank, with the given message tag. 305 * <P> 306 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If 307 * not overridden, the <code>sendTaskOutput()</code> method does nothing. 308 * 309 * @param range Chunk of loop iterations. 310 * @param comm Communicator. 311 * @param mRank Master process rank. 312 * @param tag Message tag. 313 * @exception IOException Thrown if an I/O error occurred. 314 * @throws java.io.IOException if any. 315 */ 316 public void sendTaskOutput(Range range, 317 Comm comm, 318 int mRank, 319 int tag) 320 throws IOException { 321 } 322 323 /** 324 * Receive additional output data associated with a task. Called by the 325 * master thread. The task is denoted by the given chunk of loop iterations. 326 * The output data must be received using the given communicator, from the 327 * given worker process rank, with the given message tag. 328 * <P> 329 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass. 330 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing. 331 * 332 * @param range Chunk of loop iterations. 333 * @param comm Communicator. 334 * @param wRank Worker process rank. 335 * @param tag Message tag. 336 * @exception IOException Thrown if an I/O error occurred. 337 * @throws java.io.IOException if any. 338 */ 339 public void receiveTaskOutput(Range range, 340 Comm comm, 341 int wRank, 342 int tag) 343 throws IOException { 344 } 345 346 /** 347 * Perform per-thread finalization actions after finishing the loop 348 * iterations. Called by a worker thread. 349 * <P> 350 * The <code>finish()</code> method may be overridden in a subclass. If not 351 * overridden, the <code>finish()</code> method does nothing. 352 * 353 * @exception Exception The <code>finish()</code> method may throw any 354 * exception. 355 * @throws java.lang.Exception if any. 356 */ 357 public void finish() 358 throws Exception { 359 } 360 361 /** 362 * Returns the tag offset for this worker for loop. Each message between the 363 * master and worker threads is sent with a message tag equal to 364 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 365 * tag offset. 366 * <P> 367 * The <code>tagOffset()</code> method may be overridden in a subclass. If not 368 * overridden, the <code>tagOffset()</code> returns a default tag offset of 369 * <code>Integer.MIN_VALUE</code>. 370 * 371 * @return Tag offset. 372 */ 373 public int tagOffset() { 374 return Integer.MIN_VALUE; 375 } 376 377 // Hidden operations. 378 /** 379 * Execute this worker for loop in the master thread. 380 * 381 * @param range Loop index range. 382 * 383 * @exception IOException Thrown if an I/O error occurred. 384 */ 385 void masterExecute(Range range) 386 throws IOException { 387 IntegerSchedule sch = schedule(); 388 if (sch.isFixedSchedule()) { 389 masterExecuteFixed(range, sch); 390 } else { 391 masterExecuteNonFixed(range, sch); 392 } 393 } 394 395 /** 396 * Execute this worker for loop in the master thread with a fixed schedule. 397 * 398 * @param range Loop index range. 399 * @param sch Schedule. 400 * 401 * @exception IOException Thrown if an I/O error occurred. 402 */ 403 void masterExecuteFixed(Range range, 404 IntegerSchedule sch) 405 throws IOException { 406 int count = myTeam.count; 407 Comm comm = myTeam.comm; 408 409 // Send additional task input to each worker. 410 sch.start(count, range); 411 for (int w = 0; w < count; ++w) { 412 Range chunk = sch.next(w); 413 if (chunk != null) { 414 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w)); 415 } 416 } 417 418 // Receive additional task output from each worker. 419 sch.start(count, range); 420 for (int w = 0; w < count; ++w) { 421 Range chunk = sch.next(w); 422 if (chunk != null) { 423 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w)); 424 } 425 } 426 } 427 428 /** 429 * Execute this worker for loop in the master thread with a non-fixed 430 * schedule. 431 * 432 * @param range Loop index range. 433 * @param sch Schedule. 434 * 435 * @exception IOException Thrown if an I/O error occurred. 436 */ 437 void masterExecuteNonFixed(Range range, 438 IntegerSchedule sch) 439 throws IOException { 440 int count = myTeam.count; 441 sch.start(count, range); 442 int remaining = count; 443 ObjectItemBuf<Range> buf = ObjectBuf.buffer(); 444 Range tagRange = new Range(tagFor(0), tagFor(count - 1)); 445 Comm comm = myTeam.comm; 446 447 // Send initial task to each worker. 448 for (int w = 0; w < count; ++w) { 449 Range chunk = sch.next(w); 450 buf.item = chunk; 451 buf.reset(); 452 int r = myTeam.workerRank(w); 453 int tag = tagFor(w); 454 comm.send(r, tag, buf); 455 if (chunk == null) { 456 --remaining; 457 } else { 458 sendTaskInput(chunk, comm, r, tag); 459 } 460 } 461 462 // Repeatedly receive a response from a worker and send next task to 463 // that worker. 464 while (remaining > 0) { 465 CommStatus status = comm.receive(null, tagRange, buf); 466 Range chunk = buf.item; 467 int r = status.fromRank; 468 int tag = status.tag; 469 int w = workerFor(tag); 470 receiveTaskOutput(chunk, comm, r, tag); 471 chunk = sch.next(w); 472 buf.item = chunk; 473 buf.reset(); 474 comm.send(r, tag, buf); 475 if (chunk == null) { 476 --remaining; 477 } else { 478 sendTaskInput(chunk, comm, r, tag); 479 } 480 } 481 } 482 483 /** 484 * Execute this worker for loop in a worker thread. 485 * 486 * @param w Worker index. 487 * @param range Loop index range. 488 * 489 * @exception Exception This method may throw any exception. 490 */ 491 void workerExecute(int w, 492 Range range) 493 throws Exception { 494 IntegerSchedule sch = schedule(); 495 if (sch.isFixedSchedule()) { 496 sch.start(myTeam.count, range); 497 workerExecuteFixed(sch.next(w), w); 498 } else { 499 workerExecuteNonFixed(w); 500 } 501 } 502 503 /** 504 * Execute this worker for loop in a worker thread using a fixed schedule. 505 * 506 * @param range Chunk of loop iterations. 507 * @param w Worker index. 508 * 509 * @exception Exception This method may throw any exception. 510 */ 511 void workerExecuteFixed(Range range, 512 int w) 513 throws Exception { 514 start(); 515 if (range != null) { 516 Comm comm = myTeam.comm; 517 int r = myTeam.masterRank(); 518 int tag = tagFor(w); 519 receiveTaskInput(range, comm, r, tag); 520 run(range.lb(), range.ub()); 521 sendTaskOutput(range, comm, r, tag); 522 } 523 finish(); 524 } 525 526 /** 527 * Execute this worker for loop in a worker thread using a non-fixed 528 * schedule. 529 * 530 * @param w Worker index. 531 * 532 * @exception Exception This method may throw any exception. 533 */ 534 void workerExecuteNonFixed(int w) 535 throws Exception { 536 Comm comm = myTeam.comm; 537 int r = myTeam.masterRank(); 538 int tag = tagFor(w); 539 start(); 540 ObjectItemBuf<Range> buf = ObjectBuf.buffer(); 541 for (;;) { 542 comm.receive(r, tag, buf); 543 Range range = buf.item; 544 if (range == null) { 545 break; 546 } 547 receiveTaskInput(range, comm, r, tag); 548 run(range.lb(), range.ub()); 549 550 // The next two statements constitute a critical section; other 551 // workers in this team must not send messages in between these two 552 // messages, or the master can deadlock. 553 synchronized (myTeam) { 554 comm.send(r, tag, buf); 555 sendTaskOutput(range, comm, r, tag); 556 } 557 } 558 finish(); 559 } 560 561 /** 562 * Returns the message tag for the given worker index. 563 * 564 * @param w Worker index. 565 * 566 * @return Message tag. 567 */ 568 private int tagFor(int w) { 569 return w + tagOffset(); 570 } 571 572 /** 573 * Returns the worker index for the given message tag. 574 * 575 * @param tag Message tag. 576 * 577 * @return Worker index. 578 */ 579 private int workerFor(int tag) { 580 return tag - tagOffset(); 581 } 582 583 }