1 //******************************************************************************
2 //
3 // File: Comm.java
4 // Package: edu.rit.pj
5 // Unit: Class edu.rit.pj.Comm
6 //
7 // This Java source file is copyright (C) 2009 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj;
41
42 import java.io.IOException;
43 import java.io.InterruptedIOException;
44 import java.io.PrintStream;
45 import java.net.InetSocketAddress;
46 import java.util.LinkedList;
47
48 import edu.rit.mp.Buf;
49 import edu.rit.mp.Channel;
50 import edu.rit.mp.ChannelGroup;
51 import edu.rit.mp.ConnectListener;
52 import edu.rit.mp.IORequest;
53 import edu.rit.mp.IntegerBuf;
54 import edu.rit.mp.ObjectBuf;
55 import edu.rit.mp.Status;
56 import edu.rit.pj.cluster.CommPattern;
57 import edu.rit.pj.cluster.JobBackend;
58 import edu.rit.pj.cluster.JobFrontend;
59 import edu.rit.pj.cluster.JobSchedulerException;
60 import edu.rit.pj.reduction.IntegerOp;
61 import edu.rit.pj.reduction.Op;
62 import edu.rit.util.Range;
63
64 /**
65 * Class Comm provides a communicator for a PJ cluster parallel program. Class
66 * Comm provides a method to initialize the PJ message passing middleware and
67 * run the parallel program on multiple processors of a cluster parallel
68 * computer. Class Comm also provides methods for passing messages between the
69 * processes of the parallel program.
70 * <HR>
71 * <p>
72 * <B>BASIC CONCEPTS</B>
73 * <p>
74 * A <B>cluster parallel computer</B> typically consists of a <B>frontend
75 * processor</B> and a number of <B>backend processors</B> connected via a
76 * dedicated high-speed network. A user logs into the frontend processor and
77 * runs a PJ program there. The PJ message passing middleware causes copies of
78 * the PJ program to run in a number of separate processes, each process on a
79 * different backend processor. The backend processes run the PJ program, using
80 * the PJ middleware to send messages amongst themselves. The PJ middleware
81 * redirects the backend processes' standard output and standard error streams
82 * to the frontend process. The frontend process does not actually execute the
83 * PJ program, but merely displays all the backend processes' standard output
84 * and standard error streams on the frontend process's own standard output and
85 * standard error.
86 * <p>
87 * For the PJ message passing middleware to work, certain server processes must
88 * be running. See package {@linkplain edu.rit.pj.cluster} for further
89 * informati
90 * on.
91 * <p>
92 * To initialize the PJ message passing middleware, the program must first call
93 * the static <code>Comm.init()</code> method, passing in the command line
94 * arguments.
95 * <p>
96 * A <B>communicator</B> is associated with a group of backend processes. The
97 * communicator's <B>size</B> is the number of processes in the group. Each
98 * process in the communicator has a different <B>rank</B> in the range 0 ..
99 * <I>size</I>-1. A process may obtain the size and rank by calling the
100 * communicator's <code>size()</code> and <code>rank()</code> methods.
101 * <p>
102 * There is one predefined communicator, the <B>world communicator,</B>
103 * consisting of all the backend processes in the parallel program. A process
104 * may obtain a reference to the world communicator by calling the static
105 * <code>Comm.world()</code> method. Typically, the first few lines in a PJ cluster
106 * parallel program look like this:
107 * <PRE>
108 * public class AParallelProgram
109 * {
110 * public static void main
111 * (String[] args)
112 * throws Exception
113 * {
114 * Comm.init (args);
115 * Comm world = Comm.world();
116 * int size = world.size();
117 * int rank = world.rank();
118 * . . .</PRE>
119 * <p>
120 * The number of processes in the parallel program -- that is, the size of the
121 * world communicator -- is specified by the <code>"pj.np"</code> property, which
122 * must be an integer greater than or equal to 1. You can specify the number of
123 * processes on the Java command line like this:
124 * <PRE>
125 * java -Dpj.np=4 . . .</PRE>
126 * <p>
127 * If the <code>"pj.np"</code> property is not specified, the default is 1.
128 * <p>
129 * The PJ program will run on the specified number of backend processors as
130 * described above. To determine which backend processors to use, the PJ program
131 * interacts with a <B>Job Scheduler</B> server process running on the frontend
132 * processor. When the PJ program starts and calls the <code>Comm.init()</code>
133 * method, the middleware first prints the job number on the standard error. The
134 * middleware then waits until the required number of backend processors are
135 * ready to run a job. As each backend processor becomes ready, the middleware
136 * prints on the standard error the name of each backend processor assigned to
137 * the job. Once all are ready, the PJ program starts running on those backend
138 * processors, and all further output comes from the PJ program. Since each PJ
139 * program interacts with the Job Scheduler, the Job Scheduler can ensure that
140 * each backend processor is running a backend process for only one job at a
141 * time.
142 * <p>
143 * Depending on the system load, your PJ program may have to wait in the Job
144 * Scheduler's queue for a while until enough backend processors become ready.
145 * If you get tired of waiting, you can kill your PJ program (e.g., by typing
146 * CTRL-C), which will remove your PJ program from the Job Scheduler's queue.
147 * <p>
148 * The Job Scheduler has a web interface that lets you examine the cluster
149 * status. Just point your web browser at this URL:
150 *
151 * <code> http://<hostname>:8080/</code>
152 * <p>
153 * where <code><hostname></code> is replaced by the host name of the frontend
154 * processor. The default port for the cluster status web interface is port
155 * 8080. The Job Scheduler can be configured to use a different port. For
156 * further information, see package {@linkplain edu.rit.pj.cluster}.
157 * <p>
158 * If the PJ program is executed on a host where no Job Scheduler is running,
159 * the PJ program will run in <I>one</I> process on that host (i.e., the machine
160 * you're logged into), rather than on the backend processors. The message
161 * passing methods in class Comm will still work, though. This option can be
162 * useful for debugging a PJ program's logic on a non-parallel machine before
163 * running the PJ program on a cluster.
164 * <HR>
165 * <p>
166 * <B>MESSAGE PASSING</B>
167 * <p>
168 * PJ provides two categories of communication, <B>point-to-point
169 * communication</B> and <B>collective communication.</B> The following methods
170 * of class Comm are used for point-to-point communication:
171 * <UL>
172 * <LI><code>send()</code>
173 * <LI><code>receive()</code>
174 * <LI><code>sendReceive()</code>
175 * <LI><code>floodSend()</code>
176 * <LI><code>floodReceive()</code>
177 * </UL>
178 * The following methods are used for collective communication:
179 * <UL>
180 * <LI><code>broadcast()</code>
181 * <LI><code>scatter()</code>
182 * <LI><code>gather()</code>
183 * <LI><code>allGather()</code>
184 * <LI><code>reduce()</code>
185 * <LI><code>allReduce()</code>
186 * <LI><code>allToAll()</code>
187 * <LI><code>scan()</code>
188 * <LI><code>exclusiveScan()</code>
189 * <LI><code>barrier()</code>
190 * </UL>
191 * These methods are described further in the sections below.
192 * <p>
193 * In addition, you can create a new communicator consisting of all, or a subset
194 * of, the processes in an existing communicator. Message passing in the new
195 * communicator is completely independent of message passing in the original
196 * communicator. The following method creates a new communicator:
197 * <UL>
198 * <LI><code>createComm()</code>
199 * </UL>
200 * <HR>
201 * <p>
202 * <B>POINT-TO-POINT COMMUNICATION</B>
203 * <p>
204 * One process in a PJ cluster parallel program, the <B>source process</B>, may
205 * use a communicator to send a message to another process in the program, the
206 * <B>destination process</B>. This is called a <B>point-to-point
207 * communication</B> because just the two processes are involved (as opposed to
208 * a collective communication, which involves all the processes). Five
209 * point-to-point communication methods are available in this release: send,
210 * receive, send-receive, flood-send, and flood-receive.
211 * <p>
212 * <B>Send and Receive</B>
213 * <p>
214 * To do a point-to-point communication, the source process calls the
215 * <code>send()</code> method on a certain communicator, such as the world
216 * communicator. The source process specifies the destination process's rank,
217 * the <B>tag</B> for the message, and a <B>buffer</B> containing the data items
218 * to be sent (type {@linkplain edu.rit.mp.Buf}). Likewise, the destination
219 * process calls the <code>receive()</code> method on the same communicator as the
220 * source process. The destination process specifies the source process's rank,
221 * the message tag which must be the same as in the source process, and the
222 * buffer for the data items to be received.
223 * <p>
224 * A <code>send()</code> method call and a <code>receive()</code> method call are said
225 * to <B>match</B> if (a) the rank passed to the <code>send()</code> method equals
226 * the rank of the process calling <code>receive()</code>, (b) the rank passed to
227 * the <code>receive()</code> method equals the rank of the process calling
228 * <code>send()</code>, (c) the item data type in the source buffer is the same as
229 * the item data type in the destination buffer, and (d) the send message tag
230 * equals the receive message tag. A <code>receive()</code> method call will block
231 * until a matching <code>send()</code> method call occurs. If more than one
232 * <code>send()</code> method call matches a <code>receive()</code> method call, one of
233 * the matching <code>send()</code> method calls is picked in an unspecified manner.
234 * A <code>send()</code> method call <I>may</I> block until a matching
235 * <code>receive()</code> method call occurs due to flow control in the underlying
236 * network communication.
237 * <p>
238 * The message tag can be used to distinguish different kinds of messages. A
239 * <code>receive()</code> method call will only match a <code>send()</code> method call
240 * with the same tag. If there is no need to distinguish different kinds of
241 * messages, omit the tag (it will default to 0).
242 * <p>
243 * Once a <code>send()</code> method call and a <code>receive()</code> method call have
244 * been matched together, the actual message data transfer takes place. Each
245 * item in the source buffer, starting at index 0 and continuing for the entire
246 * length of the source buffer, is written to the message. At the other end,
247 * each item in the destination buffer, starting at index 0, is read from the
248 * message.
249 * <p>
250 * The <code>receive()</code> method returns a {@linkplain CommStatus} object. The
251 * status object gives the actual rank of the process that sent the message, the
252 * actual message tag that was received, and the actual number of data items in
253 * the message. If the actual number of data items in the message is less than
254 * the length of the destination buffer, nothing is stored into the extra data
255 * items at the end of the destination buffer. If the actual number of data
256 * items in the message is greater than the length of the destination buffer,
257 * the extra data items at the end of the message are discarded.
258 * <p>
259 * The <code>send()</code> method does not return until all the message elements
260 * have been written from the source buffer. Likewise, the <code>receive()</code>
261 * method does not return until all the message elements have been read into the
262 * destination buffer. However, you cannot assume that because the
263 * <code>send()</code> method has returned, the matching <code>receive()</code> method
264 * has also returned. Because of buffering in the underlying network
265 * communication, not all the destination items might have been received even
266 * though all the source items have been sent.
267 * <p>
268 * The destination process, instead of specifying a particular source process,
269 * can declare that it will receive a message from any source process by
270 * specifying null for the source process rank in the <code>receive()</code> method
271 * call. This is called a <B>wildcard source</B>. In this case the
272 * <code>receive()</code> method call's returned status object will indicate the
273 * actual source process that sent the message.
274 * <p>
275 * The destination process, instead of specifying a particular message tag, can
276 * declare that it will receive a message with any tag by specifying null for
277 * the tag in the <code>receive()</code> method call. This is called a <B>wildcard
278 * tag</B>. Alternatively, the destination process can specify a range of
279 * message tags, and it will receive a message with any tag in the given range.
280 * In these cases the <code>receive()</code> method call's returned status object
281 * will indicate the actual message tag that was sent.
282 * <p>
283 * A process can send a message to itself. In this case one thread must call
284 * <code>send()</code> (specifying the process's own rank as the destination) and a
285 * different thread must call <code>receive()</code> (specifying the process's own
286 * rank as the source), otherwise a deadlock will ensue.
287 * <p>
288 * <B>Send-Receive</B>
289 * <p>
290 * By calling the <code>sendReceive()</code> method, a process can send a buffer of
291 * outgoing message items to a destination process while simultaneously
292 * receiving a buffer of incoming message items from a source process. The
293 * destination process may be the same as the source process, or different from
294 * the source process. The outgoing message items must come from a different
295 * place than where the incoming message items will be stored, otherwise the
296 * incoming message items may overwrite the outgoing message items before they
297 * can be sent. When the <code>sendReceive()</code> method returns, the outgoing
298 * message items have been fully sent, but they may not yet have been fully
299 * received; and the incoming message items have been fully received.
300 * <p>
301 * With the <code>sendReceive()</code> method, a process cannot receive a message
302 * from a wildcard source, and a process cannot receive a message with a
303 * wildcard tag or a range of tags. The process calling <code>sendReceive()</code>
304 * must know the rank of the source process and the message tag (if not 0). The
305 * <code>sendReceive()</code> method does return a status object giving the outcome
306 * of the receive half of the send-receive operation, just as the
307 * <code>receive()</code> method does.
308 * <p>
309 * A process can send-receive messages with itself. In this case one thread must
310 * call <code>sendReceive()</code> (specifying the process's own rank as the source
311 * and destination) and a different thread must also call <code>sendReceive()</code>
312 * (specifying the process's own rank as the source and destination), otherwise
313 * a deadlock will ensue.
314 * <p>
315 * <B>Non-Blocking Communication</B>
316 * <p>
317 * The <code>send()</code>, <code>receive()</code>, and <code>sendReceive()</code> methods
318 * each have a non-blocking version. A non-blocking communication method
319 * initiates the communication operation and immediately returns, storing the
320 * state of the communication operation in a {@linkplain CommRequest} object.
321 * The communicator then performs the communication operation in a separate
322 * thread. This allows the calling thread to do other work while the
323 * communication operation is in progress. To wait for the send and receive
324 * operations to finish, call the CommRequest object's <code>waitForFinish()</code>
325 * method.
326 * <p>
327 * <B>Flood-Send and Flood-Receive</B>
328 * <p>
329 * Any process can send a message to all processes in the communicator. This is
330 * called "flooding" the message. First, all processes must start a
331 * flood-receive operation, either by calling the non-blocking
332 * <code>floodReceive()</code> method, or by having a separate thread call the
333 * blocking <code>floodReceive()</code> method. Then, one process (any process) must
334 * call the <code>floodSend()</code> method. The data items in the flood-send
335 * operation's outgoing buffer are copied into the flood-receive operation's
336 * incoming buffer in all processes.
337 * <p>
338 * Message flooding is similar to the "broadcast" collective communication
339 * operation (see below). The differences are these: Broadcasting combines
340 * sending and receiving in a single operation; flooding uses separate send and
341 * receive operations. For broadcasting, every process must know which process
342 * is sending the outgoing data items; for flooding, the receiving processes do
343 * not need to know which process is sending (any process can send).
344 * <HR>
345 * <p>
346 * <B>COLLECTIVE COMMUNICATION</B>
347 * <p>
348 * A PJ cluster parallel program may use a communicator to send a message among
349 * all the processes in the program at the same time. This is called a
350 * <B>collective communication</B> because all the processes in the communicator
351 * are involved (as opposed to a point-to-point communication). Ten collective
352 * communication methods are available in this release: broadcast, scatter,
353 * gather, all-gather, reduce, all-reduce, all-to-all, scan, exclusive-scan, and
354 * barrier. Further collective communication methods will be added to class Comm
355 * in a later release.
356 * <p>
357 * <B>Broadcast</B>
358 * <p>
359 * One process in the communicator, the <B>root</B> process, has a source buffer
360 * (type {@linkplain edu.rit.mp.Buf Buf}) filled with data. The other processes
361 * in the communicator each have a destination buffer with the same length and
362 * the same item data type as the source buffer. Each process calls the
363 * communicator's <code>broadcast()</code> method. Afterwards, all the destination
364 * buffers contain the same data as the source buffer.
365 * <TABLE>
366 * <CAPTION>Before and After Broadcast.</CAPTION>
367 * <TR>
368 * <TD style="vertical-align:top;">
369 * Before:
370 * <pre>
371 * Process Process Process Process
372 * 0 (root) 1 2 3
373 * +----+ +----+ +----+ +----+
374 * | 1 | | | | | | |
375 * | 2 | | | | | | |
376 * | 3 | | | | | | |
377 * | 4 | | | | | | |
378 * | 5 | | | | | | |
379 * | 6 | | | | | | |
380 * | 7 | | | | | | |
381 * | 8 | | | | | | |
382 * +----+ +----+ +----+ +----+</pre>
383 * </TD>
384 * <TD>
385 * <pre>
386 * ... </pre>
387 * </TD>
388 * <TD style="vertical-align:top;">
389 * After:
390 * <pre>
391 * Process Process Process Process
392 * 0 (root) 1 2 3
393 * +----+ +----+ +----+ +----+
394 * | 1 | | 1 | | 1 | | 1 |
395 * | 2 | | 2 | | 2 | | 2 |
396 * | 3 | | 3 | | 3 | | 3 |
397 * | 4 | | 4 | | 4 | | 4 |
398 * | 5 | | 5 | | 5 | | 5 |
399 * | 6 | | 6 | | 6 | | 6 |
400 * | 7 | | 7 | | 7 | | 7 |
401 * | 8 | | 8 | | 8 | | 8 |
402 * +----+ +----+ +----+ +----+</pre>
403 * </TD>
404 * </TR>
405 * </TABLE>
406 * <I>Note:</I> Any process can be the root of the broadcast. The above is only
407 * one example with process 0 as the root.
408 * <p>
409 * <B>Scatter</B>
410 * <p>
411 * One process in the communicator, the root process, has <I>K</I> source
412 * buffers (type {@linkplain edu.rit.mp.Buf Buf}) filled with data, where
413 * <I>K</I> is the size of the communicator. For example, the source buffers
414 * could be different portions of an array. Each process in the communicator
415 * (including the root process) has a destination buffer with the same length
416 * and the same item data type as the corresponding source buffer. Each process
417 * calls the communicator's <code>scatter()</code> method. Afterwards, each
418 * process's destination buffer contains the same data as the corresponding
419 * source buffer in the root process.
420 * <TABLE>
421 * <CAPTION>Scatter.</CAPTION>
422 * <TR>
423 * <TD style="vertical-align:top;">
424 * Before:
425 * <pre>
426 * Process Process Process Process
427 * 0 (root) 1 2 3
428 * +----+
429 * | 1 |
430 * | 2 |
431 * +----+
432 * | 3 |
433 * | 4 |
434 * +----+
435 * | 5 |
436 * | 6 |
437 * +----+
438 * | 7 |
439 * | 8 |
440 * +----+
441 *
442 * +----+ +----+ +----+ +----+
443 * | | | | | | | |
444 * | | | | | | | |
445 * +----+ +----+ +----+ +----+</pre>
446 * </TD>
447 * <TD>
448 * <pre>
449 * ... </pre>
450 * </TD>
451 * <TD style="vertical-align:top;">
452 * After:
453 * <pre>
454 * Process Process Process Process
455 * 0 (root) 1 2 3
456 * +----+
457 * | 1 |
458 * | 2 |
459 * +----+
460 * | 3 |
461 * | 4 |
462 * +----+
463 * | 5 |
464 * | 6 |
465 * +----+
466 * | 7 |
467 * | 8 |
468 * +----+
469 *
470 * +----+ +----+ +----+ +----+
471 * | 1 | | 3 | | 5 | | 7 |
472 * | 2 | | 4 | | 6 | | 8 |
473 * +----+ +----+ +----+ +----+</pre>
474 * </TD>
475 * </TR>
476 * </TABLE>
477 * In the root process, the destination buffer can be the same as the source
478 * buffer:
479 * <TABLE>
480 * <CAPTION>Scatter</CAPTION>
481 * <TR>
482 * <TD style="vertical-align:top;">
483 * Before:
484 * <pre>
485 * Process Process Process Process
486 * 0 (root) 1 2 3
487 * +----+
488 * | 1 |
489 * | 2 |
490 * +----+ +----+
491 * | 3 | | |
492 * | 4 | | |
493 * +----+ +----+ +----+
494 * | 5 | | |
495 * | 6 | | |
496 * +----+ +----+ +----+
497 * | 7 | | |
498 * | 8 | | |
499 * +----+ +----+</pre>
500 * </TD>
501 * <TD>
502 * <pre>
503 * ... </pre>
504 * </TD>
505 * <TD style="vertical-align:top;">
506 * After:
507 * <pre>
508 * Process Process Process Process
509 * 0 (root) 1 2 3
510 * +----+
511 * | 1 |
512 * | 2 |
513 * +----+ +----+
514 * | 3 | | 3 |
515 * | 4 | | 4 |
516 * +----+ +----+ +----+
517 * | 5 | | 5 |
518 * | 6 | | 6 |
519 * +----+ +----+ +----+
520 * | 7 | | 7 |
521 * | 8 | | 8 |
522 * +----+ +----+</pre>
523 * </TD>
524 * </TR>
525 * </TABLE>
526 * <I>Note:</I> Any process can be the root of the scatter. The above is only
527 * one example with process 0 as the root.
528 * <p>
529 * <B>Gather</B>
530 * <p>
531 * Gather is the opposite of scatter. One process in the communicator, the root
532 * process, has <I>K</I> destination buffers (type {@linkplain edu.rit.mp.Buf
533 * Buf}), where <I>K</I> is the size of the communicator. For example, the
534 * destination buffers could be different portions of an array. Each process in
535 * the communicator (including the root process) has a source buffer with the
536 * same length and the same item data type as the corresponding destination
537 * buffer, filled with data. Each process calls the communicator's
538 * <code>gather()</code> method. Afterwards, each destination buffer in the root
539 * process contains the same data as the corresponding source buffer.
540 * <TABLE>
541 * <CAPTION>Gather</CAPTION>
542 * <TR>
543 * <TD style="vertical-align:top;">
544 * Before:
545 * <pre>
546 * Process Process Process Process
547 * 0 (root) 1 2 3
548 * +----+ +----+ +----+ +----+
549 * | 1 | | 3 | | 5 | | 7 |
550 * | 2 | | 4 | | 6 | | 8 |
551 * +----+ +----+ +----+ +----+
552 *
553 * +----+
554 * | |
555 * | |
556 * +----+
557 * | |
558 * | |
559 * +----+
560 * | |
561 * | |
562 * +----+
563 * | |
564 * | |
565 * +----+</pre>
566 * </TD>
567 * <TD>
568 * <pre>
569 * ... </pre>
570 * </TD>
571 * <TD style="vertical-align:top;">
572 * After:
573 * <pre>
574 * Process Process Process Process
575 * 0 (root) 1 2 3
576 * +----+ +----+ +----+ +----+
577 * | 1 | | 3 | | 5 | | 7 |
578 * | 2 | | 4 | | 6 | | 8 |
579 * +----+ +----+ +----+ +----+
580 *
581 * +----+
582 * | 1 |
583 * | 2 |
584 * +----+
585 * | 3 |
586 * | 4 |
587 * +----+
588 * | 5 |
589 * | 6 |
590 * +----+
591 * | 7 |
592 * | 8 |
593 * +----+</pre>
594 * </TD>
595 * </TR>
596 * </TABLE>
597 * In the root process, the destination buffer can be the same as the source
598 * buffer:
599 * <TABLE>
600 * <CAPTION>Gather</CAPTION>
601 * <TR>
602 * <TD style="vertical-align:top;">
603 * Before:
604 * <pre>
605 * Process Process Process Process
606 * 0 (root) 1 2 3
607 * +----+
608 * | 1 |
609 * | 2 |
610 * +----+ +----+
611 * | | | 3 |
612 * | | | 4 |
613 * +----+ +----+ +----+
614 * | | | 5 |
615 * | | | 6 |
616 * +----+ +----+ +----+
617 * | | | 7 |
618 * | | | 8 |
619 * +----+ +----+</pre>
620 * </TD>
621 * <TD>
622 * <pre>
623 * ... </pre>
624 * </TD>
625 * <TD style="vertical-align:top;">
626 * After:
627 * <pre>
628 * Process Process Process Process
629 * 0 (root) 1 2 3
630 * +----+
631 * | 1 |
632 * | 2 |
633 * +----+ +----+
634 * | 3 | | 3 |
635 * | 4 | | 4 |
636 * +----+ +----+ +----+
637 * | 5 | | 5 |
638 * | 6 | | 6 |
639 * +----+ +----+ +----+
640 * | 7 | | 7 |
641 * | 8 | | 8 |
642 * +----+ +----+</pre>
643 * </TD>
644 * </TR>
645 * </TABLE>
646 * <I>Note:</I> Any process can be the root of the gather. The above is only one
647 * example with process 0 as the root.
648 * <p>
649 * <B>All-Gather</B>
650 * <p>
651 * All-gather is the same as gather, except that every process has an array of
652 * destination buffers, and every process receives the results of the gather.
653 * Each process in the communicator has a source buffer (type {@linkplain
654 * edu.rit.mp.Buf Buf}) filled with data. Each process in the communicator has
655 * <I>K</I> destination buffers, where <I>K</I> is the size of the communicator.
656 * For example, the destination buffers could be different portions of an array.
657 * Each destination buffer has the same length and the same item data type as
658 * the corresponding source buffer. Each process calls the communicator's
659 * <code>allGather()</code> method. Afterwards, each destination buffer in every
660 * process contains the same data as the corresponding source buffer.
661 * <TABLE>
662 * <CAPTION>All-Gather</CAPTION>
663 * <TR>
664 * <TD style="vertical-align:top;">
665 * Before:
666 * <pre>
667 * Process Process Process Process
668 * 0 1 2 3
669 * +----+ +----+ +----+ +----+
670 * | 1 | | 3 | | 5 | | 7 |
671 * | 2 | | 4 | | 6 | | 8 |
672 * +----+ +----+ +----+ +----+
673 *
674 * +----+ +----+ +----+ +----+
675 * | | | | | | | |
676 * | | | | | | | |
677 * +----+ +----+ +----+ +----+
678 * | | | | | | | |
679 * | | | | | | | |
680 * +----+ +----+ +----+ +----+
681 * | | | | | | | |
682 * | | | | | | | |
683 * +----+ +----+ +----+ +----+
684 * | | | | | | | |
685 * | | | | | | | |
686 * +----+ +----+ +----+ +----+</pre>
687 * </TD>
688 * <TD>
689 * <pre>
690 * ... </pre>
691 * </TD>
692 * <TD style="vertical-align:top;">
693 * After:
694 * <pre>
695 * Process Process Process Process
696 * 0 1 2 3
697 * +----+ +----+ +----+ +----+
698 * | 1 | | 3 | | 5 | | 7 |
699 * | 2 | | 4 | | 6 | | 8 |
700 * +----+ +----+ +----+ +----+
701 *
702 * +----+ +----+ +----+ +----+
703 * | 1 | | 1 | | 1 | | 1 |
704 * | 2 | | 2 | | 2 | | 2 |
705 * +----+ +----+ +----+ +----+
706 * | 3 | | 3 | | 3 | | 3 |
707 * | 4 | | 4 | | 4 | | 4 |
708 * +----+ +----+ +----+ +----+
709 * | 5 | | 5 | | 5 | | 5 |
710 * | 6 | | 6 | | 6 | | 6 |
711 * +----+ +----+ +----+ +----+
712 * | 7 | | 7 | | 7 | | 7 |
713 * | 8 | | 8 | | 8 | | 8 |
714 * +----+ +----+ +----+ +----+</pre>
715 * </TD>
716 * </TR>
717 * </TABLE>
718 * The destination buffer can be the same as the source buffer in each process:
719 * <TABLE>
720 * <CAPTION>All-Gather</CAPTION>
721 * <TR>
722 * <TD style="vertical-align:top;">
723 * Before:
724 * <pre>
725 * Process Process Process Process
726 * 0 1 2 3
727 * +----+ +----+ +----+ +----+
728 * | 1 | | | | | | |
729 * | 2 | | | | | | |
730 * +----+ +----+ +----+ +----+
731 * | | | 3 | | | | |
732 * | | | 4 | | | | |
733 * +----+ +----+ +----+ +----+
734 * | | | | | 5 | | |
735 * | | | | | 6 | | |
736 * +----+ +----+ +----+ +----+
737 * | | | | | | | 7 |
738 * | | | | | | | 8 |
739 * +----+ +----+ +----+ +----+</pre>
740 * </TD>
741 * <TD>
742 * <pre>
743 * ... </pre>
744 * </TD>
745 * <TD style="vertical-align:top;">
746 * After:
747 * <pre>
748 * Process Process Process Process
749 * 0 1 2 3
750 * +----+ +----+ +----+ +----+
751 * | 1 | | 1 | | 1 | | 1 |
752 * | 2 | | 2 | | 2 | | 2 |
753 * +----+ +----+ +----+ +----+
754 * | 3 | | 3 | | 3 | | 3 |
755 * | 4 | | 4 | | 4 | | 4 |
756 * +----+ +----+ +----+ +----+
757 * | 5 | | 5 | | 5 | | 5 |
758 * | 6 | | 6 | | 6 | | 6 |
759 * +----+ +----+ +----+ +----+
760 * | 7 | | 7 | | 7 | | 7 |
761 * | 8 | | 8 | | 8 | | 8 |
762 * +----+ +----+ +----+ +----+</pre>
763 * </TD>
764 * </TR>
765 * </TABLE>
766 * <p>
767 * <B>Reduce</B>
768 * <p>
769 * Reduce is like gather, except the buffers' contents are combined together
770 * instead of just copied. Each process in the communicator has a buffer (type
771 * {@linkplain edu.rit.mp.Buf Buf}) filled with data. Each process calls the
772 * communicator's <code>reduce()</code> method, specifying some binary operation
773 * (type {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data.
774 * Afterwards, each element of the buffer in the root process contains the
775 * result of combining all the corresponding elements in all the buffers using
776 * the specified binary operation. For example, if the operation is addition,
777 * each buffer element in the root process ends up being the sum of the
778 * corresponding buffer elements in all the processes. In the non-root
779 * processes, the buffers' contents may be changed from their original contents.
780 * <TABLE>
781 * <CAPTION>Reduce</CAPTION>
782 * <TR>
783 * <TD style="vertical-align:top;">
784 * Before:
785 * <pre>
786 * Process Process Process Process
787 * 0 (root) 1 2 3
788 * +----+ +----+ +----+ +----+
789 * | 1 | | 3 | | 5 | | 7 |
790 * | 2 | | 4 | | 6 | | 8 |
791 * +----+ +----+ +----+ +----+</pre>
792 * </TD>
793 * <TD>
794 * <pre>
795 * ... </pre>
796 * </TD>
797 * <TD style="vertical-align:top;">
798 * After:
799 * <pre>
800 * Process Process Process Process
801 * 0 (root) 1 2 3
802 * +----+ +----+ +----+ +----+
803 * | 16 | | ?? | | ?? | | ?? |
804 * | 20 | | ?? | | ?? | | ?? |
805 * +----+ +----+ +----+ +----+</pre>
806 * </TD>
807 * </TR>
808 * </TABLE>
809 * <I>Note:</I> Any process can be the root of the reduction. The above is only
810 * one example with process 0 as the root.
811 * <p>
812 * <B>All-Reduce</B>
813 * <p>
814 * All-reduce is the same as reduce, except that every process receives the
815 * results of the reduction. Each process in the communicator has a buffer (type
816 * {@linkplain edu.rit.mp.Buf Buf}) filled with data. Each process calls the
817 * communicator's <code>allReduce()</code> method, specifying some binary operation
818 * (type {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data.
819 * Afterwards, each element of the buffer in each process contains the result of
820 * combining all the corresponding elements in all the buffers using the
821 * specified binary operation. For example, if the operation is addition, each
822 * buffer element ends up being the sum of the corresponding buffer elements.
823 * <TABLE>
824 * <CAPTION>All-Reduce</CAPTION>
825 * <TR>
826 * <TD style="vertical-align:top;">
827 * Before:
828 * <pre>
829 * Process Process Process Process
830 * 0 1 2 3
831 * +----+ +----+ +----+ +----+
832 * | 1 | | 3 | | 5 | | 7 |
833 * | 2 | | 4 | | 6 | | 8 |
834 * +----+ +----+ +----+ +----+</pre>
835 * </TD>
836 * <TD>
837 * <pre>
838 * ... </pre>
839 * </TD>
840 * <TD style="vertical-align:top;">
841 * After:
842 * <pre>
843 * Process Process Process Process
844 * 0 1 2 3
845 * +----+ +----+ +----+ +----+
846 * | 16 | | 16 | | 16 | | 16 |
847 * | 20 | | 20 | | 20 | | 20 |
848 * +----+ +----+ +----+ +----+</pre>
849 * </TD>
850 * </TR>
851 * </TABLE>
852 * <p>
853 * <B>All-to-All</B>
854 * <p>
855 * Every process in the communicator has <I>K</I> source buffers (type
856 * {@linkplain edu.rit.mp.Buf Buf}) filled with data, where <I>K</I> is the size
857 * of the communicator. Every process in the communicator also has <I>K</I>
858 * destination buffers (type {@linkplain edu.rit.mp.Buf Buf}). The source
859 * buffers and the destination buffers must refer to different storage. For
860 * example, the source buffers could be portions of an array, and the
861 * destination buffers could be portions of a different array. Each process
862 * calls the communicator's <code>allToAll()</code> method. Afterwards, for each
863 * process rank <I>k</I>, 0 <= <I>k</I> <= <I>K</I>-1, and each buffer
864 * index <I>i</I>, 0 <= <I>i</I> <= <I>K</I>-1, destination buffer
865 * <I>i</I> in process <I>k</I> contains the same data as source buffer <I>k</I>
866 * in process <I>i</I>.
867 * <TABLE>
868 * <CAPTION>All-to-All</CAPTION>
869 * <TR>
870 * <TD style="vertical-align:top;">
871 * Before:
872 * <pre>
873 * Process Process Process Process
874 * 0 1 2 3
875 * +----+ +----+ +----+ +----+
876 * | 1 | | 9 | | 17 | | 25 |
877 * | 2 | | 10 | | 18 | | 26 |
878 * +----+ +----+ +----+ +----+
879 * | 3 | | 11 | | 19 | | 27 |
880 * | 4 | | 12 | | 20 | | 28 |
881 * +----+ +----+ +----+ +----+
882 * | 5 | | 13 | | 21 | | 29 |
883 * | 6 | | 14 | | 22 | | 30 |
884 * +----+ +----+ +----+ +----+
885 * | 7 | | 15 | | 23 | | 31 |
886 * | 8 | | 16 | | 24 | | 32 |
887 * +----+ +----+ +----+ +----+
888 *
889 * +----+ +----+ +----+ +----+
890 * | | | | | | | |
891 * | | | | | | | |
892 * +----+ +----+ +----+ +----+
893 * | | | | | | | |
894 * | | | | | | | |
895 * +----+ +----+ +----+ +----+
896 * | | | | | | | |
897 * | | | | | | | |
898 * +----+ +----+ +----+ +----+
899 * | | | | | | | |
900 * | | | | | | | |
901 * +----+ +----+ +----+ +----+</pre>
902 * </TD>
903 * <TD>
904 * <pre>
905 * ... </pre>
906 * </TD>
907 * <TD style="vertical-align:top;">
908 * After:
909 * <pre>
910 * Process Process Process Process
911 * 0 1 2 3
912 * +----+ +----+ +----+ +----+
913 * | 1 | | 9 | | 17 | | 25 |
914 * | 2 | | 10 | | 18 | | 26 |
915 * +----+ +----+ +----+ +----+
916 * | 3 | | 11 | | 19 | | 27 |
917 * | 4 | | 12 | | 20 | | 28 |
918 * +----+ +----+ +----+ +----+
919 * | 5 | | 13 | | 21 | | 29 |
920 * | 6 | | 14 | | 22 | | 30 |
921 * +----+ +----+ +----+ +----+
922 * | 7 | | 15 | | 23 | | 31 |
923 * | 8 | | 16 | | 24 | | 32 |
924 * +----+ +----+ +----+ +----+
925 *
926 * +----+ +----+ +----+ +----+
927 * | 1 | | 3 | | 5 | | 7 |
928 * | 2 | | 4 | | 6 | | 8 |
929 * +----+ +----+ +----+ +----+
930 * | 9 | | 11 | | 13 | | 15 |
931 * | 10 | | 12 | | 14 | | 16 |
932 * +----+ +----+ +----+ +----+
933 * | 17 | | 19 | | 21 | | 23 |
934 * | 18 | | 20 | | 22 | | 24 |
935 * +----+ +----+ +----+ +----+
936 * | 25 | | 27 | | 29 | | 31 |
937 * | 26 | | 28 | | 30 | | 32 |
938 * +----+ +----+ +----+ +----+</pre>
939 * </TD>
940 * </TR>
941 * </TABLE>
942 * <p>
943 * <B>Scan</B>
944 * <p>
945 * Each process in the communicator has a buffer (type {@linkplain
946 * edu.rit.mp.Buf Buf}) filled with data. Each process calls the communicator's
947 * <code>scan()</code> method, specifying some binary operation (type {@linkplain
948 * edu.rit.pj.reduction.Op Op}) for combining the data. Afterwards, each element
949 * of the buffer in a particular process contains the result of combining all
950 * the corresponding elements in its own and all lower-ranked processes' buffers
951 * using the specified binary operation. For example, if the operation is
952 * addition, each buffer element ends up being the sum of its own and all
953 * lower-ranked processes' buffer elements.
954 * <TABLE>
955 * <CAPTION>Scan</CAPTION>
956 * <TR>
957 * <TD style="vertical-align:top;">
958 * Before:
959 * <pre>
960 * Process Process Process Process
961 * 0 1 2 3
962 * +----+ +----+ +----+ +----+
963 * | 1 | | 3 | | 5 | | 7 |
964 * | 2 | | 4 | | 6 | | 8 |
965 * +----+ +----+ +----+ +----+</pre>
966 * </TD>
967 * <TD>
968 * <pre>
969 * ... </pre>
970 * </TD>
971 * <TD style="vertical-align:top;">
972 * After:
973 * <pre>
974 * Process Process Process Process
975 * 0 1 2 3
976 * +----+ +----+ +----+ +----+
977 * | 1 | | 4 | | 9 | | 16 |
978 * | 2 | | 6 | | 12 | | 20 |
979 * +----+ +----+ +----+ +----+</pre>
980 * </TD>
981 * </TR>
982 * </TABLE>
983 * The scan operation is also known as "prefix scan" or "inclusive prefix scan"
984 * -- "inclusive" because the process's own element is included in the result.
985 * <p>
986 * <B>Exclusive-Scan</B>
987 * <p>
988 * The exclusive-scan operation is a variation of the scan operation. Each
989 * process in the communicator has a buffer (type {@linkplain edu.rit.mp.Buf
990 * Buf}) filled with data. Each process calls the communicator's
991 * <code>exclusiveScan()</code> method, specifying some binary operation (type
992 * {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data, and
993 * specifying an initial data value. Afterwards, each element of the buffer in a
994 * particular process contains the result of combining all the corresponding
995 * elements in all lower-ranked processes' buffers using the specified binary
996 * operation, except in process 0 each element of the buffer contains the
997 * initial data value. For example, if the operation is addition and the initial
998 * data value is 0, each buffer element ends up being the sum of all
999 * lower-ranked processes' buffer elements.
1000 * <TABLE>
1001 * <CAPTION>Exclusive-Scan</CAPTION>
1002 * <TR>
1003 * <TD style="vertical-align:top;">
1004 * Before:
1005 * <pre>
1006 * Process Process Process Process
1007 * 0 1 2 3
1008 * +----+ +----+ +----+ +----+
1009 * | 1 | | 3 | | 5 | | 7 |
1010 * | 2 | | 4 | | 6 | | 8 |
1011 * +----+ +----+ +----+ +----+</pre>
1012 * </TD>
1013 * <TD>
1014 * <pre>
1015 * ... </pre>
1016 * </TD>
1017 * <TD style="vertical-align:top;">
1018 * After:
1019 * <pre>
1020 * Process Process Process Process
1021 * 0 1 2 3
1022 * +----+ +----+ +----+ +----+
1023 * | 0 | | 1 | | 4 | | 9 |
1024 * | 0 | | 2 | | 6 | | 12 |
1025 * +----+ +----+ +----+ +----+</pre>
1026 * </TD>
1027 * </TR>
1028 * </TABLE>
1029 * This version of the scan operation is also known as "exclusive prefix scan"
1030 * -- "exclusive" because the process's own element is excluded from the result.
1031 * <p>
1032 * <B>Barrier</B>
1033 * <p>
1034 * The barrier operation causes all the processes to synchronize with each
1035 * other. Each process calls the communicator's <code>barrier()</code> method. The
1036 * calling thread blocks until all processes in the communicator have called the
1037 * <code>barrier()</code> method. Then the calling thread unblocks and returns from
1038 * the <code>barrier()</code> method call.
1039 *
1040 * @author Alan Kaminsky
1041 * @version 21-Jan-2009
1042 */
1043 public class Comm {
1044
1045 // Hidden data members.
1046 // Predefined communicators.
1047 private static Comm theWorldCommunicator;
1048 private static Comm theFrontendCommunicator;
1049
1050 // This communicator's size, rank, and host.
1051 private int mySize;
1052 private int myRank;
1053 private String myHost;
1054
1055 // The largest power of 2 less than or equal to this communicator's size.
1056 private int mySizePowerOf2;
1057
1058 // Channel group for message passing in this communicator.
1059 private ChannelGroup myChannelGroup;
1060
1061 // Map from rank (array index) to channel group address (array element).
1062 private InetSocketAddress[] myAddressForRank;
1063
1064 // Map from rank (array index) to channel for communicating with the process
1065 // at that rank (array element).
1066 private Channel[] myChannelForRank;
1067
1068 // Broadcast trees for flood-send, flood-receive, broadcast, and reduce
1069 // operations, indexed by root.
1070 private int[][] myBroadcastTree;
1071
1072 // Hidden constructors.
1073
1074 /**
1075 * Construct a new communicator.
1076 *
1077 * @param size Communicator's size.
1078 * @param rank Current process's rank in the communicator.
1079 * @param host Host name.
1080 * @param channelgroup Channel group for message passing in this
1081 * communicator.
1082 * @param address Map from rank (array index) to channel group address
1083 * (array element).
1084 */
1085 private Comm(int size,
1086 int rank,
1087 String host,
1088 ChannelGroup channelgroup,
1089 InetSocketAddress[] address) {
1090 // Record size, rank, channel group.
1091 mySize = size;
1092 myRank = rank;
1093 myHost = host;
1094 myChannelGroup = channelgroup;
1095
1096 // Determine the largest power of 2 less than or equal to this
1097 // communicator's size.
1098 int p2 = 1;
1099 while (p2 <= size) {
1100 p2 <<= 1;
1101 }
1102 mySizePowerOf2 = p2 >>> 1;
1103
1104 // Set channel group ID equal to rank.
1105 myChannelGroup.setChannelGroupId(rank);
1106
1107 // Set up connect listener.
1108 myChannelGroup.setConnectListener(new ConnectListener() {
1109 public void nearEndConnected(ChannelGroup theChannelGroup,
1110 Channel theChannel)
1111 throws IOException {
1112 }
1113
1114 public void farEndConnected(ChannelGroup theChannelGroup,
1115 Channel theChannel)
1116 throws IOException {
1117 doFarEndConnected(theChannel);
1118 }
1119 });
1120
1121 // Record socket address for each process rank.
1122 myAddressForRank = address;
1123
1124 // Set up channel for each process rank.
1125 myChannelForRank = new Channel[size];
1126
1127 // Populate channel at my own rank with the loopback channel.
1128 myChannelForRank[myRank] = channelgroup.loopbackChannel();
1129
1130 // If there's more than one process, start listening for incoming
1131 // connections.
1132 if (mySize > 1) {
1133 myChannelGroup.startListening();
1134 }
1135 }
1136
1137 // Exported operations.
1138
1139 /**
1140 * Initialize the PJ message passing middleware. Certain Java system
1141 * properties specify the middleware's behavior; these properties are
1142 * typically given on the Java command line with the <code>"-D"</code> flag. For
1143 * further information, see class {@linkplain PJProperties}.
1144 *
1145 * @param args Command line arguments.
1146 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1147 * <code>args</code> is null.
1148 * @throws java.lang.IllegalArgumentException (unchecked exception) Thrown if the
1149 * value of one of the Java system properties is illegal.
1150 * @throws java.io.IOException Thrown if an I/O error occurred.
1151 */
1152 public static void init(String[] args)
1153 throws IOException {
1154 // Verify preconditions.
1155 if (args == null) {
1156 throw new NullPointerException("Comm.init(): args is null");
1157 }
1158
1159 // Get the job backend object.
1160 JobBackend backend = JobBackend.getJobBackend();
1161
1162 if (backend == null) {
1163 // We're running on the frontend processor.
1164
1165 // Prepare constructor arguments for the Job Frontend object.
1166 String username = System.getProperty("user.name");
1167 int Nn = PJProperties.getPjNn();
1168 int Np = PJProperties.getPjNp();
1169 int Nt = PJProperties.getPjNt();
1170 boolean hasFrontendComm = false;
1171
1172 // Examine the call stack to find the main program class name.
1173 StackTraceElement[] stack
1174 = Thread.currentThread().getStackTrace();
1175 StackTraceElement bottom = stack[stack.length - 1];
1176
1177 if (!bottom.getMethodName().equals("main")) {
1178 // throw new IllegalStateException("Comm.init(): Not called from main program, but from " + bottom.getMethodName());
1179 // Set up world communicator in this process.
1180 theWorldCommunicator
1181 = new Comm(/*size */1,
1182 /*rank */ 0,
1183 /*host */ "<unknown>",
1184 /*channelgroup*/ new ChannelGroup(),
1185 /*address */
1186 new InetSocketAddress[]{new InetSocketAddress(0)});
1187 return;
1188 }
1189
1190 String mainClassName = bottom.getClassName();
1191
1192 // Set up the Job Frontend object.
1193 JobFrontend frontend = null;
1194 try {
1195 frontend
1196 = new JobFrontend(username, Nn, Np, Nt, hasFrontendComm, mainClassName,
1197 args);
1198
1199 // We were able to contact the Job Scheduler.
1200 // Run the job frontend in this process, then exit.
1201 frontend.run();
1202 System.exit(0);
1203 } catch (JobSchedulerException exc) {
1204 // We were not able to contact the Job Scheduler.
1205 // System.err.println(" No Job Scheduler at " + PJProperties.getPjHost() + ":" + PJProperties.getPjPort() + ", running in this (one) process");
1206
1207 // Set up world communicator.
1208 theWorldCommunicator
1209 = new Comm(/*size */1,
1210 /*rank */ 0,
1211 /*host */ "<unknown>",
1212 /*channelgroup*/ new ChannelGroup(),
1213 /*address */
1214 new InetSocketAddress[]{new InetSocketAddress(0)});
1215 }
1216 } else {
1217 // We're running on a backend processor.
1218
1219 // Set up world communicator.
1220 theWorldCommunicator
1221 = new Comm(/*size */backend.getK(),
1222 /*rank */ backend.getRank(),
1223 /*host */ backend.getBackendHost(),
1224 /*channelgroup*/ backend.getWorldChannelGroup(),
1225 /*address */ backend.getWorldAddress());
1226 }
1227 }
1228
1229 /**
1230 * Obtain a reference to the world communicator.
1231 *
1232 * @return World communicator.
1233 * @throws java.lang.IllegalStateException (unchecked exception) Thrown if
1234 * <code>Comm.init()</code> has not been called. Thrown if <code>world()</code> is
1235 * called in the job frontend process; the world communicator does not exist
1236 * in the job frontend process.
1237 */
1238 public static Comm world() {
1239 if (theWorldCommunicator != null) {
1240 return theWorldCommunicator;
1241 } else if (JobBackend.getJobBackend() != null) {
1242 throw new IllegalStateException("Comm.world(): Didn't call Comm.init()");
1243 } else {
1244 throw new IllegalStateException("Comm.world(): World communicator doesn't exist in job frontend process");
1245 }
1246 }
1247
1248 /**
1249 * Obtain the number of processes in this communicator.
1250 *
1251 * @return Size.
1252 */
1253 public int size() {
1254 return mySize;
1255 }
1256
1257 /**
1258 * Obtain the current process's rank in this communicator.
1259 *
1260 * @return Rank.
1261 */
1262 public int rank() {
1263 return myRank;
1264 }
1265
1266 /**
1267 * Obtain the host name of this communicator's backend processor. If this
1268 * communicator is not running on a cluster backend processor, the host name
1269 * is <code>"<unknown>"</code>.
1270 *
1271 * @return Host name.
1272 */
1273 public String host() {
1274 return myHost;
1275 }
1276
1277 /**
1278 * Create a new communicator. <I>Every</I> process in this communicator must
1279 * call the <code>createComm()</code> method. Each process passes true or false
1280 * for the <code>participate</code> argument to state whether the process will
1281 * participate in the new communicator. At least one process must
1282 * participate in the new communicator. Messages to set up the new
1283 * communicator are sent to all processes in this communicator, using a
1284 * message tag of 0.
1285 * <p>
1286 * In processes participating in the new communicator, the new communicator
1287 * is returned. The participating processes appear in the same order by rank
1288 * in the new communicator as in this communicator. The process can call the
1289 * new communicator's <code>rank()</code> method to determine the process's rank
1290 * in the new communicator.
1291 * <p>
1292 * In processes not participating in the new communicator, null is returned.
1293 *
1294 * @param participate True if this process will participate in the new
1295 * communicator; false otherwise.
1296 * @return New communicator if this process will participate in the new
1297 * communicator; null otherwise.
1298 * @throws java.io.IOException Thrown if an I/O error occurred.
1299 */
1300 public Comm createComm(boolean participate)
1301 throws IOException {
1302 return createComm(participate, 0);
1303 }
1304
1305 /**
1306 * Create a new communicator. <I>Every</I> process in this communicator must
1307 * call the <code>createComm()</code> method. Each process passes true or false
1308 * for the <code>participate</code> argument to state whether the process will
1309 * participate in the new communicator. At least one process must
1310 * participate in the new communicator. Messages to set up the new
1311 * communicator are sent to all processes in this communicator, using the
1312 * given message tag.
1313 * <p>
1314 * In processes participating in the new communicator, the new communicator
1315 * is returned. The participating processes appear in the same order by rank
1316 * in the new communicator as in this communicator. The process can call the
1317 * new communicator's <code>rank()</code> method to determine the process's rank
1318 * in the new communicator.
1319 * <p>
1320 * In processes not participating in the new communicator, null is returned.
1321 *
1322 * @param participate True if this process will participate in the new
1323 * communicator; false otherwise.
1324 * @param tag Message tag.
1325 * @return New communicator if this process will participate in the new
1326 * communicator; null otherwise.
1327 * @throws java.io.IOException Thrown if an I/O error occurred.
1328 */
1329 public Comm createComm(boolean participate,
1330 int tag)
1331 throws IOException {
1332 // Set up array of socket addresses for all processes.
1333 InetSocketAddress[] address = new InetSocketAddress[mySize];
1334 ObjectBuf<InetSocketAddress>[] addressbuf
1335 = ObjectBuf.sliceBuffers(address,
1336 new Range(0, mySize - 1).subranges(mySize));
1337
1338 // Create channel group for new communicator, if participating.
1339 ChannelGroup channelgroup = null;
1340 InetSocketAddress myaddress = null;
1341 if (participate) {
1342 channelgroup
1343 = new ChannelGroup(new InetSocketAddress(myChannelGroup.listenAddress().getAddress(), 0));
1344 myaddress = channelgroup.listenAddress();
1345 address[myRank] = myaddress;
1346 }
1347
1348 // All-gather channel group socket addresses into every process.
1349 allGather(tag, addressbuf[myRank], addressbuf);
1350
1351 // Close up gaps in the socket address array if any.
1352 int off = 0;
1353 int newsize = 0;
1354 int newrank = -1;
1355 for (int i = 0; i < mySize; ++i) {
1356 if (address[i] == null) {
1357 ++off;
1358 } else {
1359 if (i == myRank) {
1360 newrank = i - off;
1361 }
1362 address[i - off] = address[i];
1363 ++newsize;
1364 }
1365 }
1366
1367 // Verify size of new communicator.
1368 if (newsize == 0) {
1369 throw new IOException("Comm.createComm(): No processes in communicator");
1370 }
1371
1372 // Return new communicator if participating.
1373 if (participate) {
1374 return new Comm(newsize, newrank, myHost, channelgroup, address);
1375 } // Return null if not participating.
1376 else {
1377 return null;
1378 }
1379 }
1380
1381 /**
1382 * Send a message to the process at the given rank in this communicator. The
1383 * message uses a tag of 0. The message items come from the given buffer. To
1384 * receive the message, the destination process must call the
1385 * <code>receive()</code> method. When the <code>send()</code> method returns, the
1386 * message has been fully sent, but it may not yet have been fully received.
1387 * <p>
1388 * A process can send a message to itself; in this case a different thread
1389 * must call the <code>receive()</code> method on this communicator.
1390 *
1391 * @param toRank Destination process's rank in this communicator.
1392 * @param buffer Buffer of data items to be sent.
1393 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1394 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1395 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1396 * <code>buffer</code> is null.
1397 * @throws java.io.IOException Thrown if an I/O error occurred.
1398 */
1399 public void send(int toRank,
1400 Buf buffer)
1401 throws IOException {
1402 send(toRank, 0, buffer);
1403 }
1404
1405 /**
1406 * Send a message to the process at the given rank in this communicator with
1407 * the given message tag. The message items come from the given buffer. To
1408 * receive the message, the destination process must call the
1409 * <code>receive()</code> method. When the <code>send()</code> method returns, the
1410 * message has been fully sent, but it may not yet have been fully received.
1411 * <p>
1412 * A process can send a message to itself; in this case a different thread
1413 * must call the <code>receive()</code> method on this communicator.
1414 *
1415 * @param toRank Destination process's rank in this communicator.
1416 * @param tag Message tag.
1417 * @param buffer Buffer of data items to be sent.
1418 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1419 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1420 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1421 * <code>buffer</code> is null.
1422 * @throws java.io.IOException Thrown if an I/O error occurred.
1423 */
1424 public void send(int toRank,
1425 int tag,
1426 Buf buffer)
1427 throws IOException {
1428 myChannelGroup.send(getChannel(toRank), tag, buffer);
1429 }
1430
1431 /**
1432 * Send a message to the process at the given rank in this communicator
1433 * (non-blocking). A message tag of 0 is used. The message items come from
1434 * the given buffer. To receive the message, the destination process must
1435 * call the <code>receive()</code> method.
1436 * <p>
1437 * The <code>send()</code> method initiates the send operation and immediately
1438 * returns a {@linkplain CommRequest} object. The send operation is
1439 * performed by a separate thread. To wait for the send operation to finish,
1440 * call the returned {@linkplain CommRequest} object's
1441 * <code>waitForFinish()</code> method. When that method returns, the message
1442 * has been fully sent, but it may not yet have been fully received.
1443 * <p>
1444 * A process can send a message to itself; in this case a different thread
1445 * must call the <code>receive()</code> method on this communicator.
1446 *
1447 * @param toRank Destination process's rank in this communicator.
1448 * @param buffer Buffer of data items to be sent.
1449 * @param request CommRequest object to use to wait for the operation to
1450 * finish; in this case <code>request</code> is returned. If
1451 * <code>request</code> is null, a new CommRequest object is created and
1452 * returned.
1453 * @return CommRequest object to use to wait for the operation to finish.
1454 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1455 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1456 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1457 * <code>buffer</code> is null.
1458 * @throws java.io.IOException Thrown if an I/O error occurred.
1459 */
1460 public CommRequest send(int toRank,
1461 Buf buffer,
1462 CommRequest request)
1463 throws IOException {
1464 return send(toRank, 0, buffer, request);
1465 }
1466
1467 /**
1468 * Send a message to the process at the given rank in this communicator with
1469 * the given message tag (non-blocking). The message items come from the
1470 * given buffer. To receive the message, the destination process must call
1471 * the <code>receive()</code> method.
1472 * <p>
1473 * The <code>send()</code> method initiates the send operation and immediately
1474 * returns a {@linkplain CommRequest} object. The send operation is
1475 * performed by a separate thread. To wait for the send operation to finish,
1476 * call the returned {@linkplain CommRequest} object's
1477 * <code>waitForFinish()</code> method. When that method returns, the message
1478 * has been fully sent, but it may not yet have been fully received.
1479 * <p>
1480 * A process can send a message to itself; in this case a different thread
1481 * must call the <code>receive()</code> method on this communicator.
1482 *
1483 * @param toRank Destination process's rank in this communicator.
1484 * @param tag Message tag.
1485 * @param buffer Buffer of data items to be sent.
1486 * @param request CommRequest object to use to wait for the operation to
1487 * finish; in this case <code>request</code> is returned. If
1488 * <code>request</code> is null, a new CommRequest object is created and
1489 * returned.
1490 * @return CommRequest object to use to wait for the operation to finish.
1491 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1492 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1493 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1494 * <code>buffer</code> is null.
1495 * @throws java.io.IOException Thrown if an I/O error occurred.
1496 */
1497 public CommRequest send(int toRank,
1498 int tag,
1499 Buf buffer,
1500 CommRequest request)
1501 throws IOException {
1502 // Set up CommRequest object.
1503 CommRequest req = request == null ? new CommRequest() : request;
1504
1505 // Send message (non-blocking).
1506 req.mySendRequest = new IORequest();
1507 req.myRecvRequest = null;
1508 myChannelGroup.sendNoWait(getChannel(toRank), tag, buffer, req.mySendRequest);
1509
1510 // Return CommRequest object.
1511 return req;
1512 }
1513
1514 /**
1515 * Receive a message from the process at the given rank in this
1516 * communicator. If <code>rank</code> is null, a message will be received from
1517 * any process in this communicator. The message must have a tag of 0. The
1518 * received message items are stored in the given buffer. To send the
1519 * message, the source process must call the <code>send()</code> method. When
1520 * the <code>receive()</code> method returns, the message has been fully
1521 * received.
1522 * <p>
1523 * A {@linkplain CommStatus} object is returned. The status object gives the
1524 * actual rank of the process that sent the message, the actual message tag
1525 * that was received, and the actual number of data items in the message. If
1526 * the actual number of data items in the message is less than the length of
1527 * the buffer, nothing is stored into the extra data items at the end of the
1528 * buffer. If the actual number of data items in the message is greater than
1529 * the length of the buffer, the extra data items at the end of the message
1530 * are discarded.
1531 * <p>
1532 * A process can receive a message from itself; in this case a different
1533 * thread must call the <code>send()</code> method on this communicator.
1534 *
1535 * @param fromRank Source process's rank in this communicator, or null to
1536 * receive from any process.
1537 * @param buffer Buffer of data items to be received.
1538 * @return Status object giving the outcome of the message reception.
1539 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1540 * <code>fromRank</code> is not null and is not in the range 0 ..
1541 * <code>size()</code>-1.
1542 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1543 * <code>buffer</code> is null.
1544 * @throws java.io.IOException Thrown if an I/O error occurred.
1545 */
1546 public CommStatus receive(Integer fromRank,
1547 Buf buffer)
1548 throws IOException {
1549 return receive(fromRank, 0, buffer);
1550 }
1551
1552 /**
1553 * Receive a message from the process at the given rank in this communicator
1554 * with the given message tag. If <code>rank</code> is null, a message will be
1555 * received from any process in this communicator. The received message
1556 * items are stored in the given buffer. To send the message, the source
1557 * process must call the <code>send()</code> method. When the <code>receive()</code>
1558 * method returns, the message has been fully received.
1559 * <p>
1560 * A {@linkplain CommStatus} object is returned. The status object gives the
1561 * actual rank of the process that sent the message, the actual message tag
1562 * that was received, and the actual number of data items in the message. If
1563 * the actual number of data items in the message is less than the length of
1564 * the buffer, nothing is stored into the extra data items at the end of the
1565 * buffer. If the actual number of data items in the message is greater than
1566 * the length of the buffer, the extra data items at the end of the message
1567 * are discarded.
1568 * <p>
1569 * A process can receive a message from itself; in this case a different
1570 * thread must call the <code>send()</code> method on this communicator.
1571 *
1572 * @param fromRank Source process's rank in this communicator, or null to
1573 * receive from any process.
1574 * @param tag Message tag.
1575 * @param buffer Buffer of data items to be received.
1576 * @return Status object giving the outcome of the message reception.
1577 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1578 * <code>fromRank</code> is not null and is not in the range 0 ..
1579 * <code>size()</code>-1.
1580 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1581 * <code>buffer</code> is null.
1582 * @throws java.io.IOException Thrown if an I/O error occurred.
1583 */
1584 public CommStatus receive(Integer fromRank,
1585 int tag,
1586 Buf buffer)
1587 throws IOException {
1588 Status status;
1589
1590 // If source is a wildcard, ensure a channel to every process, then
1591 // receive from any process.
1592 if (fromRank == null) {
1593 for (int src = 0; src < mySize; ++src) {
1594 ensureChannel(src);
1595 }
1596 status = myChannelGroup.receive(null, tag, buffer);
1597 } // If source is not a wildcard, receive from that process.
1598 else {
1599 status
1600 = myChannelGroup.receive(getChannel(fromRank), tag, buffer);
1601 }
1602
1603 // Return CommStatus object.
1604 return new CommStatus(getFarRank(status.channel),
1605 status.tag,
1606 status.length);
1607 }
1608
1609 /**
1610 * Receive a message from the process at the given rank in this communicator
1611 * with the given message tag range. If <code>rank</code> is null, a message
1612 * will be received from any process in this communicator. If
1613 * <code>tagRange</code> is null, a message will be received with any tag. If
1614 * <code>tagRange</code> is not null, a message will be received with any tag in
1615 * the given range. The received message items are stored in the given
1616 * buffer. To send the message, the source process must call the
1617 * <code>send()</code> method. When the <code>receive()</code> method returns, the
1618 * message has been fully received.
1619 * <p>
1620 * A {@linkplain CommStatus} object is returned. The status object gives the
1621 * actual rank of the process that sent the message, the actual message tag
1622 * that was received, and the actual number of data items in the message. If
1623 * the actual number of data items in the message is less than the length of
1624 * the buffer, nothing is stored into the extra data items at the end of the
1625 * buffer. If the actual number of data items in the message is greater than
1626 * the length of the buffer, the extra data items at the end of the message
1627 * are discarded.
1628 * <p>
1629 * A process can receive a message from itself; in this case a different
1630 * thread must call the <code>send()</code> method on this communicator.
1631 *
1632 * @param fromRank Source process's rank in this communicator, or null to
1633 * receive from any process.
1634 * @param tagRange Message tag range, or null to receive any tag.
1635 * @param buffer Buffer of data items to be received.
1636 * @return Status object giving the outcome of the message reception.
1637 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1638 * <code>fromRank</code> is not null and is not in the range 0 ..
1639 * <code>size()</code>-1.
1640 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1641 * <code>buffer</code> is null.
1642 * @throws java.io.IOException Thrown if an I/O error occurred.
1643 */
1644 public CommStatus receive(Integer fromRank,
1645 Range tagRange,
1646 Buf buffer)
1647 throws IOException {
1648 Status status;
1649
1650 // If source is a wildcard, ensure a channel to every process, then
1651 // receive from any process.
1652 if (fromRank == null) {
1653 for (int src = 0; src < mySize; ++src) {
1654 ensureChannel(src);
1655 }
1656 status = myChannelGroup.receive(null, tagRange, buffer);
1657 } // If source is not a wildcard, receive from that process.
1658 else {
1659 status = myChannelGroup.receive(getChannel(fromRank), tagRange, buffer);
1660 }
1661
1662 // Return CommStatus object.
1663 return new CommStatus(getFarRank(status.channel),
1664 status.tag,
1665 status.length);
1666 }
1667
1668 /**
1669 * Receive a message from the process at the given rank in this communicator
1670 * (non-blocking). If <code>rank</code> is null, a message will be received from
1671 * any process in this communicator. The message must have a tag of 0. The
1672 * received message items are stored in the given buffer. To send the
1673 * message, the source process must call the <code>send()</code> method.
1674 * <p>
1675 * The <code>receive()</code> method initiates the receive operation and
1676 * immediately returns a {@linkplain CommRequest} object. The receive
1677 * operation is performed by a separate thread. To wait for the receive
1678 * operation to finish, call the returned {@linkplain CommRequest} object's
1679 * <code>waitForFinish()</code> method. When that method returns, the incoming
1680 * message items have been fully received.
1681 * <p>
1682 * A process can receive a message from itself; in this case a different
1683 * thread must call the <code>send()</code> method on this communicator.
1684 *
1685 * @param fromRank Source process's rank in this communicator, or null to
1686 * receive from any process.
1687 * @param buffer Buffer of data items to be received.
1688 * @param request CommRequest object to use to wait for the operation to
1689 * finish; in this case <code>request</code> is returned. If
1690 * <code>request</code> is null, a new CommRequest object is created and
1691 * returned.
1692 * @return CommRequest object to use to wait for the operation to finish.
1693 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1694 * <code>fromRank</code> is not null and is not in the range 0 ..
1695 * <code>size()</code>-1.
1696 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1697 * <code>buffer</code> is null.
1698 * @throws java.io.IOException Thrown if an I/O error occurred.
1699 */
1700 public CommRequest receive(Integer fromRank,
1701 Buf buffer,
1702 CommRequest request)
1703 throws IOException {
1704 return receive(fromRank, 0, buffer, request);
1705 }
1706
1707 /**
1708 * Receive a message from the process at the given rank in this communicator
1709 * with the given message tag (non-blocking). If <code>rank</code> is null, a
1710 * message will be received from any process in this communicator. If
1711 * <code>tag</code> is null, a message will be received with any tag. The
1712 * received message items are stored in the given buffer. To send the
1713 * message, the source process must call the <code>send()</code> method.
1714 * <p>
1715 * The <code>receive()</code> method initiates the receive operation and
1716 * immediately returns a {@linkplain CommRequest} object. The receive
1717 * operation is performed by a separate thread. To wait for the receive
1718 * operation to finish, call the returned {@linkplain CommRequest} object's
1719 * <code>waitForFinish()</code> method. When that method returns, the incoming
1720 * message items have been fully received.
1721 * <p>
1722 * A process can receive a message from itself; in this case a different
1723 * thread must call the <code>send()</code> method on this communicator.
1724 *
1725 * @param fromRank Source process's rank in this communicator, or null to
1726 * receive from any process.
1727 * @param tag Message tag.
1728 * @param buffer Buffer of data items to be received.
1729 * @param request CommRequest object to use to wait for the operation to
1730 * finish; in this case <code>request</code> is returned. If
1731 * <code>request</code> is null, a new CommRequest object is created and
1732 * returned.
1733 * @return CommRequest object to use to wait for the operation to finish.
1734 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1735 * <code>fromRank</code> is not null and is not in the range 0 ..
1736 * <code>size()</code>-1.
1737 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1738 * <code>buffer</code> is null.
1739 * @throws java.io.IOException Thrown if an I/O error occurred.
1740 */
1741 public CommRequest receive(Integer fromRank,
1742 int tag,
1743 Buf buffer,
1744 CommRequest request)
1745 throws IOException {
1746 // Set up CommRequest object.
1747 CommRequest req = request == null ? new CommRequest() : request;
1748 req.mySendRequest = null;
1749 req.myRecvRequest = new IORequest();
1750
1751 // If source is a wildcard, ensure a channel to every process, then
1752 // receive (non-blocking) from any process.
1753 if (fromRank == null) {
1754 for (int src = 0; src < mySize; ++src) {
1755 ensureChannel(src);
1756 }
1757 myChannelGroup.receiveNoWait(null, tag, buffer, req.myRecvRequest);
1758 } // If source is not a wildcard, receive (non-blocking) from that
1759 // process.
1760 else {
1761 myChannelGroup.receiveNoWait(getChannel(fromRank), tag, buffer, req.myRecvRequest);
1762 }
1763
1764 // Return CommRequest object.
1765 return req;
1766 }
1767
1768 /**
1769 * Receive a message from the process at the given rank in this communicator
1770 * with the given message tag range (non-blocking). If <code>rank</code> is
1771 * null, a message will be received from any process in this communicator.
1772 * If <code>tagRange</code> is null, a message will be received with any tag. If
1773 * <code>tagRange</code> is not null, a message will be received with any tag in
1774 * the given range. The received message items are stored in the given
1775 * buffer. To send the message, the source process must call the
1776 * <code>send()</code> method.
1777 * <p>
1778 * The <code>receive()</code> method initiates the receive operation and
1779 * immediately returns a {@linkplain CommRequest} object. The receive
1780 * operation is performed by a separate thread. To wait for the receive
1781 * operation to finish, call the returned {@linkplain CommRequest} object's
1782 * <code>waitForFinish()</code> method. When that method returns, the incoming
1783 * message items have been fully received.
1784 * <p>
1785 * A process can receive a message from itself; in this case a different
1786 * thread must call the <code>send()</code> method on this communicator.
1787 *
1788 * @param fromRank Source process's rank in this communicator, or null to
1789 * receive from any process.
1790 * @param tagRange Message tag range, or null to receive any tag.
1791 * @param buffer Buffer of data items to be received.
1792 * @param request CommRequest object to use to wait for the operation to
1793 * finish; in this case <code>request</code> is returned. If
1794 * <code>request</code> is null, a new CommRequest object is created and
1795 * returned.
1796 * @return CommRequest object to use to wait for the operation to finish.
1797 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1798 * <code>fromRank</code> is not null and is not in the range 0 ..
1799 * <code>size()</code>-1.
1800 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1801 * <code>buffer</code> is null.
1802 * @throws java.io.IOException Thrown if an I/O error occurred.
1803 */
1804 public CommRequest receive(Integer fromRank,
1805 Range tagRange,
1806 Buf buffer,
1807 CommRequest request)
1808 throws IOException {
1809 // Set up CommRequest object.
1810 CommRequest req = request == null ? new CommRequest() : request;
1811 req.mySendRequest = null;
1812 req.myRecvRequest = new IORequest();
1813
1814 // If source is a wildcard, ensure a channel to every process, then
1815 // receive (non-blocking) from any process.
1816 if (fromRank == null) {
1817 for (int src = 0; src < mySize; ++src) {
1818 ensureChannel(src);
1819 }
1820 myChannelGroup.receiveNoWait(null, tagRange, buffer, req.myRecvRequest);
1821 } // If source is not a wildcard, receive (non-blocking) from that
1822 // process.
1823 else {
1824 myChannelGroup.receiveNoWait(getChannel(fromRank), tagRange, buffer, req.myRecvRequest);
1825 }
1826
1827 // Return CommRequest object.
1828 return req;
1829 }
1830
1831 /**
1832 * Send a message to the process at the given rank in this communicator, and
1833 * receive a message from the process at the given rank in this
1834 * communicator. A message tag of 0 is used. The outgoing message items come
1835 * from the buffer <code>sendbuf</code>. The incoming message items go into the
1836 * buffer <code>recvbuf</code>. The outgoing message items must come from a
1837 * different place than where the incoming message items will be stored. The
1838 * destination process (process <code>toRank</code>) must call a method to
1839 * receive this process's outgoing message items. The source process
1840 * (process <code>fromRank</code>) must call a method to send this process's
1841 * incoming message items. When the <code>sendReceive()</code> method returns,
1842 * the outgoing message items have been fully sent, but they may not yet
1843 * have been fully received; and the incoming message items have been fully
1844 * received.
1845 * <p>
1846 * A {@linkplain CommStatus} object is returned giving the results of the
1847 * receive half of the operation. The status object gives the rank of the
1848 * process that sent the incoming message, the message tag that was
1849 * received, and the actual number of data items in the message. If the
1850 * actual number of data items in the message is less than the length of the
1851 * receive buffer, nothing is stored into the extra data items at the end of
1852 * the receive buffer. If the actual number of data items in the message is
1853 * greater than the length of the receive buffer, the extra data items at
1854 * the end of the message are discarded.
1855 * <p>
1856 * A process can send-receive messages with itself; in this case a different
1857 * thread must call the <code>sendReceive()</code> method on this communicator.
1858 *
1859 * @param toRank Destination process's rank in this communicator.
1860 * @param sendBuf Buffer of data items to be sent.
1861 * @param fromRank Source process's rank in this communicator.
1862 * @param recvBuf Buffer of data items to be received.
1863 * @return Status object giving the outcome of the message reception.
1864 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1865 * <code>toRank</code> or <code>fromRank</code>
1866 * is not in the range 0 .. <code>size()</code>-1.
1867 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1868 * <code>sendBuf</code> or <code>recvBuf</code>
1869 * is null.
1870 * @throws java.io.IOException Thrown if an I/O error occurred.
1871 */
1872 public CommStatus sendReceive(int toRank,
1873 Buf sendBuf,
1874 int fromRank,
1875 Buf recvBuf)
1876 throws IOException {
1877 return sendReceive(toRank, 0, sendBuf, fromRank, 0, recvBuf);
1878 }
1879
1880 /**
1881 * Send a message to the process at the given rank in this communicator with
1882 * the given message tag, and receive a message from the process at the
1883 * given rank in this communicator with the given message tag. The outgoing
1884 * message items come from the buffer <code>sendbuf</code>. The incoming message
1885 * items go into the buffer <code>recvbuf</code>. The outgoing message items
1886 * must come from a different place than where the incoming message items
1887 * will be stored. The destination process (process <code>toRank</code>) must
1888 * call a method to receive this process's outgoing message items. The
1889 * source process (process <code>fromRank</code>) must call a method to send
1890 * this process's incoming message items. When the <code>sendReceive()</code>
1891 * method returns, the outgoing message items have been fully sent, but they
1892 * may not yet have been fully received; and the incoming message items have
1893 * been fully received.
1894 * <p>
1895 * A {@linkplain CommStatus} object is returned giving the results of the
1896 * receive half of the operation. The status object gives the rank of the
1897 * process that sent the incoming message, the message tag that was
1898 * received, and the actual number of data items in the message. If the
1899 * actual number of data items in the message is less than the length of the
1900 * receive buffer, nothing is stored into the extra data items at the end of
1901 * the receive buffer. If the actual number of data items in the message is
1902 * greater than the length of the receive buffer, the extra data items at
1903 * the end of the message are discarded.
1904 * <p>
1905 * A process can send-receive messages with itself; in this case a different
1906 * thread must call the <code>sendReceive()</code> method on this communicator.
1907 *
1908 * @param toRank Destination process's rank in this communicator.
1909 * @param sendTag Message tag for outgoing message.
1910 * @param sendBuf Buffer of data items to be sent.
1911 * @param fromRank Source process's rank in this communicator.
1912 * @param recvTag Message tag for incoming message.
1913 * @param recvBuf Buffer of data items to be received.
1914 * @return Status object giving the outcome of the message reception.
1915 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1916 * <code>toRank</code> or <code>fromRank</code>
1917 * is not in the range 0 .. <code>size()</code>-1.
1918 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1919 * <code>sendBuf</code> or <code>recvBuf</code>
1920 * is null.
1921 * @throws java.io.IOException Thrown if an I/O error occurred.
1922 */
1923 public CommStatus sendReceive(int toRank,
1924 int sendTag,
1925 Buf sendBuf,
1926 int fromRank,
1927 int recvTag,
1928 Buf recvBuf)
1929 throws IOException {
1930 // Send the outgoing message (non-blocking).
1931 IORequest sendRequest = new IORequest();
1932 myChannelGroup.sendNoWait(getChannel(toRank), sendTag, sendBuf, sendRequest);
1933
1934 // Receive the outgoing message (non-blocking).
1935 IORequest recvRequest = new IORequest();
1936 myChannelGroup.receiveNoWait(getChannel(fromRank), recvTag, recvBuf, recvRequest);
1937
1938 // Wait for both messages to finish.
1939 sendRequest.waitForFinish();
1940 Status status = recvRequest.waitForFinish();
1941
1942 return new CommStatus(getFarRank(status.channel),
1943 status.tag,
1944 status.length);
1945 }
1946
1947 /**
1948 * Send a message to the process at the given rank in this communicator, and
1949 * receive a message from the process at the given rank in this communicator
1950 * (non-blocking). A message tag of 0 is used. The outgoing message items
1951 * come from the buffer <code>sendbuf</code>. The incoming message items go into
1952 * the buffer <code>recvbuf</code>. The outgoing message items must come from a
1953 * different place than where the incoming message items will be stored. The
1954 * destination process (process <code>toRank</code>) must call a method to
1955 * receive this process's outgoing message items. The source process
1956 * (process <code>fromRank</code>) must call a method to send this process's
1957 * incoming message items.
1958 * <p>
1959 * The <code>sendReceive()</code> method initiates the send and receive
1960 * operations and immediately returns a {@linkplain CommRequest} object. The
1961 * send and receive operations are performed by a separate thread. To wait
1962 * for the send and receive operations to finish, call the returned
1963 * {@linkplain CommRequest} object's <code>waitForFinish()</code> method. When
1964 * that method returns, the outgoing message items have been fully sent, but
1965 * they may not yet have been fully received; and the incoming message items
1966 * have been fully received.
1967 * <p>
1968 * A process can send-receive messages with itself; in this case a different
1969 * thread must call the <code>sendReceive()</code> method on this communicator.
1970 *
1971 * @param toRank Destination process's rank in this communicator.
1972 * @param sendBuf Buffer of data items to be sent.
1973 * @param fromRank Source process's rank in this communicator.
1974 * @param recvBuf Buffer of data items to be received.
1975 * @param request CommRequest object to use to wait for the operation to
1976 * finish; in this case <code>request</code> is returned. If
1977 * <code>request</code> is null, a new CommRequest object is created and
1978 * returned.
1979 * @return CommRequest object to use to wait for the operation to finish.
1980 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1981 * <code>toRank</code> or <code>fromRank</code>
1982 * is not in the range 0 .. <code>size()</code>-1.
1983 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
1984 * <code>sendBuf</code> or <code>recvBuf</code>
1985 * is null.
1986 * @throws java.io.IOException Thrown if an I/O error occurred.
1987 */
1988 public CommRequest sendReceive(int toRank,
1989 Buf sendBuf,
1990 int fromRank,
1991 Buf recvBuf,
1992 CommRequest request)
1993 throws IOException {
1994 return sendReceive(toRank, 0, sendBuf, fromRank, 0, recvBuf, request);
1995 }
1996
1997 /**
1998 * Send a message to the process at the given rank in this communicator with
1999 * the given message tag, and receive a message from the process at the
2000 * given rank in this communicator with the given message tag
2001 * (non-blocking). The outgoing message items come from the buffer
2002 * <code>sendbuf</code>. The incoming message items go into the buffer
2003 * <code>recvbuf</code>. The outgoing message items must come from a different
2004 * place than where the incoming message items will be stored. The
2005 * destination process (process <code>toRank</code>) must call a method to
2006 * receive this process's outgoing message items. The source process
2007 * (process <code>fromRank</code>) must call a method to send this process's
2008 * incoming message items.
2009 * <p>
2010 * The <code>sendReceive()</code> method initiates the send and receive
2011 * operations and immediately returns a {@linkplain CommRequest} object. The
2012 * send and receive operations are performed by a separate thread. To wait
2013 * for the send and receive operations to finish, call the returned
2014 * {@linkplain CommRequest} object's <code>waitForFinish()</code> method. When
2015 * that method returns, the outgoing message items have been fully sent, but
2016 * they may not yet have been fully received; and the incoming message items
2017 * have been fully received.
2018 * <p>
2019 * A process can send-receive messages with itself; in this case a different
2020 * thread must call the <code>sendReceive()</code> method on this communicator.
2021 *
2022 * @param toRank Destination process's rank in this communicator.
2023 * @param sendTag Message tag for outgoing message.
2024 * @param sendBuf Buffer of data items to be sent.
2025 * @param fromRank Source process's rank in this communicator.
2026 * @param recvTag Message tag for incoming message.
2027 * @param recvBuf Buffer of data items to be received.
2028 * @param request CommRequest object to use to wait for the operation to
2029 * finish; in this case <code>request</code> is returned. If
2030 * <code>request</code> is null, a new CommRequest object is created and
2031 * returned.
2032 * @return CommRequest object to use to wait for the operation to finish.
2033 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2034 * <code>toRank</code> or <code>fromRank</code>
2035 * is not in the range 0 .. <code>size()</code>-1.
2036 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2037 * <code>sendBuf</code> or <code>recvBuf</code>
2038 * is null.
2039 * @throws java.io.IOException Thrown if an I/O error occurred.
2040 */
2041 public CommRequest sendReceive(int toRank,
2042 int sendTag,
2043 Buf sendBuf,
2044 int fromRank,
2045 int recvTag,
2046 Buf recvBuf,
2047 CommRequest request)
2048 throws IOException {
2049 // Set up CommRequest object.
2050 CommRequest req = request == null ? new CommRequest() : request;
2051
2052 // Send the outgoing message (non-blocking).
2053 req.mySendRequest = new IORequest();
2054 myChannelGroup.sendNoWait(getChannel(toRank), sendTag, sendBuf, req.mySendRequest);
2055
2056 // Receive the outgoing message (non-blocking).
2057 req.myRecvRequest = new IORequest();
2058 myChannelGroup.receiveNoWait(getChannel(fromRank), recvTag, recvBuf, req.myRecvRequest);
2059
2060 // Return CommRequest object.
2061 return req;
2062 }
2063
2064 /**
2065 * Flood-send a message to all processes in this communicator. The message
2066 * uses a tag of 0. The message items come from the given buffer. To receive
2067 * the message, every process (including the sending process) must call the
2068 * <code>floodReceive()</code> method. When the <code>floodSend()</code> method
2069 * returns, the message has been fully sent, but it may not yet have been
2070 * fully received in all processes.
2071 * <p>
2072 * <I>Note:</I> The length of the incoming buffer in the
2073 * <code>floodReceive()</code> method call must be the same as the length of the
2074 * outgoing buffer in the <code>floodSend()</code> method call.
2075 *
2076 * @param buffer Buffer of data items to be sent.
2077 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2078 * <code>buffer</code> is null.
2079 * @throws java.io.IOException Thrown if an I/O error occurred.
2080 */
2081 public void floodSend(Buf buffer)
2082 throws IOException {
2083 floodSend(0, buffer, null).waitForFinish();
2084 }
2085
2086 /**
2087 * Flood-send a message to all processes in this communicator with the given
2088 * message tag. The message items come from the given buffer. To receive the
2089 * message, every process (including the sending process) must call the
2090 * <code>floodReceive()</code> method. When the <code>floodSend()</code> method
2091 * returns, the message has been fully sent, but it may not yet have been
2092 * fully received in all processes.
2093 * <p>
2094 * <I>Note:</I> The length of the incoming buffer in the
2095 * <code>floodReceive()</code> method call must be the same as the length of the
2096 * outgoing buffer in the <code>floodSend()</code> method call.
2097 *
2098 * @param tag Message tag.
2099 * @param buffer Buffer of data items to be sent.
2100 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2101 * <code>buffer</code> is null.
2102 * @throws java.io.IOException Thrown if an I/O error occurred.
2103 */
2104 public void floodSend(int tag,
2105 Buf buffer)
2106 throws IOException {
2107 floodSend(tag, buffer, null).waitForFinish();
2108 }
2109
2110 /**
2111 * Flood-send a message to all processes in this communicator
2112 * (non-blocking). A message tag of 0 is used. The message items come from
2113 * the given buffer. To receive the message, every process (including the
2114 * sending process) must call the <code>floodReceive()</code> method.
2115 * <p>
2116 * The <code>floodSend()</code> method initiates the flood-send operation and
2117 * immediately returns a {@linkplain CommRequest} object. The flood-send
2118 * operation is performed by a separate thread. To wait for the flood-send
2119 * operation to finish, call the returned {@linkplain CommRequest} object's
2120 * <code>waitForFinish()</code> method. When that method returns, the message
2121 * has been fully sent, but it may not yet have been fully received in all
2122 * processes.
2123 * <p>
2124 * <I>Note:</I> The length of the incoming buffer in the
2125 * <code>floodReceive()</code> method call must be the same as the length of the
2126 * outgoing buffer in the <code>floodSend()</code> method call.
2127 *
2128 * @param buffer Buffer of data items to be sent.
2129 * @param request CommRequest object to use to wait for the operation to
2130 * finish; in this case <code>request</code> is returned. If
2131 * <code>request</code> is null, a new CommRequest object is created and
2132 * returned.
2133 * @return CommRequest object to use to wait for the operation to finish.
2134 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2135 * <code>buffer</code> is null.
2136 * @throws java.io.IOException Thrown if an I/O error occurred.
2137 */
2138 public CommRequest floodSend(Buf buffer,
2139 CommRequest request)
2140 throws IOException {
2141 return floodSend(0, buffer, request);
2142 }
2143
2144 /**
2145 * Flood-send a message to all processes in this communicator with the given
2146 * message tag (non-blocking). The message items come from the given buffer.
2147 * To receive the message, every process (including the sending process)
2148 * must call the <code>floodReceive()</code> method.
2149 * <p>
2150 * The <code>floodSend()</code> method initiates the flood-send operation and
2151 * immediately returns a {@linkplain CommRequest} object. The flood-send
2152 * operation is performed by a separate thread. To wait for the flood-send
2153 * operation to finish, call the returned {@linkplain CommRequest} object's
2154 * <code>waitForFinish()</code> method. When that method returns, the message
2155 * has been fully sent, but it may not yet have been fully received in all
2156 * processes.
2157 * <p>
2158 * <I>Note:</I> The length of the incoming buffer in the
2159 * <code>floodReceive()</code> method call must be the same as the length of the
2160 * outgoing buffer in the <code>floodSend()</code> method call.
2161 *
2162 * @param tag Message tag.
2163 * @param buffer Buffer of data items to be sent.
2164 * @param request CommRequest object to use to wait for the operation to
2165 * finish; in this case <code>request</code> is returned. If
2166 * <code>request</code> is null, a new CommRequest object is created and
2167 * returned.
2168 * @return CommRequest object to use to wait for the operation to finish.
2169 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2170 * <code>buffer</code> is null.
2171 * @throws java.io.IOException Thrown if an I/O error occurred.
2172 */
2173 public CommRequest floodSend(int tag,
2174 Buf buffer,
2175 CommRequest request)
2176 throws IOException {
2177 // Set up CommRequest object.
2178 CommRequest req = request == null ? new CommRequest() : request;
2179 req.mySendRequest = new IORequest();
2180 req.myRecvRequest = null;
2181
2182 // Send data to process 0. Process 0's flood-receive I/O request object
2183 // will forward the data to all the processes.
2184 myChannelGroup.sendNoWait(getChannel(0), tag, buffer, req.mySendRequest);
2185
2186 // Return CommRequest object.
2187 return req;
2188 }
2189
2190 /**
2191 * Flood-receive a message from any process in this communicator. The
2192 * message must have a tag of 0. The received message items are stored in
2193 * the given buffer. To send the message, the source process must call the
2194 * <code>floodSend()</code> method. When the <code>floodReceive()</code> method
2195 * returns, the message has been fully received.
2196 * <p>
2197 * A {@linkplain CommStatus} object is returned. The status object gives the
2198 * actual rank of the process that sent the message, the actual message tag
2199 * that was received, and the actual number of data items in the message.
2200 * <p>
2201 * <I>Note:</I> The length of the incoming buffer in the
2202 * <code>floodReceive()</code> method call must be the same as the length of the
2203 * outgoing buffer in the <code>floodSend()</code> method call.
2204 *
2205 * @param buffer Buffer of data items to be received.
2206 * @return Status object giving the outcome of the message reception.
2207 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2208 * <code>buffer</code> is null.
2209 * @throws java.io.IOException Thrown if an I/O error occurred.
2210 */
2211 public CommStatus floodReceive(Buf buffer)
2212 throws IOException {
2213 return floodReceive(0, buffer, null).waitForFinish();
2214 }
2215
2216 /**
2217 * Flood-receive a message from any process in this communicator with the
2218 * given message tag. If <code>tag</code> is null, a message will be received
2219 * with any tag. The received message items are stored in the given buffer.
2220 * To send the message, the source process must call the
2221 * <code>floodSend()</code> method. When the <code>floodReceive()</code> method
2222 * returns, the message has been fully received.
2223 * <p>
2224 * A {@linkplain CommStatus} object is returned. The status object gives the
2225 * actual rank of the process that sent the message, the actual message tag
2226 * that was received, and the actual number of data items in the message.
2227 * <p>
2228 * <I>Note:</I> The length of the incoming buffer in the
2229 * <code>floodReceive()</code> method call must be the same as the length of the
2230 * outgoing buffer in the <code>floodSend()</code> method call.
2231 *
2232 * @param tag Message tag, or null to receive any tag.
2233 * @param buffer Buffer of data items to be received.
2234 * @return Status object giving the outcome of the message reception.
2235 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2236 * <code>buffer</code> is null.
2237 * @throws java.io.IOException Thrown if an I/O error occurred.
2238 */
2239 public CommStatus floodReceive(Integer tag,
2240 Buf buffer)
2241 throws IOException {
2242 return floodReceive(tag, buffer, null).waitForFinish();
2243 }
2244
2245 /**
2246 * Flood-receive a message from any process in this communicator
2247 * (non-blocking). A message tag of 0 is used. If <code>tag</code> is null, a
2248 * message will be received with any tag. The received message items are
2249 * stored in the given buffer. To send the message, the source process must
2250 * call the <code>floodSend()</code> method.
2251 * <p>
2252 * The <code>floodReceive()</code> method initiates the flood-receive operation
2253 * and immediately returns a {@linkplain CommRequest} object. The
2254 * flood-receive operation is performed by a separate thread. To wait for
2255 * the flood-receive operation to finish, call the returned {@linkplain
2256 * CommRequest} object's <code>waitForFinish()</code> method. When that method
2257 * returns, the incoming message items have been fully received.
2258 * <p>
2259 * <I>Note:</I> The length of the incoming buffer in the
2260 * <code>floodReceive()</code> method call must be the same as the length of the
2261 * outgoing buffer in the <code>floodSend()</code> method call.
2262 *
2263 * @param buffer Buffer of data items to be received.
2264 * @param request CommRequest object to use to wait for the operation to
2265 * finish; in this case <code>request</code> is returned. If
2266 * <code>request</code> is null, a new CommRequest object is created and
2267 * returned.
2268 * @return CommRequest object to use to wait for the operation to finish.
2269 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2270 * <code>buffer</code> is null.
2271 * @throws java.io.IOException Thrown if an I/O error occurred.
2272 */
2273 public CommRequest floodReceive(Buf buffer,
2274 CommRequest request)
2275 throws IOException {
2276 return floodReceive(0, buffer, request);
2277 }
2278
2279 /**
2280 * Flood-receive a message from any process in this communicator with the
2281 * given message tag (non-blocking). If <code>tag</code> is null, a message will
2282 * be received with any tag. The received message items are stored in the
2283 * given buffer. To send the message, the source process must call the
2284 * <code>floodSend()</code> method.
2285 * <p>
2286 * The <code>floodReceive()</code> method initiates the flood-receive operation
2287 * and immediately returns a {@linkplain CommRequest} object. The
2288 * flood-receive operation is performed by a separate thread. To wait for
2289 * the flood-receive operation to finish, call the returned {@linkplain
2290 * CommRequest} object's <code>waitForFinish()</code> method. When that method
2291 * returns, the incoming message items have been fully received.
2292 * <p>
2293 * <I>Note:</I> The length of the incoming buffer in the
2294 * <code>floodReceive()</code> method call must be the same as the length of the
2295 * outgoing buffer in the <code>floodSend()</code> method call.
2296 *
2297 * @param tag Message tag, or null to receive any tag.
2298 * @param buffer Buffer of data items to be received.
2299 * @param request CommRequest object to use to wait for the operation to
2300 * finish; in this case <code>request</code> is returned. If
2301 * <code>request</code> is null, a new CommRequest object is created and
2302 * returned.
2303 * @return CommRequest object to use to wait for the operation to finish.
2304 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2305 * <code>buffer</code> is null.
2306 * @throws java.io.IOException Thrown if an I/O error occurred.
2307 */
2308 public CommRequest floodReceive(Integer tag,
2309 Buf buffer,
2310 CommRequest request)
2311 throws IOException {
2312 // Get broadcast tree for root=0.
2313 int[] tree = getBroadcastTree(0);
2314
2315 // Set up CommRequest object with a special I/O request object that
2316 // forwards the message down the broadcast tree.
2317 CommRequest req = request == null ? new CommRequest() : request;
2318 req.mySendRequest = null;
2319 req.myRecvRequest = new FloodReceiveIORequest(tree);
2320
2321 // In process 0, ensure a channel to every process, then receive
2322 // (non-blocking) a message from any process.
2323 if (myRank == 0) {
2324 for (int src = 0; src < mySize; ++src) {
2325 ensureChannel(src);
2326 }
2327 myChannelGroup.receiveNoWait(null, tag, buffer, req.myRecvRequest);
2328 } // In other processes, ensure a channel to the child processes in the
2329 // broadcast tree, then receive (non-blocking) a message from the parent
2330 // process in the broadcast tree.
2331 else {
2332 for (int i = 1; i < tree.length; ++i) {
2333 ensureChannel(tree[i]);
2334 }
2335 myChannelGroup.receiveNoWait(getChannel(tree[0]), tag, buffer, req.myRecvRequest);
2336 }
2337
2338 // Return CommRequest object.
2339 return req;
2340 }
2341
2342 /**
2343 * Class FloodReceiveIORequest overrides the methods of class IORequest with
2344 * additional processing to forward the message when a message is received.
2345 */
2346 private class FloodReceiveIORequest
2347 extends IORequest {
2348
2349 // Broadcast tree.
2350 private int[] tree;
2351
2352 // List of zero or more additional I/O requests to forward copies of the
2353 // received message.
2354 private LinkedList<IORequest> myForwardedIORequests
2355 = new LinkedList<IORequest>();
2356
2357 /**
2358 * Construct a new I/O request object.
2359 *
2360 * @param tree Broadcast tree.
2361 */
2362 public FloodReceiveIORequest(int[] tree) {
2363 super();
2364 this.tree = tree;
2365 }
2366
2367 /**
2368 * Determine if this I/O request has finished.
2369 *
2370 * @return False if this I/O request has not finished, true if this I/O
2371 * request has finished successfully.
2372 * @throws IOException Thrown if this I/O request has finished and an
2373 * I/O error occurred.
2374 */
2375 public synchronized boolean isFinished()
2376 throws IOException {
2377 if (!super.isFinished()) {
2378 return false;
2379 }
2380 for (IORequest req : myForwardedIORequests) {
2381 if (!req.isFinished()) {
2382 return false;
2383 }
2384 }
2385 return true;
2386 }
2387
2388 /**
2389 * Wait until the send or receive operation corresponding to this I/O
2390 * request has finished. For a receive operation, a {@linkplain Status}
2391 * object containing the results of the receive operation is returned;
2392 * for a send operation, null is returned.
2393 *
2394 * @return Receive status for a receive operation, or null for a send
2395 * operation.
2396 * @throws IOException Thrown if an I/O error occurred.
2397 */
2398 public synchronized Status waitForFinish()
2399 throws IOException {
2400 Status status = super.waitForFinish();
2401 for (IORequest req : myForwardedIORequests) {
2402 req.waitForFinish();
2403 }
2404 return status;
2405 }
2406
2407 /**
2408 * Report that this I/O request succeeded.
2409 */
2410 protected synchronized void reportSuccess() {
2411 try {
2412 super.reportSuccess();
2413
2414 // Get message tag.
2415 int msgtag = myStatus.tag;
2416
2417 // Flood the message to every child process in the broadcast
2418 // tree.
2419 int n = tree.length;
2420 for (int i = 1; i < n; ++i) {
2421 IORequest req = new IORequest();
2422 myForwardedIORequests.add(req);
2423 myChannelGroup.sendNoWait(getChannel(tree[i]), msgtag, myBuf, req);
2424 }
2425 } catch (IOException exc) {
2426 reportFailure(exc);
2427 }
2428 }
2429 }
2430
2431 /**
2432 * Broadcast a message to all processes in this communicator. The broadcast
2433 * uses a message tag of 0. All processes must call <code>broadcast()</code>
2434 * with the same value for <code>root</code> and with a buffer of the same
2435 * length and the same item data type.
2436 * <p>
2437 * The root process (the process whose rank in this communicator is
2438 * <code>root</code>) sends the message items. The message items come from the
2439 * given buffer. When the <code>broadcast()</code> method returns, the message
2440 * has been fully sent, but it may not yet have been fully received by all
2441 * processes.
2442 * <p>
2443 * Each non-root process receives the message items. The message items are
2444 * stored in the given buffer. When the <code>broadcast()</code> method returns,
2445 * the message has been fully received.
2446 *
2447 * @param root Root process's rank in this communicator.
2448 * @param buffer Buffer of data items to be sent (root process) or received
2449 * (non-root processes).
2450 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2451 * <code>root</code> is not in the range 0 .. <code>size()</code>-1.
2452 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2453 * <code>buffer</code> is null.
2454 * @throws java.io.IOException Thrown if an I/O error occurred.
2455 */
2456 public void broadcast(int root,
2457 Buf buffer)
2458 throws IOException {
2459 broadcast(root, 0, buffer);
2460 }
2461
2462 /**
2463 * Broadcast a message to all processes in this communicator using the given
2464 * message tag. All processes must call <code>broadcast()</code> with the same
2465 * values for <code>root</code> and <code>tag</code> and with a buffer of the same
2466 * length and the same item data type.
2467 * <p>
2468 * The root process (the process whose rank in this communicator is
2469 * <code>root</code>) sends the message items. The message items come from the
2470 * given buffer. When the <code>broadcast()</code> method returns, the message
2471 * has been fully sent, but it may not yet have been fully received by all
2472 * processes.
2473 * <p>
2474 * Each non-root process receives the message items. The message items are
2475 * stored in the given buffer. When the <code>broadcast()</code> method returns,
2476 * the message has been fully received.
2477 *
2478 * @param root Root process's rank in this communicator.
2479 * @param tag Message tag.
2480 * @param buffer Buffer of data items to be sent (root process) or received
2481 * (non-root processes).
2482 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2483 * <code>root</code> is not in the range 0 .. <code>size()</code>-1.
2484 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2485 * <code>buffer</code> is null.
2486 * @throws java.io.IOException Thrown if an I/O error occurred.
2487 */
2488 public void broadcast(int root,
2489 int tag,
2490 Buf buffer)
2491 throws IOException {
2492 // Verify preconditions.
2493 if (0 > root || root >= mySize) {
2494 throw new IndexOutOfBoundsException("Comm.broadcast(): root = " + root + " out of bounds");
2495 }
2496
2497 // Early return if only one process.
2498 if (mySize == 1) {
2499 return;
2500 }
2501
2502 // A broadcast is done as a series of point-to-point messages. The
2503 // messages are organized into rounds. The number of rounds is
2504 // ceil(log_2(mySize)). In each round, processes send messages to other
2505 // processes in parallel. Here is the message pattern for a communicator
2506 // with 8 processes doing a broadcast from root process 0:
2507 //
2508 // Process
2509 // 0 1 2 3 4 5 6 7
2510 // | | | | | | | |
2511 // |---->| | | | | | | Round 1
2512 // | | | | | | | |
2513 // |---------->| | | | | | Round 2
2514 // | |---------->| | | | |
2515 // | | | | | | | |
2516 // |---------------------->| | | | Round 3
2517 // | |---------------------->| | |
2518 // | | |---------------------->| |
2519 // | | | |---------------------->|
2520 // | | | | | | | |
2521 //
2522 // If a process other than process 0 is the root, the message pattern is
2523 // the same, except the process ranks are circularly rotated.
2524 // Get array of process ranks in the broadcast tree.
2525 int[] broadcasttree = getBroadcastTree(root);
2526 int n = broadcasttree.length;
2527
2528 // Receive data from parent if any (blocking).
2529 int parent = broadcasttree[0];
2530 if (parent != -1) {
2531 myChannelGroup.receive(getChannel(parent), tag, buffer);
2532 }
2533
2534 // Send data to children if any (non-blocking).
2535 IORequest[] iorequest = new IORequest[n];
2536 for (int i = 1; i < n; ++i) {
2537 int child = broadcasttree[i];
2538 iorequest[i] = new IORequest();
2539 myChannelGroup.sendNoWait(getChannel(child), tag, buffer, iorequest[i]);
2540 }
2541
2542 // Wait for sends to finish if any.
2543 for (int i = 1; i < n; ++i) {
2544 iorequest[i].waitForFinish();
2545 }
2546 }
2547
2548 /**
2549 * Scatter messages to all processes in this communicator. The scatter uses
2550 * a message tag of 0. All processes must call <code>scatter()</code>
2551 * with the same value for <code>root</code>.
2552 * <p>
2553 * The root process (the process whose rank in this communicator is
2554 * <code>root</code>) sends the message items. The message items sent to process
2555 * <I>i</I> come from the source buffer at index <I>i</I> in the given array
2556 * of source buffers. When the <code>scatter()</code> method returns, the
2557 * messages have been fully sent, but they may not yet have been fully
2558 * received by all processes.
2559 * <p>
2560 * Each process, including the root process, receives the message items. The
2561 * message items are stored in the given destination buffer. This must have
2562 * the same length and the same item data type as the corresponding source
2563 * buffer. When the <code>scatter()</code> method returns, the message has been
2564 * fully received.
2565 * <p>
2566 * In the non-root processes, the source buffer array is ignored and may be
2567 * null.
2568 *
2569 * @param root Root process's rank in this communicator.
2570 * @param srcarray Array of source buffers to be sent by the root process.
2571 * Ignored in the non-root processes.
2572 * @param dst Destination buffer to be received.
2573 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2574 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2575 * <code>srcarray</code>'s length does not equal the size of this communicator.
2576 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2577 * <code>srcarray</code> or any element thereof is null. Thrown if <code>dst</code>
2578 * is null.
2579 * @throws java.io.IOException Thrown if an I/O error occurred.
2580 */
2581 public void scatter(int root,
2582 Buf[] srcarray,
2583 Buf dst)
2584 throws IOException {
2585 scatter(root, 0, srcarray, dst);
2586 }
2587
2588 /**
2589 * Scatter messages to all processes in this communicator using the given
2590 * message tag. All processes must call <code>scatter()</code> with the same
2591 * values for <code>root</code> and <code>tag</code>.
2592 * <p>
2593 * The root process (the process whose rank in this communicator is
2594 * <code>root</code>) sends the message items. The message items sent to process
2595 * <I>i</I> come from the source buffer at index <I>i</I> in the given array
2596 * of source buffers. When the <code>scatter()</code> method returns, the
2597 * messages have been fully sent, but they may not yet have been fully
2598 * received by all processes.
2599 * <p>
2600 * Each process, including the root process, receives the message items. The
2601 * message items are stored in the given destination buffer. This must have
2602 * the same length and the same item data type as the corresponding source
2603 * buffer. When the <code>scatter()</code> method returns, the message has been
2604 * fully received.
2605 * <p>
2606 * In the non-root processes, the source buffer array is ignored and may be
2607 * null.
2608 *
2609 * @param root Root process's rank in this communicator.
2610 * @param tag Message tag.
2611 * @param srcarray Array of source buffers to be sent by the root process.
2612 * Ignored in the non-root processes.
2613 * @param dst Destination buffer to be received.
2614 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2615 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2616 * <code>srcarray</code>'s length does not equal the size of this communicator.
2617 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2618 * <code>srcarray</code> or any element thereof is null. Thrown if <code>dst</code>
2619 * is null.
2620 * @throws java.io.IOException Thrown if an I/O error occurred.
2621 */
2622 public void scatter(int root,
2623 int tag,
2624 Buf[] srcarray,
2625 Buf dst)
2626 throws IOException {
2627 // Verify preconditions.
2628 if (0 > root || root >= mySize) {
2629 throw new IndexOutOfBoundsException("Comm.scatter(): root = " + root + " out of bounds");
2630 }
2631
2632 // A scatter is done as a series of point-to-point messages. The root
2633 // process sends a separate message to every other process. Here is the
2634 // message pattern for a communicator with 8 processes scattering from
2635 // root process 0:
2636 //
2637 // Process
2638 // 0 1 2 3 4 5 6 7
2639 // | | | | | | | |
2640 // |---->| | | | | | |
2641 // | | | | | | | |
2642 // |---------->| | | | | |
2643 // | | | | | | | |
2644 // |---------------->| | | | |
2645 // | | | | | | | |
2646 // |---------------------->| | | |
2647 // | | | | | | | |
2648 // |---------------------------->| | |
2649 // | | | | | | | |
2650 // |---------------------------------->| |
2651 // | | | | | | | |
2652 // |---------------------------------------->|
2653 // | | | | | | | |
2654 // Root process sends all messages.
2655 if (myRank == root) {
2656 // Array of IORequest objects for non-blocking sends.
2657 IORequest[] iorequest = new IORequest[mySize];
2658
2659 // Initiate sends to lower-ranked processes.
2660 for (int rank = 0; rank < myRank; ++rank) {
2661 iorequest[rank] = new IORequest();
2662 myChannelGroup.sendNoWait(getChannel(rank), tag, srcarray[rank], iorequest[rank]);
2663 }
2664
2665 // Initiate sends to higher-ranked processes.
2666 for (int rank = myRank + 1; rank < mySize; ++rank) {
2667 iorequest[rank] = new IORequest();
2668 myChannelGroup.sendNoWait(getChannel(rank), tag, srcarray[rank], iorequest[rank]);
2669 }
2670
2671 // Copy to itself.
2672 dst.copy(srcarray[myRank]);
2673
2674 // Wait for completion of sends to lower-ranked processes.
2675 for (int rank = 0; rank < myRank; ++rank) {
2676 iorequest[rank].waitForFinish();
2677 }
2678
2679 // Wait for completion of sends to higher-ranked processes.
2680 for (int rank = myRank + 1; rank < mySize; ++rank) {
2681 iorequest[rank].waitForFinish();
2682 }
2683 } // Non-root process receives one message.
2684 else {
2685 myChannelGroup.receive(getChannel(root), tag, dst);
2686 }
2687 }
2688
2689 /**
2690 * Gather messages from all processes in this communicator. The gather uses
2691 * a message tag of 0. All processes must call <code>gather()</code> with the
2692 * same value for <code>root</code>.
2693 * <p>
2694 * The root process (the process whose rank in this communicator is
2695 * <code>root</code>) receives the message items. The message items received
2696 * from process <I>i</I> are stored in the destination buffer at index
2697 * <I>i</I> in the given array of destination buffers. When the
2698 * <code>gather()</code> method returns, all the messages have been fully
2699 * received.
2700 * <p>
2701 * Each process, including the root process, sends the message items. The
2702 * message items come from the given source buffer. This must have the same
2703 * length and the same item data type as the corresponding destination
2704 * buffer. When the <code>gather()</code> method returns, the message has been
2705 * fully sent, but it may not yet have been fully received by the root
2706 * process.
2707 * <p>
2708 * In the non-root processes, the destination buffer array is ignored and
2709 * may be null.
2710 *
2711 * @param root Root process's rank in this communicator.
2712 * @param src Source buffer to be sent.
2713 * @param dstarray Array of destination buffers to be received by the root
2714 * process. Ignored in the non-root processes.
2715 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2716 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2717 * <code>dstarray</code>'s length does not equal the size of this communicator.
2718 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2719 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2720 * is null.
2721 * @throws java.io.IOException Thrown if an I/O error occurred.
2722 */
2723 public void gather(int root,
2724 Buf src,
2725 Buf[] dstarray)
2726 throws IOException {
2727 gather(root, 0, src, dstarray);
2728 }
2729
2730 /**
2731 * Gather messages from all processes in this communicator using the given
2732 * message tag. All processes must call <code>gather()</code> with the same
2733 * values for <code>root</code> and <code>tag</code>.
2734 * <p>
2735 * The root process (the process whose rank in this communicator is
2736 * <code>root</code>) receives the message items. The message items received
2737 * from process <I>i</I> are stored in the destination buffer at index
2738 * <I>i</I> in the given array of destination buffers. When the
2739 * <code>gather()</code> method returns, all the messages have been fully
2740 * received.
2741 * <p>
2742 * Each process, including the root process, sends the message items. The
2743 * message items come from the given source buffer. This must have the same
2744 * length and the same item data type as the corresponding destination
2745 * buffer. When the <code>gather()</code> method returns, the message has been
2746 * fully sent, but it may not yet have been fully received by the root
2747 * process.
2748 * <p>
2749 * In the non-root processes, the destination buffer array is ignored and
2750 * may be null.
2751 *
2752 * @param root Root process's rank in this communicator.
2753 * @param tag Message tag.
2754 * @param src Source buffer to be sent.
2755 * @param dstarray Array of destination buffers to be received by the root
2756 * process. Ignored in the non-root processes.
2757 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2758 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2759 * <code>dstarray</code>'s length does not equal the size of this communicator.
2760 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2761 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2762 * is null.
2763 * @throws java.io.IOException Thrown if an I/O error occurred.
2764 */
2765 public void gather(int root,
2766 int tag,
2767 Buf src,
2768 Buf[] dstarray)
2769 throws IOException {
2770 // Verify preconditions.
2771 if (0 > root || root >= mySize) {
2772 throw new IndexOutOfBoundsException("Comm.gather(): root = " + root + " out of bounds");
2773 }
2774
2775 // A gather is done as a series of point-to-point messages. The root
2776 // process receives a separate message from every other process. Here is
2777 // the message pattern for a communicator with 8 processes gathering
2778 // into root process 0:
2779 //
2780 // Process
2781 // 0 1 2 3 4 5 6 7
2782 // | | | | | | | |
2783 // |<----| | | | | | |
2784 // | | | | | | | |
2785 // |<----------| | | | | |
2786 // | | | | | | | |
2787 // |<----------------| | | | |
2788 // | | | | | | | |
2789 // |<----------------------| | | |
2790 // | | | | | | | |
2791 // |<----------------------------| | |
2792 // | | | | | | | |
2793 // |<----------------------------------| |
2794 // | | | | | | | |
2795 // |<----------------------------------------|
2796 // | | | | | | | |
2797 // Root process receives all messages.
2798 if (myRank == root) {
2799 // Array of IORequest objects for non-blocking receives.
2800 IORequest[] iorequest = new IORequest[mySize];
2801
2802 // Initiate receives from lower-ranked processes.
2803 for (int rank = 0; rank < myRank; ++rank) {
2804 iorequest[rank] = new IORequest();
2805 myChannelGroup.receiveNoWait(getChannel(rank), tag, dstarray[rank], iorequest[rank]);
2806 }
2807
2808 // Initiate receives from higher-ranked processes.
2809 for (int rank = myRank + 1; rank < mySize; ++rank) {
2810 iorequest[rank] = new IORequest();
2811 myChannelGroup.receiveNoWait(getChannel(rank), tag, dstarray[rank], iorequest[rank]);
2812 }
2813
2814 // Copy to itself.
2815 dstarray[myRank].copy(src);
2816
2817 // Wait for completion of receives from lower-ranked processes.
2818 for (int rank = 0; rank < myRank; ++rank) {
2819 iorequest[rank].waitForFinish();
2820 }
2821
2822 // Wait for completion of receives from higher-ranked processes.
2823 for (int rank = myRank + 1; rank < mySize; ++rank) {
2824 iorequest[rank].waitForFinish();
2825 }
2826 } // Non-root process sends one message.
2827 else {
2828 myChannelGroup.send(getChannel(root), tag, src);
2829 }
2830 }
2831
2832 /**
2833 * All-gather messages from each process to all processes in this
2834 * communicator. A message tag of 0 is used. All processes must call
2835 * <code>allGather()</code>.
2836 * <p>
2837 * Each process sends the message items in the given source buffer. When the
2838 * <code>allGather()</code> method returns, the source buffer has been fully
2839 * sent.
2840 * <p>
2841 * Each process receives message items from the other processes. The message
2842 * items received from process <I>i</I> are stored in the destination buffer
2843 * at index <I>i</I> in the given array of destination buffers. This
2844 * destination buffer must have the same length and the same item data type
2845 * as the source buffer in process <I>i</I>. When the <code>allGather()</code>
2846 * method returns, all the destination buffers have been fully received.
2847 * <p>
2848 * All-gather is the same as gather, except that every process has an array
2849 * of destination buffers, and every process receives the results of the
2850 * gather.
2851 *
2852 * @param src Source buffer to be sent.
2853 * @param dstarray Array of destination buffers to be received.
2854 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2855 * <code>dstarray</code>'s length does not equal the size of this communicator.
2856 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2857 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2858 * is null.
2859 * @throws java.io.IOException Thrown if an I/O error occurred.
2860 */
2861 public void allGather(Buf src,
2862 Buf[] dstarray)
2863 throws IOException {
2864 allGather(0, src, dstarray);
2865 }
2866
2867 /**
2868 * All-gather messages from each process to all processes in this
2869 * communicator using the given message tag. All processes must call
2870 * <code>allGather()</code> with the same value for <code>tag</code>.
2871 * <p>
2872 * Each process sends the message items in the given source buffer. When the
2873 * <code>allGather()</code> method returns, the source buffer has been fully
2874 * sent.
2875 * <p>
2876 * Each process receives message items from the other processes. The message
2877 * items received from process <I>i</I> are stored in the destination buffer
2878 * at index <I>i</I> in the given array of destination buffers. This
2879 * destination buffer must have the same length and the same item data type
2880 * as the source buffer in process <I>i</I>. When the <code>allGather()</code>
2881 * method returns, all the destination buffers have been fully received.
2882 * <p>
2883 * All-gather is the same as gather, except that every process has an array
2884 * of destination buffers, and every process receives the results of the
2885 * gather.
2886 *
2887 * @param tag Message tag.
2888 * @param src Source buffer to be sent.
2889 * @param dstarray Array of destination buffers to be received.
2890 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2891 * <code>dstarray</code>'s length does not equal the size of this communicator.
2892 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2893 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2894 * is null.
2895 * @throws java.io.IOException Thrown if an I/O error occurred.
2896 */
2897 public void allGather(int tag,
2898 Buf src,
2899 Buf[] dstarray)
2900 throws IOException {
2901 // Get ranks of predecessor and successor processes.
2902 int pred = (myRank - 1 + mySize) % mySize;
2903 int succ = (myRank + 1) % mySize;
2904
2905 // Copy source buffer into destination buffer at my own rank.
2906 dstarray[myRank].copy(src);
2907
2908 // Do (mySize-1) message rounds. Messages are sent in a pipelined
2909 // fashion from each process to its predecessor until each process's
2910 // source data has arrived in every process. Each outgoing message is
2911 // overlapped with an incoming message.
2912 for (int i = 1; i < mySize; ++i) {
2913 sendReceive(/*toRank */pred,
2914 /*sendTag */ tag,
2915 /*sendBuf */ dstarray[(myRank + i - 1) % mySize],
2916 /*fromRank*/ succ,
2917 /*recvTag */ tag,
2918 /*recvBuf */ dstarray[(myRank + i) % mySize]);
2919 }
2920 }
2921
2922 /**
2923 * Perform a reduction on all processes in this communicator. The reduction
2924 * uses a message tag of 0. All processes must call <code>reduce()</code> with
2925 * the same value for <code>root</code>, with a buffer of the same length and
2926 * the same item data type, and with the same binary operation (class
2927 * {@linkplain edu.rit.pj.reduction.Op Op}).
2928 * <p>
2929 * Before calling <code>reduce()</code>, each process has a buffer filled with
2930 * data items. After <code>reduce()</code> returns, each data item in the root
2931 * process's buffer has been set to the <B>reduction</B> of the
2932 * corresponding data items in all the processes' buffers. The reduction is
2933 * calculated by this formula:
2934 * <p>
2935 * <I>item</I><SUB>0</SUB> <I>op</I>
2936 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
2937 * <p>
2938 * where <I>op</I> is the binary operation passed in as an argument and
2939 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
2940 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
2941 * process rank 0, 1, 2, and so on. However, the order in which the data
2942 * items actually are combined is not specified. Therefore, the binary
2943 * operation must be such that the answer will be the same regardless of the
2944 * order in which the data items are combined; that is, the binary operation
2945 * must be commutative and associative.
2946 * <p>
2947 * In the root process, the reduce operation always changes the buffer's
2948 * contents as described above. In the non-root processes, the reduce
2949 * operation may or may not change the buffer's contents; the final contents
2950 * of the buffer in the non-root processes is not specified.
2951 * <p>
2952 * When the <code>reduce()</code> method returns in the root process, the
2953 * reduction has been fully performed as described above. When the
2954 * <code>reduce()</code> method returns in a non-root process, the non-root
2955 * process has sent all its data items into the reduction, but the reduction
2956 * may not be fully complete in the root process yet.
2957 *
2958 * @param root Root process's rank in this communicator.
2959 * @param buffer Buffer of data items to be reduced.
2960 * @param op Binary operation.
2961 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2962 * <code>root</code> is not in the range 0 .. <code>size()</code>-1.
2963 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2964 * <code>buf</code> is null or <code>op</code>
2965 * is null.
2966 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
2967 * <code>buf</code> and <code>op</code> do not use the same item data type.
2968 * @throws java.io.IOException Thrown if an I/O error occurred.
2969 */
2970 public void reduce(int root,
2971 Buf buffer,
2972 Op op)
2973 throws IOException {
2974 reduce(root, 0, buffer, op);
2975 }
2976
2977 /**
2978 * Perform a reduction on all processes in this communicator using the given
2979 * message tag. All processes must call <code>reduce()</code> with the same
2980 * value for <code>root</code>, with a buffer of the same length and the same
2981 * item data type, and with the same binary operation (class {@linkplain
2982 * edu.rit.pj.reduction.Op Op}).
2983 * <p>
2984 * Before calling <code>reduce()</code>, each process has a buffer filled with
2985 * data items. After <code>reduce()</code> returns, each data item in the root
2986 * process's buffer has been set to the <B>reduction</B> of the
2987 * corresponding data items in all the processes' buffers. The reduction is
2988 * calculated by this formula:
2989 * <p>
2990 * <I>item</I><SUB>0</SUB> <I>op</I>
2991 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
2992 * <p>
2993 * where <I>op</I> is the binary operation passed in as an argument and
2994 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
2995 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
2996 * process rank 0, 1, 2, and so on. However, the order in which the data
2997 * items actually are combined is not specified. Therefore, the binary
2998 * operation must be such that the answer will be the same regardless of the
2999 * order in which the data items are combined; that is, the binary operation
3000 * must be commutative and associative.
3001 * <p>
3002 * In the root process, the reduce operation always changes the buffer's
3003 * contents as described above. In the non-root processes, the reduce
3004 * operation may or may not change the buffer's contents; the final contents
3005 * of the buffer in the non-root processes is not specified.
3006 * <p>
3007 * When the <code>reduce()</code> method returns in the root process, the
3008 * reduction has been fully performed as described above. When the
3009 * <code>reduce()</code> method returns in a non-root process, the non-root
3010 * process has sent all its data items into the reduction, but the reduction
3011 * may not be fully complete in the root process yet.
3012 *
3013 * @param root Root process's rank in this communicator.
3014 * @param tag Message tag.
3015 * @param buffer Buffer of data items to be reduced.
3016 * @param op Binary operation.
3017 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
3018 * <code>root</code> is not in the range 0 .. <code>size()</code>-1.
3019 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3020 * <code>buf</code> is null or <code>op</code>
3021 * is null.
3022 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3023 * <code>buf</code> and <code>op</code> do not use the same item data type.
3024 * @throws java.io.IOException Thrown if an I/O error occurred.
3025 */
3026 public void reduce(int root,
3027 int tag,
3028 Buf buffer,
3029 Op op)
3030 throws IOException {
3031 // Verify preconditions.
3032 if (0 > root || root >= mySize) {
3033 throw new IndexOutOfBoundsException("Comm.reduce(): root = " + root + " out of bounds");
3034 }
3035
3036 // Early return if only one process.
3037 if (mySize == 1) {
3038 return;
3039 }
3040
3041 // A reduction is done as a series of point-to-point messages. The
3042 // messages are organized into rounds. The number of rounds is
3043 // ceil(log_2(mySize)). The message pattern is the reverse of the
3044 // broadcast message pattern. In each round, processes receive messages
3045 // from other processes and reduce the data items into their accumulator
3046 // buffers in parallel. When a process has received all messages, it
3047 // sends the reduced results on. Here is the message pattern for a
3048 // communicator with 8 processes doing a reduction into root process 0:
3049 //
3050 // Process
3051 // 0 1 2 3 4 5 6 7
3052 // | | | | | | | |
3053 // |<----------------------| | | | Round 1
3054 // | |<----------------------| | |
3055 // | | |<----------------------| |
3056 // | | | |<----------------------|
3057 // | | | | | | | |
3058 // |<----------| | | | | | Round 2
3059 // | |<----------| | | | |
3060 // | | | | | | | |
3061 // |<----| | | | | | | Round 3
3062 // | | | | | | | |
3063 //
3064 // If a process other than process 0 is the root, the message pattern is
3065 // the same, except the process ranks are circularly rotated.
3066 // Get array of process ranks in the broadcast tree.
3067 int[] broadcasttree = getBroadcastTree(root);
3068 int n = broadcasttree.length;
3069
3070 // Set up reduction buffer on top of source buffer.
3071 Buf reductionbuf = buffer.getReductionBuf(op);
3072
3073 // Receive data from children if any, one at a time in reverse order.
3074 for (int i = n - 1; i >= 1; --i) {
3075 int child = broadcasttree[i];
3076 myChannelGroup.receive(getChannel(child), tag, reductionbuf);
3077 }
3078
3079 // Send data to parent if any.
3080 int parent = broadcasttree[0];
3081 if (parent != -1) {
3082 myChannelGroup.send(getChannel(parent), tag, buffer);
3083 }
3084 }
3085
3086 /**
3087 * Perform an all-reduce on all processes in this communicator. The
3088 * all-reduce uses a message tag of 0. All processes must call
3089 * <code>allReduce()</code> with a buffer of the same length and the same item
3090 * data type, and with the same binary operation (class {@linkplain
3091 * edu.rit.pj.reduction.Op Op}).
3092 * <p>
3093 * Before calling <code>allReduce()</code>, each process has a buffer filled
3094 * with data items. After <code>allReduce()</code> returns, each data item in
3095 * the calling process's buffer has been set to the <B>reduction</B> of the
3096 * corresponding data items in all the processes' buffers. The reduction is
3097 * calculated by this formula:
3098 * <p>
3099 * <I>item</I><SUB>0</SUB> <I>op</I>
3100 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3101 * <p>
3102 * where <I>op</I> is the binary operation passed in as an argument and
3103 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3104 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3105 * process rank 0, 1, 2, and so on. However, the order in which the data
3106 * items actually are combined is not specified. Therefore, the binary
3107 * operation must be such that the answer will be the same regardless of the
3108 * order in which the data items are combined; that is, the binary operation
3109 * must be commutative and associative.
3110 * <p>
3111 * The <code>allReduce()</code> method is similar to the <code>reduce()</code>
3112 * method, except the results are stored in all the processes' buffers, not
3113 * just the one root process's buffer.
3114 *
3115 * @param buffer Buffer of data items to be reduced.
3116 * @param op Binary operation.
3117 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3118 * <code>buf</code> is null or <code>op</code>
3119 * is null.
3120 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3121 * <code>buf</code> and <code>op</code> do not use the same item data type.
3122 * @throws java.io.IOException Thrown if an I/O error occurred.
3123 */
3124 public void allReduce(Buf buffer,
3125 Op op)
3126 throws IOException {
3127 allReduce(0, buffer, op);
3128 }
3129
3130 /**
3131 * Perform an all-reduce on all processes in this communicator using the
3132 * given message tag. All processes must call <code>allReduce()</code> with a
3133 * buffer of the same length and the same item data type, and with the same
3134 * binary operation (class {@linkplain edu.rit.pj.reduction.Op Op}).
3135 * <p>
3136 * Before calling <code>allReduce()</code>, each process has a buffer filled
3137 * with data items. After <code>allReduce()</code> returns, each data item in
3138 * the calling process's buffer has been set to the <B>reduction</B> of the
3139 * corresponding data items in all the processes' buffers. The reduction is
3140 * calculated by this formula:
3141 * <p>
3142 * <I>item</I><SUB>0</SUB> <I>op</I>
3143 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3144 * <p>
3145 * where <I>op</I> is the binary operation passed in as an argument and
3146 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3147 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3148 * process rank 0, 1, 2, and so on. However, the order in which the data
3149 * items actually are combined is not specified. Therefore, the binary
3150 * operation must be such that the answer will be the same regardless of the
3151 * order in which the data items are combined; that is, the binary operation
3152 * must be commutative and associative.
3153 * <p>
3154 * The <code>allReduce()</code> method is similar to the <code>reduce()</code>
3155 * method, except the results are stored in all the processes' buffers, not
3156 * just the one root process's buffer.
3157 *
3158 * @param tag Message tag.
3159 * @param buffer Buffer of data items to be reduced.
3160 * @param op Binary operation.
3161 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3162 * <code>buf</code> is null or <code>op</code>
3163 * is null.
3164 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3165 * <code>buf</code> and <code>op</code> do not use the same item data type.
3166 * @throws java.io.IOException Thrown if an I/O error occurred.
3167 */
3168 public void allReduce(int tag,
3169 Buf buffer,
3170 Op op)
3171 throws IOException {
3172 // An all-reduce is done using a "butterfly" message passing pattern.
3173 // Consider the case of K=8 processes. In the first round, processes one
3174 // rank apart exchange data, then each processes accumulates the data
3175 // from the other process using the reduction operator. In the second
3176 // round, processes two ranks apart exchange and accumulate data. In the
3177 // third round, processes four ranks apart exchange and accumulate data.
3178 //
3179 // Process
3180 // 0 1 2 3 4 5 6 7
3181 // | | | | | | | |
3182 // |<--->| |<--->| |<--->| |<--->| Round 1
3183 // | | | | | | | |
3184 // |<--------->| | |<--------->| | Round 2
3185 // | |<--------->| | |<--------->|
3186 // | | | | | | | |
3187 // |<--------------------->| | | | Round 3
3188 // | |<--------------------->| | |
3189 // | | |<--------------------->| |
3190 // | | | |<--------------------->|
3191 // | | | | | | | |
3192 //
3193 // The butterfly pattern works only if the number of processes is a
3194 // power of two. If this is not the case, there are two extra message
3195 // rounds. Each process outside the butterfly pattern sends its data to
3196 // its counterpart inside the butterfly pattern, which accumulates the
3197 // data using the reduction operator. Then the butterfly pattern takes
3198 // place. Afterwards, each process outside the butterfly pattern
3199 // receives the final result from its counterpart inside the butterfly
3200 // pattern. For the case of K=10 processes:
3201 //
3202 // Process
3203 // 0 1 2 3 4 5 6 7 8 9
3204 // | | | | | | | | | |
3205 // |<----------------------------------------------| | Pre
3206 // | |<----------------------------------------------|
3207 // | | | | | | | | | |
3208 // |<--->| |<--->| |<--->| |<--->| | | Round 1
3209 // | | | | | | | | | |
3210 // |<--------->| | |<--------->| | | | Round 2
3211 // | |<--------->| | |<--------->| | |
3212 // | | | | | | | | | |
3213 // |<--------------------->| | | | | | Round 3
3214 // | |<--------------------->| | | | |
3215 // | | |<--------------------->| | | |
3216 // | | | |<--------------------->| | |
3217 // | | | | | | | | | |
3218 // |---------------------------------------------->| | Post
3219 // | |---------------------------------------------->|
3220 // | | | | | | | | | |
3221 //
3222 // If K is a power of two, the all-reduce takes (log_2 K) rounds. If K
3223 // is not a power of two, the all-reduce takes floor(log_2 K)+2 rounds.
3224
3225 // Early exit if only one process.
3226 if (mySize == 1) {
3227 return;
3228 }
3229
3230 // Determine the highest power of 2 less than or equal to this
3231 // communicator's size. Processes at this rank and above will be outside
3232 // the butterfly message passing pattern.
3233 int outside = mySizePowerOf2;
3234
3235 // For processes outside the butterfly:
3236 if (myRank >= outside) {
3237 int insideRank = myRank - outside;
3238
3239 // Send initial data to counterpart inside.
3240 send(insideRank, tag, buffer);
3241
3242 // Receive reduced result from counterpart inside.
3243 receive(insideRank, tag, buffer);
3244 } // For processes inside the butterfly:
3245 else {
3246 // Set up temporary receive buffer.
3247 Buf receiveBuf = buffer.getTemporaryBuf();
3248
3249 // Set up reduction buffer on top of data buffer.
3250 Buf reductionBuf = buffer.getReductionBuf(op);
3251
3252 // If there is a counterpart process outside, receive and accumulate
3253 // its initial data.
3254 int outsideRank = myRank + outside;
3255 if (outsideRank < mySize) {
3256 receive(outsideRank, tag, reductionBuf);
3257 }
3258
3259 // Perform butterfly message passing rounds with counterpart
3260 // processes inside.
3261 int round = 1;
3262 while (round < outside) {
3263 int otherRank = myRank ^ round;
3264 sendReceive(otherRank, tag, buffer, otherRank, tag, receiveBuf);
3265 reductionBuf.copy(receiveBuf);
3266 round <<= 1;
3267 }
3268
3269 // If there is a counterpart process outside, send the reduced
3270 // result.
3271 if (outsideRank < mySize) {
3272 send(outsideRank, tag, buffer);
3273 }
3274 }
3275 }
3276
3277 /**
3278 * Do an all-to-all among all processes in this communicator. A message tag
3279 * of 0 is used.
3280 * <p>
3281 * <code>srcarray</code> must be an array of <I>K</I> buffers, where <I>K</I> is
3282 * the size of this communicator. <code>dstarray</code> must be an array of
3283 * <I>K</I> buffers referring to different storage from the source buffers.
3284 * For each process rank <I>k</I>, 0 <= <I>k</I> <= <I>K</I>, and each
3285 * buffer index <I>i</I>, 0 <= <I>i</I> <= <I>K</I>, the contents of
3286 * <code>srcarray[k]</code> in process <I>i</I> are sent to <code>dstarray[i]</code>
3287 * in process <I>k</I>.
3288 *
3289 * @param srcarray Array of source buffers to be sent by this process.
3290 * @param dstarray Array of destination buffers to be received by this
3291 * process.
3292 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
3293 * <code>srcarray</code>'s length does not equal the size of this communicator.
3294 * Thrown if <code>dstarray</code>'s length does not equal the size of this
3295 * communicator.
3296 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3297 * <code>srcarray</code> or any element thereof is null. Thrown if
3298 * <code>dstarray</code> or any element thereof is null.
3299 * @throws java.io.IOException Thrown if an I/O error occurred.
3300 */
3301 public void allToAll(Buf[] srcarray,
3302 Buf[] dstarray)
3303 throws IOException {
3304 allToAll(0, srcarray, dstarray);
3305 }
3306
3307 /**
3308 * Do an all-to-all among all processes in this communicator using the given
3309 * message tag. All processes must call <code>allToAll()</code> with the same
3310 * value for <code>tag</code>.
3311 * <p>
3312 * <code>srcarray</code> must be an array of <I>K</I> buffers, where <I>K</I> is
3313 * the size of this communicator. <code>dstarray</code> must be an array of
3314 * <I>K</I> buffers referring to different storage from the source buffers.
3315 * For each process rank <I>k</I>, 0 <= <I>k</I> <= <I>K</I>, and each
3316 * buffer index <I>i</I>, 0 <= <I>i</I> <= <I>K</I>, the contents of
3317 * <code>srcarray[k]</code> in process <I>i</I> are sent to <code>dstarray[i]</code>
3318 * in process <I>k</I>.
3319 *
3320 * @param tag Message tag.
3321 * @param srcarray Array of source buffers to be sent by this process.
3322 * @param dstarray Array of destination buffers to be received by this
3323 * process.
3324 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
3325 * <code>srcarray</code>'s length does not equal the size of this communicator.
3326 * Thrown if <code>dstarray</code>'s length does not equal the size of this
3327 * communicator.
3328 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3329 * <code>srcarray</code> or any element thereof is null. Thrown if
3330 * <code>dstarray</code> or any element thereof is null.
3331 * @throws java.io.IOException Thrown if an I/O error occurred.
3332 */
3333 public void allToAll(int tag,
3334 Buf[] srcarray,
3335 Buf[] dstarray)
3336 throws IOException {
3337 // An all-to-all is done as a series of send-receives. Each process
3338 // sends the appropriate buffer to the process one ahead and receives
3339 // the appropriate buffer from the process one behind. Then each process
3340 // sends the appropriate buffer to the process two ahead and receives
3341 // the appropriate buffer from the process two behind. And so on. Here
3342 // is the message pattern for a communicator with 4 processes doing an
3343 // all-to-all:
3344 //
3345 // Process
3346 // 0 1 2 3
3347 // | | | |
3348 // |---->| | | Round 1
3349 // | |---->| |
3350 // | | |---->|
3351 // - -->| | | |--- -
3352 // | | | |
3353 // | | | |
3354 // |---------->| | Round 2
3355 // | |---------->|
3356 // - -->| | |--------- -
3357 // - -------->| | |--- -
3358 // | | | |
3359 // | | | |
3360 // |---------------->| Round 3
3361 // - -->| |--------------- -
3362 // - -------->| |--------- -
3363 // - -------------->| |--- -
3364 // | | | |
3365
3366 // Copy source to destination at this process's own rank.
3367 dstarray[myRank].copy(srcarray[myRank]);
3368
3369 // Initiate K-1 non-blocking send-receives.
3370 CommRequest[] commrequest = new CommRequest[mySize];
3371 for (int i = 1; i < mySize; ++i) {
3372 int toRank = (myRank + i) % mySize;
3373 int fromRank = (myRank - i + mySize) % mySize;
3374 commrequest[i] = sendReceive(toRank, tag, srcarray[toRank],
3375 fromRank, tag, dstarray[fromRank], null);
3376 }
3377
3378 // Wait for completion of all send-receives.
3379 for (int i = 1; i < mySize; ++i) {
3380 commrequest[i].waitForFinish();
3381 }
3382 }
3383
3384 /**
3385 * Perform a scan on all processes in this communicator. A message tag of 0
3386 * is used. All processes must call <code>scan()</code> with a buffer of the
3387 * same length and the same item data type, and with the same binary
3388 * operation (class {@linkplain edu.rit.pj.reduction.Op Op}).
3389 * <p>
3390 * Before calling <code>scan()</code>, each process has a buffer filled with
3391 * data items. After <code>scan()</code> returns, each data item in the buffer
3392 * of process rank <I>i</I> has been set to the <B>reduction</B> of the
3393 * corresponding data items in the buffers of process ranks 0 through
3394 * <I>i</I>. The reduction is calculated by this formula:
3395 * <p>
3396 * <I>item</I><SUB>0</SUB> <I>op</I>
3397 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3398 * <p>
3399 * where <I>op</I> is the binary operation passed in as an argument and
3400 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3401 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3402 * process ranks 0 through <I>i</I>. However, the order in which the data
3403 * items actually are combined is not specified. Therefore, the binary
3404 * operation must be such that the answer will be the same regardless of the
3405 * order in which the data items are combined; that is, the binary operation
3406 * must be commutative and associative.
3407 *
3408 * @param buf Buffer of data items to be scanned.
3409 * @param op Binary operation.
3410 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3411 * <code>buf</code> is null or <code>op</code>
3412 * is null.
3413 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3414 * <code>buf</code> and <code>op</code> do not use the same item data type.
3415 * @throws java.io.IOException Thrown if an I/O error occurred.
3416 */
3417 public void scan(Buf buf,
3418 Op op)
3419 throws IOException {
3420 scan(0, buf, op);
3421 }
3422
3423 /**
3424 * Perform a scan on all processes in this communicator using the given
3425 * message tag. All processes must call <code>scan()</code> with the same value
3426 * for <code>tag</code>, with a buffer of the same length and the same item data
3427 * type, and with the same binary operation (class {@linkplain
3428 * edu.rit.pj.reduction.Op Op}).
3429 * <p>
3430 * Before calling <code>scan()</code>, each process has a buffer filled with
3431 * data items. After <code>scan()</code> returns, each data item in the buffer
3432 * of process rank <I>i</I> has been set to the <B>reduction</B> of the
3433 * corresponding data items in the buffers of process ranks 0 through
3434 * <I>i</I>. The reduction is calculated by this formula:
3435 * <p>
3436 * <I>item</I><SUB>0</SUB> <I>op</I>
3437 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3438 * <p>
3439 * where <I>op</I> is the binary operation passed in as an argument and
3440 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3441 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3442 * process ranks 0 through <I>i</I>. However, the order in which the data
3443 * items actually are combined is not specified. Therefore, the binary
3444 * operation must be such that the answer will be the same regardless of the
3445 * order in which the data items are combined; that is, the binary operation
3446 * must be commutative and associative.
3447 *
3448 * @param tag Message tag.
3449 * @param buf Buffer of data items to be scanned.
3450 * @param op Binary operation.
3451 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3452 * <code>buf</code> is null or <code>op</code>
3453 * is null.
3454 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3455 * <code>buf</code> and <code>op</code> do not use the same item data type.
3456 * @throws java.io.IOException Thrown if an I/O error occurred.
3457 */
3458 public void scan(int tag,
3459 Buf buf,
3460 Op op)
3461 throws IOException {
3462 // Early return if only one process.
3463 if (mySize == 1) {
3464 return;
3465 }
3466
3467 // A scan is done as a series of point-to-point messages. The messages
3468 // are organized into rounds. The number of rounds is
3469 // ceil(log_2(mySize)). In the first round, each process sends its data
3470 // to the process one rank ahead, and the incoming data is combined with
3471 // the process's data using the reduction operator. In the second round,
3472 // each process sends its data to the process two ranks ahead. In the
3473 // third round, each process sends its data to process four ranks ahead.
3474 // And so on. Here is the message pattern for a communicator with 8
3475 // processes:
3476 //
3477 // Process
3478 // 0 1 2 3 4 5 6 7
3479 // | | | | | | | |
3480 // |---->|---->|---->|---->|---->|---->|---->| Round 1
3481 // | | | | | | | |
3482 // |---------->| | | | | | Round 2
3483 // | |---------->| | | | |
3484 // | | |---------->| | | |
3485 // | | | |---------->| | |
3486 // | | | | |---------->| |
3487 // | | | | | |---------->|
3488 // | | | | | | | |
3489 // |---------------------->| | | | Round 3
3490 // | |---------------------->| | |
3491 // | | |---------------------->| |
3492 // | | | |---------------------->|
3493 //
3494 // Get temporary buffer for holding incoming data items.
3495 Buf tempbuf = buf.getTemporaryBuf();
3496
3497 // Get reduction buffer for combining data items.
3498 Buf reductionbuf = buf.getReductionBuf(op);
3499
3500 // Do rounds of message passing and reduction.
3501 int skip = 1;
3502 for (; ; ) {
3503 int toRank = myRank + skip;
3504 int fromRank = myRank - skip;
3505 boolean toExists = 0 <= toRank && toRank < mySize;
3506 boolean fromExists = 0 <= fromRank && fromRank < mySize;
3507 if (toExists && fromExists) {
3508 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf);
3509 reductionbuf.copy(tempbuf);
3510 } else if (fromExists) {
3511 receive(fromRank, tag, reductionbuf);
3512 } else if (toExists) {
3513 send(toRank, tag, buf);
3514 } else {
3515 break;
3516 }
3517 skip <<= 1;
3518 }
3519 }
3520
3521 /**
3522 * Perform an exclusive scan on all processes in this communicator. A
3523 * message tag of 0 is used. All processes must call
3524 * <code>exclusiveScan()</code> with a buffer of the same length and the same
3525 * item data type, with the same binary operation (class {@linkplain
3526 * edu.rit.pj.reduction.Op Op}), and with the same initial data value.
3527 * <p>
3528 * Before calling <code>exclusiveScan()</code>, each process has a buffer filled
3529 * with data items. After <code>exclusiveScan()</code> returns, each data item
3530 * in the buffer of process rank <I>i</I> > 0 has been set to the
3531 * <B>reduction</B> of the corresponding data items in the buffers of
3532 * process ranks 0 through <I>i</I>-1. The reduction is calculated by this
3533 * formula:
3534 * <p>
3535 * <I>item</I><SUB>0</SUB> <I>op</I>
3536 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3537 * <p>
3538 * where <I>op</I> is the binary operation passed in as an argument and
3539 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3540 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3541 * process ranks 0 through <I>i</I>-1. However, the order in which the data
3542 * items actually are combined is not specified. Therefore, the binary
3543 * operation must be such that the answer will be the same regardless of the
3544 * order in which the data items are combined; that is, the binary operation
3545 * must be commutative and associative.
3546 * <p>
3547 * In process 0, each data item in the buffer has been set to the initial
3548 * data value using the buffer's <code>fill()</code> method.
3549 * <p>
3550 * If the buffer's item data type is a primitive type, the <code>item</code>
3551 * must be an instance of the corresponding primitive wrapper class -- class
3552 * Integer for type <code>int</code>, class Double for type <code>double</code>, and
3553 * so on. If the <code>item</code> is null, the item data type's default initial
3554 * value is assigned to each element in the buffer.
3555 * <p>
3556 * If the buffer's item data type is a nonprimitive type, the <code>item</code>
3557 * must be an instance of the item class or a subclass thereof. The
3558 * <code>item</code> may be null. Note that since <code>item</code> is
3559 * <I>assigned</I> to every buffer element, every buffer element ends up
3560 * referring to the same <code>item</code>.
3561 *
3562 * @param buf Buffer of data items to be scanned.
3563 * @param op Binary operation.
3564 * @param item Initial data value.
3565 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3566 * <code>buf</code> is null or <code>op</code>
3567 * is null.
3568 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3569 * <code>buf</code> and <code>op</code> do not use the same item data type.
3570 * @throws java.io.IOException Thrown if an I/O error occurred.
3571 */
3572 public void exclusiveScan(Buf buf,
3573 Op op,
3574 Object item)
3575 throws IOException {
3576 exclusiveScan(0, buf, op, item);
3577 }
3578
3579 /**
3580 * Perform an exclusive scan on all processes in this communicator using the
3581 * given message tag. All processes must call <code>exclusiveScan()</code> with
3582 * the same value for <code>tag</code>, with a buffer of the same length and the
3583 * same item data type, with the same binary operation (class {@linkplain
3584 * edu.rit.pj.reduction.Op Op}), and with the same initial data value.
3585 * <p>
3586 * Before calling <code>exclusiveScan()</code>, each process has a buffer filled
3587 * with data items. After <code>exclusiveScan()</code> returns, each data item
3588 * in the buffer of process rank <I>i</I> > 0 has been set to the
3589 * <B>reduction</B> of the corresponding data items in the buffers of
3590 * process ranks 0 through <I>i</I>-1. The reduction is calculated by this
3591 * formula:
3592 * <p>
3593 * <I>item</I><SUB>0</SUB> <I>op</I>
3594 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3595 * <p>
3596 * where <I>op</I> is the binary operation passed in as an argument and
3597 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3598 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3599 * process ranks 0 through <I>i</I>-1. However, the order in which the data
3600 * items actually are combined is not specified. Therefore, the binary
3601 * operation must be such that the answer will be the same regardless of the
3602 * order in which the data items are combined; that is, the binary operation
3603 * must be commutative and associative.
3604 * <p>
3605 * In process 0, each data item in the buffer has been set to the initial
3606 * data value using the buffer's <code>fill()</code> method.
3607 * <p>
3608 * If the buffer's item data type is a primitive type, the <code>item</code>
3609 * must be an instance of the corresponding primitive wrapper class -- class
3610 * Integer for type <code>int</code>, class Double for type <code>double</code>, and
3611 * so on. If the <code>item</code> is null, the item data type's default initial
3612 * value is assigned to each element in the buffer.
3613 * <p>
3614 * If the buffer's item data type is a nonprimitive type, the <code>item</code>
3615 * must be an instance of the item class or a subclass thereof. The
3616 * <code>item</code> may be null. Note that since <code>item</code> is
3617 * <I>assigned</I> to every buffer element, every buffer element ends up
3618 * referring to the same <code>item</code>.
3619 *
3620 * @param tag Message tag.
3621 * @param buf Buffer of data items to be scanned.
3622 * @param op Binary operation.
3623 * @param item Initial data value.
3624 * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3625 * <code>buf</code> is null or <code>op</code>
3626 * is null.
3627 * @throws java.lang.ClassCastException (unchecked exception) Thrown if
3628 * <code>buf</code> and <code>op</code> do not use the same item data type.
3629 * @throws java.io.IOException Thrown if an I/O error occurred.
3630 */
3631 public void exclusiveScan(int tag,
3632 Buf buf,
3633 Op op,
3634 Object item)
3635 throws IOException {
3636 // An exclusive scan begins with each process sending its buffer to the
3637 // next higher process. Then process 0 fills its buffer with the initial
3638 // data value, while processes 1 and higher do an inclusive scan.
3639
3640 int toRank;
3641 int fromRank;
3642 boolean toExists;
3643 boolean fromExists;
3644
3645 // Process 0 does this.
3646 if (myRank == 0) {
3647 // Send buffer to next higher process.
3648 toRank = 1;
3649 toExists = toRank < mySize;
3650 if (toExists) {
3651 send(toRank, tag, buf);
3652 }
3653
3654 // Fill buffer with initial data value.
3655 buf.fill(item);
3656 } // Processes 1 and higher do this.
3657 else {
3658 // Get temporary buffer for holding incoming data items.
3659 Buf tempbuf = buf.getTemporaryBuf();
3660
3661 // Get reduction buffer for combining data items.
3662 Buf reductionbuf = buf.getReductionBuf(op);
3663
3664 // Send buffer to next higher process.
3665 toRank = myRank + 1;
3666 fromRank = myRank - 1;
3667 toExists = 0 <= toRank && toRank < mySize;
3668 if (toExists) {
3669 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf);
3670 buf.copy(tempbuf);
3671 } else {
3672 receive(fromRank, tag, buf);
3673 }
3674
3675 // Do rounds of message passing and reduction.
3676 int skip = 1;
3677 for (; ; ) {
3678 toRank = myRank + skip;
3679 fromRank = myRank - skip;
3680 toExists = 1 <= toRank && toRank < mySize;
3681 fromExists = 1 <= fromRank && fromRank < mySize;
3682 if (toExists && fromExists) {
3683 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf);
3684 reductionbuf.copy(tempbuf);
3685 } else if (fromExists) {
3686 receive(fromRank, tag, reductionbuf);
3687 } else if (toExists) {
3688 send(toRank, tag, buf);
3689 } else {
3690 break;
3691 }
3692 skip <<= 1;
3693 }
3694 }
3695 }
3696
3697 /**
3698 * Cause all processes in this communicator to wait at a barrier. The
3699 * barrier uses a message tag of 0. All processes must call
3700 * <code>barrier()</code>. The calling thread blocks until every process has
3701 * called <code>barrier()</code>, then the calling thread unblocks and returns
3702 * from the <code>barrier()</code> call.
3703 *
3704 * @throws java.io.IOException Thrown if an I/O error occurred.
3705 */
3706 public void barrier()
3707 throws IOException {
3708 barrier(0);
3709 }
3710
3711 /**
3712 * Cause all processes in this communicator to wait at a barrier, using the
3713 * given message tag. All processes must call <code>barrier()</code> with the
3714 * same tag. The calling thread blocks until every process has called
3715 * <code>barrier()</code>, then the calling thread unblocks and returns from the
3716 * <code>barrier()</code> call.
3717 *
3718 * @param tag Message tag.
3719 * @throws java.io.IOException Thrown if an I/O error occurred.
3720 */
3721 public void barrier(int tag)
3722 throws IOException {
3723 // A barrier is done as an all-reduce of an empty buffer.
3724 allReduce(tag, IntegerBuf.emptyBuffer(), IntegerOp.SUM);
3725 }
3726
3727 /**
3728 * Returns a string version of this communicator. The string includes the
3729 * communicator's size, the current process's rank, and the host and port of
3730 * each backend process.
3731 *
3732 * @return String version.
3733 */
3734 public String toString() {
3735 StringBuilder buf = new StringBuilder();
3736 buf.append("Comm(size=");
3737 buf.append(mySize);
3738 buf.append(",rank=");
3739 buf.append(myRank);
3740 buf.append(",backend");
3741 for (int i = 0; i < mySize; ++i) {
3742 if (i > 0) {
3743 buf.append(',');
3744 }
3745 buf.append('[');
3746 buf.append(i);
3747 buf.append("]=");
3748 buf.append(myAddressForRank[i]);
3749 }
3750 buf.append(')');
3751 return buf.toString();
3752 }
3753
3754 /**
3755 * Dump the state of this communicator on the given print stream. For
3756 * debugging.
3757 *
3758 * @param out Print stream.
3759 * @param prefix String to print at the beginning of each line.
3760 */
3761 public void dump(PrintStream out,
3762 String prefix) {
3763 out.println();
3764 out.println(prefix + getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(this)));
3765 out.println(prefix + "mySize = " + mySize);
3766 out.println(prefix + "myRank = " + myRank);
3767 out.println(prefix + "myHost = " + myHost);
3768 out.println(prefix + "mySizePowerOf2 = " + mySizePowerOf2);
3769 out.println(prefix + "myChannelGroup = " + myChannelGroup);
3770 out.println(prefix + "myAddressForRank:");
3771 for (int i = 0; i < myAddressForRank.length; ++i) {
3772 out.println(prefix + "\t[" + i + "] " + myAddressForRank[i]);
3773 }
3774 out.println(prefix + "myChannelForRank:");
3775 for (int i = 0; i < myChannelForRank.length; ++i) {
3776 out.println(prefix + "\t[" + i + "] " + myChannelForRank[i]);
3777 }
3778 out.println(prefix + "myBroadcastTree:");
3779 for (int i = 0; i < myBroadcastTree.length; ++i) {
3780 out.print(prefix + "\t[" + i + "]");
3781 int[] tree = myBroadcastTree[i];
3782 if (tree == null) {
3783 out.print(" null");
3784 } else {
3785 for (int j = 0; j < tree.length; ++j) {
3786 out.print(" " + tree[j]);
3787 }
3788 }
3789 out.println();
3790 }
3791 out.println();
3792 myChannelGroup.dump(out, prefix);
3793 }
3794
3795 // Hidden operations.
3796
3797 /**
3798 * Notify that another process connected a channel to this process.
3799 *
3800 * @param theChannel Channel.
3801 * @throws IOException Thrown if an I/O error occurred.
3802 */
3803 private synchronized void doFarEndConnected(Channel theChannel)
3804 throws IOException {
3805 // Record channel and rank.
3806 myChannelForRank[getFarRank(theChannel)] = theChannel;
3807
3808 // Notify any threads waiting in getChannel().
3809 notifyAll();
3810 }
3811
3812 /**
3813 * Ensure that a channel for communicating with the process at the given
3814 * rank is or will be set up.
3815 *
3816 * @param farrank Rank of far end process.
3817 * @throws IOException Thrown if an I/O error occurred.
3818 */
3819 private synchronized void ensureChannel(int farrank)
3820 throws IOException {
3821 // Get channel from channel array.
3822 Channel channel = myChannelForRank[farrank];
3823
3824 // If the channel does not exist:
3825 if (channel == null) {
3826 // If this is the lower-ranked process, set up the connection.
3827 if (myRank < farrank) {
3828 myChannelForRank[farrank]
3829 = myChannelGroup.connect(myAddressForRank[farrank]);
3830 }
3831
3832 // If this is the higher-ranked process, the lower-ranked process
3833 // will set up the connection.
3834 }
3835 }
3836
3837 /**
3838 * Get the channel for communicating with the process at the given rank.
3839 *
3840 * @param farrank Rank of far end process.
3841 * @return Channel.
3842 * @throws IOException Thrown if an I/O error occurred.
3843 */
3844 private synchronized Channel getChannel(int farrank)
3845 throws IOException {
3846 // Get channel from channel array.
3847 Channel channel = myChannelForRank[farrank];
3848
3849 // If the channel does not exist:
3850 if (channel == null) {
3851 // If this is the lower-ranked process, set up the connection.
3852 if (myRank < farrank) {
3853 channel = myChannelGroup.connect(myAddressForRank[farrank]);
3854 myChannelForRank[farrank] = channel;
3855 } // If this is the higher-ranked process, wait for the lower-ranked
3856 // process to set up the connection.
3857 else {
3858 try {
3859 while (channel == null) {
3860 wait();
3861 channel = myChannelForRank[farrank];
3862 }
3863 } catch (InterruptedException exc) {
3864 IOException exc2 = new InterruptedIOException();
3865 exc2.initCause(exc);
3866 throw exc2;
3867 }
3868 }
3869 }
3870
3871 return channel;
3872 }
3873
3874 /**
3875 * Get the rank of the process at the far end of the given channel.
3876 *
3877 * @param channel Channel.
3878 * @return Far end process rank.
3879 */
3880 static int getFarRank(Channel channel) {
3881 return channel.farEndChannelGroupId();
3882 }
3883
3884 /**
3885 * Get an array of process ranks in the broadcast tree for the given root.
3886 * The broadcast tree is cached in the field myBroadcastTree for later use.
3887 *
3888 * @param root Root process's rank.
3889 * @return Broadcast tree.
3890 */
3891 private synchronized int[] getBroadcastTree(int root) {
3892 if (myBroadcastTree == null) {
3893 myBroadcastTree = new int[mySize][];
3894 }
3895 int[] broadcasttree = myBroadcastTree[root];
3896 if (broadcasttree == null) {
3897 broadcasttree = CommPattern.broadcastPattern(mySize, myRank, root);
3898 myBroadcastTree[root] = broadcasttree;
3899 }
3900 return broadcasttree;
3901 }
3902
3903 }