Class WorkerIteration<T>
- Type Parameters:
- T- Data type of the items iterated over.
 To execute a worker iteration, create a WorkerRegion object;
 create an instance of a concrete subclass of class WorkerIteration; and pass
 this instance to the worker region's execute() method. Either every
 worker team thread must call the worker region's execute() method
 with identical arguments, or every thread must not call the
 execute() method. You can do all this using an anonymous inner
 class; for example:
 
     new WorkerRegion()
         {
         ArrayList<String> list = new ArrayList<String>();
         . . .
         public void run()
             {
             . . .
             execute (list, new WorkerIteration<String>()
                 {
                 // Thread local variable declarations
                 . . .
                 public void start()
                     {
                     // Per-thread pre-loop initialization code
                     . . .
                     }
                 public void run (String item)
                     {
                     // Loop code
                     . . .
                     }
                 public void finish()
                     {
                     // Per-thread post-loop finalization code
                     . . .
                     }
                 });
             }
         . . .
         }
 
 
 In each process of a cluster parallel program, the worker team has one or
 more worker threads. Every worker thread in every process has a unique worker
 index, going from index 0 for the first worker thread in the first process to
 index K−1 for the last worker thread in the last process, where
 K is the total number of worker threads in all the processes. In
 addition, in one process there is a master thread. The worker and master
 threads all call the worker region's execute() method to execute the
 worker for loop. However, the worker and master threads differ in their
 actions.
 
 The master thread does the following. The master sets up the source of items
 to be iterated over -- either an array's elements, an iterator's items, or an
 iterable collection's contents. The master repeatedly sends "tasks" to the
 workers and receives "responses" from the workers. To send a task to a
 particular worker, the master (1) sends a message containing the next item to
 the worker's process; and (2) calls the worker iteration's
 sendTaskInput() method. This method's default implementation does
 nothing, but it can be overridden to send additional task input data to the
 worker. To receive a response from a particular worker, the master (1)
 receives a message from the worker's process containing the item that was
 processed (whose state may have changed); and (2) calls the worker
 iteration's receiveTaskOutput() method. This method's default
 implementation does nothing, but it can be overridden to receive additional
 task output data from the worker. Once all tasks have been sent to the
 workers and all responses have been received from the workers, the master
 returns from the worker region's execute() method.
 
 Each worker thread does the following. The worker calls the worker
 iteration's start() method once before processing any items. The
 worker repeatedly receives tasks from the master and sends responses to the
 master. To receive a task from the master, the worker (1) receives a message
 containing the next item from the master's process; and (2) calls the worker
 iteration's receiveTaskInput() method. This method's default
 implementation does nothing, but it can be overridden to receive additional
 task input data from the master. The worker now calls the worker iteration's
 run() method, passing in the item to be processed. When the
 run() method returns, the worker sends the response to the master.
 To send the response, the worker (1) sends a message to the master's process
 containing the item that was processed (whose state may have changed); and
 (2) calls the worker iteration's sendTaskOutput() method. This
 method's default implementation does nothing, but it can be overridden to
 send additional task output data to the master. Once all tasks have been
 received from the master and all responses have been sent to the master, the
 worker calls the worker iteration's finish() method. (Unlike a
 ParallelIteration's threads, the workers do not
 synchronize with each other at a barrier at this point.) The worker then
 returns from the worker region's execute() method.
 
 Each message described above is sent with a message tag equal to
 W+T, where W is the worker index and T is the
 "tag offset." The tag offset is Integer.MIN_VALUE by default, but
 this can be changed by overriding the tagOffset() method. Thus, the
 message tags fall in the range T .. K−1+T, where
 K is the total number of workers in all the processes. The program
 should not use message tags in this range except to send and receive the
 messages described above.
 
 Note that each worker team thread actually creates its own instance of the
 worker iteration class and passes that instance to the worker region's
 execute() method. Thus, any fields declared in the worker iteration
 class will not be shared by all the workers, but instead will be
 private to each worker.
 
 The start() method is intended for performing per-thread
 initialization before starting the loop iterations. If no such initialization
 is needed, omit the start() method.
 
 The run() method contains the code for the loop body. It does
 whatever processing is needed on the one item passed in as an argument. Note
 that, unlike a worker for loop (class WorkerForLoop), a worker
 iteration is not "chunked;" each worker team thread always processes just one
 item at a time.
 
 The finish() method is intended for performing per-thread
 finalization after finishing the loop iterations. If no such finalization is
 needed, omit the finish() method.
 
 If the worker iteration's start(), run(), or
 finish() method throws an exception in one of the worker threads,
 then that worker thread executes no further code in the loop, and the worker
 region's execute() method throws that same exception in that thread.
 However, the other worker threads in the worker team continue to execute.
- Version:
- 07-Oct-2010
- Author:
- Alan Kaminsky
- 
Constructor SummaryConstructors
- 
Method SummaryModifier and TypeMethodDescriptionvoidfinish()Perform per-thread finalization actions after finishing the loop iterations.voidreceiveTaskInput(T item, Comm comm, int mRank, int tag) Receive input data associated with a task.voidreceiveTaskOutput(T item, Comm comm, int wRank, int tag) Receive additional output data associated with a task.abstract voidProcess one item in this worker iteration.voidsendTaskInput(T item, Comm comm, int wRank, int tag) Send additional input data associated with a task.voidsendTaskOutput(T item, Comm comm, int mRank, int tag) Send additional output data associated with a task.voidstart()Perform per-thread initialization actions before starting the loop iterations.intReturns the tag offset for this worker for loop.Methods inherited from class edu.rit.pj.WorkerConstructgetThreadCount, getThreadIndex, getTotalThreadCount, isExecutingInParallel, isMasterThread, region, team
- 
Constructor Details- 
WorkerIterationpublic WorkerIteration()Construct a new worker iteration.
 
- 
- 
Method Details- 
startPerform per-thread initialization actions before starting the loop iterations. Called by a worker thread.The start()method may be overridden in a subclass. If not overridden, thestart()method does nothing.
- 
sendTaskInputSend additional input data associated with a task. Called by the master thread. The task is denoted by the given item to be processed. The input data must be sent using the given communicator, to the given worker process rank, with the given message tag.The sendTaskInput()method may be overridden in a subclass. If not overridden, thesendTaskInput()method does nothing.- Parameters:
- item- Item to be processed.
- comm- Communicator.
- wRank- Worker process rank.
- tag- Message tag.
- Throws:
- IOException- Thrown if an I/O error occurred.
- IOException- if any.
 
- 
receiveTaskInputReceive input data associated with a task. Called by a worker thread. The task is denoted by the given item to be processed. The input data must be received using the given communicator, from the given master process rank, with the given message tag.The receiveTaskInput()method may be overridden in a subclass. If not overridden, thereceiveTaskInput()method does nothing.- Parameters:
- item- Item to be processed.
- comm- Communicator.
- mRank- Master process rank.
- tag- Message tag.
- Throws:
- IOException- Thrown if an I/O error occurred.
- IOException- if any.
 
- 
runProcess one item in this worker iteration. Therun()method must perform the loop body for the given item.The run()method must be overridden in a subclass.
- 
sendTaskOutputSend additional output data associated with a task. Called by a worker thread. The task is denoted by the given item that was processed (whose state may have changed during processing). The output data must be sent using the given communicator, to the given master process rank, with the given message tag.The sendTaskOutput()method may be overridden in a subclass. If not overridden, thesendTaskOutput()method does nothing.- Parameters:
- item- Item that was processed.
- comm- Communicator.
- mRank- Master process rank.
- tag- Message tag.
- Throws:
- IOException- Thrown if an I/O error occurred.
- IOException- if any.
 
- 
receiveTaskOutputReceive additional output data associated with a task. Called by the master thread. The task is denoted by the given item that was processed (whose state may have changed during processing). The output data must be received using the given communicator, from the given worker process rank, with the given message tag.The receiveTaskOutput()method may be overridden in a subclass. If not overridden, thereceiveTaskOutput()method does nothing.- Parameters:
- item- Item that was processed.
- comm- Communicator.
- wRank- Worker process rank.
- tag- Message tag.
- Throws:
- IOException- Thrown if an I/O error occurred.
- IOException- if any.
 
- 
finishPerform per-thread finalization actions after finishing the loop iterations. Called by a worker thread.The finish()method may be overridden in a subclass. If not overridden, thefinish()method does nothing.
- 
tagOffsetpublic int tagOffset()Returns the tag offset for this worker for loop. Each message between the master and worker threads is sent with a message tag equal to W+T, where W is the worker index and T is the tag offset.The tagOffset()method may be overridden in a subclass. If not overridden, thetagOffset()returns a default tag offset ofInteger.MIN_VALUE.- Returns:
- Tag offset.
 
 
-