1 //****************************************************************************** 2 // 3 // File: WorkerIteration.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.WorkerIteration 6 // 7 // This Java source file is copyright (C) 2010 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.io.IOException; 43 44 import edu.rit.mp.ObjectBuf; 45 import edu.rit.mp.buf.ObjectItemBuf; 46 import edu.rit.util.Range; 47 48 /** 49 * Class WorkerIteration is the abstract base class for a worker iteration that 50 * is executed inside a {@linkplain WorkerRegion}. The worker iteration lets you 51 * iterate over a group of items, with a separate worker team thread processing 52 * each item. The generic type parameter T specifies the items' data type. The 53 * items can be the elements of an array, the items obtained from an {@linkplain 54 * java.util.Iterator Iterator}, or the items contained in an {@linkplain 55 * java.lang.Iterable Iterable} collection. 56 * <P> 57 * To execute a worker iteration, create a {@linkplain WorkerRegion} object; 58 * create an instance of a concrete subclass of class WorkerIteration; and pass 59 * this instance to the worker region's <code>execute()</code> method. Either every 60 * worker team thread must call the worker region's <code>execute()</code> method 61 * with identical arguments, or every thread must not call the 62 * <code>execute()</code> method. You can do all this using an anonymous inner 63 * class; for example: 64 * <PRE> 65 * new WorkerRegion() 66 * { 67 * ArrayList<String> list = new ArrayList<String>(); 68 * . . . 69 * public void run() 70 * { 71 * . . . 72 * execute (list, new WorkerIteration<String>() 73 * { 74 * // Thread local variable declarations 75 * . . . 76 * public void start() 77 * { 78 * // Per-thread pre-loop initialization code 79 * . . . 80 * } 81 * public void run (String item) 82 * { 83 * // Loop code 84 * . . . 85 * } 86 * public void finish() 87 * { 88 * // Per-thread post-loop finalization code 89 * . . . 90 * } 91 * }); 92 * } 93 * . . . 94 * } 95 * </PRE> 96 * <P> 97 * In each process of a cluster parallel program, the worker team has one or 98 * more worker threads. Every worker thread in every process has a unique worker 99 * index, going from index 0 for the first worker thread in the first process to 100 * index <I>K</I>−1 for the last worker thread in the last process, where 101 * <I>K</I> is the total number of worker threads in all the processes. In 102 * addition, in one process there is a master thread. The worker and master 103 * threads all call the worker region's <code>execute()</code> method to execute the 104 * worker for loop. However, the worker and master threads differ in their 105 * actions. 106 * <P> 107 * The master thread does the following. The master sets up the source of items 108 * to be iterated over -- either an array's elements, an iterator's items, or an 109 * iterable collection's contents. The master repeatedly sends "tasks" to the 110 * workers and receives "responses" from the workers. To send a task to a 111 * particular worker, the master (1) sends a message containing the next item to 112 * the worker's process; and (2) calls the worker iteration's 113 * <code>sendTaskInput()</code> method. This method's default implementation does 114 * nothing, but it can be overridden to send additional task input data to the 115 * worker. To receive a response from a particular worker, the master (1) 116 * receives a message from the worker's process containing the item that was 117 * processed (whose state may have changed); and (2) calls the worker 118 * iteration's <code>receiveTaskOutput()</code> method. This method's default 119 * implementation does nothing, but it can be overridden to receive additional 120 * task output data from the worker. Once all tasks have been sent to the 121 * workers and all responses have been received from the workers, the master 122 * returns from the worker region's <code>execute()</code> method. 123 * <P> 124 * Each worker thread does the following. The worker calls the worker 125 * iteration's <code>start()</code> method once before processing any items. The 126 * worker repeatedly receives tasks from the master and sends responses to the 127 * master. To receive a task from the master, the worker (1) receives a message 128 * containing the next item from the master's process; and (2) calls the worker 129 * iteration's <code>receiveTaskInput()</code> method. This method's default 130 * implementation does nothing, but it can be overridden to receive additional 131 * task input data from the master. The worker now calls the worker iteration's 132 * <code>run()</code> method, passing in the item to be processed. When the 133 * <code>run()</code> method returns, the worker sends the response to the master. 134 * To send the response, the worker (1) sends a message to the master's process 135 * containing the item that was processed (whose state may have changed); and 136 * (2) calls the worker iteration's <code>sendTaskOutput()</code> method. This 137 * method's default implementation does nothing, but it can be overridden to 138 * send additional task output data to the master. Once all tasks have been 139 * received from the master and all responses have been sent to the master, the 140 * worker calls the worker iteration's <code>finish()</code> method. (Unlike a 141 * {@linkplain ParallelIteration}'s threads, the workers do <I>not</I> 142 * synchronize with each other at a barrier at this point.) The worker then 143 * returns from the worker region's <code>execute()</code> method. 144 * <P> 145 * Each message described above is sent with a message tag equal to 146 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 147 * "tag offset." The tag offset is <code>Integer.MIN_VALUE</code> by default, but 148 * this can be changed by overriding the <code>tagOffset()</code> method. Thus, the 149 * message tags fall in the range <I>T</I> .. <I>K</I>−1+<I>T</I>, where 150 * <I>K</I> is the total number of workers in all the processes. The program 151 * should not use message tags in this range except to send and receive the 152 * messages described above. 153 * <P> 154 * Note that each worker team thread actually creates its own instance of the 155 * worker iteration class and passes that instance to the worker region's 156 * <code>execute()</code> method. Thus, any fields declared in the worker iteration 157 * class will <I>not</I> be shared by all the workers, but instead will be 158 * private to each worker. 159 * <P> 160 * The <code>start()</code> method is intended for performing per-thread 161 * initialization before starting the loop iterations. If no such initialization 162 * is needed, omit the <code>start()</code> method. 163 * <P> 164 * The <code>run()</code> method contains the code for the loop body. It does 165 * whatever processing is needed on the one item passed in as an argument. Note 166 * that, unlike a worker for loop (class {@linkplain WorkerForLoop}), a worker 167 * iteration is not "chunked;" each worker team thread always processes just one 168 * item at a time. 169 * <P> 170 * The <code>finish()</code> method is intended for performing per-thread 171 * finalization after finishing the loop iterations. If no such finalization is 172 * needed, omit the <code>finish()</code> method. 173 * <P> 174 * If the worker iteration's <code>start()</code>, <code>run()</code>, or 175 * <code>finish()</code> method throws an exception in one of the worker threads, 176 * then that worker thread executes no further code in the loop, and the worker 177 * region's <code>execute()</code> method throws that same exception in that thread. 178 * However, the other worker threads in the worker team continue to execute. 179 * 180 * @param <T> Data type of the items iterated over. 181 * @author Alan Kaminsky 182 * @version 07-Oct-2010 183 */ 184 public abstract class WorkerIteration<T> 185 extends WorkerConstruct { 186 187 // Exported constructors. 188 /** 189 * Construct a new worker iteration. 190 */ 191 public WorkerIteration() { 192 super(); 193 } 194 195 // Exported operations. 196 /** 197 * Perform per-thread initialization actions before starting the loop 198 * iterations. Called by a worker thread. 199 * <P> 200 * The <code>start()</code> method may be overridden in a subclass. If not 201 * overridden, the <code>start()</code> method does nothing. 202 * 203 * @exception Exception The <code>start()</code> method may throw any exception. 204 * @throws java.lang.Exception if any. 205 */ 206 public void start() 207 throws Exception { 208 } 209 210 /** 211 * Send additional input data associated with a task. Called by the master 212 * thread. The task is denoted by the given item to be processed. The input 213 * data must be sent using the given communicator, to the given worker 214 * process rank, with the given message tag. 215 * <P> 216 * The <code>sendTaskInput()</code> method may be overridden in a subclass. If 217 * not overridden, the <code>sendTaskInput()</code> method does nothing. 218 * 219 * @param item Item to be processed. 220 * @param comm Communicator. 221 * @param wRank Worker process rank. 222 * @param tag Message tag. 223 * @exception IOException Thrown if an I/O error occurred. 224 * @throws java.io.IOException if any. 225 */ 226 public void sendTaskInput(T item, 227 Comm comm, 228 int wRank, 229 int tag) 230 throws IOException { 231 } 232 233 /** 234 * Receive input data associated with a task. Called by a worker thread. The 235 * task is denoted by the given item to be processed. The input data must be 236 * received using the given communicator, from the given master process 237 * rank, with the given message tag. 238 * <P> 239 * The <code>receiveTaskInput()</code> method may be overridden in a subclass. 240 * If not overridden, the <code>receiveTaskInput()</code> method does nothing. 241 * 242 * @param item Item to be processed. 243 * @param comm Communicator. 244 * @param mRank Master process rank. 245 * @param tag Message tag. 246 * @exception IOException Thrown if an I/O error occurred. 247 * @throws java.io.IOException if any. 248 */ 249 public void receiveTaskInput(T item, 250 Comm comm, 251 int mRank, 252 int tag) 253 throws IOException { 254 } 255 256 /** 257 * Process one item in this worker iteration. The <code>run()</code> method must 258 * perform the loop body for the given item. 259 * <P> 260 * The <code>run()</code> method must be overridden in a subclass. 261 * 262 * @param item Item. 263 * @exception Exception The <code>run()</code> method may throw any exception. 264 * @throws java.lang.Exception if any. 265 */ 266 public abstract void run(T item) 267 throws Exception; 268 269 /** 270 * Send additional output data associated with a task. Called by a worker 271 * thread. The task is denoted by the given item that was processed (whose 272 * state may have changed during processing). The output data must be sent 273 * using the given communicator, to the given master process rank, with the 274 * given message tag. 275 * <P> 276 * The <code>sendTaskOutput()</code> method may be overridden in a subclass. If 277 * not overridden, the <code>sendTaskOutput()</code> method does nothing. 278 * 279 * @param item Item that was processed. 280 * @param comm Communicator. 281 * @param mRank Master process rank. 282 * @param tag Message tag. 283 * @exception IOException Thrown if an I/O error occurred. 284 * @throws java.io.IOException if any. 285 */ 286 public void sendTaskOutput(T item, 287 Comm comm, 288 int mRank, 289 int tag) 290 throws IOException { 291 } 292 293 /** 294 * Receive additional output data associated with a task. Called by the 295 * master thread. The task is denoted by the given item that was processed 296 * (whose state may have changed during processing). The output data must be 297 * received using the given communicator, from the given worker process 298 * rank, with the given message tag. 299 * <P> 300 * The <code>receiveTaskOutput()</code> method may be overridden in a subclass. 301 * If not overridden, the <code>receiveTaskOutput()</code> method does nothing. 302 * 303 * @param item Item that was processed. 304 * @param comm Communicator. 305 * @param wRank Worker process rank. 306 * @param tag Message tag. 307 * @exception IOException Thrown if an I/O error occurred. 308 * @throws java.io.IOException if any. 309 */ 310 public void receiveTaskOutput(T item, 311 Comm comm, 312 int wRank, 313 int tag) 314 throws IOException { 315 } 316 317 /** 318 * Perform per-thread finalization actions after finishing the loop 319 * iterations. Called by a worker thread. 320 * <P> 321 * The <code>finish()</code> method may be overridden in a subclass. If not 322 * overridden, the <code>finish()</code> method does nothing. 323 * 324 * @exception Exception The <code>finish()</code> method may throw any 325 * exception. 326 * @throws java.lang.Exception if any. 327 */ 328 public void finish() 329 throws Exception { 330 } 331 332 /** 333 * Returns the tag offset for this worker for loop. Each message between the 334 * master and worker threads is sent with a message tag equal to 335 * <I>W</I>+<I>T</I>, where <I>W</I> is the worker index and <I>T</I> is the 336 * tag offset. 337 * <P> 338 * The <code>tagOffset()</code> method may be overridden in a subclass. If not 339 * overridden, the <code>tagOffset()</code> returns a default tag offset of 340 * <code>Integer.MIN_VALUE</code>. 341 * 342 * @return Tag offset. 343 */ 344 public int tagOffset() { 345 return Integer.MIN_VALUE; 346 } 347 348 // Hidden operations. 349 /** 350 * Execute this worker iteration in the master thread. 351 * 352 * @param generator Item generator. 353 * 354 * @exception IOException Thrown if an I/O error occurred. 355 */ 356 void masterExecute(ItemGenerator<T> generator) 357 throws IOException { 358 int count = myTeam.count; 359 int remaining = count; 360 ObjectItemBuf<ItemHolder<T>> buf = ObjectBuf.buffer(); 361 Range tagRange = new Range(tagFor(0), tagFor(count - 1)); 362 Comm comm = myTeam.comm; 363 364 // Send initial task to each worker. 365 for (int w = 0; w < count; ++w) { 366 ItemHolder<T> holder = generator.nextItem(); 367 buf.item = holder; 368 buf.reset(); 369 int r = myTeam.workerRank(w); 370 int tag = tagFor(w); 371 comm.send(r, tag, buf); 372 if (holder == null) { 373 --remaining; 374 } else { 375 sendTaskInput(holder.myItem, comm, r, tag); 376 } 377 } 378 379 // Repeatedly receive a response from a worker and send next task to 380 // that worker. 381 while (remaining > 0) { 382 CommStatus status = comm.receive(null, tagRange, buf); 383 ItemHolder<T> holder = buf.item; 384 int r = status.fromRank; 385 int tag = status.tag; 386 int w = workerFor(tag); 387 receiveTaskOutput(holder.myItem, comm, r, tag); 388 holder = generator.nextItem(); 389 buf.item = holder; 390 buf.reset(); 391 comm.send(r, tag, buf); 392 if (holder == null) { 393 --remaining; 394 } else { 395 sendTaskInput(holder.myItem, comm, r, tag); 396 } 397 } 398 } 399 400 /** 401 * Execute this worker for loop in a worker thread. 402 * 403 * @param w Worker index. 404 * 405 * @exception Exception This method may throw any exception. 406 */ 407 void workerExecute(int w) 408 throws Exception { 409 Comm comm = myTeam.comm; 410 int r = myTeam.masterRank(); 411 int tag = tagFor(w); 412 start(); 413 ObjectItemBuf<ItemHolder<T>> buf = ObjectBuf.buffer(); 414 for (;;) { 415 comm.receive(r, tag, buf); 416 ItemHolder<T> holder = buf.item; 417 if (holder == null) { 418 break; 419 } 420 receiveTaskInput(holder.myItem, comm, r, tag); 421 run(holder.myItem); 422 buf.reset(); 423 424 // The next two statements constitute a critical section; other 425 // workers in this team must not send messages in between these two 426 // messages, or the master can deadlock. 427 synchronized (myTeam) { 428 comm.send(r, tag, buf); 429 sendTaskOutput(holder.myItem, comm, r, tag); 430 } 431 } 432 finish(); 433 } 434 435 /** 436 * Returns the message tag for the given worker index. 437 * 438 * @param w Worker index. 439 * 440 * @return Message tag. 441 */ 442 private int tagFor(int w) { 443 return w + tagOffset(); 444 } 445 446 /** 447 * Returns the worker index for the given message tag. 448 * 449 * @param tag Message tag. 450 * 451 * @return Worker index. 452 */ 453 private int workerFor(int tag) { 454 return tag - tagOffset(); 455 } 456 457 }