Class WorkerLongForLoop
long
. The loop stride is implicit (+1).
To execute a worker for loop, create a WorkerRegion object;
create an instance of a concrete subclass of class WorkerLongForLoop; and
pass this instance to the worker region's execute()
method. Either
every worker team thread must call the worker region's execute()
method with identical arguments, or every thread must not call the
execute()
method. You can do all this using an anonymous inner
class; for example:
new WorkerRegion() { . . . public void run() { . . . execute (0L, 99L, new WorkerLongForLoop() { // Thread local variable declarations . . . public void start() { // Per-thread pre-loop initialization code . . . } public void run (long first, long last) { // Loop code . . . } public void finish() { // Per-thread post-loop finalization code . . . } }); } . . . }
In each process of a cluster parallel program, the worker team has one or
more worker threads. Every worker thread in every process has a unique worker
tag, going from tag 0 for the first worker thread in the first process to tag
K−1 for the last worker thread in the last process, where
K is the total number of worker threads in all the processes. In
addition, in one process there is a master thread. The worker and master
threads all call the worker region's execute()
method to execute the
worker for loop. However, the worker and master threads differ in their
actions.
The master thread does the following. The master obtains the worker for
loop's schedule as returned by the schedule()
method. The range of
loop indexes is divided into "chunks" and the chunks are apportioned among
the workers in accordance with the schedule. The master repeatedly sends
"tasks" to the workers and receives "responses" from the workers. To send a
task to a particular worker, the master (1) sends a message containing the
chunk index range to the worker's process; and (2) calls the worker for
loop's sendTaskInput()
method. This method's default implementation
does nothing, but it can be overridden to send additional task input data to
the worker. To receive a response from a particular worker, the master (1)
receives a message containing the chunk index range from the worker's
process; and (2) calls the worker for loop's receiveTaskOutput()
method. This method's default implementation does nothing, but it can be
overridden to receive additional task output data from the worker. Once all
tasks have been sent to the workers and all responses have been received from
the workers, the master returns from the worker region's execute()
method.
Each worker thread does the following. The worker calls the worker for loop's
start()
method once before beginning any loop iterations. The worker
repeatedly receives tasks from the master and sends responses to the master.
To receive a task from the master, the worker (1) receives a message
containing the chunk index range from the master's process; and (2) calls the
worker for loop's receiveTaskInput()
method. This method's default
implementation does nothing, but it can be overridden to receive additional
task input data from the master. The worker now calls the worker for loop's
run()
method, passing in the chunk index range lower and upper
bounds. When the run()
method returns, the worker sends the response
to the master. To send the response, the worker (1) sends a message
containing the chunk index range to the master's process; and (2) calls the
worker for loop's sendTaskOutput()
method. This method's default
implementation does nothing, but it can be overridden to send additional task
output data to the master. Once all tasks have been received from the master
and all responses have been sent to the master, the worker calls the worker
for loop's finish()
method. (Unlike a ParallelTeam's
threads, the workers do not synchronize with each other at a barrier
at this point.) The worker then returns from the worker region's
execute()
method.
If the worker for loop has a fixed schedule (in which there is exactly one chunk with a predetermined index range for each worker), then the messages containing the chunk index range are omitted, and each worker gets its chunk index range directly from the fixed schedule. However, the task input data (if any) and task output data (if any) are still sent and received.
Each message described above is sent with a message tag equal to
W+T, where W is the worker index and T is the
"tag offset." The tag offset is Integer.MIN_VALUE
by default, but
this can be changed by overriding the tagOffset()
method. Thus, the
message tags fall in the range T .. K−1+T, where
K is the total number of workers in all the processes. The program
should not use message tags in this range except to send and receive the
messages described above.
Note that each worker team thread actually creates its own instance of the
worker for loop class and passes that instance to the worker region's
execute()
method. Thus, any fields declared in the worker for loop
class will not be shared by all the workers, but instead will be
private to each worker.
The start()
method is intended for performing per-thread
initialization before starting the loop iterations. If no such initialization
is needed, omit the start()
method.
The run()
method contains the code for the loop. The first and last
indexes for a chunk of loop iterations are passed in as arguments. The loop
stride is implicit (+1). The worker for loop's run()
method must be
coded this way:
public void run (long first, long last) { for (long i = first; i <= last; ++ i) { // Loop body code . . . } }with the loop indexes running from
first
to last
inclusive and increasing by +1 on each iteration.
The finish()
method is intended for performing per-thread
finalization after finishing the loop iterations. If no such finalization is
needed, omit the finish()
method.
If the worker for loop's start()
, run()
, or
finish()
method throws an exception in one of the worker threads,
then that worker thread executes no further code in the loop, and the worker
region's execute()
method throws that same exception in that thread.
However, the other worker threads in the worker team continue to execute.
- Version:
- 27-Jan-2010
- Author:
- Alan Kaminsky
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
finish()
Perform per-thread finalization actions after finishing the loop iterations.void
receiveTaskInput
(LongRange range, Comm comm, int mRank, int tag) Receive additional input data associated with a task.void
receiveTaskOutput
(LongRange range, Comm comm, int wRank, int tag) Receive additional output data associated with a task.abstract void
run
(long first, long last) Execute one chunk of iterations of this worker for loop.schedule()
Determine this worker for loop's schedule.void
sendTaskInput
(LongRange range, Comm comm, int wRank, int tag) Send additional input data associated with a task.void
sendTaskOutput
(LongRange range, Comm comm, int mRank, int tag) Send additional output data associated with a task.void
start()
Perform per-thread initialization actions before starting the loop iterations.int
Returns the tag offset for this worker for loop.Methods inherited from class edu.rit.pj.WorkerConstruct
getThreadCount, getThreadIndex, getTotalThreadCount, isExecutingInParallel, isMasterThread, region, team
-
Constructor Details
-
WorkerLongForLoop
public WorkerLongForLoop()Construct a new worker for loop.
-
-
Method Details
-
schedule
Determine this worker for loop's schedule. Called by the master and worker threads. The schedule determines how the loop iterations are apportioned among the worker team threads. For further information, see class LongSchedule.The
schedule()
method may be overridden in a subclass to return the desired schedule. If not overridden, the default is a runtime schedule (seeLongSchedule.runtime()
).- Returns:
- Schedule for this worker for loop.
-
start
Perform per-thread initialization actions before starting the loop iterations. Called by a worker thread.The
start()
method may be overridden in a subclass. If not overridden, thestart()
method does nothing. -
sendTaskInput
Send additional input data associated with a task. Called by the master thread. The task is denoted by the given chunk of loop iterations. The input data must be sent using the given communicator, to the given worker process rank, with the given message tag.The
sendTaskInput()
method may be overridden in a subclass. If not overridden, thesendTaskInput()
method does nothing.- Parameters:
range
- Chunk of loop iterations.comm
- Communicator.wRank
- Worker process rank.tag
- Message tag.- Throws:
IOException
- Thrown if an I/O error occurred.IOException
- if any.
-
receiveTaskInput
Receive additional input data associated with a task. Called by a worker thread. The task is denoted by the given chunk of loop iterations. The input data must be received using the given communicator, from the given master process rank, with the given message tag.The
receiveTaskInput()
method may be overridden in a subclass. If not overridden, thereceiveTaskInput()
method does nothing.- Parameters:
range
- Chunk of loop iterations.comm
- Communicator.mRank
- Master process rank.tag
- Message tag.- Throws:
IOException
- Thrown if an I/O error occurred.IOException
- if any.
-
run
Execute one chunk of iterations of this worker for loop. Called by a worker thread. Therun()
method must perform the loop body for indexesfirst
throughlast
inclusive, increasing the loop index by +1 after each iteration.The
run()
method must be overridden in a subclass. -
sendTaskOutput
Send additional output data associated with a task. Called by a worker thread. The task is denoted by the given chunk of loop iterations. The output data must be sent using the given communicator, to the given master process rank, with the given message tag.The
sendTaskOutput()
method may be overridden in a subclass. If not overridden, thesendTaskOutput()
method does nothing.- Parameters:
range
- Chunk of loop iterations.comm
- Communicator.mRank
- Master process rank.tag
- Message tag.- Throws:
IOException
- Thrown if an I/O error occurred.IOException
- if any.
-
receiveTaskOutput
Receive additional output data associated with a task. Called by the master thread. The task is denoted by the given chunk of loop iterations. The output data must be received using the given communicator, from the given worker process rank, with the given message tag.The
receiveTaskOutput()
method may be overridden in a subclass. If not overridden, thereceiveTaskOutput()
method does nothing.- Parameters:
range
- Chunk of loop iterations.comm
- Communicator.wRank
- Worker process rank.tag
- Message tag.- Throws:
IOException
- Thrown if an I/O error occurred.IOException
- if any.
-
finish
Perform per-thread finalization actions after finishing the loop iterations. Called by a worker thread.The
finish()
method may be overridden in a subclass. If not overridden, thefinish()
method does nothing. -
tagOffset
public int tagOffset()Returns the tag offset for this worker for loop. Each message between the master and worker threads is sent with a message tag equal to W+T, where W is the worker index and T is the tag offset.The
tagOffset()
method may be overridden in a subclass. If not overridden, thetagOffset()
returns a default tag offset ofInteger.MIN_VALUE
.- Returns:
- Tag offset.
-