View Javadoc
1   //******************************************************************************
2   //
3   // File:    Comm.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.Comm
6   //
7   // This Java source file is copyright (C) 2009 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj;
41  
42  import java.io.IOException;
43  import java.io.InterruptedIOException;
44  import java.io.PrintStream;
45  import java.net.InetSocketAddress;
46  import java.util.LinkedList;
47  
48  import edu.rit.mp.Buf;
49  import edu.rit.mp.Channel;
50  import edu.rit.mp.ChannelGroup;
51  import edu.rit.mp.ConnectListener;
52  import edu.rit.mp.IORequest;
53  import edu.rit.mp.IntegerBuf;
54  import edu.rit.mp.ObjectBuf;
55  import edu.rit.mp.Status;
56  import edu.rit.pj.cluster.CommPattern;
57  import edu.rit.pj.cluster.JobBackend;
58  import edu.rit.pj.cluster.JobFrontend;
59  import edu.rit.pj.cluster.JobSchedulerException;
60  import edu.rit.pj.reduction.IntegerOp;
61  import edu.rit.pj.reduction.Op;
62  import edu.rit.util.Range;
63  
64  /**
65   * Class Comm provides a communicator for a PJ cluster parallel program. Class
66   * Comm provides a method to initialize the PJ message passing middleware and
67   * run the parallel program on multiple processors of a cluster parallel
68   * computer. Class Comm also provides methods for passing messages between the
69   * processes of the parallel program.
70   * <HR>
71   * <p>
72   * <B>BASIC CONCEPTS</B>
73   * <p>
74   * A <B>cluster parallel computer</B> typically consists of a <B>frontend
75   * processor</B> and a number of <B>backend processors</B> connected via a
76   * dedicated high-speed network. A user logs into the frontend processor and
77   * runs a PJ program there. The PJ message passing middleware causes copies of
78   * the PJ program to run in a number of separate processes, each process on a
79   * different backend processor. The backend processes run the PJ program, using
80   * the PJ middleware to send messages amongst themselves. The PJ middleware
81   * redirects the backend processes' standard output and standard error streams
82   * to the frontend process. The frontend process does not actually execute the
83   * PJ program, but merely displays all the backend processes' standard output
84   * and standard error streams on the frontend process's own standard output and
85   * standard error.
86   * <p>
87   * For the PJ message passing middleware to work, certain server processes must
88   * be running. See package {@linkplain edu.rit.pj.cluster} for further
89   * informati
90   * on.
91   * <p>
92   * To initialize the PJ message passing middleware, the program must first call
93   * the static <code>Comm.init()</code> method, passing in the command line
94   * arguments.
95   * <p>
96   * A <B>communicator</B> is associated with a group of backend processes. The
97   * communicator's <B>size</B> is the number of processes in the group. Each
98   * process in the communicator has a different <B>rank</B> in the range 0 ..
99   * <I>size</I>-1. A process may obtain the size and rank by calling the
100  * communicator's <code>size()</code> and <code>rank()</code> methods.
101  * <p>
102  * There is one predefined communicator, the <B>world communicator,</B>
103  * consisting of all the backend processes in the parallel program. A process
104  * may obtain a reference to the world communicator by calling the static
105  * <code>Comm.world()</code> method. Typically, the first few lines in a PJ cluster
106  * parallel program look like this:
107  * <PRE>
108  * public class AParallelProgram
109  * {
110  * public static void main
111  * (String[] args)
112  * throws Exception
113  * {
114  * Comm.init (args);
115  * Comm world = Comm.world();
116  * int size = world.size();
117  * int rank = world.rank();
118  * . . .</PRE>
119  * <p>
120  * The number of processes in the parallel program -- that is, the size of the
121  * world communicator -- is specified by the <code>"pj.np"</code> property, which
122  * must be an integer greater than or equal to 1. You can specify the number of
123  * processes on the Java command line like this:
124  * <PRE>
125  * java -Dpj.np=4 . . .</PRE>
126  * <p>
127  * If the <code>"pj.np"</code> property is not specified, the default is 1.
128  * <p>
129  * The PJ program will run on the specified number of backend processors as
130  * described above. To determine which backend processors to use, the PJ program
131  * interacts with a <B>Job Scheduler</B> server process running on the frontend
132  * processor. When the PJ program starts and calls the <code>Comm.init()</code>
133  * method, the middleware first prints the job number on the standard error. The
134  * middleware then waits until the required number of backend processors are
135  * ready to run a job. As each backend processor becomes ready, the middleware
136  * prints on the standard error the name of each backend processor assigned to
137  * the job. Once all are ready, the PJ program starts running on those backend
138  * processors, and all further output comes from the PJ program. Since each PJ
139  * program interacts with the Job Scheduler, the Job Scheduler can ensure that
140  * each backend processor is running a backend process for only one job at a
141  * time.
142  * <p>
143  * Depending on the system load, your PJ program may have to wait in the Job
144  * Scheduler's queue for a while until enough backend processors become ready.
145  * If you get tired of waiting, you can kill your PJ program (e.g., by typing
146  * CTRL-C), which will remove your PJ program from the Job Scheduler's queue.
147  * <p>
148  * The Job Scheduler has a web interface that lets you examine the cluster
149  * status. Just point your web browser at this URL:
150  *
151  * <code>&nbsp;&nbsp;&nbsp;&nbsp;http://&lt;hostname&gt;:8080/</code>
152  * <p>
153  * where <code>&lt;hostname&gt;</code> is replaced by the host name of the frontend
154  * processor. The default port for the cluster status web interface is port
155  * 8080. The Job Scheduler can be configured to use a different port. For
156  * further information, see package {@linkplain edu.rit.pj.cluster}.
157  * <p>
158  * If the PJ program is executed on a host where no Job Scheduler is running,
159  * the PJ program will run in <I>one</I> process on that host (i.e., the machine
160  * you're logged into), rather than on the backend processors. The message
161  * passing methods in class Comm will still work, though. This option can be
162  * useful for debugging a PJ program's logic on a non-parallel machine before
163  * running the PJ program on a cluster.
164  * <HR>
165  * <p>
166  * <B>MESSAGE PASSING</B>
167  * <p>
168  * PJ provides two categories of communication, <B>point-to-point
169  * communication</B> and <B>collective communication.</B> The following methods
170  * of class Comm are used for point-to-point communication:
171  * <UL>
172  * <LI><code>send()</code>
173  * <LI><code>receive()</code>
174  * <LI><code>sendReceive()</code>
175  * <LI><code>floodSend()</code>
176  * <LI><code>floodReceive()</code>
177  * </UL>
178  * The following methods are used for collective communication:
179  * <UL>
180  * <LI><code>broadcast()</code>
181  * <LI><code>scatter()</code>
182  * <LI><code>gather()</code>
183  * <LI><code>allGather()</code>
184  * <LI><code>reduce()</code>
185  * <LI><code>allReduce()</code>
186  * <LI><code>allToAll()</code>
187  * <LI><code>scan()</code>
188  * <LI><code>exclusiveScan()</code>
189  * <LI><code>barrier()</code>
190  * </UL>
191  * These methods are described further in the sections below.
192  * <p>
193  * In addition, you can create a new communicator consisting of all, or a subset
194  * of, the processes in an existing communicator. Message passing in the new
195  * communicator is completely independent of message passing in the original
196  * communicator. The following method creates a new communicator:
197  * <UL>
198  * <LI><code>createComm()</code>
199  * </UL>
200  * <HR>
201  * <p>
202  * <B>POINT-TO-POINT COMMUNICATION</B>
203  * <p>
204  * One process in a PJ cluster parallel program, the <B>source process</B>, may
205  * use a communicator to send a message to another process in the program, the
206  * <B>destination process</B>. This is called a <B>point-to-point
207  * communication</B> because just the two processes are involved (as opposed to
208  * a collective communication, which involves all the processes). Five
209  * point-to-point communication methods are available in this release: send,
210  * receive, send-receive, flood-send, and flood-receive.
211  * <p>
212  * <B>Send and Receive</B>
213  * <p>
214  * To do a point-to-point communication, the source process calls the
215  * <code>send()</code> method on a certain communicator, such as the world
216  * communicator. The source process specifies the destination process's rank,
217  * the <B>tag</B> for the message, and a <B>buffer</B> containing the data items
218  * to be sent (type {@linkplain edu.rit.mp.Buf}). Likewise, the destination
219  * process calls the <code>receive()</code> method on the same communicator as the
220  * source process. The destination process specifies the source process's rank,
221  * the message tag which must be the same as in the source process, and the
222  * buffer for the data items to be received.
223  * <p>
224  * A <code>send()</code> method call and a <code>receive()</code> method call are said
225  * to <B>match</B> if (a) the rank passed to the <code>send()</code> method equals
226  * the rank of the process calling <code>receive()</code>, (b) the rank passed to
227  * the <code>receive()</code> method equals the rank of the process calling
228  * <code>send()</code>, (c) the item data type in the source buffer is the same as
229  * the item data type in the destination buffer, and (d) the send message tag
230  * equals the receive message tag. A <code>receive()</code> method call will block
231  * until a matching <code>send()</code> method call occurs. If more than one
232  * <code>send()</code> method call matches a <code>receive()</code> method call, one of
233  * the matching <code>send()</code> method calls is picked in an unspecified manner.
234  * A <code>send()</code> method call <I>may</I> block until a matching
235  * <code>receive()</code> method call occurs due to flow control in the underlying
236  * network communication.
237  * <p>
238  * The message tag can be used to distinguish different kinds of messages. A
239  * <code>receive()</code> method call will only match a <code>send()</code> method call
240  * with the same tag. If there is no need to distinguish different kinds of
241  * messages, omit the tag (it will default to 0).
242  * <p>
243  * Once a <code>send()</code> method call and a <code>receive()</code> method call have
244  * been matched together, the actual message data transfer takes place. Each
245  * item in the source buffer, starting at index 0 and continuing for the entire
246  * length of the source buffer, is written to the message. At the other end,
247  * each item in the destination buffer, starting at index 0, is read from the
248  * message.
249  * <p>
250  * The <code>receive()</code> method returns a {@linkplain CommStatus} object. The
251  * status object gives the actual rank of the process that sent the message, the
252  * actual message tag that was received, and the actual number of data items in
253  * the message. If the actual number of data items in the message is less than
254  * the length of the destination buffer, nothing is stored into the extra data
255  * items at the end of the destination buffer. If the actual number of data
256  * items in the message is greater than the length of the destination buffer,
257  * the extra data items at the end of the message are discarded.
258  * <p>
259  * The <code>send()</code> method does not return until all the message elements
260  * have been written from the source buffer. Likewise, the <code>receive()</code>
261  * method does not return until all the message elements have been read into the
262  * destination buffer. However, you cannot assume that because the
263  * <code>send()</code> method has returned, the matching <code>receive()</code> method
264  * has also returned. Because of buffering in the underlying network
265  * communication, not all the destination items might have been received even
266  * though all the source items have been sent.
267  * <p>
268  * The destination process, instead of specifying a particular source process,
269  * can declare that it will receive a message from any source process by
270  * specifying null for the source process rank in the <code>receive()</code> method
271  * call. This is called a <B>wildcard source</B>. In this case the
272  * <code>receive()</code> method call's returned status object will indicate the
273  * actual source process that sent the message.
274  * <p>
275  * The destination process, instead of specifying a particular message tag, can
276  * declare that it will receive a message with any tag by specifying null for
277  * the tag in the <code>receive()</code> method call. This is called a <B>wildcard
278  * tag</B>. Alternatively, the destination process can specify a range of
279  * message tags, and it will receive a message with any tag in the given range.
280  * In these cases the <code>receive()</code> method call's returned status object
281  * will indicate the actual message tag that was sent.
282  * <p>
283  * A process can send a message to itself. In this case one thread must call
284  * <code>send()</code> (specifying the process's own rank as the destination) and a
285  * different thread must call <code>receive()</code> (specifying the process's own
286  * rank as the source), otherwise a deadlock will ensue.
287  * <p>
288  * <B>Send-Receive</B>
289  * <p>
290  * By calling the <code>sendReceive()</code> method, a process can send a buffer of
291  * outgoing message items to a destination process while simultaneously
292  * receiving a buffer of incoming message items from a source process. The
293  * destination process may be the same as the source process, or different from
294  * the source process. The outgoing message items must come from a different
295  * place than where the incoming message items will be stored, otherwise the
296  * incoming message items may overwrite the outgoing message items before they
297  * can be sent. When the <code>sendReceive()</code> method returns, the outgoing
298  * message items have been fully sent, but they may not yet have been fully
299  * received; and the incoming message items have been fully received.
300  * <p>
301  * With the <code>sendReceive()</code> method, a process cannot receive a message
302  * from a wildcard source, and a process cannot receive a message with a
303  * wildcard tag or a range of tags. The process calling <code>sendReceive()</code>
304  * must know the rank of the source process and the message tag (if not 0). The
305  * <code>sendReceive()</code> method does return a status object giving the outcome
306  * of the receive half of the send-receive operation, just as the
307  * <code>receive()</code> method does.
308  * <p>
309  * A process can send-receive messages with itself. In this case one thread must
310  * call <code>sendReceive()</code> (specifying the process's own rank as the source
311  * and destination) and a different thread must also call <code>sendReceive()</code>
312  * (specifying the process's own rank as the source and destination), otherwise
313  * a deadlock will ensue.
314  * <p>
315  * <B>Non-Blocking Communication</B>
316  * <p>
317  * The <code>send()</code>, <code>receive()</code>, and <code>sendReceive()</code> methods
318  * each have a non-blocking version. A non-blocking communication method
319  * initiates the communication operation and immediately returns, storing the
320  * state of the communication operation in a {@linkplain CommRequest} object.
321  * The communicator then performs the communication operation in a separate
322  * thread. This allows the calling thread to do other work while the
323  * communication operation is in progress. To wait for the send and receive
324  * operations to finish, call the CommRequest object's <code>waitForFinish()</code>
325  * method.
326  * <p>
327  * <B>Flood-Send and Flood-Receive</B>
328  * <p>
329  * Any process can send a message to all processes in the communicator. This is
330  * called "flooding" the message. First, all processes must start a
331  * flood-receive operation, either by calling the non-blocking
332  * <code>floodReceive()</code> method, or by having a separate thread call the
333  * blocking <code>floodReceive()</code> method. Then, one process (any process) must
334  * call the <code>floodSend()</code> method. The data items in the flood-send
335  * operation's outgoing buffer are copied into the flood-receive operation's
336  * incoming buffer in all processes.
337  * <p>
338  * Message flooding is similar to the "broadcast" collective communication
339  * operation (see below). The differences are these: Broadcasting combines
340  * sending and receiving in a single operation; flooding uses separate send and
341  * receive operations. For broadcasting, every process must know which process
342  * is sending the outgoing data items; for flooding, the receiving processes do
343  * not need to know which process is sending (any process can send).
344  * <HR>
345  * <p>
346  * <B>COLLECTIVE COMMUNICATION</B>
347  * <p>
348  * A PJ cluster parallel program may use a communicator to send a message among
349  * all the processes in the program at the same time. This is called a
350  * <B>collective communication</B> because all the processes in the communicator
351  * are involved (as opposed to a point-to-point communication). Ten collective
352  * communication methods are available in this release: broadcast, scatter,
353  * gather, all-gather, reduce, all-reduce, all-to-all, scan, exclusive-scan, and
354  * barrier. Further collective communication methods will be added to class Comm
355  * in a later release.
356  * <p>
357  * <B>Broadcast</B>
358  * <p>
359  * One process in the communicator, the <B>root</B> process, has a source buffer
360  * (type {@linkplain edu.rit.mp.Buf Buf}) filled with data. The other processes
361  * in the communicator each have a destination buffer with the same length and
362  * the same item data type as the source buffer. Each process calls the
363  * communicator's <code>broadcast()</code> method. Afterwards, all the destination
364  * buffers contain the same data as the source buffer.
365  * <TABLE>
366  * <CAPTION>Before and After Broadcast.</CAPTION>
367  * <TR>
368  * <TD style="vertical-align:top;">
369  * Before:
370  * <pre>
371  * Process   Process   Process   Process
372  * 0 (root)     1         2         3
373  * +----+    +----+    +----+    +----+
374  * |  1 |    |    |    |    |    |    |
375  * |  2 |    |    |    |    |    |    |
376  * |  3 |    |    |    |    |    |    |
377  * |  4 |    |    |    |    |    |    |
378  * |  5 |    |    |    |    |    |    |
379  * |  6 |    |    |    |    |    |    |
380  * |  7 |    |    |    |    |    |    |
381  * |  8 |    |    |    |    |    |    |
382  * +----+    +----+    +----+    +----+</pre>
383  * </TD>
384  * <TD>
385  * <pre>
386  *  ... </pre>
387  * </TD>
388  * <TD style="vertical-align:top;">
389  * After:
390  * <pre>
391  * Process   Process   Process   Process
392  * 0 (root)     1         2         3
393  * +----+    +----+    +----+    +----+
394  * |  1 |    |  1 |    |  1 |    |  1 |
395  * |  2 |    |  2 |    |  2 |    |  2 |
396  * |  3 |    |  3 |    |  3 |    |  3 |
397  * |  4 |    |  4 |    |  4 |    |  4 |
398  * |  5 |    |  5 |    |  5 |    |  5 |
399  * |  6 |    |  6 |    |  6 |    |  6 |
400  * |  7 |    |  7 |    |  7 |    |  7 |
401  * |  8 |    |  8 |    |  8 |    |  8 |
402  * +----+    +----+    +----+    +----+</pre>
403  * </TD>
404  * </TR>
405  * </TABLE>
406  * <I>Note:</I> Any process can be the root of the broadcast. The above is only
407  * one example with process 0 as the root.
408  * <p>
409  * <B>Scatter</B>
410  * <p>
411  * One process in the communicator, the root process, has <I>K</I> source
412  * buffers (type {@linkplain edu.rit.mp.Buf Buf}) filled with data, where
413  * <I>K</I> is the size of the communicator. For example, the source buffers
414  * could be different portions of an array. Each process in the communicator
415  * (including the root process) has a destination buffer with the same length
416  * and the same item data type as the corresponding source buffer. Each process
417  * calls the communicator's <code>scatter()</code> method. Afterwards, each
418  * process's destination buffer contains the same data as the corresponding
419  * source buffer in the root process.
420  * <TABLE>
421  * <CAPTION>Scatter.</CAPTION>
422  * <TR>
423  * <TD style="vertical-align:top;">
424  * Before:
425  * <pre>
426  * Process   Process   Process   Process
427  * 0 (root)     1         2         3
428  * +----+
429  * |  1 |
430  * |  2 |
431  * +----+
432  * |  3 |
433  * |  4 |
434  * +----+
435  * |  5 |
436  * |  6 |
437  * +----+
438  * |  7 |
439  * |  8 |
440  * +----+
441  *
442  * +----+    +----+    +----+    +----+
443  * |    |    |    |    |    |    |    |
444  * |    |    |    |    |    |    |    |
445  * +----+    +----+    +----+    +----+</pre>
446  * </TD>
447  * <TD>
448  * <pre>
449  *  ... </pre>
450  * </TD>
451  * <TD style="vertical-align:top;">
452  * After:
453  * <pre>
454  * Process   Process   Process   Process
455  * 0 (root)     1         2         3
456  * +----+
457  * |  1 |
458  * |  2 |
459  * +----+
460  * |  3 |
461  * |  4 |
462  * +----+
463  * |  5 |
464  * |  6 |
465  * +----+
466  * |  7 |
467  * |  8 |
468  * +----+
469  *
470  * +----+    +----+    +----+    +----+
471  * |  1 |    |  3 |    |  5 |    |  7 |
472  * |  2 |    |  4 |    |  6 |    |  8 |
473  * +----+    +----+    +----+    +----+</pre>
474  * </TD>
475  * </TR>
476  * </TABLE>
477  * In the root process, the destination buffer can be the same as the source
478  * buffer:
479  * <TABLE>
480  * <CAPTION>Scatter</CAPTION>
481  * <TR>
482  * <TD style="vertical-align:top;">
483  * Before:
484  * <pre>
485  * Process   Process   Process   Process
486  * 0 (root)     1         2         3
487  * +----+
488  * |  1 |
489  * |  2 |
490  * +----+    +----+
491  * |  3 |    |    |
492  * |  4 |    |    |
493  * +----+    +----+    +----+
494  * |  5 |              |    |
495  * |  6 |              |    |
496  * +----+              +----+    +----+
497  * |  7 |                        |    |
498  * |  8 |                        |    |
499  * +----+                        +----+</pre>
500  * </TD>
501  * <TD>
502  * <pre>
503  *  ... </pre>
504  * </TD>
505  * <TD style="vertical-align:top;">
506  * After:
507  * <pre>
508  * Process   Process   Process   Process
509  * 0 (root)     1         2         3
510  * +----+
511  * |  1 |
512  * |  2 |
513  * +----+    +----+
514  * |  3 |    |  3 |
515  * |  4 |    |  4 |
516  * +----+    +----+    +----+
517  * |  5 |              |  5 |
518  * |  6 |              |  6 |
519  * +----+              +----+    +----+
520  * |  7 |                        |  7 |
521  * |  8 |                        |  8 |
522  * +----+                        +----+</pre>
523  * </TD>
524  * </TR>
525  * </TABLE>
526  * <I>Note:</I> Any process can be the root of the scatter. The above is only
527  * one example with process 0 as the root.
528  * <p>
529  * <B>Gather</B>
530  * <p>
531  * Gather is the opposite of scatter. One process in the communicator, the root
532  * process, has <I>K</I> destination buffers (type {@linkplain edu.rit.mp.Buf
533  * Buf}), where <I>K</I> is the size of the communicator. For example, the
534  * destination buffers could be different portions of an array. Each process in
535  * the communicator (including the root process) has a source buffer with the
536  * same length and the same item data type as the corresponding destination
537  * buffer, filled with data. Each process calls the communicator's
538  * <code>gather()</code> method. Afterwards, each destination buffer in the root
539  * process contains the same data as the corresponding source buffer.
540  * <TABLE>
541  * <CAPTION>Gather</CAPTION>
542  * <TR>
543  * <TD style="vertical-align:top;">
544  * Before:
545  * <pre>
546  * Process   Process   Process   Process
547  * 0 (root)     1         2         3
548  * +----+    +----+    +----+    +----+
549  * |  1 |    |  3 |    |  5 |    |  7 |
550  * |  2 |    |  4 |    |  6 |    |  8 |
551  * +----+    +----+    +----+    +----+
552  *
553  * +----+
554  * |    |
555  * |    |
556  * +----+
557  * |    |
558  * |    |
559  * +----+
560  * |    |
561  * |    |
562  * +----+
563  * |    |
564  * |    |
565  * +----+</pre>
566  * </TD>
567  * <TD>
568  * <pre>
569  *  ... </pre>
570  * </TD>
571  * <TD style="vertical-align:top;">
572  * After:
573  * <pre>
574  * Process   Process   Process   Process
575  * 0 (root)     1         2         3
576  * +----+    +----+    +----+    +----+
577  * |  1 |    |  3 |    |  5 |    |  7 |
578  * |  2 |    |  4 |    |  6 |    |  8 |
579  * +----+    +----+    +----+    +----+
580  *
581  * +----+
582  * |  1 |
583  * |  2 |
584  * +----+
585  * |  3 |
586  * |  4 |
587  * +----+
588  * |  5 |
589  * |  6 |
590  * +----+
591  * |  7 |
592  * |  8 |
593  * +----+</pre>
594  * </TD>
595  * </TR>
596  * </TABLE>
597  * In the root process, the destination buffer can be the same as the source
598  * buffer:
599  * <TABLE>
600  * <CAPTION>Gather</CAPTION>
601  * <TR>
602  * <TD style="vertical-align:top;">
603  * Before:
604  * <pre>
605  * Process   Process   Process   Process
606  * 0 (root)     1         2         3
607  * +----+
608  * |  1 |
609  * |  2 |
610  * +----+    +----+
611  * |    |    |  3 |
612  * |    |    |  4 |
613  * +----+    +----+    +----+
614  * |    |              |  5 |
615  * |    |              |  6 |
616  * +----+              +----+    +----+
617  * |    |                        |  7 |
618  * |    |                        |  8 |
619  * +----+                        +----+</pre>
620  * </TD>
621  * <TD>
622  * <pre>
623  *  ... </pre>
624  * </TD>
625  * <TD style="vertical-align:top;">
626  * After:
627  * <pre>
628  * Process   Process   Process   Process
629  * 0 (root)     1         2         3
630  * +----+
631  * |  1 |
632  * |  2 |
633  * +----+    +----+
634  * |  3 |    |  3 |
635  * |  4 |    |  4 |
636  * +----+    +----+    +----+
637  * |  5 |              |  5 |
638  * |  6 |              |  6 |
639  * +----+              +----+    +----+
640  * |  7 |                        |  7 |
641  * |  8 |                        |  8 |
642  * +----+                        +----+</pre>
643  * </TD>
644  * </TR>
645  * </TABLE>
646  * <I>Note:</I> Any process can be the root of the gather. The above is only one
647  * example with process 0 as the root.
648  * <p>
649  * <B>All-Gather</B>
650  * <p>
651  * All-gather is the same as gather, except that every process has an array of
652  * destination buffers, and every process receives the results of the gather.
653  * Each process in the communicator has a source buffer (type {@linkplain
654  * edu.rit.mp.Buf Buf}) filled with data. Each process in the communicator has
655  * <I>K</I> destination buffers, where <I>K</I> is the size of the communicator.
656  * For example, the destination buffers could be different portions of an array.
657  * Each destination buffer has the same length and the same item data type as
658  * the corresponding source buffer. Each process calls the communicator's
659  * <code>allGather()</code> method. Afterwards, each destination buffer in every
660  * process contains the same data as the corresponding source buffer.
661  * <TABLE>
662  * <CAPTION>All-Gather</CAPTION>
663  * <TR>
664  * <TD style="vertical-align:top;">
665  * Before:
666  * <pre>
667  * Process   Process   Process   Process
668  * 0         1         2         3
669  * +----+    +----+    +----+    +----+
670  * |  1 |    |  3 |    |  5 |    |  7 |
671  * |  2 |    |  4 |    |  6 |    |  8 |
672  * +----+    +----+    +----+    +----+
673  *
674  * +----+    +----+    +----+    +----+
675  * |    |    |    |    |    |    |    |
676  * |    |    |    |    |    |    |    |
677  * +----+    +----+    +----+    +----+
678  * |    |    |    |    |    |    |    |
679  * |    |    |    |    |    |    |    |
680  * +----+    +----+    +----+    +----+
681  * |    |    |    |    |    |    |    |
682  * |    |    |    |    |    |    |    |
683  * +----+    +----+    +----+    +----+
684  * |    |    |    |    |    |    |    |
685  * |    |    |    |    |    |    |    |
686  * +----+    +----+    +----+    +----+</pre>
687  * </TD>
688  * <TD>
689  * <pre>
690  *  ... </pre>
691  * </TD>
692  * <TD style="vertical-align:top;">
693  * After:
694  * <pre>
695  * Process   Process   Process   Process
696  * 0         1         2         3
697  * +----+    +----+    +----+    +----+
698  * |  1 |    |  3 |    |  5 |    |  7 |
699  * |  2 |    |  4 |    |  6 |    |  8 |
700  * +----+    +----+    +----+    +----+
701  *
702  * +----+    +----+    +----+    +----+
703  * |  1 |    |  1 |    |  1 |    |  1 |
704  * |  2 |    |  2 |    |  2 |    |  2 |
705  * +----+    +----+    +----+    +----+
706  * |  3 |    |  3 |    |  3 |    |  3 |
707  * |  4 |    |  4 |    |  4 |    |  4 |
708  * +----+    +----+    +----+    +----+
709  * |  5 |    |  5 |    |  5 |    |  5 |
710  * |  6 |    |  6 |    |  6 |    |  6 |
711  * +----+    +----+    +----+    +----+
712  * |  7 |    |  7 |    |  7 |    |  7 |
713  * |  8 |    |  8 |    |  8 |    |  8 |
714  * +----+    +----+    +----+    +----+</pre>
715  * </TD>
716  * </TR>
717  * </TABLE>
718  * The destination buffer can be the same as the source buffer in each process:
719  * <TABLE>
720  * <CAPTION>All-Gather</CAPTION>
721  * <TR>
722  * <TD style="vertical-align:top;">
723  * Before:
724  * <pre>
725  * Process   Process   Process   Process
726  * 0         1         2         3
727  * +----+    +----+    +----+    +----+
728  * |  1 |    |    |    |    |    |    |
729  * |  2 |    |    |    |    |    |    |
730  * +----+    +----+    +----+    +----+
731  * |    |    |  3 |    |    |    |    |
732  * |    |    |  4 |    |    |    |    |
733  * +----+    +----+    +----+    +----+
734  * |    |    |    |    |  5 |    |    |
735  * |    |    |    |    |  6 |    |    |
736  * +----+    +----+    +----+    +----+
737  * |    |    |    |    |    |    |  7 |
738  * |    |    |    |    |    |    |  8 |
739  * +----+    +----+    +----+    +----+</pre>
740  * </TD>
741  * <TD>
742  * <pre>
743  *  ... </pre>
744  * </TD>
745  * <TD style="vertical-align:top;">
746  * After:
747  * <pre>
748  * Process   Process   Process   Process
749  * 0         1         2         3
750  * +----+    +----+    +----+    +----+
751  * |  1 |    |  1 |    |  1 |    |  1 |
752  * |  2 |    |  2 |    |  2 |    |  2 |
753  * +----+    +----+    +----+    +----+
754  * |  3 |    |  3 |    |  3 |    |  3 |
755  * |  4 |    |  4 |    |  4 |    |  4 |
756  * +----+    +----+    +----+    +----+
757  * |  5 |    |  5 |    |  5 |    |  5 |
758  * |  6 |    |  6 |    |  6 |    |  6 |
759  * +----+    +----+    +----+    +----+
760  * |  7 |    |  7 |    |  7 |    |  7 |
761  * |  8 |    |  8 |    |  8 |    |  8 |
762  * +----+    +----+    +----+    +----+</pre>
763  * </TD>
764  * </TR>
765  * </TABLE>
766  * <p>
767  * <B>Reduce</B>
768  * <p>
769  * Reduce is like gather, except the buffers' contents are combined together
770  * instead of just copied. Each process in the communicator has a buffer (type
771  * {@linkplain edu.rit.mp.Buf Buf}) filled with data. Each process calls the
772  * communicator's <code>reduce()</code> method, specifying some binary operation
773  * (type {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data.
774  * Afterwards, each element of the buffer in the root process contains the
775  * result of combining all the corresponding elements in all the buffers using
776  * the specified binary operation. For example, if the operation is addition,
777  * each buffer element in the root process ends up being the sum of the
778  * corresponding buffer elements in all the processes. In the non-root
779  * processes, the buffers' contents may be changed from their original contents.
780  * <TABLE>
781  * <CAPTION>Reduce</CAPTION>
782  * <TR>
783  * <TD style="vertical-align:top;">
784  * Before:
785  * <pre>
786  * Process   Process   Process   Process
787  * 0 (root)     1         2         3
788  * +----+    +----+    +----+    +----+
789  * |  1 |    |  3 |    |  5 |    |  7 |
790  * |  2 |    |  4 |    |  6 |    |  8 |
791  * +----+    +----+    +----+    +----+</pre>
792  * </TD>
793  * <TD>
794  * <pre>
795  *  ... </pre>
796  * </TD>
797  * <TD style="vertical-align:top;">
798  * After:
799  * <pre>
800  * Process   Process   Process   Process
801  * 0 (root)     1         2         3
802  * +----+    +----+    +----+    +----+
803  * | 16 |    | ?? |    | ?? |    | ?? |
804  * | 20 |    | ?? |    | ?? |    | ?? |
805  * +----+    +----+    +----+    +----+</pre>
806  * </TD>
807  * </TR>
808  * </TABLE>
809  * <I>Note:</I> Any process can be the root of the reduction. The above is only
810  * one example with process 0 as the root.
811  * <p>
812  * <B>All-Reduce</B>
813  * <p>
814  * All-reduce is the same as reduce, except that every process receives the
815  * results of the reduction. Each process in the communicator has a buffer (type
816  * {@linkplain edu.rit.mp.Buf Buf}) filled with data. Each process calls the
817  * communicator's <code>allReduce()</code> method, specifying some binary operation
818  * (type {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data.
819  * Afterwards, each element of the buffer in each process contains the result of
820  * combining all the corresponding elements in all the buffers using the
821  * specified binary operation. For example, if the operation is addition, each
822  * buffer element ends up being the sum of the corresponding buffer elements.
823  * <TABLE>
824  * <CAPTION>All-Reduce</CAPTION>
825  * <TR>
826  * <TD style="vertical-align:top;">
827  * Before:
828  * <pre>
829  * Process   Process   Process   Process
830  * 0         1         2         3
831  * +----+    +----+    +----+    +----+
832  * |  1 |    |  3 |    |  5 |    |  7 |
833  * |  2 |    |  4 |    |  6 |    |  8 |
834  * +----+    +----+    +----+    +----+</pre>
835  * </TD>
836  * <TD>
837  * <pre>
838  *  ... </pre>
839  * </TD>
840  * <TD style="vertical-align:top;">
841  * After:
842  * <pre>
843  * Process   Process   Process   Process
844  * 0         1         2         3
845  * +----+    +----+    +----+    +----+
846  * | 16 |    | 16 |    | 16 |    | 16 |
847  * | 20 |    | 20 |    | 20 |    | 20 |
848  * +----+    +----+    +----+    +----+</pre>
849  * </TD>
850  * </TR>
851  * </TABLE>
852  * <p>
853  * <B>All-to-All</B>
854  * <p>
855  * Every process in the communicator has <I>K</I> source buffers (type
856  * {@linkplain edu.rit.mp.Buf Buf}) filled with data, where <I>K</I> is the size
857  * of the communicator. Every process in the communicator also has <I>K</I>
858  * destination buffers (type {@linkplain edu.rit.mp.Buf Buf}). The source
859  * buffers and the destination buffers must refer to different storage. For
860  * example, the source buffers could be portions of an array, and the
861  * destination buffers could be portions of a different array. Each process
862  * calls the communicator's <code>allToAll()</code> method. Afterwards, for each
863  * process rank <I>k</I>, 0 &lt;= <I>k</I> &lt;= <I>K</I>-1, and each buffer
864  * index <I>i</I>, 0 &lt;= <I>i</I> &lt;= <I>K</I>-1, destination buffer
865  * <I>i</I> in process <I>k</I> contains the same data as source buffer <I>k</I>
866  * in process <I>i</I>.
867  * <TABLE>
868  * <CAPTION>All-to-All</CAPTION>
869  * <TR>
870  * <TD style="vertical-align:top;">
871  * Before:
872  * <pre>
873  * Process   Process   Process   Process
874  * 0         1         2         3
875  * +----+    +----+    +----+    +----+
876  * |  1 |    |  9 |    | 17 |    | 25 |
877  * |  2 |    | 10 |    | 18 |    | 26 |
878  * +----+    +----+    +----+    +----+
879  * |  3 |    | 11 |    | 19 |    | 27 |
880  * |  4 |    | 12 |    | 20 |    | 28 |
881  * +----+    +----+    +----+    +----+
882  * |  5 |    | 13 |    | 21 |    | 29 |
883  * |  6 |    | 14 |    | 22 |    | 30 |
884  * +----+    +----+    +----+    +----+
885  * |  7 |    | 15 |    | 23 |    | 31 |
886  * |  8 |    | 16 |    | 24 |    | 32 |
887  * +----+    +----+    +----+    +----+
888  *
889  * +----+    +----+    +----+    +----+
890  * |    |    |    |    |    |    |    |
891  * |    |    |    |    |    |    |    |
892  * +----+    +----+    +----+    +----+
893  * |    |    |    |    |    |    |    |
894  * |    |    |    |    |    |    |    |
895  * +----+    +----+    +----+    +----+
896  * |    |    |    |    |    |    |    |
897  * |    |    |    |    |    |    |    |
898  * +----+    +----+    +----+    +----+
899  * |    |    |    |    |    |    |    |
900  * |    |    |    |    |    |    |    |
901  * +----+    +----+    +----+    +----+</pre>
902  * </TD>
903  * <TD>
904  * <pre>
905  *  ... </pre>
906  * </TD>
907  * <TD style="vertical-align:top;">
908  * After:
909  * <pre>
910  * Process   Process   Process   Process
911  * 0         1         2         3
912  * +----+    +----+    +----+    +----+
913  * |  1 |    |  9 |    | 17 |    | 25 |
914  * |  2 |    | 10 |    | 18 |    | 26 |
915  * +----+    +----+    +----+    +----+
916  * |  3 |    | 11 |    | 19 |    | 27 |
917  * |  4 |    | 12 |    | 20 |    | 28 |
918  * +----+    +----+    +----+    +----+
919  * |  5 |    | 13 |    | 21 |    | 29 |
920  * |  6 |    | 14 |    | 22 |    | 30 |
921  * +----+    +----+    +----+    +----+
922  * |  7 |    | 15 |    | 23 |    | 31 |
923  * |  8 |    | 16 |    | 24 |    | 32 |
924  * +----+    +----+    +----+    +----+
925  *
926  * +----+    +----+    +----+    +----+
927  * |  1 |    |  3 |    |  5 |    |  7 |
928  * |  2 |    |  4 |    |  6 |    |  8 |
929  * +----+    +----+    +----+    +----+
930  * |  9 |    | 11 |    | 13 |    | 15 |
931  * | 10 |    | 12 |    | 14 |    | 16 |
932  * +----+    +----+    +----+    +----+
933  * | 17 |    | 19 |    | 21 |    | 23 |
934  * | 18 |    | 20 |    | 22 |    | 24 |
935  * +----+    +----+    +----+    +----+
936  * | 25 |    | 27 |    | 29 |    | 31 |
937  * | 26 |    | 28 |    | 30 |    | 32 |
938  * +----+    +----+    +----+    +----+</pre>
939  * </TD>
940  * </TR>
941  * </TABLE>
942  * <p>
943  * <B>Scan</B>
944  * <p>
945  * Each process in the communicator has a buffer (type {@linkplain
946  * edu.rit.mp.Buf Buf}) filled with data. Each process calls the communicator's
947  * <code>scan()</code> method, specifying some binary operation (type {@linkplain
948  * edu.rit.pj.reduction.Op Op}) for combining the data. Afterwards, each element
949  * of the buffer in a particular process contains the result of combining all
950  * the corresponding elements in its own and all lower-ranked processes' buffers
951  * using the specified binary operation. For example, if the operation is
952  * addition, each buffer element ends up being the sum of its own and all
953  * lower-ranked processes' buffer elements.
954  * <TABLE>
955  * <CAPTION>Scan</CAPTION>
956  * <TR>
957  * <TD style="vertical-align:top;">
958  * Before:
959  * <pre>
960  * Process   Process   Process   Process
961  * 0         1         2         3
962  * +----+    +----+    +----+    +----+
963  * |  1 |    |  3 |    |  5 |    |  7 |
964  * |  2 |    |  4 |    |  6 |    |  8 |
965  * +----+    +----+    +----+    +----+</pre>
966  * </TD>
967  * <TD>
968  * <pre>
969  *  ... </pre>
970  * </TD>
971  * <TD style="vertical-align:top;">
972  * After:
973  * <pre>
974  * Process   Process   Process   Process
975  * 0         1         2         3
976  * +----+    +----+    +----+    +----+
977  * |  1 |    |  4 |    |  9 |    | 16 |
978  * |  2 |    |  6 |    | 12 |    | 20 |
979  * +----+    +----+    +----+    +----+</pre>
980  * </TD>
981  * </TR>
982  * </TABLE>
983  * The scan operation is also known as "prefix scan" or "inclusive prefix scan"
984  * -- "inclusive" because the process's own element is included in the result.
985  * <p>
986  * <B>Exclusive-Scan</B>
987  * <p>
988  * The exclusive-scan operation is a variation of the scan operation. Each
989  * process in the communicator has a buffer (type {@linkplain edu.rit.mp.Buf
990  * Buf}) filled with data. Each process calls the communicator's
991  * <code>exclusiveScan()</code> method, specifying some binary operation (type
992  * {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data, and
993  * specifying an initial data value. Afterwards, each element of the buffer in a
994  * particular process contains the result of combining all the corresponding
995  * elements in all lower-ranked processes' buffers using the specified binary
996  * operation, except in process 0 each element of the buffer contains the
997  * initial data value. For example, if the operation is addition and the initial
998  * data value is 0, each buffer element ends up being the sum of all
999  * lower-ranked processes' buffer elements.
1000  * <TABLE>
1001  * <CAPTION>Exclusive-Scan</CAPTION>
1002  * <TR>
1003  * <TD style="vertical-align:top;">
1004  * Before:
1005  * <pre>
1006  * Process   Process   Process   Process
1007  * 0         1         2         3
1008  * +----+    +----+    +----+    +----+
1009  * |  1 |    |  3 |    |  5 |    |  7 |
1010  * |  2 |    |  4 |    |  6 |    |  8 |
1011  * +----+    +----+    +----+    +----+</pre>
1012  * </TD>
1013  * <TD>
1014  * <pre>
1015  *  ... </pre>
1016  * </TD>
1017  * <TD style="vertical-align:top;">
1018  * After:
1019  * <pre>
1020  * Process   Process   Process   Process
1021  * 0         1         2         3
1022  * +----+    +----+    +----+    +----+
1023  * |  0 |    |  1 |    |  4 |    |  9 |
1024  * |  0 |    |  2 |    |  6 |    | 12 |
1025  * +----+    +----+    +----+    +----+</pre>
1026  * </TD>
1027  * </TR>
1028  * </TABLE>
1029  * This version of the scan operation is also known as "exclusive prefix scan"
1030  * -- "exclusive" because the process's own element is excluded from the result.
1031  * <p>
1032  * <B>Barrier</B>
1033  * <p>
1034  * The barrier operation causes all the processes to synchronize with each
1035  * other. Each process calls the communicator's <code>barrier()</code> method. The
1036  * calling thread blocks until all processes in the communicator have called the
1037  * <code>barrier()</code> method. Then the calling thread unblocks and returns from
1038  * the <code>barrier()</code> method call.
1039  *
1040  * @author Alan Kaminsky
1041  * @version 21-Jan-2009
1042  */
1043 public class Comm {
1044 
1045     // Hidden data members.
1046     // Predefined communicators.
1047     private static Comm theWorldCommunicator;
1048     private static Comm theFrontendCommunicator;
1049 
1050     // This communicator's size, rank, and host.
1051     private int mySize;
1052     private int myRank;
1053     private String myHost;
1054 
1055     // The largest power of 2 less than or equal to this communicator's size.
1056     private int mySizePowerOf2;
1057 
1058     // Channel group for message passing in this communicator.
1059     private ChannelGroup myChannelGroup;
1060 
1061     // Map from rank (array index) to channel group address (array element).
1062     private InetSocketAddress[] myAddressForRank;
1063 
1064     // Map from rank (array index) to channel for communicating with the process
1065     // at that rank (array element).
1066     private Channel[] myChannelForRank;
1067 
1068     // Broadcast trees for flood-send, flood-receive, broadcast, and reduce
1069     // operations, indexed by root.
1070     private int[][] myBroadcastTree;
1071 
1072 // Hidden constructors.
1073 
1074     /**
1075      * Construct a new communicator.
1076      *
1077      * @param size         Communicator's size.
1078      * @param rank         Current process's rank in the communicator.
1079      * @param host         Host name.
1080      * @param channelgroup Channel group for message passing in this
1081      *                     communicator.
1082      * @param address      Map from rank (array index) to channel group address
1083      *                     (array element).
1084      */
1085     private Comm(int size,
1086                  int rank,
1087                  String host,
1088                  ChannelGroup channelgroup,
1089                  InetSocketAddress[] address) {
1090         // Record size, rank, channel group.
1091         mySize = size;
1092         myRank = rank;
1093         myHost = host;
1094         myChannelGroup = channelgroup;
1095 
1096         // Determine the largest power of 2 less than or equal to this
1097         // communicator's size.
1098         int p2 = 1;
1099         while (p2 <= size) {
1100             p2 <<= 1;
1101         }
1102         mySizePowerOf2 = p2 >>> 1;
1103 
1104         // Set channel group ID equal to rank.
1105         myChannelGroup.setChannelGroupId(rank);
1106 
1107         // Set up connect listener.
1108         myChannelGroup.setConnectListener(new ConnectListener() {
1109             public void nearEndConnected(ChannelGroup theChannelGroup,
1110                                          Channel theChannel)
1111                     throws IOException {
1112             }
1113 
1114             public void farEndConnected(ChannelGroup theChannelGroup,
1115                                         Channel theChannel)
1116                     throws IOException {
1117                 doFarEndConnected(theChannel);
1118             }
1119         });
1120 
1121         // Record socket address for each process rank.
1122         myAddressForRank = address;
1123 
1124         // Set up channel for each process rank.
1125         myChannelForRank = new Channel[size];
1126 
1127         // Populate channel at my own rank with the loopback channel.
1128         myChannelForRank[myRank] = channelgroup.loopbackChannel();
1129 
1130         // If there's more than one process, start listening for incoming
1131         // connections.
1132         if (mySize > 1) {
1133             myChannelGroup.startListening();
1134         }
1135     }
1136 
1137 // Exported operations.
1138 
1139     /**
1140      * Initialize the PJ message passing middleware. Certain Java system
1141      * properties specify the middleware's behavior; these properties are
1142      * typically given on the Java command line with the <code>"-D"</code> flag. For
1143      * further information, see class {@linkplain PJProperties}.
1144      *
1145      * @param args Command line arguments.
1146      * @throws java.lang.NullPointerException     (unchecked exception) Thrown if
1147      *                                  <code>args</code> is null.
1148      * @throws java.lang.IllegalArgumentException (unchecked exception) Thrown if the
1149      *                                  value of one of the Java system properties is illegal.
1150      * @throws java.io.IOException              Thrown if an I/O error occurred.
1151      */
1152     public static void init(String[] args)
1153             throws IOException {
1154         // Verify preconditions.
1155         if (args == null) {
1156             throw new NullPointerException("Comm.init(): args is null");
1157         }
1158 
1159         // Get the job backend object.
1160         JobBackend backend = JobBackend.getJobBackend();
1161 
1162         if (backend == null) {
1163             // We're running on the frontend processor.
1164 
1165             // Prepare constructor arguments for the Job Frontend object.
1166             String username = System.getProperty("user.name");
1167             int Nn = PJProperties.getPjNn();
1168             int Np = PJProperties.getPjNp();
1169             int Nt = PJProperties.getPjNt();
1170             boolean hasFrontendComm = false;
1171 
1172             // Examine the call stack to find the main program class name.
1173             StackTraceElement[] stack
1174                     = Thread.currentThread().getStackTrace();
1175             StackTraceElement bottom = stack[stack.length - 1];
1176 
1177             if (!bottom.getMethodName().equals("main")) {
1178                 // throw new IllegalStateException("Comm.init(): Not called from main program, but from " + bottom.getMethodName());
1179                 // Set up world communicator in this process.
1180                 theWorldCommunicator
1181                         = new Comm(/*size        */1,
1182                         /*rank        */ 0,
1183                         /*host        */ "<unknown>",
1184                         /*channelgroup*/ new ChannelGroup(),
1185                         /*address     */
1186                         new InetSocketAddress[]{new InetSocketAddress(0)});
1187                 return;
1188             }
1189 
1190             String mainClassName = bottom.getClassName();
1191 
1192             // Set up the Job Frontend object.
1193             JobFrontend frontend = null;
1194             try {
1195                 frontend
1196                         = new JobFrontend(username, Nn, Np, Nt, hasFrontendComm, mainClassName,
1197                         args);
1198 
1199                 // We were able to contact the Job Scheduler.
1200                 // Run the job frontend in this process, then exit.
1201                 frontend.run();
1202                 System.exit(0);
1203             } catch (JobSchedulerException exc) {
1204                 // We were not able to contact the Job Scheduler.
1205                 // System.err.println(" No Job Scheduler at " + PJProperties.getPjHost() + ":" + PJProperties.getPjPort() + ", running in this (one) process");
1206 
1207                 // Set up world communicator.
1208                 theWorldCommunicator
1209                         = new Comm(/*size        */1,
1210                         /*rank        */ 0,
1211                         /*host        */ "<unknown>",
1212                         /*channelgroup*/ new ChannelGroup(),
1213                         /*address     */
1214                         new InetSocketAddress[]{new InetSocketAddress(0)});
1215             }
1216         } else {
1217             // We're running on a backend processor.
1218 
1219             // Set up world communicator.
1220             theWorldCommunicator
1221                     = new Comm(/*size        */backend.getK(),
1222                     /*rank        */ backend.getRank(),
1223                     /*host        */ backend.getBackendHost(),
1224                     /*channelgroup*/ backend.getWorldChannelGroup(),
1225                     /*address     */ backend.getWorldAddress());
1226         }
1227     }
1228 
1229     /**
1230      * Obtain a reference to the world communicator.
1231      *
1232      * @return World communicator.
1233      * @throws java.lang.IllegalStateException (unchecked exception) Thrown if
1234      *                               <code>Comm.init()</code> has not been called. Thrown if <code>world()</code> is
1235      *                               called in the job frontend process; the world communicator does not exist
1236      *                               in the job frontend process.
1237      */
1238     public static Comm world() {
1239         if (theWorldCommunicator != null) {
1240             return theWorldCommunicator;
1241         } else if (JobBackend.getJobBackend() != null) {
1242             throw new IllegalStateException("Comm.world(): Didn't call Comm.init()");
1243         } else {
1244             throw new IllegalStateException("Comm.world(): World communicator doesn't exist in job frontend process");
1245         }
1246     }
1247 
1248     /**
1249      * Obtain the number of processes in this communicator.
1250      *
1251      * @return Size.
1252      */
1253     public int size() {
1254         return mySize;
1255     }
1256 
1257     /**
1258      * Obtain the current process's rank in this communicator.
1259      *
1260      * @return Rank.
1261      */
1262     public int rank() {
1263         return myRank;
1264     }
1265 
1266     /**
1267      * Obtain the host name of this communicator's backend processor. If this
1268      * communicator is not running on a cluster backend processor, the host name
1269      * is <code>"&lt;unknown&gt;"</code>.
1270      *
1271      * @return Host name.
1272      */
1273     public String host() {
1274         return myHost;
1275     }
1276 
1277     /**
1278      * Create a new communicator. <I>Every</I> process in this communicator must
1279      * call the <code>createComm()</code> method. Each process passes true or false
1280      * for the <code>participate</code> argument to state whether the process will
1281      * participate in the new communicator. At least one process must
1282      * participate in the new communicator. Messages to set up the new
1283      * communicator are sent to all processes in this communicator, using a
1284      * message tag of 0.
1285      * <p>
1286      * In processes participating in the new communicator, the new communicator
1287      * is returned. The participating processes appear in the same order by rank
1288      * in the new communicator as in this communicator. The process can call the
1289      * new communicator's <code>rank()</code> method to determine the process's rank
1290      * in the new communicator.
1291      * <p>
1292      * In processes not participating in the new communicator, null is returned.
1293      *
1294      * @param participate True if this process will participate in the new
1295      *                    communicator; false otherwise.
1296      * @return New communicator if this process will participate in the new
1297      * communicator; null otherwise.
1298      * @throws java.io.IOException Thrown if an I/O error occurred.
1299      */
1300     public Comm createComm(boolean participate)
1301             throws IOException {
1302         return createComm(participate, 0);
1303     }
1304 
1305     /**
1306      * Create a new communicator. <I>Every</I> process in this communicator must
1307      * call the <code>createComm()</code> method. Each process passes true or false
1308      * for the <code>participate</code> argument to state whether the process will
1309      * participate in the new communicator. At least one process must
1310      * participate in the new communicator. Messages to set up the new
1311      * communicator are sent to all processes in this communicator, using the
1312      * given message tag.
1313      * <p>
1314      * In processes participating in the new communicator, the new communicator
1315      * is returned. The participating processes appear in the same order by rank
1316      * in the new communicator as in this communicator. The process can call the
1317      * new communicator's <code>rank()</code> method to determine the process's rank
1318      * in the new communicator.
1319      * <p>
1320      * In processes not participating in the new communicator, null is returned.
1321      *
1322      * @param participate True if this process will participate in the new
1323      *                    communicator; false otherwise.
1324      * @param tag         Message tag.
1325      * @return New communicator if this process will participate in the new
1326      * communicator; null otherwise.
1327      * @throws java.io.IOException Thrown if an I/O error occurred.
1328      */
1329     public Comm createComm(boolean participate,
1330                            int tag)
1331             throws IOException {
1332         // Set up array of socket addresses for all processes.
1333         InetSocketAddress[] address = new InetSocketAddress[mySize];
1334         ObjectBuf<InetSocketAddress>[] addressbuf
1335                 = ObjectBuf.sliceBuffers(address,
1336                 new Range(0, mySize - 1).subranges(mySize));
1337 
1338         // Create channel group for new communicator, if participating.
1339         ChannelGroup channelgroup = null;
1340         InetSocketAddress myaddress = null;
1341         if (participate) {
1342             channelgroup
1343                     = new ChannelGroup(new InetSocketAddress(myChannelGroup.listenAddress().getAddress(), 0));
1344             myaddress = channelgroup.listenAddress();
1345             address[myRank] = myaddress;
1346         }
1347 
1348         // All-gather channel group socket addresses into every process.
1349         allGather(tag, addressbuf[myRank], addressbuf);
1350 
1351         // Close up gaps in the socket address array if any.
1352         int off = 0;
1353         int newsize = 0;
1354         int newrank = -1;
1355         for (int i = 0; i < mySize; ++i) {
1356             if (address[i] == null) {
1357                 ++off;
1358             } else {
1359                 if (i == myRank) {
1360                     newrank = i - off;
1361                 }
1362                 address[i - off] = address[i];
1363                 ++newsize;
1364             }
1365         }
1366 
1367         // Verify size of new communicator.
1368         if (newsize == 0) {
1369             throw new IOException("Comm.createComm(): No processes in communicator");
1370         }
1371 
1372         // Return new communicator if participating.
1373         if (participate) {
1374             return new Comm(newsize, newrank, myHost, channelgroup, address);
1375         } // Return null if not participating.
1376         else {
1377             return null;
1378         }
1379     }
1380 
1381     /**
1382      * Send a message to the process at the given rank in this communicator. The
1383      * message uses a tag of 0. The message items come from the given buffer. To
1384      * receive the message, the destination process must call the
1385      * <code>receive()</code> method. When the <code>send()</code> method returns, the
1386      * message has been fully sent, but it may not yet have been fully received.
1387      * <p>
1388      * A process can send a message to itself; in this case a different thread
1389      * must call the <code>receive()</code> method on this communicator.
1390      *
1391      * @param toRank Destination process's rank in this communicator.
1392      * @param buffer Buffer of data items to be sent.
1393      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1394      *                                   <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1395      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1396      *                                   <code>buffer</code> is null.
1397      * @throws java.io.IOException               Thrown if an I/O error occurred.
1398      */
1399     public void send(int toRank,
1400                      Buf buffer)
1401             throws IOException {
1402         send(toRank, 0, buffer);
1403     }
1404 
1405     /**
1406      * Send a message to the process at the given rank in this communicator with
1407      * the given message tag. The message items come from the given buffer. To
1408      * receive the message, the destination process must call the
1409      * <code>receive()</code> method. When the <code>send()</code> method returns, the
1410      * message has been fully sent, but it may not yet have been fully received.
1411      * <p>
1412      * A process can send a message to itself; in this case a different thread
1413      * must call the <code>receive()</code> method on this communicator.
1414      *
1415      * @param toRank Destination process's rank in this communicator.
1416      * @param tag    Message tag.
1417      * @param buffer Buffer of data items to be sent.
1418      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1419      *                                   <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1420      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1421      *                                   <code>buffer</code> is null.
1422      * @throws java.io.IOException               Thrown if an I/O error occurred.
1423      */
1424     public void send(int toRank,
1425                      int tag,
1426                      Buf buffer)
1427             throws IOException {
1428         myChannelGroup.send(getChannel(toRank), tag, buffer);
1429     }
1430 
1431     /**
1432      * Send a message to the process at the given rank in this communicator
1433      * (non-blocking). A message tag of 0 is used. The message items come from
1434      * the given buffer. To receive the message, the destination process must
1435      * call the <code>receive()</code> method.
1436      * <p>
1437      * The <code>send()</code> method initiates the send operation and immediately
1438      * returns a {@linkplain CommRequest} object. The send operation is
1439      * performed by a separate thread. To wait for the send operation to finish,
1440      * call the returned {@linkplain CommRequest} object's
1441      * <code>waitForFinish()</code> method. When that method returns, the message
1442      * has been fully sent, but it may not yet have been fully received.
1443      * <p>
1444      * A process can send a message to itself; in this case a different thread
1445      * must call the <code>receive()</code> method on this communicator.
1446      *
1447      * @param toRank  Destination process's rank in this communicator.
1448      * @param buffer  Buffer of data items to be sent.
1449      * @param request CommRequest object to use to wait for the operation to
1450      *                finish; in this case <code>request</code> is returned. If
1451      *                <code>request</code> is null, a new CommRequest object is created and
1452      *                returned.
1453      * @return CommRequest object to use to wait for the operation to finish.
1454      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1455      *                                   <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1456      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1457      *                                   <code>buffer</code> is null.
1458      * @throws java.io.IOException               Thrown if an I/O error occurred.
1459      */
1460     public CommRequest send(int toRank,
1461                             Buf buffer,
1462                             CommRequest request)
1463             throws IOException {
1464         return send(toRank, 0, buffer, request);
1465     }
1466 
1467     /**
1468      * Send a message to the process at the given rank in this communicator with
1469      * the given message tag (non-blocking). The message items come from the
1470      * given buffer. To receive the message, the destination process must call
1471      * the <code>receive()</code> method.
1472      * <p>
1473      * The <code>send()</code> method initiates the send operation and immediately
1474      * returns a {@linkplain CommRequest} object. The send operation is
1475      * performed by a separate thread. To wait for the send operation to finish,
1476      * call the returned {@linkplain CommRequest} object's
1477      * <code>waitForFinish()</code> method. When that method returns, the message
1478      * has been fully sent, but it may not yet have been fully received.
1479      * <p>
1480      * A process can send a message to itself; in this case a different thread
1481      * must call the <code>receive()</code> method on this communicator.
1482      *
1483      * @param toRank  Destination process's rank in this communicator.
1484      * @param tag     Message tag.
1485      * @param buffer  Buffer of data items to be sent.
1486      * @param request CommRequest object to use to wait for the operation to
1487      *                finish; in this case <code>request</code> is returned. If
1488      *                <code>request</code> is null, a new CommRequest object is created and
1489      *                returned.
1490      * @return CommRequest object to use to wait for the operation to finish.
1491      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1492      *                                   <code>toRank</code> is not in the range 0 .. <code>size()</code>-1.
1493      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1494      *                                   <code>buffer</code> is null.
1495      * @throws java.io.IOException               Thrown if an I/O error occurred.
1496      */
1497     public CommRequest send(int toRank,
1498                             int tag,
1499                             Buf buffer,
1500                             CommRequest request)
1501             throws IOException {
1502         // Set up CommRequest object.
1503         CommRequest req = request == null ? new CommRequest() : request;
1504 
1505         // Send message (non-blocking).
1506         req.mySendRequest = new IORequest();
1507         req.myRecvRequest = null;
1508         myChannelGroup.sendNoWait(getChannel(toRank), tag, buffer, req.mySendRequest);
1509 
1510         // Return CommRequest object.
1511         return req;
1512     }
1513 
1514     /**
1515      * Receive a message from the process at the given rank in this
1516      * communicator. If <code>rank</code> is null, a message will be received from
1517      * any process in this communicator. The message must have a tag of 0. The
1518      * received message items are stored in the given buffer. To send the
1519      * message, the source process must call the <code>send()</code> method. When
1520      * the <code>receive()</code> method returns, the message has been fully
1521      * received.
1522      * <p>
1523      * A {@linkplain CommStatus} object is returned. The status object gives the
1524      * actual rank of the process that sent the message, the actual message tag
1525      * that was received, and the actual number of data items in the message. If
1526      * the actual number of data items in the message is less than the length of
1527      * the buffer, nothing is stored into the extra data items at the end of the
1528      * buffer. If the actual number of data items in the message is greater than
1529      * the length of the buffer, the extra data items at the end of the message
1530      * are discarded.
1531      * <p>
1532      * A process can receive a message from itself; in this case a different
1533      * thread must call the <code>send()</code> method on this communicator.
1534      *
1535      * @param fromRank Source process's rank in this communicator, or null to
1536      *                 receive from any process.
1537      * @param buffer   Buffer of data items to be received.
1538      * @return Status object giving the outcome of the message reception.
1539      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1540      *                                   <code>fromRank</code> is not null and is not in the range 0 ..
1541      *                                   <code>size()</code>-1.
1542      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1543      *                                   <code>buffer</code> is null.
1544      * @throws java.io.IOException               Thrown if an I/O error occurred.
1545      */
1546     public CommStatus receive(Integer fromRank,
1547                               Buf buffer)
1548             throws IOException {
1549         return receive(fromRank, 0, buffer);
1550     }
1551 
1552     /**
1553      * Receive a message from the process at the given rank in this communicator
1554      * with the given message tag. If <code>rank</code> is null, a message will be
1555      * received from any process in this communicator. The received message
1556      * items are stored in the given buffer. To send the message, the source
1557      * process must call the <code>send()</code> method. When the <code>receive()</code>
1558      * method returns, the message has been fully received.
1559      * <p>
1560      * A {@linkplain CommStatus} object is returned. The status object gives the
1561      * actual rank of the process that sent the message, the actual message tag
1562      * that was received, and the actual number of data items in the message. If
1563      * the actual number of data items in the message is less than the length of
1564      * the buffer, nothing is stored into the extra data items at the end of the
1565      * buffer. If the actual number of data items in the message is greater than
1566      * the length of the buffer, the extra data items at the end of the message
1567      * are discarded.
1568      * <p>
1569      * A process can receive a message from itself; in this case a different
1570      * thread must call the <code>send()</code> method on this communicator.
1571      *
1572      * @param fromRank Source process's rank in this communicator, or null to
1573      *                 receive from any process.
1574      * @param tag      Message tag.
1575      * @param buffer   Buffer of data items to be received.
1576      * @return Status object giving the outcome of the message reception.
1577      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1578      *                                   <code>fromRank</code> is not null and is not in the range 0 ..
1579      *                                   <code>size()</code>-1.
1580      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1581      *                                   <code>buffer</code> is null.
1582      * @throws java.io.IOException               Thrown if an I/O error occurred.
1583      */
1584     public CommStatus receive(Integer fromRank,
1585                               int tag,
1586                               Buf buffer)
1587             throws IOException {
1588         Status status;
1589 
1590         // If source is a wildcard, ensure a channel to every process, then
1591         // receive from any process.
1592         if (fromRank == null) {
1593             for (int src = 0; src < mySize; ++src) {
1594                 ensureChannel(src);
1595             }
1596             status = myChannelGroup.receive(null, tag, buffer);
1597         } // If source is not a wildcard, receive from that process.
1598         else {
1599             status
1600                     = myChannelGroup.receive(getChannel(fromRank), tag, buffer);
1601         }
1602 
1603         // Return CommStatus object.
1604         return new CommStatus(getFarRank(status.channel),
1605                 status.tag,
1606                 status.length);
1607     }
1608 
1609     /**
1610      * Receive a message from the process at the given rank in this communicator
1611      * with the given message tag range. If <code>rank</code> is null, a message
1612      * will be received from any process in this communicator. If
1613      * <code>tagRange</code> is null, a message will be received with any tag. If
1614      * <code>tagRange</code> is not null, a message will be received with any tag in
1615      * the given range. The received message items are stored in the given
1616      * buffer. To send the message, the source process must call the
1617      * <code>send()</code> method. When the <code>receive()</code> method returns, the
1618      * message has been fully received.
1619      * <p>
1620      * A {@linkplain CommStatus} object is returned. The status object gives the
1621      * actual rank of the process that sent the message, the actual message tag
1622      * that was received, and the actual number of data items in the message. If
1623      * the actual number of data items in the message is less than the length of
1624      * the buffer, nothing is stored into the extra data items at the end of the
1625      * buffer. If the actual number of data items in the message is greater than
1626      * the length of the buffer, the extra data items at the end of the message
1627      * are discarded.
1628      * <p>
1629      * A process can receive a message from itself; in this case a different
1630      * thread must call the <code>send()</code> method on this communicator.
1631      *
1632      * @param fromRank Source process's rank in this communicator, or null to
1633      *                 receive from any process.
1634      * @param tagRange Message tag range, or null to receive any tag.
1635      * @param buffer   Buffer of data items to be received.
1636      * @return Status object giving the outcome of the message reception.
1637      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1638      *                                   <code>fromRank</code> is not null and is not in the range 0 ..
1639      *                                   <code>size()</code>-1.
1640      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1641      *                                   <code>buffer</code> is null.
1642      * @throws java.io.IOException               Thrown if an I/O error occurred.
1643      */
1644     public CommStatus receive(Integer fromRank,
1645                               Range tagRange,
1646                               Buf buffer)
1647             throws IOException {
1648         Status status;
1649 
1650         // If source is a wildcard, ensure a channel to every process, then
1651         // receive from any process.
1652         if (fromRank == null) {
1653             for (int src = 0; src < mySize; ++src) {
1654                 ensureChannel(src);
1655             }
1656             status = myChannelGroup.receive(null, tagRange, buffer);
1657         } // If source is not a wildcard, receive from that process.
1658         else {
1659             status = myChannelGroup.receive(getChannel(fromRank), tagRange, buffer);
1660         }
1661 
1662         // Return CommStatus object.
1663         return new CommStatus(getFarRank(status.channel),
1664                 status.tag,
1665                 status.length);
1666     }
1667 
1668     /**
1669      * Receive a message from the process at the given rank in this communicator
1670      * (non-blocking). If <code>rank</code> is null, a message will be received from
1671      * any process in this communicator. The message must have a tag of 0. The
1672      * received message items are stored in the given buffer. To send the
1673      * message, the source process must call the <code>send()</code> method.
1674      * <p>
1675      * The <code>receive()</code> method initiates the receive operation and
1676      * immediately returns a {@linkplain CommRequest} object. The receive
1677      * operation is performed by a separate thread. To wait for the receive
1678      * operation to finish, call the returned {@linkplain CommRequest} object's
1679      * <code>waitForFinish()</code> method. When that method returns, the incoming
1680      * message items have been fully received.
1681      * <p>
1682      * A process can receive a message from itself; in this case a different
1683      * thread must call the <code>send()</code> method on this communicator.
1684      *
1685      * @param fromRank Source process's rank in this communicator, or null to
1686      *                 receive from any process.
1687      * @param buffer   Buffer of data items to be received.
1688      * @param request  CommRequest object to use to wait for the operation to
1689      *                 finish; in this case <code>request</code> is returned. If
1690      *                 <code>request</code> is null, a new CommRequest object is created and
1691      *                 returned.
1692      * @return CommRequest object to use to wait for the operation to finish.
1693      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1694      *                                   <code>fromRank</code> is not null and is not in the range 0 ..
1695      *                                   <code>size()</code>-1.
1696      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1697      *                                   <code>buffer</code> is null.
1698      * @throws java.io.IOException               Thrown if an I/O error occurred.
1699      */
1700     public CommRequest receive(Integer fromRank,
1701                                Buf buffer,
1702                                CommRequest request)
1703             throws IOException {
1704         return receive(fromRank, 0, buffer, request);
1705     }
1706 
1707     /**
1708      * Receive a message from the process at the given rank in this communicator
1709      * with the given message tag (non-blocking). If <code>rank</code> is null, a
1710      * message will be received from any process in this communicator. If
1711      * <code>tag</code> is null, a message will be received with any tag. The
1712      * received message items are stored in the given buffer. To send the
1713      * message, the source process must call the <code>send()</code> method.
1714      * <p>
1715      * The <code>receive()</code> method initiates the receive operation and
1716      * immediately returns a {@linkplain CommRequest} object. The receive
1717      * operation is performed by a separate thread. To wait for the receive
1718      * operation to finish, call the returned {@linkplain CommRequest} object's
1719      * <code>waitForFinish()</code> method. When that method returns, the incoming
1720      * message items have been fully received.
1721      * <p>
1722      * A process can receive a message from itself; in this case a different
1723      * thread must call the <code>send()</code> method on this communicator.
1724      *
1725      * @param fromRank Source process's rank in this communicator, or null to
1726      *                 receive from any process.
1727      * @param tag      Message tag.
1728      * @param buffer   Buffer of data items to be received.
1729      * @param request  CommRequest object to use to wait for the operation to
1730      *                 finish; in this case <code>request</code> is returned. If
1731      *                 <code>request</code> is null, a new CommRequest object is created and
1732      *                 returned.
1733      * @return CommRequest object to use to wait for the operation to finish.
1734      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1735      *                                   <code>fromRank</code> is not null and is not in the range 0 ..
1736      *                                   <code>size()</code>-1.
1737      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1738      *                                   <code>buffer</code> is null.
1739      * @throws java.io.IOException               Thrown if an I/O error occurred.
1740      */
1741     public CommRequest receive(Integer fromRank,
1742                                int tag,
1743                                Buf buffer,
1744                                CommRequest request)
1745             throws IOException {
1746         // Set up CommRequest object.
1747         CommRequest req = request == null ? new CommRequest() : request;
1748         req.mySendRequest = null;
1749         req.myRecvRequest = new IORequest();
1750 
1751         // If source is a wildcard, ensure a channel to every process, then
1752         // receive (non-blocking) from any process.
1753         if (fromRank == null) {
1754             for (int src = 0; src < mySize; ++src) {
1755                 ensureChannel(src);
1756             }
1757             myChannelGroup.receiveNoWait(null, tag, buffer, req.myRecvRequest);
1758         } // If source is not a wildcard, receive (non-blocking) from that
1759         // process.
1760         else {
1761             myChannelGroup.receiveNoWait(getChannel(fromRank), tag, buffer, req.myRecvRequest);
1762         }
1763 
1764         // Return CommRequest object.
1765         return req;
1766     }
1767 
1768     /**
1769      * Receive a message from the process at the given rank in this communicator
1770      * with the given message tag range (non-blocking). If <code>rank</code> is
1771      * null, a message will be received from any process in this communicator.
1772      * If <code>tagRange</code> is null, a message will be received with any tag. If
1773      * <code>tagRange</code> is not null, a message will be received with any tag in
1774      * the given range. The received message items are stored in the given
1775      * buffer. To send the message, the source process must call the
1776      * <code>send()</code> method.
1777      * <p>
1778      * The <code>receive()</code> method initiates the receive operation and
1779      * immediately returns a {@linkplain CommRequest} object. The receive
1780      * operation is performed by a separate thread. To wait for the receive
1781      * operation to finish, call the returned {@linkplain CommRequest} object's
1782      * <code>waitForFinish()</code> method. When that method returns, the incoming
1783      * message items have been fully received.
1784      * <p>
1785      * A process can receive a message from itself; in this case a different
1786      * thread must call the <code>send()</code> method on this communicator.
1787      *
1788      * @param fromRank Source process's rank in this communicator, or null to
1789      *                 receive from any process.
1790      * @param tagRange Message tag range, or null to receive any tag.
1791      * @param buffer   Buffer of data items to be received.
1792      * @param request  CommRequest object to use to wait for the operation to
1793      *                 finish; in this case <code>request</code> is returned. If
1794      *                 <code>request</code> is null, a new CommRequest object is created and
1795      *                 returned.
1796      * @return CommRequest object to use to wait for the operation to finish.
1797      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1798      *                                   <code>fromRank</code> is not null and is not in the range 0 ..
1799      *                                   <code>size()</code>-1.
1800      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1801      *                                   <code>buffer</code> is null.
1802      * @throws java.io.IOException               Thrown if an I/O error occurred.
1803      */
1804     public CommRequest receive(Integer fromRank,
1805                                Range tagRange,
1806                                Buf buffer,
1807                                CommRequest request)
1808             throws IOException {
1809         // Set up CommRequest object.
1810         CommRequest req = request == null ? new CommRequest() : request;
1811         req.mySendRequest = null;
1812         req.myRecvRequest = new IORequest();
1813 
1814         // If source is a wildcard, ensure a channel to every process, then
1815         // receive (non-blocking) from any process.
1816         if (fromRank == null) {
1817             for (int src = 0; src < mySize; ++src) {
1818                 ensureChannel(src);
1819             }
1820             myChannelGroup.receiveNoWait(null, tagRange, buffer, req.myRecvRequest);
1821         } // If source is not a wildcard, receive (non-blocking) from that
1822         // process.
1823         else {
1824             myChannelGroup.receiveNoWait(getChannel(fromRank), tagRange, buffer, req.myRecvRequest);
1825         }
1826 
1827         // Return CommRequest object.
1828         return req;
1829     }
1830 
1831     /**
1832      * Send a message to the process at the given rank in this communicator, and
1833      * receive a message from the process at the given rank in this
1834      * communicator. A message tag of 0 is used. The outgoing message items come
1835      * from the buffer <code>sendbuf</code>. The incoming message items go into the
1836      * buffer <code>recvbuf</code>. The outgoing message items must come from a
1837      * different place than where the incoming message items will be stored. The
1838      * destination process (process <code>toRank</code>) must call a method to
1839      * receive this process's outgoing message items. The source process
1840      * (process <code>fromRank</code>) must call a method to send this process's
1841      * incoming message items. When the <code>sendReceive()</code> method returns,
1842      * the outgoing message items have been fully sent, but they may not yet
1843      * have been fully received; and the incoming message items have been fully
1844      * received.
1845      * <p>
1846      * A {@linkplain CommStatus} object is returned giving the results of the
1847      * receive half of the operation. The status object gives the rank of the
1848      * process that sent the incoming message, the message tag that was
1849      * received, and the actual number of data items in the message. If the
1850      * actual number of data items in the message is less than the length of the
1851      * receive buffer, nothing is stored into the extra data items at the end of
1852      * the receive buffer. If the actual number of data items in the message is
1853      * greater than the length of the receive buffer, the extra data items at
1854      * the end of the message are discarded.
1855      * <p>
1856      * A process can send-receive messages with itself; in this case a different
1857      * thread must call the <code>sendReceive()</code> method on this communicator.
1858      *
1859      * @param toRank   Destination process's rank in this communicator.
1860      * @param sendBuf  Buffer of data items to be sent.
1861      * @param fromRank Source process's rank in this communicator.
1862      * @param recvBuf  Buffer of data items to be received.
1863      * @return Status object giving the outcome of the message reception.
1864      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1865      *                                   <code>toRank</code> or <code>fromRank</code>
1866      *                                   is not in the range 0 .. <code>size()</code>-1.
1867      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1868      *                                   <code>sendBuf</code> or <code>recvBuf</code>
1869      *                                   is null.
1870      * @throws java.io.IOException               Thrown if an I/O error occurred.
1871      */
1872     public CommStatus sendReceive(int toRank,
1873                                   Buf sendBuf,
1874                                   int fromRank,
1875                                   Buf recvBuf)
1876             throws IOException {
1877         return sendReceive(toRank, 0, sendBuf, fromRank, 0, recvBuf);
1878     }
1879 
1880     /**
1881      * Send a message to the process at the given rank in this communicator with
1882      * the given message tag, and receive a message from the process at the
1883      * given rank in this communicator with the given message tag. The outgoing
1884      * message items come from the buffer <code>sendbuf</code>. The incoming message
1885      * items go into the buffer <code>recvbuf</code>. The outgoing message items
1886      * must come from a different place than where the incoming message items
1887      * will be stored. The destination process (process <code>toRank</code>) must
1888      * call a method to receive this process's outgoing message items. The
1889      * source process (process <code>fromRank</code>) must call a method to send
1890      * this process's incoming message items. When the <code>sendReceive()</code>
1891      * method returns, the outgoing message items have been fully sent, but they
1892      * may not yet have been fully received; and the incoming message items have
1893      * been fully received.
1894      * <p>
1895      * A {@linkplain CommStatus} object is returned giving the results of the
1896      * receive half of the operation. The status object gives the rank of the
1897      * process that sent the incoming message, the message tag that was
1898      * received, and the actual number of data items in the message. If the
1899      * actual number of data items in the message is less than the length of the
1900      * receive buffer, nothing is stored into the extra data items at the end of
1901      * the receive buffer. If the actual number of data items in the message is
1902      * greater than the length of the receive buffer, the extra data items at
1903      * the end of the message are discarded.
1904      * <p>
1905      * A process can send-receive messages with itself; in this case a different
1906      * thread must call the <code>sendReceive()</code> method on this communicator.
1907      *
1908      * @param toRank   Destination process's rank in this communicator.
1909      * @param sendTag  Message tag for outgoing message.
1910      * @param sendBuf  Buffer of data items to be sent.
1911      * @param fromRank Source process's rank in this communicator.
1912      * @param recvTag  Message tag for incoming message.
1913      * @param recvBuf  Buffer of data items to be received.
1914      * @return Status object giving the outcome of the message reception.
1915      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1916      *                                   <code>toRank</code> or <code>fromRank</code>
1917      *                                   is not in the range 0 .. <code>size()</code>-1.
1918      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1919      *                                   <code>sendBuf</code> or <code>recvBuf</code>
1920      *                                   is null.
1921      * @throws java.io.IOException               Thrown if an I/O error occurred.
1922      */
1923     public CommStatus sendReceive(int toRank,
1924                                   int sendTag,
1925                                   Buf sendBuf,
1926                                   int fromRank,
1927                                   int recvTag,
1928                                   Buf recvBuf)
1929             throws IOException {
1930         // Send the outgoing message (non-blocking).
1931         IORequest sendRequest = new IORequest();
1932         myChannelGroup.sendNoWait(getChannel(toRank), sendTag, sendBuf, sendRequest);
1933 
1934         // Receive the outgoing message (non-blocking).
1935         IORequest recvRequest = new IORequest();
1936         myChannelGroup.receiveNoWait(getChannel(fromRank), recvTag, recvBuf, recvRequest);
1937 
1938         // Wait for both messages to finish.
1939         sendRequest.waitForFinish();
1940         Status status = recvRequest.waitForFinish();
1941 
1942         return new CommStatus(getFarRank(status.channel),
1943                 status.tag,
1944                 status.length);
1945     }
1946 
1947     /**
1948      * Send a message to the process at the given rank in this communicator, and
1949      * receive a message from the process at the given rank in this communicator
1950      * (non-blocking). A message tag of 0 is used. The outgoing message items
1951      * come from the buffer <code>sendbuf</code>. The incoming message items go into
1952      * the buffer <code>recvbuf</code>. The outgoing message items must come from a
1953      * different place than where the incoming message items will be stored. The
1954      * destination process (process <code>toRank</code>) must call a method to
1955      * receive this process's outgoing message items. The source process
1956      * (process <code>fromRank</code>) must call a method to send this process's
1957      * incoming message items.
1958      * <p>
1959      * The <code>sendReceive()</code> method initiates the send and receive
1960      * operations and immediately returns a {@linkplain CommRequest} object. The
1961      * send and receive operations are performed by a separate thread. To wait
1962      * for the send and receive operations to finish, call the returned
1963      * {@linkplain CommRequest} object's <code>waitForFinish()</code> method. When
1964      * that method returns, the outgoing message items have been fully sent, but
1965      * they may not yet have been fully received; and the incoming message items
1966      * have been fully received.
1967      * <p>
1968      * A process can send-receive messages with itself; in this case a different
1969      * thread must call the <code>sendReceive()</code> method on this communicator.
1970      *
1971      * @param toRank   Destination process's rank in this communicator.
1972      * @param sendBuf  Buffer of data items to be sent.
1973      * @param fromRank Source process's rank in this communicator.
1974      * @param recvBuf  Buffer of data items to be received.
1975      * @param request  CommRequest object to use to wait for the operation to
1976      *                 finish; in this case <code>request</code> is returned. If
1977      *                 <code>request</code> is null, a new CommRequest object is created and
1978      *                 returned.
1979      * @return CommRequest object to use to wait for the operation to finish.
1980      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
1981      *                                   <code>toRank</code> or <code>fromRank</code>
1982      *                                   is not in the range 0 .. <code>size()</code>-1.
1983      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
1984      *                                   <code>sendBuf</code> or <code>recvBuf</code>
1985      *                                   is null.
1986      * @throws java.io.IOException               Thrown if an I/O error occurred.
1987      */
1988     public CommRequest sendReceive(int toRank,
1989                                    Buf sendBuf,
1990                                    int fromRank,
1991                                    Buf recvBuf,
1992                                    CommRequest request)
1993             throws IOException {
1994         return sendReceive(toRank, 0, sendBuf, fromRank, 0, recvBuf, request);
1995     }
1996 
1997     /**
1998      * Send a message to the process at the given rank in this communicator with
1999      * the given message tag, and receive a message from the process at the
2000      * given rank in this communicator with the given message tag
2001      * (non-blocking). The outgoing message items come from the buffer
2002      * <code>sendbuf</code>. The incoming message items go into the buffer
2003      * <code>recvbuf</code>. The outgoing message items must come from a different
2004      * place than where the incoming message items will be stored. The
2005      * destination process (process <code>toRank</code>) must call a method to
2006      * receive this process's outgoing message items. The source process
2007      * (process <code>fromRank</code>) must call a method to send this process's
2008      * incoming message items.
2009      * <p>
2010      * The <code>sendReceive()</code> method initiates the send and receive
2011      * operations and immediately returns a {@linkplain CommRequest} object. The
2012      * send and receive operations are performed by a separate thread. To wait
2013      * for the send and receive operations to finish, call the returned
2014      * {@linkplain CommRequest} object's <code>waitForFinish()</code> method. When
2015      * that method returns, the outgoing message items have been fully sent, but
2016      * they may not yet have been fully received; and the incoming message items
2017      * have been fully received.
2018      * <p>
2019      * A process can send-receive messages with itself; in this case a different
2020      * thread must call the <code>sendReceive()</code> method on this communicator.
2021      *
2022      * @param toRank   Destination process's rank in this communicator.
2023      * @param sendTag  Message tag for outgoing message.
2024      * @param sendBuf  Buffer of data items to be sent.
2025      * @param fromRank Source process's rank in this communicator.
2026      * @param recvTag  Message tag for incoming message.
2027      * @param recvBuf  Buffer of data items to be received.
2028      * @param request  CommRequest object to use to wait for the operation to
2029      *                 finish; in this case <code>request</code> is returned. If
2030      *                 <code>request</code> is null, a new CommRequest object is created and
2031      *                 returned.
2032      * @return CommRequest object to use to wait for the operation to finish.
2033      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2034      *                                   <code>toRank</code> or <code>fromRank</code>
2035      *                                   is not in the range 0 .. <code>size()</code>-1.
2036      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2037      *                                   <code>sendBuf</code> or <code>recvBuf</code>
2038      *                                   is null.
2039      * @throws java.io.IOException               Thrown if an I/O error occurred.
2040      */
2041     public CommRequest sendReceive(int toRank,
2042                                    int sendTag,
2043                                    Buf sendBuf,
2044                                    int fromRank,
2045                                    int recvTag,
2046                                    Buf recvBuf,
2047                                    CommRequest request)
2048             throws IOException {
2049         // Set up CommRequest object.
2050         CommRequest req = request == null ? new CommRequest() : request;
2051 
2052         // Send the outgoing message (non-blocking).
2053         req.mySendRequest = new IORequest();
2054         myChannelGroup.sendNoWait(getChannel(toRank), sendTag, sendBuf, req.mySendRequest);
2055 
2056         // Receive the outgoing message (non-blocking).
2057         req.myRecvRequest = new IORequest();
2058         myChannelGroup.receiveNoWait(getChannel(fromRank), recvTag, recvBuf, req.myRecvRequest);
2059 
2060         // Return CommRequest object.
2061         return req;
2062     }
2063 
2064     /**
2065      * Flood-send a message to all processes in this communicator. The message
2066      * uses a tag of 0. The message items come from the given buffer. To receive
2067      * the message, every process (including the sending process) must call the
2068      * <code>floodReceive()</code> method. When the <code>floodSend()</code> method
2069      * returns, the message has been fully sent, but it may not yet have been
2070      * fully received in all processes.
2071      * <p>
2072      * <I>Note:</I> The length of the incoming buffer in the
2073      * <code>floodReceive()</code> method call must be the same as the length of the
2074      * outgoing buffer in the <code>floodSend()</code> method call.
2075      *
2076      * @param buffer Buffer of data items to be sent.
2077      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2078      *                              <code>buffer</code> is null.
2079      * @throws java.io.IOException          Thrown if an I/O error occurred.
2080      */
2081     public void floodSend(Buf buffer)
2082             throws IOException {
2083         floodSend(0, buffer, null).waitForFinish();
2084     }
2085 
2086     /**
2087      * Flood-send a message to all processes in this communicator with the given
2088      * message tag. The message items come from the given buffer. To receive the
2089      * message, every process (including the sending process) must call the
2090      * <code>floodReceive()</code> method. When the <code>floodSend()</code> method
2091      * returns, the message has been fully sent, but it may not yet have been
2092      * fully received in all processes.
2093      * <p>
2094      * <I>Note:</I> The length of the incoming buffer in the
2095      * <code>floodReceive()</code> method call must be the same as the length of the
2096      * outgoing buffer in the <code>floodSend()</code> method call.
2097      *
2098      * @param tag    Message tag.
2099      * @param buffer Buffer of data items to be sent.
2100      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2101      *                              <code>buffer</code> is null.
2102      * @throws java.io.IOException          Thrown if an I/O error occurred.
2103      */
2104     public void floodSend(int tag,
2105                           Buf buffer)
2106             throws IOException {
2107         floodSend(tag, buffer, null).waitForFinish();
2108     }
2109 
2110     /**
2111      * Flood-send a message to all processes in this communicator
2112      * (non-blocking). A message tag of 0 is used. The message items come from
2113      * the given buffer. To receive the message, every process (including the
2114      * sending process) must call the <code>floodReceive()</code> method.
2115      * <p>
2116      * The <code>floodSend()</code> method initiates the flood-send operation and
2117      * immediately returns a {@linkplain CommRequest} object. The flood-send
2118      * operation is performed by a separate thread. To wait for the flood-send
2119      * operation to finish, call the returned {@linkplain CommRequest} object's
2120      * <code>waitForFinish()</code> method. When that method returns, the message
2121      * has been fully sent, but it may not yet have been fully received in all
2122      * processes.
2123      * <p>
2124      * <I>Note:</I> The length of the incoming buffer in the
2125      * <code>floodReceive()</code> method call must be the same as the length of the
2126      * outgoing buffer in the <code>floodSend()</code> method call.
2127      *
2128      * @param buffer  Buffer of data items to be sent.
2129      * @param request CommRequest object to use to wait for the operation to
2130      *                finish; in this case <code>request</code> is returned. If
2131      *                <code>request</code> is null, a new CommRequest object is created and
2132      *                returned.
2133      * @return CommRequest object to use to wait for the operation to finish.
2134      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2135      *                              <code>buffer</code> is null.
2136      * @throws java.io.IOException          Thrown if an I/O error occurred.
2137      */
2138     public CommRequest floodSend(Buf buffer,
2139                                  CommRequest request)
2140             throws IOException {
2141         return floodSend(0, buffer, request);
2142     }
2143 
2144     /**
2145      * Flood-send a message to all processes in this communicator with the given
2146      * message tag (non-blocking). The message items come from the given buffer.
2147      * To receive the message, every process (including the sending process)
2148      * must call the <code>floodReceive()</code> method.
2149      * <p>
2150      * The <code>floodSend()</code> method initiates the flood-send operation and
2151      * immediately returns a {@linkplain CommRequest} object. The flood-send
2152      * operation is performed by a separate thread. To wait for the flood-send
2153      * operation to finish, call the returned {@linkplain CommRequest} object's
2154      * <code>waitForFinish()</code> method. When that method returns, the message
2155      * has been fully sent, but it may not yet have been fully received in all
2156      * processes.
2157      * <p>
2158      * <I>Note:</I> The length of the incoming buffer in the
2159      * <code>floodReceive()</code> method call must be the same as the length of the
2160      * outgoing buffer in the <code>floodSend()</code> method call.
2161      *
2162      * @param tag     Message tag.
2163      * @param buffer  Buffer of data items to be sent.
2164      * @param request CommRequest object to use to wait for the operation to
2165      *                finish; in this case <code>request</code> is returned. If
2166      *                <code>request</code> is null, a new CommRequest object is created and
2167      *                returned.
2168      * @return CommRequest object to use to wait for the operation to finish.
2169      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2170      *                              <code>buffer</code> is null.
2171      * @throws java.io.IOException          Thrown if an I/O error occurred.
2172      */
2173     public CommRequest floodSend(int tag,
2174                                  Buf buffer,
2175                                  CommRequest request)
2176             throws IOException {
2177         // Set up CommRequest object.
2178         CommRequest req = request == null ? new CommRequest() : request;
2179         req.mySendRequest = new IORequest();
2180         req.myRecvRequest = null;
2181 
2182         // Send data to process 0. Process 0's flood-receive I/O request object
2183         // will forward the data to all the processes.
2184         myChannelGroup.sendNoWait(getChannel(0), tag, buffer, req.mySendRequest);
2185 
2186         // Return CommRequest object.
2187         return req;
2188     }
2189 
2190     /**
2191      * Flood-receive a message from any process in this communicator. The
2192      * message must have a tag of 0. The received message items are stored in
2193      * the given buffer. To send the message, the source process must call the
2194      * <code>floodSend()</code> method. When the <code>floodReceive()</code> method
2195      * returns, the message has been fully received.
2196      * <p>
2197      * A {@linkplain CommStatus} object is returned. The status object gives the
2198      * actual rank of the process that sent the message, the actual message tag
2199      * that was received, and the actual number of data items in the message.
2200      * <p>
2201      * <I>Note:</I> The length of the incoming buffer in the
2202      * <code>floodReceive()</code> method call must be the same as the length of the
2203      * outgoing buffer in the <code>floodSend()</code> method call.
2204      *
2205      * @param buffer Buffer of data items to be received.
2206      * @return Status object giving the outcome of the message reception.
2207      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2208      *                              <code>buffer</code> is null.
2209      * @throws java.io.IOException          Thrown if an I/O error occurred.
2210      */
2211     public CommStatus floodReceive(Buf buffer)
2212             throws IOException {
2213         return floodReceive(0, buffer, null).waitForFinish();
2214     }
2215 
2216     /**
2217      * Flood-receive a message from any process in this communicator with the
2218      * given message tag. If <code>tag</code> is null, a message will be received
2219      * with any tag. The received message items are stored in the given buffer.
2220      * To send the message, the source process must call the
2221      * <code>floodSend()</code> method. When the <code>floodReceive()</code> method
2222      * returns, the message has been fully received.
2223      * <p>
2224      * A {@linkplain CommStatus} object is returned. The status object gives the
2225      * actual rank of the process that sent the message, the actual message tag
2226      * that was received, and the actual number of data items in the message.
2227      * <p>
2228      * <I>Note:</I> The length of the incoming buffer in the
2229      * <code>floodReceive()</code> method call must be the same as the length of the
2230      * outgoing buffer in the <code>floodSend()</code> method call.
2231      *
2232      * @param tag    Message tag, or null to receive any tag.
2233      * @param buffer Buffer of data items to be received.
2234      * @return Status object giving the outcome of the message reception.
2235      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2236      *                              <code>buffer</code> is null.
2237      * @throws java.io.IOException          Thrown if an I/O error occurred.
2238      */
2239     public CommStatus floodReceive(Integer tag,
2240                                    Buf buffer)
2241             throws IOException {
2242         return floodReceive(tag, buffer, null).waitForFinish();
2243     }
2244 
2245     /**
2246      * Flood-receive a message from any process in this communicator
2247      * (non-blocking). A message tag of 0 is used. If <code>tag</code> is null, a
2248      * message will be received with any tag. The received message items are
2249      * stored in the given buffer. To send the message, the source process must
2250      * call the <code>floodSend()</code> method.
2251      * <p>
2252      * The <code>floodReceive()</code> method initiates the flood-receive operation
2253      * and immediately returns a {@linkplain CommRequest} object. The
2254      * flood-receive operation is performed by a separate thread. To wait for
2255      * the flood-receive operation to finish, call the returned {@linkplain
2256      * CommRequest} object's <code>waitForFinish()</code> method. When that method
2257      * returns, the incoming message items have been fully received.
2258      * <p>
2259      * <I>Note:</I> The length of the incoming buffer in the
2260      * <code>floodReceive()</code> method call must be the same as the length of the
2261      * outgoing buffer in the <code>floodSend()</code> method call.
2262      *
2263      * @param buffer  Buffer of data items to be received.
2264      * @param request CommRequest object to use to wait for the operation to
2265      *                finish; in this case <code>request</code> is returned. If
2266      *                <code>request</code> is null, a new CommRequest object is created and
2267      *                returned.
2268      * @return CommRequest object to use to wait for the operation to finish.
2269      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2270      *                              <code>buffer</code> is null.
2271      * @throws java.io.IOException          Thrown if an I/O error occurred.
2272      */
2273     public CommRequest floodReceive(Buf buffer,
2274                                     CommRequest request)
2275             throws IOException {
2276         return floodReceive(0, buffer, request);
2277     }
2278 
2279     /**
2280      * Flood-receive a message from any process in this communicator with the
2281      * given message tag (non-blocking). If <code>tag</code> is null, a message will
2282      * be received with any tag. The received message items are stored in the
2283      * given buffer. To send the message, the source process must call the
2284      * <code>floodSend()</code> method.
2285      * <p>
2286      * The <code>floodReceive()</code> method initiates the flood-receive operation
2287      * and immediately returns a {@linkplain CommRequest} object. The
2288      * flood-receive operation is performed by a separate thread. To wait for
2289      * the flood-receive operation to finish, call the returned {@linkplain
2290      * CommRequest} object's <code>waitForFinish()</code> method. When that method
2291      * returns, the incoming message items have been fully received.
2292      * <p>
2293      * <I>Note:</I> The length of the incoming buffer in the
2294      * <code>floodReceive()</code> method call must be the same as the length of the
2295      * outgoing buffer in the <code>floodSend()</code> method call.
2296      *
2297      * @param tag     Message tag, or null to receive any tag.
2298      * @param buffer  Buffer of data items to be received.
2299      * @param request CommRequest object to use to wait for the operation to
2300      *                finish; in this case <code>request</code> is returned. If
2301      *                <code>request</code> is null, a new CommRequest object is created and
2302      *                returned.
2303      * @return CommRequest object to use to wait for the operation to finish.
2304      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
2305      *                              <code>buffer</code> is null.
2306      * @throws java.io.IOException          Thrown if an I/O error occurred.
2307      */
2308     public CommRequest floodReceive(Integer tag,
2309                                     Buf buffer,
2310                                     CommRequest request)
2311             throws IOException {
2312         // Get broadcast tree for root=0.
2313         int[] tree = getBroadcastTree(0);
2314 
2315         // Set up CommRequest object with a special I/O request object that
2316         // forwards the message down the broadcast tree.
2317         CommRequest req = request == null ? new CommRequest() : request;
2318         req.mySendRequest = null;
2319         req.myRecvRequest = new FloodReceiveIORequest(tree);
2320 
2321         // In process 0, ensure a channel to every process, then receive
2322         // (non-blocking) a message from any process.
2323         if (myRank == 0) {
2324             for (int src = 0; src < mySize; ++src) {
2325                 ensureChannel(src);
2326             }
2327             myChannelGroup.receiveNoWait(null, tag, buffer, req.myRecvRequest);
2328         } // In other processes, ensure a channel to the child processes in the
2329         // broadcast tree, then receive (non-blocking) a message from the parent
2330         // process in the broadcast tree.
2331         else {
2332             for (int i = 1; i < tree.length; ++i) {
2333                 ensureChannel(tree[i]);
2334             }
2335             myChannelGroup.receiveNoWait(getChannel(tree[0]), tag, buffer, req.myRecvRequest);
2336         }
2337 
2338         // Return CommRequest object.
2339         return req;
2340     }
2341 
2342     /**
2343      * Class FloodReceiveIORequest overrides the methods of class IORequest with
2344      * additional processing to forward the message when a message is received.
2345      */
2346     private class FloodReceiveIORequest
2347             extends IORequest {
2348 
2349         // Broadcast tree.
2350         private int[] tree;
2351 
2352         // List of zero or more additional I/O requests to forward copies of the
2353         // received message.
2354         private LinkedList<IORequest> myForwardedIORequests
2355                 = new LinkedList<IORequest>();
2356 
2357         /**
2358          * Construct a new I/O request object.
2359          *
2360          * @param tree Broadcast tree.
2361          */
2362         public FloodReceiveIORequest(int[] tree) {
2363             super();
2364             this.tree = tree;
2365         }
2366 
2367         /**
2368          * Determine if this I/O request has finished.
2369          *
2370          * @return False if this I/O request has not finished, true if this I/O
2371          * request has finished successfully.
2372          * @throws IOException Thrown if this I/O request has finished and an
2373          *                     I/O error occurred.
2374          */
2375         public synchronized boolean isFinished()
2376                 throws IOException {
2377             if (!super.isFinished()) {
2378                 return false;
2379             }
2380             for (IORequest req : myForwardedIORequests) {
2381                 if (!req.isFinished()) {
2382                     return false;
2383                 }
2384             }
2385             return true;
2386         }
2387 
2388         /**
2389          * Wait until the send or receive operation corresponding to this I/O
2390          * request has finished. For a receive operation, a {@linkplain Status}
2391          * object containing the results of the receive operation is returned;
2392          * for a send operation, null is returned.
2393          *
2394          * @return Receive status for a receive operation, or null for a send
2395          * operation.
2396          * @throws IOException Thrown if an I/O error occurred.
2397          */
2398         public synchronized Status waitForFinish()
2399                 throws IOException {
2400             Status status = super.waitForFinish();
2401             for (IORequest req : myForwardedIORequests) {
2402                 req.waitForFinish();
2403             }
2404             return status;
2405         }
2406 
2407         /**
2408          * Report that this I/O request succeeded.
2409          */
2410         protected synchronized void reportSuccess() {
2411             try {
2412                 super.reportSuccess();
2413 
2414                 // Get message tag.
2415                 int msgtag = myStatus.tag;
2416 
2417                 // Flood the message to every child process in the broadcast
2418                 // tree.
2419                 int n = tree.length;
2420                 for (int i = 1; i < n; ++i) {
2421                     IORequest req = new IORequest();
2422                     myForwardedIORequests.add(req);
2423                     myChannelGroup.sendNoWait(getChannel(tree[i]), msgtag, myBuf, req);
2424                 }
2425             } catch (IOException exc) {
2426                 reportFailure(exc);
2427             }
2428         }
2429     }
2430 
2431     /**
2432      * Broadcast a message to all processes in this communicator. The broadcast
2433      * uses a message tag of 0. All processes must call <code>broadcast()</code>
2434      * with the same value for <code>root</code> and with a buffer of the same
2435      * length and the same item data type.
2436      * <p>
2437      * The root process (the process whose rank in this communicator is
2438      * <code>root</code>) sends the message items. The message items come from the
2439      * given buffer. When the <code>broadcast()</code> method returns, the message
2440      * has been fully sent, but it may not yet have been fully received by all
2441      * processes.
2442      * <p>
2443      * Each non-root process receives the message items. The message items are
2444      * stored in the given buffer. When the <code>broadcast()</code> method returns,
2445      * the message has been fully received.
2446      *
2447      * @param root   Root process's rank in this communicator.
2448      * @param buffer Buffer of data items to be sent (root process) or received
2449      *               (non-root processes).
2450      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2451      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1.
2452      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2453      *                                   <code>buffer</code> is null.
2454      * @throws java.io.IOException               Thrown if an I/O error occurred.
2455      */
2456     public void broadcast(int root,
2457                           Buf buffer)
2458             throws IOException {
2459         broadcast(root, 0, buffer);
2460     }
2461 
2462     /**
2463      * Broadcast a message to all processes in this communicator using the given
2464      * message tag. All processes must call <code>broadcast()</code> with the same
2465      * values for <code>root</code> and <code>tag</code> and with a buffer of the same
2466      * length and the same item data type.
2467      * <p>
2468      * The root process (the process whose rank in this communicator is
2469      * <code>root</code>) sends the message items. The message items come from the
2470      * given buffer. When the <code>broadcast()</code> method returns, the message
2471      * has been fully sent, but it may not yet have been fully received by all
2472      * processes.
2473      * <p>
2474      * Each non-root process receives the message items. The message items are
2475      * stored in the given buffer. When the <code>broadcast()</code> method returns,
2476      * the message has been fully received.
2477      *
2478      * @param root   Root process's rank in this communicator.
2479      * @param tag    Message tag.
2480      * @param buffer Buffer of data items to be sent (root process) or received
2481      *               (non-root processes).
2482      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2483      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1.
2484      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2485      *                                   <code>buffer</code> is null.
2486      * @throws java.io.IOException               Thrown if an I/O error occurred.
2487      */
2488     public void broadcast(int root,
2489                           int tag,
2490                           Buf buffer)
2491             throws IOException {
2492         // Verify preconditions.
2493         if (0 > root || root >= mySize) {
2494             throw new IndexOutOfBoundsException("Comm.broadcast(): root = " + root + " out of bounds");
2495         }
2496 
2497         // Early return if only one process.
2498         if (mySize == 1) {
2499             return;
2500         }
2501 
2502         // A broadcast is done as a series of point-to-point messages. The
2503         // messages are organized into rounds. The number of rounds is
2504         // ceil(log_2(mySize)). In each round, processes send messages to other
2505         // processes in parallel. Here is the message pattern for a communicator
2506         // with 8 processes doing a broadcast from root process 0:
2507         //
2508         //     Process
2509         //     0     1     2     3     4     5     6     7
2510         //     |     |     |     |     |     |     |     |
2511         //     |---->|     |     |     |     |     |     |  Round 1
2512         //     |     |     |     |     |     |     |     |
2513         //     |---------->|     |     |     |     |     |  Round 2
2514         //     |     |---------->|     |     |     |     |
2515         //     |     |     |     |     |     |     |     |
2516         //     |---------------------->|     |     |     |  Round 3
2517         //     |     |---------------------->|     |     |
2518         //     |     |     |---------------------->|     |
2519         //     |     |     |     |---------------------->|
2520         //     |     |     |     |     |     |     |     |
2521         //
2522         // If a process other than process 0 is the root, the message pattern is
2523         // the same, except the process ranks are circularly rotated.
2524         // Get array of process ranks in the broadcast tree.
2525         int[] broadcasttree = getBroadcastTree(root);
2526         int n = broadcasttree.length;
2527 
2528         // Receive data from parent if any (blocking).
2529         int parent = broadcasttree[0];
2530         if (parent != -1) {
2531             myChannelGroup.receive(getChannel(parent), tag, buffer);
2532         }
2533 
2534         // Send data to children if any (non-blocking).
2535         IORequest[] iorequest = new IORequest[n];
2536         for (int i = 1; i < n; ++i) {
2537             int child = broadcasttree[i];
2538             iorequest[i] = new IORequest();
2539             myChannelGroup.sendNoWait(getChannel(child), tag, buffer, iorequest[i]);
2540         }
2541 
2542         // Wait for sends to finish if any.
2543         for (int i = 1; i < n; ++i) {
2544             iorequest[i].waitForFinish();
2545         }
2546     }
2547 
2548     /**
2549      * Scatter messages to all processes in this communicator. The scatter uses
2550      * a message tag of 0. All processes must call <code>scatter()</code>
2551      * with the same value for <code>root</code>.
2552      * <p>
2553      * The root process (the process whose rank in this communicator is
2554      * <code>root</code>) sends the message items. The message items sent to process
2555      * <I>i</I> come from the source buffer at index <I>i</I> in the given array
2556      * of source buffers. When the <code>scatter()</code> method returns, the
2557      * messages have been fully sent, but they may not yet have been fully
2558      * received by all processes.
2559      * <p>
2560      * Each process, including the root process, receives the message items. The
2561      * message items are stored in the given destination buffer. This must have
2562      * the same length and the same item data type as the corresponding source
2563      * buffer. When the <code>scatter()</code> method returns, the message has been
2564      * fully received.
2565      * <p>
2566      * In the non-root processes, the source buffer array is ignored and may be
2567      * null.
2568      *
2569      * @param root     Root process's rank in this communicator.
2570      * @param srcarray Array of source buffers to be sent by the root process.
2571      *                 Ignored in the non-root processes.
2572      * @param dst      Destination buffer to be received.
2573      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2574      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2575      *                                   <code>srcarray</code>'s length does not equal the size of this communicator.
2576      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2577      *                                   <code>srcarray</code> or any element thereof is null. Thrown if <code>dst</code>
2578      *                                   is null.
2579      * @throws java.io.IOException               Thrown if an I/O error occurred.
2580      */
2581     public void scatter(int root,
2582                         Buf[] srcarray,
2583                         Buf dst)
2584             throws IOException {
2585         scatter(root, 0, srcarray, dst);
2586     }
2587 
2588     /**
2589      * Scatter messages to all processes in this communicator using the given
2590      * message tag. All processes must call <code>scatter()</code> with the same
2591      * values for <code>root</code> and <code>tag</code>.
2592      * <p>
2593      * The root process (the process whose rank in this communicator is
2594      * <code>root</code>) sends the message items. The message items sent to process
2595      * <I>i</I> come from the source buffer at index <I>i</I> in the given array
2596      * of source buffers. When the <code>scatter()</code> method returns, the
2597      * messages have been fully sent, but they may not yet have been fully
2598      * received by all processes.
2599      * <p>
2600      * Each process, including the root process, receives the message items. The
2601      * message items are stored in the given destination buffer. This must have
2602      * the same length and the same item data type as the corresponding source
2603      * buffer. When the <code>scatter()</code> method returns, the message has been
2604      * fully received.
2605      * <p>
2606      * In the non-root processes, the source buffer array is ignored and may be
2607      * null.
2608      *
2609      * @param root     Root process's rank in this communicator.
2610      * @param tag      Message tag.
2611      * @param srcarray Array of source buffers to be sent by the root process.
2612      *                 Ignored in the non-root processes.
2613      * @param dst      Destination buffer to be received.
2614      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2615      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2616      *                                   <code>srcarray</code>'s length does not equal the size of this communicator.
2617      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2618      *                                   <code>srcarray</code> or any element thereof is null. Thrown if <code>dst</code>
2619      *                                   is null.
2620      * @throws java.io.IOException               Thrown if an I/O error occurred.
2621      */
2622     public void scatter(int root,
2623                         int tag,
2624                         Buf[] srcarray,
2625                         Buf dst)
2626             throws IOException {
2627         // Verify preconditions.
2628         if (0 > root || root >= mySize) {
2629             throw new IndexOutOfBoundsException("Comm.scatter(): root = " + root + " out of bounds");
2630         }
2631 
2632         // A scatter is done as a series of point-to-point messages. The root
2633         // process sends a separate message to every other process. Here is the
2634         // message pattern for a communicator with 8 processes scattering from
2635         // root process 0:
2636         //
2637         //     Process
2638         //     0     1     2     3     4     5     6     7
2639         //     |     |     |     |     |     |     |     |
2640         //     |---->|     |     |     |     |     |     |
2641         //     |     |     |     |     |     |     |     |
2642         //     |---------->|     |     |     |     |     |
2643         //     |     |     |     |     |     |     |     |
2644         //     |---------------->|     |     |     |     |
2645         //     |     |     |     |     |     |     |     |
2646         //     |---------------------->|     |     |     |
2647         //     |     |     |     |     |     |     |     |
2648         //     |---------------------------->|     |     |
2649         //     |     |     |     |     |     |     |     |
2650         //     |---------------------------------->|     |
2651         //     |     |     |     |     |     |     |     |
2652         //     |---------------------------------------->|
2653         //     |     |     |     |     |     |     |     |
2654         // Root process sends all messages.
2655         if (myRank == root) {
2656             // Array of IORequest objects for non-blocking sends.
2657             IORequest[] iorequest = new IORequest[mySize];
2658 
2659             // Initiate sends to lower-ranked processes.
2660             for (int rank = 0; rank < myRank; ++rank) {
2661                 iorequest[rank] = new IORequest();
2662                 myChannelGroup.sendNoWait(getChannel(rank), tag, srcarray[rank], iorequest[rank]);
2663             }
2664 
2665             // Initiate sends to higher-ranked processes.
2666             for (int rank = myRank + 1; rank < mySize; ++rank) {
2667                 iorequest[rank] = new IORequest();
2668                 myChannelGroup.sendNoWait(getChannel(rank), tag, srcarray[rank], iorequest[rank]);
2669             }
2670 
2671             // Copy to itself.
2672             dst.copy(srcarray[myRank]);
2673 
2674             // Wait for completion of sends to lower-ranked processes.
2675             for (int rank = 0; rank < myRank; ++rank) {
2676                 iorequest[rank].waitForFinish();
2677             }
2678 
2679             // Wait for completion of sends to higher-ranked processes.
2680             for (int rank = myRank + 1; rank < mySize; ++rank) {
2681                 iorequest[rank].waitForFinish();
2682             }
2683         } // Non-root process receives one message.
2684         else {
2685             myChannelGroup.receive(getChannel(root), tag, dst);
2686         }
2687     }
2688 
2689     /**
2690      * Gather messages from all processes in this communicator. The gather uses
2691      * a message tag of 0. All processes must call <code>gather()</code> with the
2692      * same value for <code>root</code>.
2693      * <p>
2694      * The root process (the process whose rank in this communicator is
2695      * <code>root</code>) receives the message items. The message items received
2696      * from process <I>i</I> are stored in the destination buffer at index
2697      * <I>i</I> in the given array of destination buffers. When the
2698      * <code>gather()</code> method returns, all the messages have been fully
2699      * received.
2700      * <p>
2701      * Each process, including the root process, sends the message items. The
2702      * message items come from the given source buffer. This must have the same
2703      * length and the same item data type as the corresponding destination
2704      * buffer. When the <code>gather()</code> method returns, the message has been
2705      * fully sent, but it may not yet have been fully received by the root
2706      * process.
2707      * <p>
2708      * In the non-root processes, the destination buffer array is ignored and
2709      * may be null.
2710      *
2711      * @param root     Root process's rank in this communicator.
2712      * @param src      Source buffer to be sent.
2713      * @param dstarray Array of destination buffers to be received by the root
2714      *                 process. Ignored in the non-root processes.
2715      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2716      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2717      *                                   <code>dstarray</code>'s length does not equal the size of this communicator.
2718      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2719      *                                   <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2720      *                                   is null.
2721      * @throws java.io.IOException               Thrown if an I/O error occurred.
2722      */
2723     public void gather(int root,
2724                        Buf src,
2725                        Buf[] dstarray)
2726             throws IOException {
2727         gather(root, 0, src, dstarray);
2728     }
2729 
2730     /**
2731      * Gather messages from all processes in this communicator using the given
2732      * message tag. All processes must call <code>gather()</code> with the same
2733      * values for <code>root</code> and <code>tag</code>.
2734      * <p>
2735      * The root process (the process whose rank in this communicator is
2736      * <code>root</code>) receives the message items. The message items received
2737      * from process <I>i</I> are stored in the destination buffer at index
2738      * <I>i</I> in the given array of destination buffers. When the
2739      * <code>gather()</code> method returns, all the messages have been fully
2740      * received.
2741      * <p>
2742      * Each process, including the root process, sends the message items. The
2743      * message items come from the given source buffer. This must have the same
2744      * length and the same item data type as the corresponding destination
2745      * buffer. When the <code>gather()</code> method returns, the message has been
2746      * fully sent, but it may not yet have been fully received by the root
2747      * process.
2748      * <p>
2749      * In the non-root processes, the destination buffer array is ignored and
2750      * may be null.
2751      *
2752      * @param root     Root process's rank in this communicator.
2753      * @param tag      Message tag.
2754      * @param src      Source buffer to be sent.
2755      * @param dstarray Array of destination buffers to be received by the root
2756      *                 process. Ignored in the non-root processes.
2757      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2758      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if
2759      *                                   <code>dstarray</code>'s length does not equal the size of this communicator.
2760      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2761      *                                   <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2762      *                                   is null.
2763      * @throws java.io.IOException               Thrown if an I/O error occurred.
2764      */
2765     public void gather(int root,
2766                        int tag,
2767                        Buf src,
2768                        Buf[] dstarray)
2769             throws IOException {
2770         // Verify preconditions.
2771         if (0 > root || root >= mySize) {
2772             throw new IndexOutOfBoundsException("Comm.gather(): root = " + root + " out of bounds");
2773         }
2774 
2775         // A gather is done as a series of point-to-point messages. The root
2776         // process receives a separate message from every other process. Here is
2777         // the message pattern for a communicator with 8 processes gathering
2778         // into root process 0:
2779         //
2780         //     Process
2781         //     0     1     2     3     4     5     6     7
2782         //     |     |     |     |     |     |     |     |
2783         //     |<----|     |     |     |     |     |     |
2784         //     |     |     |     |     |     |     |     |
2785         //     |<----------|     |     |     |     |     |
2786         //     |     |     |     |     |     |     |     |
2787         //     |<----------------|     |     |     |     |
2788         //     |     |     |     |     |     |     |     |
2789         //     |<----------------------|     |     |     |
2790         //     |     |     |     |     |     |     |     |
2791         //     |<----------------------------|     |     |
2792         //     |     |     |     |     |     |     |     |
2793         //     |<----------------------------------|     |
2794         //     |     |     |     |     |     |     |     |
2795         //     |<----------------------------------------|
2796         //     |     |     |     |     |     |     |     |
2797         // Root process receives all messages.
2798         if (myRank == root) {
2799             // Array of IORequest objects for non-blocking receives.
2800             IORequest[] iorequest = new IORequest[mySize];
2801 
2802             // Initiate receives from lower-ranked processes.
2803             for (int rank = 0; rank < myRank; ++rank) {
2804                 iorequest[rank] = new IORequest();
2805                 myChannelGroup.receiveNoWait(getChannel(rank), tag, dstarray[rank], iorequest[rank]);
2806             }
2807 
2808             // Initiate receives from higher-ranked processes.
2809             for (int rank = myRank + 1; rank < mySize; ++rank) {
2810                 iorequest[rank] = new IORequest();
2811                 myChannelGroup.receiveNoWait(getChannel(rank), tag, dstarray[rank], iorequest[rank]);
2812             }
2813 
2814             // Copy to itself.
2815             dstarray[myRank].copy(src);
2816 
2817             // Wait for completion of receives from lower-ranked processes.
2818             for (int rank = 0; rank < myRank; ++rank) {
2819                 iorequest[rank].waitForFinish();
2820             }
2821 
2822             // Wait for completion of receives from higher-ranked processes.
2823             for (int rank = myRank + 1; rank < mySize; ++rank) {
2824                 iorequest[rank].waitForFinish();
2825             }
2826         } // Non-root process sends one message.
2827         else {
2828             myChannelGroup.send(getChannel(root), tag, src);
2829         }
2830     }
2831 
2832     /**
2833      * All-gather messages from each process to all processes in this
2834      * communicator. A message tag of 0 is used. All processes must call
2835      * <code>allGather()</code>.
2836      * <p>
2837      * Each process sends the message items in the given source buffer. When the
2838      * <code>allGather()</code> method returns, the source buffer has been fully
2839      * sent.
2840      * <p>
2841      * Each process receives message items from the other processes. The message
2842      * items received from process <I>i</I> are stored in the destination buffer
2843      * at index <I>i</I> in the given array of destination buffers. This
2844      * destination buffer must have the same length and the same item data type
2845      * as the source buffer in process <I>i</I>. When the <code>allGather()</code>
2846      * method returns, all the destination buffers have been fully received.
2847      * <p>
2848      * All-gather is the same as gather, except that every process has an array
2849      * of destination buffers, and every process receives the results of the
2850      * gather.
2851      *
2852      * @param src      Source buffer to be sent.
2853      * @param dstarray Array of destination buffers to be received.
2854      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2855      *                                   <code>dstarray</code>'s length does not equal the size of this communicator.
2856      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2857      *                                   <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2858      *                                   is null.
2859      * @throws java.io.IOException               Thrown if an I/O error occurred.
2860      */
2861     public void allGather(Buf src,
2862                           Buf[] dstarray)
2863             throws IOException {
2864         allGather(0, src, dstarray);
2865     }
2866 
2867     /**
2868      * All-gather messages from each process to all processes in this
2869      * communicator using the given message tag. All processes must call
2870      * <code>allGather()</code> with the same value for <code>tag</code>.
2871      * <p>
2872      * Each process sends the message items in the given source buffer. When the
2873      * <code>allGather()</code> method returns, the source buffer has been fully
2874      * sent.
2875      * <p>
2876      * Each process receives message items from the other processes. The message
2877      * items received from process <I>i</I> are stored in the destination buffer
2878      * at index <I>i</I> in the given array of destination buffers. This
2879      * destination buffer must have the same length and the same item data type
2880      * as the source buffer in process <I>i</I>. When the <code>allGather()</code>
2881      * method returns, all the destination buffers have been fully received.
2882      * <p>
2883      * All-gather is the same as gather, except that every process has an array
2884      * of destination buffers, and every process receives the results of the
2885      * gather.
2886      *
2887      * @param tag      Message tag.
2888      * @param src      Source buffer to be sent.
2889      * @param dstarray Array of destination buffers to be received.
2890      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2891      *                                   <code>dstarray</code>'s length does not equal the size of this communicator.
2892      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2893      *                                   <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code>
2894      *                                   is null.
2895      * @throws java.io.IOException               Thrown if an I/O error occurred.
2896      */
2897     public void allGather(int tag,
2898                           Buf src,
2899                           Buf[] dstarray)
2900             throws IOException {
2901         // Get ranks of predecessor and successor processes.
2902         int pred = (myRank - 1 + mySize) % mySize;
2903         int succ = (myRank + 1) % mySize;
2904 
2905         // Copy source buffer into destination buffer at my own rank.
2906         dstarray[myRank].copy(src);
2907 
2908         // Do (mySize-1) message rounds. Messages are sent in a pipelined
2909         // fashion from each process to its predecessor until each process's
2910         // source data has arrived in every process. Each outgoing message is
2911         // overlapped with an incoming message.
2912         for (int i = 1; i < mySize; ++i) {
2913             sendReceive(/*toRank  */pred,
2914                     /*sendTag */ tag,
2915                     /*sendBuf */ dstarray[(myRank + i - 1) % mySize],
2916                     /*fromRank*/ succ,
2917                     /*recvTag */ tag,
2918                     /*recvBuf */ dstarray[(myRank + i) % mySize]);
2919         }
2920     }
2921 
2922     /**
2923      * Perform a reduction on all processes in this communicator. The reduction
2924      * uses a message tag of 0. All processes must call <code>reduce()</code> with
2925      * the same value for <code>root</code>, with a buffer of the same length and
2926      * the same item data type, and with the same binary operation (class
2927      * {@linkplain edu.rit.pj.reduction.Op Op}).
2928      * <p>
2929      * Before calling <code>reduce()</code>, each process has a buffer filled with
2930      * data items. After <code>reduce()</code> returns, each data item in the root
2931      * process's buffer has been set to the <B>reduction</B> of the
2932      * corresponding data items in all the processes' buffers. The reduction is
2933      * calculated by this formula:
2934      * <p>
2935      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
2936      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
2937      * <p>
2938      * where <I>op</I> is the binary operation passed in as an argument and
2939      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
2940      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
2941      * process rank 0, 1, 2, and so on. However, the order in which the data
2942      * items actually are combined is not specified. Therefore, the binary
2943      * operation must be such that the answer will be the same regardless of the
2944      * order in which the data items are combined; that is, the binary operation
2945      * must be commutative and associative.
2946      * <p>
2947      * In the root process, the reduce operation always changes the buffer's
2948      * contents as described above. In the non-root processes, the reduce
2949      * operation may or may not change the buffer's contents; the final contents
2950      * of the buffer in the non-root processes is not specified.
2951      * <p>
2952      * When the <code>reduce()</code> method returns in the root process, the
2953      * reduction has been fully performed as described above. When the
2954      * <code>reduce()</code> method returns in a non-root process, the non-root
2955      * process has sent all its data items into the reduction, but the reduction
2956      * may not be fully complete in the root process yet.
2957      *
2958      * @param root   Root process's rank in this communicator.
2959      * @param buffer Buffer of data items to be reduced.
2960      * @param op     Binary operation.
2961      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
2962      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1.
2963      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
2964      *                                   <code>buf</code> is null or <code>op</code>
2965      *                                   is null.
2966      * @throws java.lang.ClassCastException        (unchecked exception) Thrown if
2967      *                                   <code>buf</code> and <code>op</code> do not use the same item data type.
2968      * @throws java.io.IOException               Thrown if an I/O error occurred.
2969      */
2970     public void reduce(int root,
2971                        Buf buffer,
2972                        Op op)
2973             throws IOException {
2974         reduce(root, 0, buffer, op);
2975     }
2976 
2977     /**
2978      * Perform a reduction on all processes in this communicator using the given
2979      * message tag. All processes must call <code>reduce()</code> with the same
2980      * value for <code>root</code>, with a buffer of the same length and the same
2981      * item data type, and with the same binary operation (class {@linkplain
2982      * edu.rit.pj.reduction.Op Op}).
2983      * <p>
2984      * Before calling <code>reduce()</code>, each process has a buffer filled with
2985      * data items. After <code>reduce()</code> returns, each data item in the root
2986      * process's buffer has been set to the <B>reduction</B> of the
2987      * corresponding data items in all the processes' buffers. The reduction is
2988      * calculated by this formula:
2989      * <p>
2990      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
2991      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
2992      * <p>
2993      * where <I>op</I> is the binary operation passed in as an argument and
2994      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
2995      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
2996      * process rank 0, 1, 2, and so on. However, the order in which the data
2997      * items actually are combined is not specified. Therefore, the binary
2998      * operation must be such that the answer will be the same regardless of the
2999      * order in which the data items are combined; that is, the binary operation
3000      * must be commutative and associative.
3001      * <p>
3002      * In the root process, the reduce operation always changes the buffer's
3003      * contents as described above. In the non-root processes, the reduce
3004      * operation may or may not change the buffer's contents; the final contents
3005      * of the buffer in the non-root processes is not specified.
3006      * <p>
3007      * When the <code>reduce()</code> method returns in the root process, the
3008      * reduction has been fully performed as described above. When the
3009      * <code>reduce()</code> method returns in a non-root process, the non-root
3010      * process has sent all its data items into the reduction, but the reduction
3011      * may not be fully complete in the root process yet.
3012      *
3013      * @param root   Root process's rank in this communicator.
3014      * @param tag    Message tag.
3015      * @param buffer Buffer of data items to be reduced.
3016      * @param op     Binary operation.
3017      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
3018      *                                   <code>root</code> is not in the range 0 .. <code>size()</code>-1.
3019      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
3020      *                                   <code>buf</code> is null or <code>op</code>
3021      *                                   is null.
3022      * @throws java.lang.ClassCastException        (unchecked exception) Thrown if
3023      *                                   <code>buf</code> and <code>op</code> do not use the same item data type.
3024      * @throws java.io.IOException               Thrown if an I/O error occurred.
3025      */
3026     public void reduce(int root,
3027                        int tag,
3028                        Buf buffer,
3029                        Op op)
3030             throws IOException {
3031         // Verify preconditions.
3032         if (0 > root || root >= mySize) {
3033             throw new IndexOutOfBoundsException("Comm.reduce(): root = " + root + " out of bounds");
3034         }
3035 
3036         // Early return if only one process.
3037         if (mySize == 1) {
3038             return;
3039         }
3040 
3041         // A reduction is done as a series of point-to-point messages. The
3042         // messages are organized into rounds. The number of rounds is
3043         // ceil(log_2(mySize)). The message pattern is the reverse of the
3044         // broadcast message pattern. In each round, processes receive messages
3045         // from other processes and reduce the data items into their accumulator
3046         // buffers in parallel. When a process has received all messages, it
3047         // sends the reduced results on. Here is the message pattern for a
3048         // communicator with 8 processes doing a reduction into root process 0:
3049         //
3050         //     Process
3051         //     0     1     2     3     4     5     6     7
3052         //     |     |     |     |     |     |     |     |
3053         //     |<----------------------|     |     |     |  Round 1
3054         //     |     |<----------------------|     |     |
3055         //     |     |     |<----------------------|     |
3056         //     |     |     |     |<----------------------|
3057         //     |     |     |     |     |     |     |     |
3058         //     |<----------|     |     |     |     |     |  Round 2
3059         //     |     |<----------|     |     |     |     |
3060         //     |     |     |     |     |     |     |     |
3061         //     |<----|     |     |     |     |     |     |  Round 3
3062         //     |     |     |     |     |     |     |     |
3063         //
3064         // If a process other than process 0 is the root, the message pattern is
3065         // the same, except the process ranks are circularly rotated.
3066         // Get array of process ranks in the broadcast tree.
3067         int[] broadcasttree = getBroadcastTree(root);
3068         int n = broadcasttree.length;
3069 
3070         // Set up reduction buffer on top of source buffer.
3071         Buf reductionbuf = buffer.getReductionBuf(op);
3072 
3073         // Receive data from children if any, one at a time in reverse order.
3074         for (int i = n - 1; i >= 1; --i) {
3075             int child = broadcasttree[i];
3076             myChannelGroup.receive(getChannel(child), tag, reductionbuf);
3077         }
3078 
3079         // Send data to parent if any.
3080         int parent = broadcasttree[0];
3081         if (parent != -1) {
3082             myChannelGroup.send(getChannel(parent), tag, buffer);
3083         }
3084     }
3085 
3086     /**
3087      * Perform an all-reduce on all processes in this communicator. The
3088      * all-reduce uses a message tag of 0. All processes must call
3089      * <code>allReduce()</code> with a buffer of the same length and the same item
3090      * data type, and with the same binary operation (class {@linkplain
3091      * edu.rit.pj.reduction.Op Op}).
3092      * <p>
3093      * Before calling <code>allReduce()</code>, each process has a buffer filled
3094      * with data items. After <code>allReduce()</code> returns, each data item in
3095      * the calling process's buffer has been set to the <B>reduction</B> of the
3096      * corresponding data items in all the processes' buffers. The reduction is
3097      * calculated by this formula:
3098      * <p>
3099      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
3100      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3101      * <p>
3102      * where <I>op</I> is the binary operation passed in as an argument and
3103      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3104      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3105      * process rank 0, 1, 2, and so on. However, the order in which the data
3106      * items actually are combined is not specified. Therefore, the binary
3107      * operation must be such that the answer will be the same regardless of the
3108      * order in which the data items are combined; that is, the binary operation
3109      * must be commutative and associative.
3110      * <p>
3111      * The <code>allReduce()</code> method is similar to the <code>reduce()</code>
3112      * method, except the results are stored in all the processes' buffers, not
3113      * just the one root process's buffer.
3114      *
3115      * @param buffer Buffer of data items to be reduced.
3116      * @param op     Binary operation.
3117      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3118      *                              <code>buf</code> is null or <code>op</code>
3119      *                              is null.
3120      * @throws java.lang.ClassCastException   (unchecked exception) Thrown if
3121      *                              <code>buf</code> and <code>op</code> do not use the same item data type.
3122      * @throws java.io.IOException          Thrown if an I/O error occurred.
3123      */
3124     public void allReduce(Buf buffer,
3125                           Op op)
3126             throws IOException {
3127         allReduce(0, buffer, op);
3128     }
3129 
3130     /**
3131      * Perform an all-reduce on all processes in this communicator using the
3132      * given message tag. All processes must call <code>allReduce()</code> with a
3133      * buffer of the same length and the same item data type, and with the same
3134      * binary operation (class {@linkplain edu.rit.pj.reduction.Op Op}).
3135      * <p>
3136      * Before calling <code>allReduce()</code>, each process has a buffer filled
3137      * with data items. After <code>allReduce()</code> returns, each data item in
3138      * the calling process's buffer has been set to the <B>reduction</B> of the
3139      * corresponding data items in all the processes' buffers. The reduction is
3140      * calculated by this formula:
3141      * <p>
3142      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
3143      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3144      * <p>
3145      * where <I>op</I> is the binary operation passed in as an argument and
3146      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3147      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3148      * process rank 0, 1, 2, and so on. However, the order in which the data
3149      * items actually are combined is not specified. Therefore, the binary
3150      * operation must be such that the answer will be the same regardless of the
3151      * order in which the data items are combined; that is, the binary operation
3152      * must be commutative and associative.
3153      * <p>
3154      * The <code>allReduce()</code> method is similar to the <code>reduce()</code>
3155      * method, except the results are stored in all the processes' buffers, not
3156      * just the one root process's buffer.
3157      *
3158      * @param tag    Message tag.
3159      * @param buffer Buffer of data items to be reduced.
3160      * @param op     Binary operation.
3161      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3162      *                              <code>buf</code> is null or <code>op</code>
3163      *                              is null.
3164      * @throws java.lang.ClassCastException   (unchecked exception) Thrown if
3165      *                              <code>buf</code> and <code>op</code> do not use the same item data type.
3166      * @throws java.io.IOException          Thrown if an I/O error occurred.
3167      */
3168     public void allReduce(int tag,
3169                           Buf buffer,
3170                           Op op)
3171             throws IOException {
3172         // An all-reduce is done using a "butterfly" message passing pattern.
3173         // Consider the case of K=8 processes. In the first round, processes one
3174         // rank apart exchange data, then each processes accumulates the data
3175         // from the other process using the reduction operator. In the second
3176         // round, processes two ranks apart exchange and accumulate data. In the
3177         // third round, processes four ranks apart exchange and accumulate data.
3178         //
3179         //     Process
3180         //     0     1     2     3     4     5     6     7
3181         //     |     |     |     |     |     |     |     |
3182         //     |<--->|     |<--->|     |<--->|     |<--->|  Round 1
3183         //     |     |     |     |     |     |     |     |
3184         //     |<--------->|     |     |<--------->|     |  Round 2
3185         //     |     |<--------->|     |     |<--------->|
3186         //     |     |     |     |     |     |     |     |
3187         //     |<--------------------->|     |     |     |  Round 3
3188         //     |     |<--------------------->|     |     |
3189         //     |     |     |<--------------------->|     |
3190         //     |     |     |     |<--------------------->|
3191         //     |     |     |     |     |     |     |     |
3192         //
3193         // The butterfly pattern works only if the number of processes is a
3194         // power of two. If this is not the case, there are two extra message
3195         // rounds. Each process outside the butterfly pattern sends its data to
3196         // its counterpart inside the butterfly pattern, which accumulates the
3197         // data using the reduction operator. Then the butterfly pattern takes
3198         // place. Afterwards, each process outside the butterfly pattern
3199         // receives the final result from its counterpart inside the butterfly
3200         // pattern. For the case of K=10 processes:
3201         //
3202         //     Process
3203         //     0     1     2     3     4     5     6     7     8     9
3204         //     |     |     |     |     |     |     |     |     |     |
3205         //     |<----------------------------------------------|     | Pre
3206         //     |     |<----------------------------------------------|
3207         //     |     |     |     |     |     |     |     |     |     |
3208         //     |<--->|     |<--->|     |<--->|     |<--->|     |     | Round 1
3209         //     |     |     |     |     |     |     |     |     |     |
3210         //     |<--------->|     |     |<--------->|     |     |     | Round 2
3211         //     |     |<--------->|     |     |<--------->|     |     |
3212         //     |     |     |     |     |     |     |     |     |     |
3213         //     |<--------------------->|     |     |     |     |     | Round 3
3214         //     |     |<--------------------->|     |     |     |     |
3215         //     |     |     |<--------------------->|     |     |     |
3216         //     |     |     |     |<--------------------->|     |     |
3217         //     |     |     |     |     |     |     |     |     |     |
3218         //     |---------------------------------------------->|     | Post
3219         //     |     |---------------------------------------------->|
3220         //     |     |     |     |     |     |     |     |     |     |
3221         //
3222         // If K is a power of two, the all-reduce takes (log_2 K) rounds. If K
3223         // is not a power of two, the all-reduce takes floor(log_2 K)+2 rounds.
3224 
3225         // Early exit if only one process.
3226         if (mySize == 1) {
3227             return;
3228         }
3229 
3230         // Determine the highest power of 2 less than or equal to this
3231         // communicator's size. Processes at this rank and above will be outside
3232         // the butterfly message passing pattern.
3233         int outside = mySizePowerOf2;
3234 
3235         // For processes outside the butterfly:
3236         if (myRank >= outside) {
3237             int insideRank = myRank - outside;
3238 
3239             // Send initial data to counterpart inside.
3240             send(insideRank, tag, buffer);
3241 
3242             // Receive reduced result from counterpart inside.
3243             receive(insideRank, tag, buffer);
3244         } // For processes inside the butterfly:
3245         else {
3246             // Set up temporary receive buffer.
3247             Buf receiveBuf = buffer.getTemporaryBuf();
3248 
3249             // Set up reduction buffer on top of data buffer.
3250             Buf reductionBuf = buffer.getReductionBuf(op);
3251 
3252             // If there is a counterpart process outside, receive and accumulate
3253             // its initial data.
3254             int outsideRank = myRank + outside;
3255             if (outsideRank < mySize) {
3256                 receive(outsideRank, tag, reductionBuf);
3257             }
3258 
3259             // Perform butterfly message passing rounds with counterpart
3260             // processes inside.
3261             int round = 1;
3262             while (round < outside) {
3263                 int otherRank = myRank ^ round;
3264                 sendReceive(otherRank, tag, buffer, otherRank, tag, receiveBuf);
3265                 reductionBuf.copy(receiveBuf);
3266                 round <<= 1;
3267             }
3268 
3269             // If there is a counterpart process outside, send the reduced
3270             // result.
3271             if (outsideRank < mySize) {
3272                 send(outsideRank, tag, buffer);
3273             }
3274         }
3275     }
3276 
3277     /**
3278      * Do an all-to-all among all processes in this communicator. A message tag
3279      * of 0 is used.
3280      * <p>
3281      * <code>srcarray</code> must be an array of <I>K</I> buffers, where <I>K</I> is
3282      * the size of this communicator. <code>dstarray</code> must be an array of
3283      * <I>K</I> buffers referring to different storage from the source buffers.
3284      * For each process rank <I>k</I>, 0 &lt;= <I>k</I> &lt;= <I>K</I>, and each
3285      * buffer index <I>i</I>, 0 &lt;= <I>i</I> &lt;= <I>K</I>, the contents of
3286      * <code>srcarray[k]</code> in process <I>i</I> are sent to <code>dstarray[i]</code>
3287      * in process <I>k</I>.
3288      *
3289      * @param srcarray Array of source buffers to be sent by this process.
3290      * @param dstarray Array of destination buffers to be received by this
3291      *                 process.
3292      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
3293      *                                   <code>srcarray</code>'s length does not equal the size of this communicator.
3294      *                                   Thrown if <code>dstarray</code>'s length does not equal the size of this
3295      *                                   communicator.
3296      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
3297      *                                   <code>srcarray</code> or any element thereof is null. Thrown if
3298      *                                   <code>dstarray</code> or any element thereof is null.
3299      * @throws java.io.IOException               Thrown if an I/O error occurred.
3300      */
3301     public void allToAll(Buf[] srcarray,
3302                          Buf[] dstarray)
3303             throws IOException {
3304         allToAll(0, srcarray, dstarray);
3305     }
3306 
3307     /**
3308      * Do an all-to-all among all processes in this communicator using the given
3309      * message tag. All processes must call <code>allToAll()</code> with the same
3310      * value for <code>tag</code>.
3311      * <p>
3312      * <code>srcarray</code> must be an array of <I>K</I> buffers, where <I>K</I> is
3313      * the size of this communicator. <code>dstarray</code> must be an array of
3314      * <I>K</I> buffers referring to different storage from the source buffers.
3315      * For each process rank <I>k</I>, 0 &lt;= <I>k</I> &lt;= <I>K</I>, and each
3316      * buffer index <I>i</I>, 0 &lt;= <I>i</I> &lt;= <I>K</I>, the contents of
3317      * <code>srcarray[k]</code> in process <I>i</I> are sent to <code>dstarray[i]</code>
3318      * in process <I>k</I>.
3319      *
3320      * @param tag      Message tag.
3321      * @param srcarray Array of source buffers to be sent by this process.
3322      * @param dstarray Array of destination buffers to be received by this
3323      *                 process.
3324      * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if
3325      *                                   <code>srcarray</code>'s length does not equal the size of this communicator.
3326      *                                   Thrown if <code>dstarray</code>'s length does not equal the size of this
3327      *                                   communicator.
3328      * @throws java.lang.NullPointerException      (unchecked exception) Thrown if
3329      *                                   <code>srcarray</code> or any element thereof is null. Thrown if
3330      *                                   <code>dstarray</code> or any element thereof is null.
3331      * @throws java.io.IOException               Thrown if an I/O error occurred.
3332      */
3333     public void allToAll(int tag,
3334                          Buf[] srcarray,
3335                          Buf[] dstarray)
3336             throws IOException {
3337         // An all-to-all is done as a series of send-receives. Each process
3338         // sends the appropriate buffer to the process one ahead and receives
3339         // the appropriate buffer from the process one behind. Then each process
3340         // sends the appropriate buffer to the process two ahead and receives
3341         // the appropriate buffer from the process two behind. And so on. Here
3342         // is the message pattern for a communicator with 4 processes doing an
3343         // all-to-all:
3344         //
3345         //          Process
3346         //          0     1     2     3
3347         //          |     |     |     |
3348         //          |---->|     |     | Round 1
3349         //          |     |---->|     |
3350         //          |     |     |---->|
3351         //     - -->|     |     |     |--- -
3352         //          |     |     |     |
3353         //          |     |     |     |
3354         //          |---------->|     | Round 2
3355         //          |     |---------->|
3356         //     - -->|     |     |--------- -
3357         //     - -------->|     |     |--- -
3358         //          |     |     |     |
3359         //          |     |     |     |
3360         //          |---------------->| Round 3
3361         //     - -->|     |--------------- -
3362         //     - -------->|     |--------- -
3363         //     - -------------->|     |--- -
3364         //          |     |     |     |
3365 
3366         // Copy source to destination at this process's own rank.
3367         dstarray[myRank].copy(srcarray[myRank]);
3368 
3369         // Initiate K-1 non-blocking send-receives.
3370         CommRequest[] commrequest = new CommRequest[mySize];
3371         for (int i = 1; i < mySize; ++i) {
3372             int toRank = (myRank + i) % mySize;
3373             int fromRank = (myRank - i + mySize) % mySize;
3374             commrequest[i] = sendReceive(toRank, tag, srcarray[toRank],
3375                     fromRank, tag, dstarray[fromRank], null);
3376         }
3377 
3378         // Wait for completion of all send-receives.
3379         for (int i = 1; i < mySize; ++i) {
3380             commrequest[i].waitForFinish();
3381         }
3382     }
3383 
3384     /**
3385      * Perform a scan on all processes in this communicator. A message tag of 0
3386      * is used. All processes must call <code>scan()</code> with a buffer of the
3387      * same length and the same item data type, and with the same binary
3388      * operation (class {@linkplain edu.rit.pj.reduction.Op Op}).
3389      * <p>
3390      * Before calling <code>scan()</code>, each process has a buffer filled with
3391      * data items. After <code>scan()</code> returns, each data item in the buffer
3392      * of process rank <I>i</I> has been set to the <B>reduction</B> of the
3393      * corresponding data items in the buffers of process ranks 0 through
3394      * <I>i</I>. The reduction is calculated by this formula:
3395      * <p>
3396      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
3397      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3398      * <p>
3399      * where <I>op</I> is the binary operation passed in as an argument and
3400      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3401      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3402      * process ranks 0 through <I>i</I>. However, the order in which the data
3403      * items actually are combined is not specified. Therefore, the binary
3404      * operation must be such that the answer will be the same regardless of the
3405      * order in which the data items are combined; that is, the binary operation
3406      * must be commutative and associative.
3407      *
3408      * @param buf Buffer of data items to be scanned.
3409      * @param op  Binary operation.
3410      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3411      *                              <code>buf</code> is null or <code>op</code>
3412      *                              is null.
3413      * @throws java.lang.ClassCastException   (unchecked exception) Thrown if
3414      *                              <code>buf</code> and <code>op</code> do not use the same item data type.
3415      * @throws java.io.IOException          Thrown if an I/O error occurred.
3416      */
3417     public void scan(Buf buf,
3418                      Op op)
3419             throws IOException {
3420         scan(0, buf, op);
3421     }
3422 
3423     /**
3424      * Perform a scan on all processes in this communicator using the given
3425      * message tag. All processes must call <code>scan()</code> with the same value
3426      * for <code>tag</code>, with a buffer of the same length and the same item data
3427      * type, and with the same binary operation (class {@linkplain
3428      * edu.rit.pj.reduction.Op Op}).
3429      * <p>
3430      * Before calling <code>scan()</code>, each process has a buffer filled with
3431      * data items. After <code>scan()</code> returns, each data item in the buffer
3432      * of process rank <I>i</I> has been set to the <B>reduction</B> of the
3433      * corresponding data items in the buffers of process ranks 0 through
3434      * <I>i</I>. The reduction is calculated by this formula:
3435      * <p>
3436      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
3437      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3438      * <p>
3439      * where <I>op</I> is the binary operation passed in as an argument and
3440      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3441      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3442      * process ranks 0 through <I>i</I>. However, the order in which the data
3443      * items actually are combined is not specified. Therefore, the binary
3444      * operation must be such that the answer will be the same regardless of the
3445      * order in which the data items are combined; that is, the binary operation
3446      * must be commutative and associative.
3447      *
3448      * @param tag Message tag.
3449      * @param buf Buffer of data items to be scanned.
3450      * @param op  Binary operation.
3451      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3452      *                              <code>buf</code> is null or <code>op</code>
3453      *                              is null.
3454      * @throws java.lang.ClassCastException   (unchecked exception) Thrown if
3455      *                              <code>buf</code> and <code>op</code> do not use the same item data type.
3456      * @throws java.io.IOException          Thrown if an I/O error occurred.
3457      */
3458     public void scan(int tag,
3459                      Buf buf,
3460                      Op op)
3461             throws IOException {
3462         // Early return if only one process.
3463         if (mySize == 1) {
3464             return;
3465         }
3466 
3467         // A scan is done as a series of point-to-point messages. The messages
3468         // are organized into rounds. The number of rounds is
3469         // ceil(log_2(mySize)). In the first round, each process sends its data
3470         // to the process one rank ahead, and the incoming data is combined with
3471         // the process's data using the reduction operator. In the second round,
3472         // each process sends its data to the process two ranks ahead. In the
3473         // third round, each process sends its data to process four ranks ahead.
3474         // And so on. Here is the message pattern for a communicator with 8
3475         // processes:
3476         //
3477         //     Process
3478         //     0     1     2     3     4     5     6     7
3479         //     |     |     |     |     |     |     |     |
3480         //     |---->|---->|---->|---->|---->|---->|---->|  Round 1
3481         //     |     |     |     |     |     |     |     |
3482         //     |---------->|     |     |     |     |     |  Round 2
3483         //     |     |---------->|     |     |     |     |
3484         //     |     |     |---------->|     |     |     |
3485         //     |     |     |     |---------->|     |     |
3486         //     |     |     |     |     |---------->|     |
3487         //     |     |     |     |     |     |---------->|
3488         //     |     |     |     |     |     |     |     |
3489         //     |---------------------->|     |     |     |  Round 3
3490         //     |     |---------------------->|     |     |
3491         //     |     |     |---------------------->|     |
3492         //     |     |     |     |---------------------->|
3493         //
3494         // Get temporary buffer for holding incoming data items.
3495         Buf tempbuf = buf.getTemporaryBuf();
3496 
3497         // Get reduction buffer for combining data items.
3498         Buf reductionbuf = buf.getReductionBuf(op);
3499 
3500         // Do rounds of message passing and reduction.
3501         int skip = 1;
3502         for (; ; ) {
3503             int toRank = myRank + skip;
3504             int fromRank = myRank - skip;
3505             boolean toExists = 0 <= toRank && toRank < mySize;
3506             boolean fromExists = 0 <= fromRank && fromRank < mySize;
3507             if (toExists && fromExists) {
3508                 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf);
3509                 reductionbuf.copy(tempbuf);
3510             } else if (fromExists) {
3511                 receive(fromRank, tag, reductionbuf);
3512             } else if (toExists) {
3513                 send(toRank, tag, buf);
3514             } else {
3515                 break;
3516             }
3517             skip <<= 1;
3518         }
3519     }
3520 
3521     /**
3522      * Perform an exclusive scan on all processes in this communicator. A
3523      * message tag of 0 is used. All processes must call
3524      * <code>exclusiveScan()</code> with a buffer of the same length and the same
3525      * item data type, with the same binary operation (class {@linkplain
3526      * edu.rit.pj.reduction.Op Op}), and with the same initial data value.
3527      * <p>
3528      * Before calling <code>exclusiveScan()</code>, each process has a buffer filled
3529      * with data items. After <code>exclusiveScan()</code> returns, each data item
3530      * in the buffer of process rank <I>i</I> &gt; 0 has been set to the
3531      * <B>reduction</B> of the corresponding data items in the buffers of
3532      * process ranks 0 through <I>i</I>-1. The reduction is calculated by this
3533      * formula:
3534      * <p>
3535      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
3536      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3537      * <p>
3538      * where <I>op</I> is the binary operation passed in as an argument and
3539      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3540      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3541      * process ranks 0 through <I>i</I>-1. However, the order in which the data
3542      * items actually are combined is not specified. Therefore, the binary
3543      * operation must be such that the answer will be the same regardless of the
3544      * order in which the data items are combined; that is, the binary operation
3545      * must be commutative and associative.
3546      * <p>
3547      * In process 0, each data item in the buffer has been set to the initial
3548      * data value using the buffer's <code>fill()</code> method.
3549      * <p>
3550      * If the buffer's item data type is a primitive type, the <code>item</code>
3551      * must be an instance of the corresponding primitive wrapper class -- class
3552      * Integer for type <code>int</code>, class Double for type <code>double</code>, and
3553      * so on. If the <code>item</code> is null, the item data type's default initial
3554      * value is assigned to each element in the buffer.
3555      * <p>
3556      * If the buffer's item data type is a nonprimitive type, the <code>item</code>
3557      * must be an instance of the item class or a subclass thereof. The
3558      * <code>item</code> may be null. Note that since <code>item</code> is
3559      * <I>assigned</I> to every buffer element, every buffer element ends up
3560      * referring to the same <code>item</code>.
3561      *
3562      * @param buf  Buffer of data items to be scanned.
3563      * @param op   Binary operation.
3564      * @param item Initial data value.
3565      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3566      *                              <code>buf</code> is null or <code>op</code>
3567      *                              is null.
3568      * @throws java.lang.ClassCastException   (unchecked exception) Thrown if
3569      *                              <code>buf</code> and <code>op</code> do not use the same item data type.
3570      * @throws java.io.IOException          Thrown if an I/O error occurred.
3571      */
3572     public void exclusiveScan(Buf buf,
3573                               Op op,
3574                               Object item)
3575             throws IOException {
3576         exclusiveScan(0, buf, op, item);
3577     }
3578 
3579     /**
3580      * Perform an exclusive scan on all processes in this communicator using the
3581      * given message tag. All processes must call <code>exclusiveScan()</code> with
3582      * the same value for <code>tag</code>, with a buffer of the same length and the
3583      * same item data type, with the same binary operation (class {@linkplain
3584      * edu.rit.pj.reduction.Op Op}), and with the same initial data value.
3585      * <p>
3586      * Before calling <code>exclusiveScan()</code>, each process has a buffer filled
3587      * with data items. After <code>exclusiveScan()</code> returns, each data item
3588      * in the buffer of process rank <I>i</I> &gt; 0 has been set to the
3589      * <B>reduction</B> of the corresponding data items in the buffers of
3590      * process ranks 0 through <I>i</I>-1. The reduction is calculated by this
3591      * formula:
3592      * <p>
3593      * &nbsp;&nbsp;&nbsp;&nbsp;<I>item</I><SUB>0</SUB> <I>op</I>
3594      * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . .
3595      * <p>
3596      * where <I>op</I> is the binary operation passed in as an argument and
3597      * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>,
3598      * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of
3599      * process ranks 0 through <I>i</I>-1. However, the order in which the data
3600      * items actually are combined is not specified. Therefore, the binary
3601      * operation must be such that the answer will be the same regardless of the
3602      * order in which the data items are combined; that is, the binary operation
3603      * must be commutative and associative.
3604      * <p>
3605      * In process 0, each data item in the buffer has been set to the initial
3606      * data value using the buffer's <code>fill()</code> method.
3607      * <p>
3608      * If the buffer's item data type is a primitive type, the <code>item</code>
3609      * must be an instance of the corresponding primitive wrapper class -- class
3610      * Integer for type <code>int</code>, class Double for type <code>double</code>, and
3611      * so on. If the <code>item</code> is null, the item data type's default initial
3612      * value is assigned to each element in the buffer.
3613      * <p>
3614      * If the buffer's item data type is a nonprimitive type, the <code>item</code>
3615      * must be an instance of the item class or a subclass thereof. The
3616      * <code>item</code> may be null. Note that since <code>item</code> is
3617      * <I>assigned</I> to every buffer element, every buffer element ends up
3618      * referring to the same <code>item</code>.
3619      *
3620      * @param tag  Message tag.
3621      * @param buf  Buffer of data items to be scanned.
3622      * @param op   Binary operation.
3623      * @param item Initial data value.
3624      * @throws java.lang.NullPointerException (unchecked exception) Thrown if
3625      *                              <code>buf</code> is null or <code>op</code>
3626      *                              is null.
3627      * @throws java.lang.ClassCastException   (unchecked exception) Thrown if
3628      *                              <code>buf</code> and <code>op</code> do not use the same item data type.
3629      * @throws java.io.IOException          Thrown if an I/O error occurred.
3630      */
3631     public void exclusiveScan(int tag,
3632                               Buf buf,
3633                               Op op,
3634                               Object item)
3635             throws IOException {
3636         // An exclusive scan begins with each process sending its buffer to the
3637         // next higher process. Then process 0 fills its buffer with the initial
3638         // data value, while processes 1 and higher do an inclusive scan.
3639 
3640         int toRank;
3641         int fromRank;
3642         boolean toExists;
3643         boolean fromExists;
3644 
3645         // Process 0 does this.
3646         if (myRank == 0) {
3647             // Send buffer to next higher process.
3648             toRank = 1;
3649             toExists = toRank < mySize;
3650             if (toExists) {
3651                 send(toRank, tag, buf);
3652             }
3653 
3654             // Fill buffer with initial data value.
3655             buf.fill(item);
3656         } // Processes 1 and higher do this.
3657         else {
3658             // Get temporary buffer for holding incoming data items.
3659             Buf tempbuf = buf.getTemporaryBuf();
3660 
3661             // Get reduction buffer for combining data items.
3662             Buf reductionbuf = buf.getReductionBuf(op);
3663 
3664             // Send buffer to next higher process.
3665             toRank = myRank + 1;
3666             fromRank = myRank - 1;
3667             toExists = 0 <= toRank && toRank < mySize;
3668             if (toExists) {
3669                 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf);
3670                 buf.copy(tempbuf);
3671             } else {
3672                 receive(fromRank, tag, buf);
3673             }
3674 
3675             // Do rounds of message passing and reduction.
3676             int skip = 1;
3677             for (; ; ) {
3678                 toRank = myRank + skip;
3679                 fromRank = myRank - skip;
3680                 toExists = 1 <= toRank && toRank < mySize;
3681                 fromExists = 1 <= fromRank && fromRank < mySize;
3682                 if (toExists && fromExists) {
3683                     sendReceive(toRank, tag, buf, fromRank, tag, tempbuf);
3684                     reductionbuf.copy(tempbuf);
3685                 } else if (fromExists) {
3686                     receive(fromRank, tag, reductionbuf);
3687                 } else if (toExists) {
3688                     send(toRank, tag, buf);
3689                 } else {
3690                     break;
3691                 }
3692                 skip <<= 1;
3693             }
3694         }
3695     }
3696 
3697     /**
3698      * Cause all processes in this communicator to wait at a barrier. The
3699      * barrier uses a message tag of 0. All processes must call
3700      * <code>barrier()</code>. The calling thread blocks until every process has
3701      * called <code>barrier()</code>, then the calling thread unblocks and returns
3702      * from the <code>barrier()</code> call.
3703      *
3704      * @throws java.io.IOException Thrown if an I/O error occurred.
3705      */
3706     public void barrier()
3707             throws IOException {
3708         barrier(0);
3709     }
3710 
3711     /**
3712      * Cause all processes in this communicator to wait at a barrier, using the
3713      * given message tag. All processes must call <code>barrier()</code> with the
3714      * same tag. The calling thread blocks until every process has called
3715      * <code>barrier()</code>, then the calling thread unblocks and returns from the
3716      * <code>barrier()</code> call.
3717      *
3718      * @param tag Message tag.
3719      * @throws java.io.IOException Thrown if an I/O error occurred.
3720      */
3721     public void barrier(int tag)
3722             throws IOException {
3723         // A barrier is done as an all-reduce of an empty buffer.
3724         allReduce(tag, IntegerBuf.emptyBuffer(), IntegerOp.SUM);
3725     }
3726 
3727     /**
3728      * Returns a string version of this communicator. The string includes the
3729      * communicator's size, the current process's rank, and the host and port of
3730      * each backend process.
3731      *
3732      * @return String version.
3733      */
3734     public String toString() {
3735         StringBuilder buf = new StringBuilder();
3736         buf.append("Comm(size=");
3737         buf.append(mySize);
3738         buf.append(",rank=");
3739         buf.append(myRank);
3740         buf.append(",backend");
3741         for (int i = 0; i < mySize; ++i) {
3742             if (i > 0) {
3743                 buf.append(',');
3744             }
3745             buf.append('[');
3746             buf.append(i);
3747             buf.append("]=");
3748             buf.append(myAddressForRank[i]);
3749         }
3750         buf.append(')');
3751         return buf.toString();
3752     }
3753 
3754     /**
3755      * Dump the state of this communicator on the given print stream. For
3756      * debugging.
3757      *
3758      * @param out    Print stream.
3759      * @param prefix String to print at the beginning of each line.
3760      */
3761     public void dump(PrintStream out,
3762                      String prefix) {
3763         out.println();
3764         out.println(prefix + getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(this)));
3765         out.println(prefix + "mySize = " + mySize);
3766         out.println(prefix + "myRank = " + myRank);
3767         out.println(prefix + "myHost = " + myHost);
3768         out.println(prefix + "mySizePowerOf2 = " + mySizePowerOf2);
3769         out.println(prefix + "myChannelGroup = " + myChannelGroup);
3770         out.println(prefix + "myAddressForRank:");
3771         for (int i = 0; i < myAddressForRank.length; ++i) {
3772             out.println(prefix + "\t[" + i + "] " + myAddressForRank[i]);
3773         }
3774         out.println(prefix + "myChannelForRank:");
3775         for (int i = 0; i < myChannelForRank.length; ++i) {
3776             out.println(prefix + "\t[" + i + "] " + myChannelForRank[i]);
3777         }
3778         out.println(prefix + "myBroadcastTree:");
3779         for (int i = 0; i < myBroadcastTree.length; ++i) {
3780             out.print(prefix + "\t[" + i + "]");
3781             int[] tree = myBroadcastTree[i];
3782             if (tree == null) {
3783                 out.print(" null");
3784             } else {
3785                 for (int j = 0; j < tree.length; ++j) {
3786                     out.print(" " + tree[j]);
3787                 }
3788             }
3789             out.println();
3790         }
3791         out.println();
3792         myChannelGroup.dump(out, prefix);
3793     }
3794 
3795 // Hidden operations.
3796 
3797     /**
3798      * Notify that another process connected a channel to this process.
3799      *
3800      * @param theChannel Channel.
3801      * @throws IOException Thrown if an I/O error occurred.
3802      */
3803     private synchronized void doFarEndConnected(Channel theChannel)
3804             throws IOException {
3805         // Record channel and rank.
3806         myChannelForRank[getFarRank(theChannel)] = theChannel;
3807 
3808         // Notify any threads waiting in getChannel().
3809         notifyAll();
3810     }
3811 
3812     /**
3813      * Ensure that a channel for communicating with the process at the given
3814      * rank is or will be set up.
3815      *
3816      * @param farrank Rank of far end process.
3817      * @throws IOException Thrown if an I/O error occurred.
3818      */
3819     private synchronized void ensureChannel(int farrank)
3820             throws IOException {
3821         // Get channel from channel array.
3822         Channel channel = myChannelForRank[farrank];
3823 
3824         // If the channel does not exist:
3825         if (channel == null) {
3826             // If this is the lower-ranked process, set up the connection.
3827             if (myRank < farrank) {
3828                 myChannelForRank[farrank]
3829                         = myChannelGroup.connect(myAddressForRank[farrank]);
3830             }
3831 
3832             // If this is the higher-ranked process, the lower-ranked process
3833             // will set up the connection.
3834         }
3835     }
3836 
3837     /**
3838      * Get the channel for communicating with the process at the given rank.
3839      *
3840      * @param farrank Rank of far end process.
3841      * @return Channel.
3842      * @throws IOException Thrown if an I/O error occurred.
3843      */
3844     private synchronized Channel getChannel(int farrank)
3845             throws IOException {
3846         // Get channel from channel array.
3847         Channel channel = myChannelForRank[farrank];
3848 
3849         // If the channel does not exist:
3850         if (channel == null) {
3851             // If this is the lower-ranked process, set up the connection.
3852             if (myRank < farrank) {
3853                 channel = myChannelGroup.connect(myAddressForRank[farrank]);
3854                 myChannelForRank[farrank] = channel;
3855             } // If this is the higher-ranked process, wait for the lower-ranked
3856             // process to set up the connection.
3857             else {
3858                 try {
3859                     while (channel == null) {
3860                         wait();
3861                         channel = myChannelForRank[farrank];
3862                     }
3863                 } catch (InterruptedException exc) {
3864                     IOException exc2 = new InterruptedIOException();
3865                     exc2.initCause(exc);
3866                     throw exc2;
3867                 }
3868             }
3869         }
3870 
3871         return channel;
3872     }
3873 
3874     /**
3875      * Get the rank of the process at the far end of the given channel.
3876      *
3877      * @param channel Channel.
3878      * @return Far end process rank.
3879      */
3880     static int getFarRank(Channel channel) {
3881         return channel.farEndChannelGroupId();
3882     }
3883 
3884     /**
3885      * Get an array of process ranks in the broadcast tree for the given root.
3886      * The broadcast tree is cached in the field myBroadcastTree for later use.
3887      *
3888      * @param root Root process's rank.
3889      * @return Broadcast tree.
3890      */
3891     private synchronized int[] getBroadcastTree(int root) {
3892         if (myBroadcastTree == null) {
3893             myBroadcastTree = new int[mySize][];
3894         }
3895         int[] broadcasttree = myBroadcastTree[root];
3896         if (broadcasttree == null) {
3897             broadcasttree = CommPattern.broadcastPattern(mySize, myRank, root);
3898             myBroadcastTree[root] = broadcasttree;
3899         }
3900         return broadcasttree;
3901     }
3902 
3903 }