1 //****************************************************************************** 2 // 3 // File: WorkerLongStrideForLoop.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.WorkerLongStrideForLoop 6 // 7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.io.IOException; 43 44 import edu.rit.mp.ObjectBuf; 45 import edu.rit.mp.buf.ObjectItemBuf; 46 import edu.rit.util.LongRange; 47 import edu.rit.util.Range; 48 49 /** 50 * Class WorkerLongStrideForLoop is the abstract base class for one variation of 51 * a worker for loop that is executed inside a {@linkplain WorkerRegion}. The 52 * loop index data type is <code>long</code>. The loop stride is explicitly 53 * specified. 54 * <P> 55 * To execute a worker for loop, create a {@linkplain WorkerRegion} object; 56 * create an instance of a concrete subclass of class WorkerLongStrideForLoop; 57 * and pass this instance to the worker region's <code>execute()</code> method. 58 * Either every worker team thread must call the worker region's 59 * <code>execute()</code> method with identical arguments, or every thread must not 60 * call the <code>execute()</code> method. You can do all this using an anonymous 61 * inner class; for example: 62 * <PRE> 63 * new WorkerRegion() 64 * { 65 * . . . 66 * public void run() 67 * { 68 * . . . 69 * execute (0L, 98L, 2L, new WorkerLongStrideForLoop() 70 * { 71 * // Thread local variable declarations 72 * . . . 73 * public void start() 74 * { 75 * // Per-thread pre-loop initialization code 76 * . . . 77 * } 78 * public void run (long first, long last, long stride) 79 * { 80 * // Loop code 81 * . . . 82 * } 83 * public void finish() 84 * { 85 * // Per-thread post-loop finalization code 86 * . . . 87 * } 88 * }); 89 * } 90 * . . . 91 * } 92 * </PRE> 93 * <P> 94 * In each process of a cluster parallel program, the worker team has one or 95 * more worker threads. Every worker thread in every process has a unique worker 96 * tag, going from tag 0 for the first worker thread in the first process to tag 97 * <I>K</I>−1 for the last worker thread in the last process, where 98 * <I>K</I> is the total number of worker threads in all the processes. In 99 * addition, in one process there is a master thread. The worker and master 100 * threads all call the worker region's <code>execute()</code> method to execute the 101 * worker for loop. However, the worker and master threads differ in their 102 * actions. 103 * <P> 104 * The master thread does the following. The master obtains the worker for 105 * loop's schedule as returned by the <code>schedule()</code> method. The range of 106 * loop indexes is divided into "chunks" and the chunks are apportioned among 107 * the workers in accordance with the schedule. The master repeatedly sends 108 * "tasks" to the workers and receives "responses" from the workers. To send a 109 * task to a particular worker, the master (1) sends a message containing the 110 * chunk index range to the worker's process; and (2) calls the worker for 111 * loop's <code>sendTaskInput()</code> method. This method's default implementation 112 * does nothing, but it can be overridden to send additional task input data to 113 * the worker. To receive a response from a particular worker, the master (1) 114 * receives a message containing the chunk index range from the worker's 115 * process; and (2) calls the worker for loop's <code>receiveTaskOutput()</code> 116 * method. This method's default implementation does nothing, but it can be 117 * overridden to receive additional task output data from the worker. Once all 118 * tasks have been sent to the workers and all responses have been received from 119 * the workers, the master returns from the worker region's <code>execute()</code> 120 * method. 121 * <P> 122 * Each worker thread does the following. The worker calls the worker for loop's 123 * <code>start()</code> method once before beginning any loop iterations. The worker 124 * repeatedly receives tasks from the master and sends responses to the master. 125 * To receive a task from the master, the worker (1) receives a message 126 * containing the chunk index range from the master's process; and (2) calls the 127 * worker for loop's <code>receiveTaskInput()</code> method. This method's default 128 * implementation does nothing, but it can be overridden to receive additional 129 * task input data from the master. The worker now calls the worker for loop's 130 * <code>run()</code> method, passing in the chunk index range lower and upper 131 * bounds. When the <code>run()</code> method returns, the worker sends the response 132 * to the master. To send the response, the worker (1) sends a message 133 * containing the chunk index range to the master's process; and (2) calls the 134 * worker for loop's <code>sendTaskOutput()</code> method. This method's default 135 * implementation does nothing, but it can be overridden to send additional task 136 * output data to the master. Once all tasks have been received from the master 137 * and all responses have been sent to the master, the worker calls the worker 138 * for loop's <code>finish()</code> method. (Unlike a {@linkplain ParallelTeam}'s 139 * threads, the workers do <I>not</I> synchronize with each other at a barrier 140 * at this point.) The worker then returns from the worker region's 141 * <code>execute()</code> method. 142 * <P> 143 * If the worker for loop has a fixed schedule (in which there is exactly one 144 * chunk with a predetermined index range for each worker), then the messages 145 * containing the chunk index range are omitted, and each worker gets its chunk 146 * index range directly from the fixed schedule. However, the task input data 147 * (if any) and task output data (if any) are still sent and received. 148 * <P> 149 * Each message described above is sent with a message tag equal to 150 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 151 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but 152 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the 153 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where 154 * <I>K</I> is the total number of workers in all the processes. The program 155 * should not use message tags in this range except to send and receive the 156 * messages described above. 157 * <P> 158 * Note that each worker team thread actually creates its own instance of the 159 * worker for loop class and passes that instance to the worker region's 160 * <code>execute()</code> method. Thus, any fields declared in the worker for loop 161 * class will <I>not</I> be shared by all the workers, but instead will be 162 * private to each worker. 163 * <P> 164 * The <code>start()</code> method is intended for performing per-thread 165 * initialization before starting the loop iterations. If no such initialization 166 * is needed, omit the <code>start()</code> method. 167 * <P> 168 * The <code>run()</code> method contains the code for the loop. The first and last 169 * indexes for a chunk of loop iterations are passed in as arguments. The loop 170 * stride, which is always positive, is also explicitly specified as an 171 * argument. The worker for loop's <code>run()</code> method must be coded this way: 172 * <PRE> 173 * public void run (long first, long last, long stride) 174 * { 175 * for (long i = first; i <= last; i += stride) 176 * { 177 * // Loop body code 178 * . . . 179 * } 180 * } 181 * </PRE> with the loop indexes running from <code>first</code> to <code>last</code> 182 * inclusive and increasing by <code>stride</code> on each iteration. 183 * <P> 184 * The <code>finish()</code> method is intended for performing per-thread 185 * finalization after finishing the loop iterations. If no such finalization is 186 * needed, omit the <code>finish()</code> method. 187 * <P> 188 * If the worker for loop's <code>start()</code>, <code>run()</code>, or 189 * <code>finish()</code> method throws an exception in one of the worker threads, 190 * then that worker thread executes no further code in the loop, and the worker 191 * region's <code>execute()</code> method throws that same exception in that thread. 192 * However, the other worker threads in the worker team continue to execute. 193 * 194 * @author Alan Kaminsky 195 * @version 27-Jan-2010 196 */ 197 public abstract class WorkerLongStrideForLoop 198 extends WorkerForLoop { 199 200 // Exported constructors. 201 /** 202 * Construct a new worker for loop. 203 */ 204 public WorkerLongStrideForLoop() { 205 super(); 206 } 207 208 // Exported operations. 209 /** 210 * Determine this worker for loop's schedule. Called by the master and 211 * worker threads. The schedule determines how the loop iterations are 212 * apportioned among the worker team threads. For further information, see 213 * class {@linkplain LongSchedule}. 214 * <P> 215 * The <code>schedule()</code> method may be overridden in a subclass to return 216 * the desired schedule. If not overridden, the default is a runtime 217 * schedule (see {@link edu.rit.pj.LongSchedule#runtime()}). 218 * 219 * @return Schedule for this worker for loop. 220 */ 221 public LongSchedule schedule() { 222 return LongSchedule.runtime(); 223 } 224 225 /** 226 * Perform per-thread initialization actions before starting the loop 227 * iterations. Called by a worker thread. 228 * <P> 229 * The <code>start()</code> method may be overridden in a subclass. If not 230 * overridden, the <code>start()</code> method does nothing. 231 * 232 * @exception Exception The <code>start()</code> method may throw any exception. 233 * @throws java.lang.Exception if any. 234 */ 235 public void start() 236 throws Exception { 237 } 238 239 /** 240 * Send additional input data associated with a task. Called by the master 241 * thread. The task is denoted by the given chunk of loop iterations. The 242 * input data must be sent using the given communicator, to the given worker 243 * process rank, with the given message tag. 244 * <P> 245 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If 246 * not overridden, the <code>sendTaskInput()</code> method does nothing. 247 * 248 * @param range Chunk of loop iterations. 249 * @param comm Communicator. 250 * @param wRank Worker process rank. 251 * @param tag Message tag. 252 * @exception IOException Thrown if an I/O error occurred. 253 * @throws java.io.IOException if any. 254 */ 255 public void sendTaskInput(LongRange range, 256 Comm comm, 257 int wRank, 258 int tag) 259 throws IOException { 260 } 261 262 /** 263 * Receive additional input data associated with a task. Called by a worker 264 * thread. The task is denoted by the given chunk of loop iterations. The 265 * input data must be received using the given communicator, from the given 266 * master process rank, with the given message tag. 267 * <P> 268 * The <code>receiveTaskInput()</code> method may be overridden in a subclass. 269 * If not overridden, the <code>receiveTaskInput()</code> method does nothing. 270 * 271 * @param range Chunk of loop iterations. 272 * @param comm Communicator. 273 * @param mRank Master process rank. 274 * @param tag Message tag. 275 * @exception IOException Thrown if an I/O error occurred. 276 * @throws java.io.IOException if any. 277 */ 278 public void receiveTaskInput(LongRange range, 279 Comm comm, 280 int mRank, 281 int tag) 282 throws IOException { 283 } 284 285 /** 286 * Execute one chunk of iterations of this worker for loop. Called by a 287 * worker thread. The <code>run()</code> method must perform the loop body for 288 * indexes <code>first</code> through <code>last</code> inclusive, increasing the 289 * loop index by <code>stride</code> after each iteration. 290 * <P> 291 * The <code>run()</code> method must be overridden in a subclass. 292 * 293 * @param first First loop index. 294 * @param last Last loop index. 295 * @param stride Loop index stride, always positive. 296 * @exception Exception The <code>run()</code> method may throw any exception. 297 * @throws java.lang.Exception if any. 298 */ 299 public abstract void run(long first, 300 long last, 301 long stride) 302 throws Exception; 303 304 /** 305 * Send additional output data associated with a task. Called by a worker 306 * thread. The task is denoted by the given chunk of loop iterations. The 307 * output data must be sent using the given communicator, to the given 308 * master process rank, with the given message tag. 309 * <P> 310 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If 311 * not overridden, the <code>sendTaskOutput()</code> method does nothing. 312 * 313 * @param range Chunk of loop iterations. 314 * @param comm Communicator. 315 * @param mRank Master process rank. 316 * @param tag Message tag. 317 * @exception IOException Thrown if an I/O error occurred. 318 * @throws java.io.IOException if any. 319 */ 320 public void sendTaskOutput(LongRange range, 321 Comm comm, 322 int mRank, 323 int tag) 324 throws IOException { 325 } 326 327 /** 328 * Receive additional output data associated with a task. Called by the 329 * master thread. The task is denoted by the given chunk of loop iterations. 330 * The output data must be received using the given communicator, from the 331 * given worker process rank, with the given message tag. 332 * <P> 333 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass. 334 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing. 335 * 336 * @param range Chunk of loop iterations. 337 * @param comm Communicator. 338 * @param wRank Worker process rank. 339 * @param tag Message tag. 340 * @exception IOException Thrown if an I/O error occurred. 341 * @throws java.io.IOException if any. 342 */ 343 public void receiveTaskOutput(LongRange range, 344 Comm comm, 345 int wRank, 346 int tag) 347 throws IOException { 348 } 349 350 /** 351 * Perform per-thread finalization actions after finishing the loop 352 * iterations. Called by a worker thread. 353 * <P> 354 * The <code>finish()</code> method may be overridden in a subclass. If not 355 * overridden, the <code>finish()</code> method does nothing. 356 * 357 * @exception Exception The <code>finish()</code> method may throw any 358 * exception. 359 * @throws java.lang.Exception if any. 360 */ 361 public void finish() 362 throws Exception { 363 } 364 365 /** 366 * Returns the tag offset for this worker for loop. Each message between the 367 * master and worker threads is sent with a message tag equal to 368 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 369 * tag offset. 370 * <P> 371 * The <code>tagOffset()</code> method may be overridden in a subclass. If not 372 * overridden, the <code>tagOffset()</code> returns a default tag offset of 373 * <code>Integer.MIN_VALUE</code>. 374 * 375 * @return Tag offset. 376 */ 377 public int tagOffset() { 378 return Integer.MIN_VALUE; 379 } 380 381 // Hidden operations. 382 /** 383 * Execute this worker for loop in the master thread. 384 * 385 * @param range Loop index range. 386 * 387 * @exception IOException Thrown if an I/O error occurred. 388 */ 389 void masterExecute(LongRange range) 390 throws IOException { 391 LongSchedule sch = schedule(); 392 if (sch.isFixedSchedule()) { 393 masterExecuteFixed(range, sch); 394 } else { 395 masterExecuteNonFixed(range, sch); 396 } 397 } 398 399 /** 400 * Execute this worker for loop in the master thread with a fixed schedule. 401 * 402 * @param range Loop index range. 403 * @param sch Schedule. 404 * 405 * @exception IOException Thrown if an I/O error occurred. 406 */ 407 void masterExecuteFixed(LongRange range, 408 LongSchedule sch) 409 throws IOException { 410 int count = myTeam.count; 411 Comm comm = myTeam.comm; 412 413 // Send additional task input to each worker. 414 sch.start(count, range); 415 for (int w = 0; w < count; ++w) { 416 LongRange chunk = sch.next(w); 417 if (chunk != null) { 418 sendTaskInput(chunk, comm, myTeam.workerRank(w), tagFor(w)); 419 } 420 } 421 422 // Receive additional task output from each worker. 423 sch.start(count, range); 424 for (int w = 0; w < count; ++w) { 425 LongRange chunk = sch.next(w); 426 if (chunk != null) { 427 receiveTaskOutput(chunk, comm, myTeam.workerRank(w), tagFor(w)); 428 } 429 } 430 } 431 432 /** 433 * Execute this worker for loop in the master thread with a non-fixed 434 * schedule. 435 * 436 * @param range Loop index range. 437 * @param sch Schedule. 438 * 439 * @exception IOException Thrown if an I/O error occurred. 440 */ 441 void masterExecuteNonFixed(LongRange range, 442 LongSchedule sch) 443 throws IOException { 444 int count = myTeam.count; 445 sch.start(count, range); 446 int remaining = count; 447 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer(); 448 Range tagRange = new Range(tagFor(0), tagFor(count - 1)); 449 Comm comm = myTeam.comm; 450 451 // Send initial task to each worker. 452 for (int w = 0; w < count; ++w) { 453 LongRange chunk = sch.next(w); 454 buf.item = chunk; 455 buf.reset(); 456 int r = myTeam.workerRank(w); 457 int tag = tagFor(w); 458 comm.send(r, tag, buf); 459 if (chunk == null) { 460 --remaining; 461 } else { 462 sendTaskInput(chunk, comm, r, tag); 463 } 464 } 465 466 // Repeatedly receive a response from a worker and send next task to 467 // that worker. 468 while (remaining > 0) { 469 CommStatus status = comm.receive(null, tagRange, buf); 470 LongRange chunk = buf.item; 471 int r = status.fromRank; 472 int tag = status.tag; 473 int w = workerFor(tag); 474 receiveTaskOutput(chunk, comm, r, tag); 475 chunk = sch.next(w); 476 buf.item = chunk; 477 buf.reset(); 478 comm.send(r, tag, buf); 479 if (chunk == null) { 480 --remaining; 481 } else { 482 sendTaskInput(chunk, comm, r, tag); 483 } 484 } 485 } 486 487 /** 488 * Execute this worker for loop in a worker thread. 489 * 490 * @param w Worker index. 491 * @param range Loop index range. 492 * 493 * @exception Exception This method may throw any exception. 494 */ 495 void workerExecute(int w, 496 LongRange range) 497 throws Exception { 498 LongSchedule sch = schedule(); 499 if (sch.isFixedSchedule()) { 500 sch.start(myTeam.count, range); 501 workerExecuteFixed(sch.next(w), w); 502 } else { 503 workerExecuteNonFixed(w); 504 } 505 } 506 507 /** 508 * Execute this worker for loop in a worker thread using a fixed schedule. 509 * 510 * @param range Chunk of loop iterations. 511 * @param w Worker index. 512 * 513 * @exception Exception This method may throw any exception. 514 */ 515 void workerExecuteFixed(LongRange range, 516 int w) 517 throws Exception { 518 start(); 519 if (range != null) { 520 Comm comm = myTeam.comm; 521 int r = myTeam.masterRank(); 522 int tag = tagFor(w); 523 receiveTaskInput(range, comm, r, tag); 524 run(range.lb(), range.ub(), range.stride()); 525 sendTaskOutput(range, comm, r, tag); 526 } 527 finish(); 528 } 529 530 /** 531 * Execute this worker for loop in a worker thread using a non-fixed 532 * schedule. 533 * 534 * @param w Worker index. 535 * 536 * @exception Exception This method may throw any exception. 537 */ 538 void workerExecuteNonFixed(int w) 539 throws Exception { 540 Comm comm = myTeam.comm; 541 int r = myTeam.masterRank(); 542 int tag = tagFor(w); 543 start(); 544 ObjectItemBuf<LongRange> buf = ObjectBuf.buffer(); 545 for (;;) { 546 comm.receive(r, tag, buf); 547 LongRange range = buf.item; 548 if (range == null) { 549 break; 550 } 551 receiveTaskInput(range, comm, r, tag); 552 run(range.lb(), range.ub(), range.stride()); 553 554 // The next two statements constitute a critical section; other 555 // workers in this team must not send messages in between these two 556 // messages, or the master can deadlock. 557 synchronized (myTeam) { 558 comm.send(r, tag, buf); 559 sendTaskOutput(range, comm, r, tag); 560 } 561 } 562 finish(); 563 } 564 565 /** 566 * Returns the message tag for the given worker index. 567 * 568 * @param w Worker index. 569 * 570 * @return Message tag. 571 */ 572 private int tagFor(int w) { 573 return w + tagOffset(); 574 } 575 576 /** 577 * Returns the worker index for the given message tag. 578 * 579 * @param tag Message tag. 580 * 581 * @return Worker index. 582 */ 583 private int workerFor(int tag) { 584 return tag - tagOffset(); 585 } 586 587 }