1 //****************************************************************************** 2 // 3 // File: Comm.java 4 // Package: edu.rit.pj 5 // Unit: Class edu.rit.pj.Comm 6 // 7 // This Java source file is copyright (C) 2009 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj; 41 42 import java.io.IOException; 43 import java.io.InterruptedIOException; 44 import java.io.PrintStream; 45 import java.net.InetSocketAddress; 46 import java.util.LinkedList; 47 48 import edu.rit.mp.Buf; 49 import edu.rit.mp.Channel; 50 import edu.rit.mp.ChannelGroup; 51 import edu.rit.mp.ConnectListener; 52 import edu.rit.mp.IORequest; 53 import edu.rit.mp.IntegerBuf; 54 import edu.rit.mp.ObjectBuf; 55 import edu.rit.mp.Status; 56 import edu.rit.pj.cluster.CommPattern; 57 import edu.rit.pj.cluster.JobBackend; 58 import edu.rit.pj.cluster.JobFrontend; 59 import edu.rit.pj.cluster.JobSchedulerException; 60 import edu.rit.pj.reduction.IntegerOp; 61 import edu.rit.pj.reduction.Op; 62 import edu.rit.util.Range; 63 64 /** 65 * Class Comm provides a communicator for a PJ cluster parallel program. Class 66 * Comm provides a method to initialize the PJ message passing middleware and 67 * run the parallel program on multiple processors of a cluster parallel 68 * computer. Class Comm also provides methods for passing messages between the 69 * processes of the parallel program. 70 * <HR> 71 * <p> 72 * <B>BASIC CONCEPTS</B> 73 * <p> 74 * A <B>cluster parallel computer</B> typically consists of a <B>frontend 75 * processor</B> and a number of <B>backend processors</B> connected via a 76 * dedicated high-speed network. A user logs into the frontend processor and 77 * runs a PJ program there. The PJ message passing middleware causes copies of 78 * the PJ program to run in a number of separate processes, each process on a 79 * different backend processor. The backend processes run the PJ program, using 80 * the PJ middleware to send messages amongst themselves. The PJ middleware 81 * redirects the backend processes' standard output and standard error streams 82 * to the frontend process. The frontend process does not actually execute the 83 * PJ program, but merely displays all the backend processes' standard output 84 * and standard error streams on the frontend process's own standard output and 85 * standard error. 86 * <p> 87 * For the PJ message passing middleware to work, certain server processes must 88 * be running. See package {@linkplain edu.rit.pj.cluster} for further 89 * informati 90 * on. 91 * <p> 92 * To initialize the PJ message passing middleware, the program must first call 93 * the static <code>Comm.init()</code> method, passing in the command line 94 * arguments. 95 * <p> 96 * A <B>communicator</B> is associated with a group of backend processes. The 97 * communicator's <B>size</B> is the number of processes in the group. Each 98 * process in the communicator has a different <B>rank</B> in the range 0 .. 99 * <I>size</I>-1. A process may obtain the size and rank by calling the 100 * communicator's <code>size()</code> and <code>rank()</code> methods. 101 * <p> 102 * There is one predefined communicator, the <B>world communicator,</B> 103 * consisting of all the backend processes in the parallel program. A process 104 * may obtain a reference to the world communicator by calling the static 105 * <code>Comm.world()</code> method. Typically, the first few lines in a PJ cluster 106 * parallel program look like this: 107 * <PRE> 108 * public class AParallelProgram 109 * { 110 * public static void main 111 * (String[] args) 112 * throws Exception 113 * { 114 * Comm.init (args); 115 * Comm world = Comm.world(); 116 * int size = world.size(); 117 * int rank = world.rank(); 118 * . . .</PRE> 119 * <p> 120 * The number of processes in the parallel program -- that is, the size of the 121 * world communicator -- is specified by the <code>"pj.np"</code> property, which 122 * must be an integer greater than or equal to 1. You can specify the number of 123 * processes on the Java command line like this: 124 * <PRE> 125 * java -Dpj.np=4 . . .</PRE> 126 * <p> 127 * If the <code>"pj.np"</code> property is not specified, the default is 1. 128 * <p> 129 * The PJ program will run on the specified number of backend processors as 130 * described above. To determine which backend processors to use, the PJ program 131 * interacts with a <B>Job Scheduler</B> server process running on the frontend 132 * processor. When the PJ program starts and calls the <code>Comm.init()</code> 133 * method, the middleware first prints the job number on the standard error. The 134 * middleware then waits until the required number of backend processors are 135 * ready to run a job. As each backend processor becomes ready, the middleware 136 * prints on the standard error the name of each backend processor assigned to 137 * the job. Once all are ready, the PJ program starts running on those backend 138 * processors, and all further output comes from the PJ program. Since each PJ 139 * program interacts with the Job Scheduler, the Job Scheduler can ensure that 140 * each backend processor is running a backend process for only one job at a 141 * time. 142 * <p> 143 * Depending on the system load, your PJ program may have to wait in the Job 144 * Scheduler's queue for a while until enough backend processors become ready. 145 * If you get tired of waiting, you can kill your PJ program (e.g., by typing 146 * CTRL-C), which will remove your PJ program from the Job Scheduler's queue. 147 * <p> 148 * The Job Scheduler has a web interface that lets you examine the cluster 149 * status. Just point your web browser at this URL: 150 * 151 * <code> http://<hostname>:8080/</code> 152 * <p> 153 * where <code><hostname></code> is replaced by the host name of the frontend 154 * processor. The default port for the cluster status web interface is port 155 * 8080. The Job Scheduler can be configured to use a different port. For 156 * further information, see package {@linkplain edu.rit.pj.cluster}. 157 * <p> 158 * If the PJ program is executed on a host where no Job Scheduler is running, 159 * the PJ program will run in <I>one</I> process on that host (i.e., the machine 160 * you're logged into), rather than on the backend processors. The message 161 * passing methods in class Comm will still work, though. This option can be 162 * useful for debugging a PJ program's logic on a non-parallel machine before 163 * running the PJ program on a cluster. 164 * <HR> 165 * <p> 166 * <B>MESSAGE PASSING</B> 167 * <p> 168 * PJ provides two categories of communication, <B>point-to-point 169 * communication</B> and <B>collective communication.</B> The following methods 170 * of class Comm are used for point-to-point communication: 171 * <UL> 172 * <LI><code>send()</code> 173 * <LI><code>receive()</code> 174 * <LI><code>sendReceive()</code> 175 * <LI><code>floodSend()</code> 176 * <LI><code>floodReceive()</code> 177 * </UL> 178 * The following methods are used for collective communication: 179 * <UL> 180 * <LI><code>broadcast()</code> 181 * <LI><code>scatter()</code> 182 * <LI><code>gather()</code> 183 * <LI><code>allGather()</code> 184 * <LI><code>reduce()</code> 185 * <LI><code>allReduce()</code> 186 * <LI><code>allToAll()</code> 187 * <LI><code>scan()</code> 188 * <LI><code>exclusiveScan()</code> 189 * <LI><code>barrier()</code> 190 * </UL> 191 * These methods are described further in the sections below. 192 * <p> 193 * In addition, you can create a new communicator consisting of all, or a subset 194 * of, the processes in an existing communicator. Message passing in the new 195 * communicator is completely independent of message passing in the original 196 * communicator. The following method creates a new communicator: 197 * <UL> 198 * <LI><code>createComm()</code> 199 * </UL> 200 * <HR> 201 * <p> 202 * <B>POINT-TO-POINT COMMUNICATION</B> 203 * <p> 204 * One process in a PJ cluster parallel program, the <B>source process</B>, may 205 * use a communicator to send a message to another process in the program, the 206 * <B>destination process</B>. This is called a <B>point-to-point 207 * communication</B> because just the two processes are involved (as opposed to 208 * a collective communication, which involves all the processes). Five 209 * point-to-point communication methods are available in this release: send, 210 * receive, send-receive, flood-send, and flood-receive. 211 * <p> 212 * <B>Send and Receive</B> 213 * <p> 214 * To do a point-to-point communication, the source process calls the 215 * <code>send()</code> method on a certain communicator, such as the world 216 * communicator. The source process specifies the destination process's rank, 217 * the <B>tag</B> for the message, and a <B>buffer</B> containing the data items 218 * to be sent (type {@linkplain edu.rit.mp.Buf}). Likewise, the destination 219 * process calls the <code>receive()</code> method on the same communicator as the 220 * source process. The destination process specifies the source process's rank, 221 * the message tag which must be the same as in the source process, and the 222 * buffer for the data items to be received. 223 * <p> 224 * A <code>send()</code> method call and a <code>receive()</code> method call are said 225 * to <B>match</B> if (a) the rank passed to the <code>send()</code> method equals 226 * the rank of the process calling <code>receive()</code>, (b) the rank passed to 227 * the <code>receive()</code> method equals the rank of the process calling 228 * <code>send()</code>, (c) the item data type in the source buffer is the same as 229 * the item data type in the destination buffer, and (d) the send message tag 230 * equals the receive message tag. A <code>receive()</code> method call will block 231 * until a matching <code>send()</code> method call occurs. If more than one 232 * <code>send()</code> method call matches a <code>receive()</code> method call, one of 233 * the matching <code>send()</code> method calls is picked in an unspecified manner. 234 * A <code>send()</code> method call <I>may</I> block until a matching 235 * <code>receive()</code> method call occurs due to flow control in the underlying 236 * network communication. 237 * <p> 238 * The message tag can be used to distinguish different kinds of messages. A 239 * <code>receive()</code> method call will only match a <code>send()</code> method call 240 * with the same tag. If there is no need to distinguish different kinds of 241 * messages, omit the tag (it will default to 0). 242 * <p> 243 * Once a <code>send()</code> method call and a <code>receive()</code> method call have 244 * been matched together, the actual message data transfer takes place. Each 245 * item in the source buffer, starting at index 0 and continuing for the entire 246 * length of the source buffer, is written to the message. At the other end, 247 * each item in the destination buffer, starting at index 0, is read from the 248 * message. 249 * <p> 250 * The <code>receive()</code> method returns a {@linkplain CommStatus} object. The 251 * status object gives the actual rank of the process that sent the message, the 252 * actual message tag that was received, and the actual number of data items in 253 * the message. If the actual number of data items in the message is less than 254 * the length of the destination buffer, nothing is stored into the extra data 255 * items at the end of the destination buffer. If the actual number of data 256 * items in the message is greater than the length of the destination buffer, 257 * the extra data items at the end of the message are discarded. 258 * <p> 259 * The <code>send()</code> method does not return until all the message elements 260 * have been written from the source buffer. Likewise, the <code>receive()</code> 261 * method does not return until all the message elements have been read into the 262 * destination buffer. However, you cannot assume that because the 263 * <code>send()</code> method has returned, the matching <code>receive()</code> method 264 * has also returned. Because of buffering in the underlying network 265 * communication, not all the destination items might have been received even 266 * though all the source items have been sent. 267 * <p> 268 * The destination process, instead of specifying a particular source process, 269 * can declare that it will receive a message from any source process by 270 * specifying null for the source process rank in the <code>receive()</code> method 271 * call. This is called a <B>wildcard source</B>. In this case the 272 * <code>receive()</code> method call's returned status object will indicate the 273 * actual source process that sent the message. 274 * <p> 275 * The destination process, instead of specifying a particular message tag, can 276 * declare that it will receive a message with any tag by specifying null for 277 * the tag in the <code>receive()</code> method call. This is called a <B>wildcard 278 * tag</B>. Alternatively, the destination process can specify a range of 279 * message tags, and it will receive a message with any tag in the given range. 280 * In these cases the <code>receive()</code> method call's returned status object 281 * will indicate the actual message tag that was sent. 282 * <p> 283 * A process can send a message to itself. In this case one thread must call 284 * <code>send()</code> (specifying the process's own rank as the destination) and a 285 * different thread must call <code>receive()</code> (specifying the process's own 286 * rank as the source), otherwise a deadlock will ensue. 287 * <p> 288 * <B>Send-Receive</B> 289 * <p> 290 * By calling the <code>sendReceive()</code> method, a process can send a buffer of 291 * outgoing message items to a destination process while simultaneously 292 * receiving a buffer of incoming message items from a source process. The 293 * destination process may be the same as the source process, or different from 294 * the source process. The outgoing message items must come from a different 295 * place than where the incoming message items will be stored, otherwise the 296 * incoming message items may overwrite the outgoing message items before they 297 * can be sent. When the <code>sendReceive()</code> method returns, the outgoing 298 * message items have been fully sent, but they may not yet have been fully 299 * received; and the incoming message items have been fully received. 300 * <p> 301 * With the <code>sendReceive()</code> method, a process cannot receive a message 302 * from a wildcard source, and a process cannot receive a message with a 303 * wildcard tag or a range of tags. The process calling <code>sendReceive()</code> 304 * must know the rank of the source process and the message tag (if not 0). The 305 * <code>sendReceive()</code> method does return a status object giving the outcome 306 * of the receive half of the send-receive operation, just as the 307 * <code>receive()</code> method does. 308 * <p> 309 * A process can send-receive messages with itself. In this case one thread must 310 * call <code>sendReceive()</code> (specifying the process's own rank as the source 311 * and destination) and a different thread must also call <code>sendReceive()</code> 312 * (specifying the process's own rank as the source and destination), otherwise 313 * a deadlock will ensue. 314 * <p> 315 * <B>Non-Blocking Communication</B> 316 * <p> 317 * The <code>send()</code>, <code>receive()</code>, and <code>sendReceive()</code> methods 318 * each have a non-blocking version. A non-blocking communication method 319 * initiates the communication operation and immediately returns, storing the 320 * state of the communication operation in a {@linkplain CommRequest} object. 321 * The communicator then performs the communication operation in a separate 322 * thread. This allows the calling thread to do other work while the 323 * communication operation is in progress. To wait for the send and receive 324 * operations to finish, call the CommRequest object's <code>waitForFinish()</code> 325 * method. 326 * <p> 327 * <B>Flood-Send and Flood-Receive</B> 328 * <p> 329 * Any process can send a message to all processes in the communicator. This is 330 * called "flooding" the message. First, all processes must start a 331 * flood-receive operation, either by calling the non-blocking 332 * <code>floodReceive()</code> method, or by having a separate thread call the 333 * blocking <code>floodReceive()</code> method. Then, one process (any process) must 334 * call the <code>floodSend()</code> method. The data items in the flood-send 335 * operation's outgoing buffer are copied into the flood-receive operation's 336 * incoming buffer in all processes. 337 * <p> 338 * Message flooding is similar to the "broadcast" collective communication 339 * operation (see below). The differences are these: Broadcasting combines 340 * sending and receiving in a single operation; flooding uses separate send and 341 * receive operations. For broadcasting, every process must know which process 342 * is sending the outgoing data items; for flooding, the receiving processes do 343 * not need to know which process is sending (any process can send). 344 * <HR> 345 * <p> 346 * <B>COLLECTIVE COMMUNICATION</B> 347 * <p> 348 * A PJ cluster parallel program may use a communicator to send a message among 349 * all the processes in the program at the same time. This is called a 350 * <B>collective communication</B> because all the processes in the communicator 351 * are involved (as opposed to a point-to-point communication). Ten collective 352 * communication methods are available in this release: broadcast, scatter, 353 * gather, all-gather, reduce, all-reduce, all-to-all, scan, exclusive-scan, and 354 * barrier. Further collective communication methods will be added to class Comm 355 * in a later release. 356 * <p> 357 * <B>Broadcast</B> 358 * <p> 359 * One process in the communicator, the <B>root</B> process, has a source buffer 360 * (type {@linkplain edu.rit.mp.Buf Buf}) filled with data. The other processes 361 * in the communicator each have a destination buffer with the same length and 362 * the same item data type as the source buffer. Each process calls the 363 * communicator's <code>broadcast()</code> method. Afterwards, all the destination 364 * buffers contain the same data as the source buffer. 365 * <TABLE> 366 * <CAPTION>Before and After Broadcast.</CAPTION> 367 * <TR> 368 * <TD style="vertical-align:top;"> 369 * Before: 370 * <pre> 371 * Process Process Process Process 372 * 0 (root) 1 2 3 373 * +----+ +----+ +----+ +----+ 374 * | 1 | | | | | | | 375 * | 2 | | | | | | | 376 * | 3 | | | | | | | 377 * | 4 | | | | | | | 378 * | 5 | | | | | | | 379 * | 6 | | | | | | | 380 * | 7 | | | | | | | 381 * | 8 | | | | | | | 382 * +----+ +----+ +----+ +----+</pre> 383 * </TD> 384 * <TD> 385 * <pre> 386 * ... </pre> 387 * </TD> 388 * <TD style="vertical-align:top;"> 389 * After: 390 * <pre> 391 * Process Process Process Process 392 * 0 (root) 1 2 3 393 * +----+ +----+ +----+ +----+ 394 * | 1 | | 1 | | 1 | | 1 | 395 * | 2 | | 2 | | 2 | | 2 | 396 * | 3 | | 3 | | 3 | | 3 | 397 * | 4 | | 4 | | 4 | | 4 | 398 * | 5 | | 5 | | 5 | | 5 | 399 * | 6 | | 6 | | 6 | | 6 | 400 * | 7 | | 7 | | 7 | | 7 | 401 * | 8 | | 8 | | 8 | | 8 | 402 * +----+ +----+ +----+ +----+</pre> 403 * </TD> 404 * </TR> 405 * </TABLE> 406 * <I>Note:</I> Any process can be the root of the broadcast. The above is only 407 * one example with process 0 as the root. 408 * <p> 409 * <B>Scatter</B> 410 * <p> 411 * One process in the communicator, the root process, has <I>K</I> source 412 * buffers (type {@linkplain edu.rit.mp.Buf Buf}) filled with data, where 413 * <I>K</I> is the size of the communicator. For example, the source buffers 414 * could be different portions of an array. Each process in the communicator 415 * (including the root process) has a destination buffer with the same length 416 * and the same item data type as the corresponding source buffer. Each process 417 * calls the communicator's <code>scatter()</code> method. Afterwards, each 418 * process's destination buffer contains the same data as the corresponding 419 * source buffer in the root process. 420 * <TABLE> 421 * <CAPTION>Scatter.</CAPTION> 422 * <TR> 423 * <TD style="vertical-align:top;"> 424 * Before: 425 * <pre> 426 * Process Process Process Process 427 * 0 (root) 1 2 3 428 * +----+ 429 * | 1 | 430 * | 2 | 431 * +----+ 432 * | 3 | 433 * | 4 | 434 * +----+ 435 * | 5 | 436 * | 6 | 437 * +----+ 438 * | 7 | 439 * | 8 | 440 * +----+ 441 * 442 * +----+ +----+ +----+ +----+ 443 * | | | | | | | | 444 * | | | | | | | | 445 * +----+ +----+ +----+ +----+</pre> 446 * </TD> 447 * <TD> 448 * <pre> 449 * ... </pre> 450 * </TD> 451 * <TD style="vertical-align:top;"> 452 * After: 453 * <pre> 454 * Process Process Process Process 455 * 0 (root) 1 2 3 456 * +----+ 457 * | 1 | 458 * | 2 | 459 * +----+ 460 * | 3 | 461 * | 4 | 462 * +----+ 463 * | 5 | 464 * | 6 | 465 * +----+ 466 * | 7 | 467 * | 8 | 468 * +----+ 469 * 470 * +----+ +----+ +----+ +----+ 471 * | 1 | | 3 | | 5 | | 7 | 472 * | 2 | | 4 | | 6 | | 8 | 473 * +----+ +----+ +----+ +----+</pre> 474 * </TD> 475 * </TR> 476 * </TABLE> 477 * In the root process, the destination buffer can be the same as the source 478 * buffer: 479 * <TABLE> 480 * <CAPTION>Scatter</CAPTION> 481 * <TR> 482 * <TD style="vertical-align:top;"> 483 * Before: 484 * <pre> 485 * Process Process Process Process 486 * 0 (root) 1 2 3 487 * +----+ 488 * | 1 | 489 * | 2 | 490 * +----+ +----+ 491 * | 3 | | | 492 * | 4 | | | 493 * +----+ +----+ +----+ 494 * | 5 | | | 495 * | 6 | | | 496 * +----+ +----+ +----+ 497 * | 7 | | | 498 * | 8 | | | 499 * +----+ +----+</pre> 500 * </TD> 501 * <TD> 502 * <pre> 503 * ... </pre> 504 * </TD> 505 * <TD style="vertical-align:top;"> 506 * After: 507 * <pre> 508 * Process Process Process Process 509 * 0 (root) 1 2 3 510 * +----+ 511 * | 1 | 512 * | 2 | 513 * +----+ +----+ 514 * | 3 | | 3 | 515 * | 4 | | 4 | 516 * +----+ +----+ +----+ 517 * | 5 | | 5 | 518 * | 6 | | 6 | 519 * +----+ +----+ +----+ 520 * | 7 | | 7 | 521 * | 8 | | 8 | 522 * +----+ +----+</pre> 523 * </TD> 524 * </TR> 525 * </TABLE> 526 * <I>Note:</I> Any process can be the root of the scatter. The above is only 527 * one example with process 0 as the root. 528 * <p> 529 * <B>Gather</B> 530 * <p> 531 * Gather is the opposite of scatter. One process in the communicator, the root 532 * process, has <I>K</I> destination buffers (type {@linkplain edu.rit.mp.Buf 533 * Buf}), where <I>K</I> is the size of the communicator. For example, the 534 * destination buffers could be different portions of an array. Each process in 535 * the communicator (including the root process) has a source buffer with the 536 * same length and the same item data type as the corresponding destination 537 * buffer, filled with data. Each process calls the communicator's 538 * <code>gather()</code> method. Afterwards, each destination buffer in the root 539 * process contains the same data as the corresponding source buffer. 540 * <TABLE> 541 * <CAPTION>Gather</CAPTION> 542 * <TR> 543 * <TD style="vertical-align:top;"> 544 * Before: 545 * <pre> 546 * Process Process Process Process 547 * 0 (root) 1 2 3 548 * +----+ +----+ +----+ +----+ 549 * | 1 | | 3 | | 5 | | 7 | 550 * | 2 | | 4 | | 6 | | 8 | 551 * +----+ +----+ +----+ +----+ 552 * 553 * +----+ 554 * | | 555 * | | 556 * +----+ 557 * | | 558 * | | 559 * +----+ 560 * | | 561 * | | 562 * +----+ 563 * | | 564 * | | 565 * +----+</pre> 566 * </TD> 567 * <TD> 568 * <pre> 569 * ... </pre> 570 * </TD> 571 * <TD style="vertical-align:top;"> 572 * After: 573 * <pre> 574 * Process Process Process Process 575 * 0 (root) 1 2 3 576 * +----+ +----+ +----+ +----+ 577 * | 1 | | 3 | | 5 | | 7 | 578 * | 2 | | 4 | | 6 | | 8 | 579 * +----+ +----+ +----+ +----+ 580 * 581 * +----+ 582 * | 1 | 583 * | 2 | 584 * +----+ 585 * | 3 | 586 * | 4 | 587 * +----+ 588 * | 5 | 589 * | 6 | 590 * +----+ 591 * | 7 | 592 * | 8 | 593 * +----+</pre> 594 * </TD> 595 * </TR> 596 * </TABLE> 597 * In the root process, the destination buffer can be the same as the source 598 * buffer: 599 * <TABLE> 600 * <CAPTION>Gather</CAPTION> 601 * <TR> 602 * <TD style="vertical-align:top;"> 603 * Before: 604 * <pre> 605 * Process Process Process Process 606 * 0 (root) 1 2 3 607 * +----+ 608 * | 1 | 609 * | 2 | 610 * +----+ +----+ 611 * | | | 3 | 612 * | | | 4 | 613 * +----+ +----+ +----+ 614 * | | | 5 | 615 * | | | 6 | 616 * +----+ +----+ +----+ 617 * | | | 7 | 618 * | | | 8 | 619 * +----+ +----+</pre> 620 * </TD> 621 * <TD> 622 * <pre> 623 * ... </pre> 624 * </TD> 625 * <TD style="vertical-align:top;"> 626 * After: 627 * <pre> 628 * Process Process Process Process 629 * 0 (root) 1 2 3 630 * +----+ 631 * | 1 | 632 * | 2 | 633 * +----+ +----+ 634 * | 3 | | 3 | 635 * | 4 | | 4 | 636 * +----+ +----+ +----+ 637 * | 5 | | 5 | 638 * | 6 | | 6 | 639 * +----+ +----+ +----+ 640 * | 7 | | 7 | 641 * | 8 | | 8 | 642 * +----+ +----+</pre> 643 * </TD> 644 * </TR> 645 * </TABLE> 646 * <I>Note:</I> Any process can be the root of the gather. The above is only one 647 * example with process 0 as the root. 648 * <p> 649 * <B>All-Gather</B> 650 * <p> 651 * All-gather is the same as gather, except that every process has an array of 652 * destination buffers, and every process receives the results of the gather. 653 * Each process in the communicator has a source buffer (type {@linkplain 654 * edu.rit.mp.Buf Buf}) filled with data. Each process in the communicator has 655 * <I>K</I> destination buffers, where <I>K</I> is the size of the communicator. 656 * For example, the destination buffers could be different portions of an array. 657 * Each destination buffer has the same length and the same item data type as 658 * the corresponding source buffer. Each process calls the communicator's 659 * <code>allGather()</code> method. Afterwards, each destination buffer in every 660 * process contains the same data as the corresponding source buffer. 661 * <TABLE> 662 * <CAPTION>All-Gather</CAPTION> 663 * <TR> 664 * <TD style="vertical-align:top;"> 665 * Before: 666 * <pre> 667 * Process Process Process Process 668 * 0 1 2 3 669 * +----+ +----+ +----+ +----+ 670 * | 1 | | 3 | | 5 | | 7 | 671 * | 2 | | 4 | | 6 | | 8 | 672 * +----+ +----+ +----+ +----+ 673 * 674 * +----+ +----+ +----+ +----+ 675 * | | | | | | | | 676 * | | | | | | | | 677 * +----+ +----+ +----+ +----+ 678 * | | | | | | | | 679 * | | | | | | | | 680 * +----+ +----+ +----+ +----+ 681 * | | | | | | | | 682 * | | | | | | | | 683 * +----+ +----+ +----+ +----+ 684 * | | | | | | | | 685 * | | | | | | | | 686 * +----+ +----+ +----+ +----+</pre> 687 * </TD> 688 * <TD> 689 * <pre> 690 * ... </pre> 691 * </TD> 692 * <TD style="vertical-align:top;"> 693 * After: 694 * <pre> 695 * Process Process Process Process 696 * 0 1 2 3 697 * +----+ +----+ +----+ +----+ 698 * | 1 | | 3 | | 5 | | 7 | 699 * | 2 | | 4 | | 6 | | 8 | 700 * +----+ +----+ +----+ +----+ 701 * 702 * +----+ +----+ +----+ +----+ 703 * | 1 | | 1 | | 1 | | 1 | 704 * | 2 | | 2 | | 2 | | 2 | 705 * +----+ +----+ +----+ +----+ 706 * | 3 | | 3 | | 3 | | 3 | 707 * | 4 | | 4 | | 4 | | 4 | 708 * +----+ +----+ +----+ +----+ 709 * | 5 | | 5 | | 5 | | 5 | 710 * | 6 | | 6 | | 6 | | 6 | 711 * +----+ +----+ +----+ +----+ 712 * | 7 | | 7 | | 7 | | 7 | 713 * | 8 | | 8 | | 8 | | 8 | 714 * +----+ +----+ +----+ +----+</pre> 715 * </TD> 716 * </TR> 717 * </TABLE> 718 * The destination buffer can be the same as the source buffer in each process: 719 * <TABLE> 720 * <CAPTION>All-Gather</CAPTION> 721 * <TR> 722 * <TD style="vertical-align:top;"> 723 * Before: 724 * <pre> 725 * Process Process Process Process 726 * 0 1 2 3 727 * +----+ +----+ +----+ +----+ 728 * | 1 | | | | | | | 729 * | 2 | | | | | | | 730 * +----+ +----+ +----+ +----+ 731 * | | | 3 | | | | | 732 * | | | 4 | | | | | 733 * +----+ +----+ +----+ +----+ 734 * | | | | | 5 | | | 735 * | | | | | 6 | | | 736 * +----+ +----+ +----+ +----+ 737 * | | | | | | | 7 | 738 * | | | | | | | 8 | 739 * +----+ +----+ +----+ +----+</pre> 740 * </TD> 741 * <TD> 742 * <pre> 743 * ... </pre> 744 * </TD> 745 * <TD style="vertical-align:top;"> 746 * After: 747 * <pre> 748 * Process Process Process Process 749 * 0 1 2 3 750 * +----+ +----+ +----+ +----+ 751 * | 1 | | 1 | | 1 | | 1 | 752 * | 2 | | 2 | | 2 | | 2 | 753 * +----+ +----+ +----+ +----+ 754 * | 3 | | 3 | | 3 | | 3 | 755 * | 4 | | 4 | | 4 | | 4 | 756 * +----+ +----+ +----+ +----+ 757 * | 5 | | 5 | | 5 | | 5 | 758 * | 6 | | 6 | | 6 | | 6 | 759 * +----+ +----+ +----+ +----+ 760 * | 7 | | 7 | | 7 | | 7 | 761 * | 8 | | 8 | | 8 | | 8 | 762 * +----+ +----+ +----+ +----+</pre> 763 * </TD> 764 * </TR> 765 * </TABLE> 766 * <p> 767 * <B>Reduce</B> 768 * <p> 769 * Reduce is like gather, except the buffers' contents are combined together 770 * instead of just copied. Each process in the communicator has a buffer (type 771 * {@linkplain edu.rit.mp.Buf Buf}) filled with data. Each process calls the 772 * communicator's <code>reduce()</code> method, specifying some binary operation 773 * (type {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data. 774 * Afterwards, each element of the buffer in the root process contains the 775 * result of combining all the corresponding elements in all the buffers using 776 * the specified binary operation. For example, if the operation is addition, 777 * each buffer element in the root process ends up being the sum of the 778 * corresponding buffer elements in all the processes. In the non-root 779 * processes, the buffers' contents may be changed from their original contents. 780 * <TABLE> 781 * <CAPTION>Reduce</CAPTION> 782 * <TR> 783 * <TD style="vertical-align:top;"> 784 * Before: 785 * <pre> 786 * Process Process Process Process 787 * 0 (root) 1 2 3 788 * +----+ +----+ +----+ +----+ 789 * | 1 | | 3 | | 5 | | 7 | 790 * | 2 | | 4 | | 6 | | 8 | 791 * +----+ +----+ +----+ +----+</pre> 792 * </TD> 793 * <TD> 794 * <pre> 795 * ... </pre> 796 * </TD> 797 * <TD style="vertical-align:top;"> 798 * After: 799 * <pre> 800 * Process Process Process Process 801 * 0 (root) 1 2 3 802 * +----+ +----+ +----+ +----+ 803 * | 16 | | ?? | | ?? | | ?? | 804 * | 20 | | ?? | | ?? | | ?? | 805 * +----+ +----+ +----+ +----+</pre> 806 * </TD> 807 * </TR> 808 * </TABLE> 809 * <I>Note:</I> Any process can be the root of the reduction. The above is only 810 * one example with process 0 as the root. 811 * <p> 812 * <B>All-Reduce</B> 813 * <p> 814 * All-reduce is the same as reduce, except that every process receives the 815 * results of the reduction. Each process in the communicator has a buffer (type 816 * {@linkplain edu.rit.mp.Buf Buf}) filled with data. Each process calls the 817 * communicator's <code>allReduce()</code> method, specifying some binary operation 818 * (type {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data. 819 * Afterwards, each element of the buffer in each process contains the result of 820 * combining all the corresponding elements in all the buffers using the 821 * specified binary operation. For example, if the operation is addition, each 822 * buffer element ends up being the sum of the corresponding buffer elements. 823 * <TABLE> 824 * <CAPTION>All-Reduce</CAPTION> 825 * <TR> 826 * <TD style="vertical-align:top;"> 827 * Before: 828 * <pre> 829 * Process Process Process Process 830 * 0 1 2 3 831 * +----+ +----+ +----+ +----+ 832 * | 1 | | 3 | | 5 | | 7 | 833 * | 2 | | 4 | | 6 | | 8 | 834 * +----+ +----+ +----+ +----+</pre> 835 * </TD> 836 * <TD> 837 * <pre> 838 * ... </pre> 839 * </TD> 840 * <TD style="vertical-align:top;"> 841 * After: 842 * <pre> 843 * Process Process Process Process 844 * 0 1 2 3 845 * +----+ +----+ +----+ +----+ 846 * | 16 | | 16 | | 16 | | 16 | 847 * | 20 | | 20 | | 20 | | 20 | 848 * +----+ +----+ +----+ +----+</pre> 849 * </TD> 850 * </TR> 851 * </TABLE> 852 * <p> 853 * <B>All-to-All</B> 854 * <p> 855 * Every process in the communicator has <I>K</I> source buffers (type 856 * {@linkplain edu.rit.mp.Buf Buf}) filled with data, where <I>K</I> is the size 857 * of the communicator. Every process in the communicator also has <I>K</I> 858 * destination buffers (type {@linkplain edu.rit.mp.Buf Buf}). The source 859 * buffers and the destination buffers must refer to different storage. For 860 * example, the source buffers could be portions of an array, and the 861 * destination buffers could be portions of a different array. Each process 862 * calls the communicator's <code>allToAll()</code> method. Afterwards, for each 863 * process rank <I>k</I>, 0 <= <I>k</I> <= <I>K</I>-1, and each buffer 864 * index <I>i</I>, 0 <= <I>i</I> <= <I>K</I>-1, destination buffer 865 * <I>i</I> in process <I>k</I> contains the same data as source buffer <I>k</I> 866 * in process <I>i</I>. 867 * <TABLE> 868 * <CAPTION>All-to-All</CAPTION> 869 * <TR> 870 * <TD style="vertical-align:top;"> 871 * Before: 872 * <pre> 873 * Process Process Process Process 874 * 0 1 2 3 875 * +----+ +----+ +----+ +----+ 876 * | 1 | | 9 | | 17 | | 25 | 877 * | 2 | | 10 | | 18 | | 26 | 878 * +----+ +----+ +----+ +----+ 879 * | 3 | | 11 | | 19 | | 27 | 880 * | 4 | | 12 | | 20 | | 28 | 881 * +----+ +----+ +----+ +----+ 882 * | 5 | | 13 | | 21 | | 29 | 883 * | 6 | | 14 | | 22 | | 30 | 884 * +----+ +----+ +----+ +----+ 885 * | 7 | | 15 | | 23 | | 31 | 886 * | 8 | | 16 | | 24 | | 32 | 887 * +----+ +----+ +----+ +----+ 888 * 889 * +----+ +----+ +----+ +----+ 890 * | | | | | | | | 891 * | | | | | | | | 892 * +----+ +----+ +----+ +----+ 893 * | | | | | | | | 894 * | | | | | | | | 895 * +----+ +----+ +----+ +----+ 896 * | | | | | | | | 897 * | | | | | | | | 898 * +----+ +----+ +----+ +----+ 899 * | | | | | | | | 900 * | | | | | | | | 901 * +----+ +----+ +----+ +----+</pre> 902 * </TD> 903 * <TD> 904 * <pre> 905 * ... </pre> 906 * </TD> 907 * <TD style="vertical-align:top;"> 908 * After: 909 * <pre> 910 * Process Process Process Process 911 * 0 1 2 3 912 * +----+ +----+ +----+ +----+ 913 * | 1 | | 9 | | 17 | | 25 | 914 * | 2 | | 10 | | 18 | | 26 | 915 * +----+ +----+ +----+ +----+ 916 * | 3 | | 11 | | 19 | | 27 | 917 * | 4 | | 12 | | 20 | | 28 | 918 * +----+ +----+ +----+ +----+ 919 * | 5 | | 13 | | 21 | | 29 | 920 * | 6 | | 14 | | 22 | | 30 | 921 * +----+ +----+ +----+ +----+ 922 * | 7 | | 15 | | 23 | | 31 | 923 * | 8 | | 16 | | 24 | | 32 | 924 * +----+ +----+ +----+ +----+ 925 * 926 * +----+ +----+ +----+ +----+ 927 * | 1 | | 3 | | 5 | | 7 | 928 * | 2 | | 4 | | 6 | | 8 | 929 * +----+ +----+ +----+ +----+ 930 * | 9 | | 11 | | 13 | | 15 | 931 * | 10 | | 12 | | 14 | | 16 | 932 * +----+ +----+ +----+ +----+ 933 * | 17 | | 19 | | 21 | | 23 | 934 * | 18 | | 20 | | 22 | | 24 | 935 * +----+ +----+ +----+ +----+ 936 * | 25 | | 27 | | 29 | | 31 | 937 * | 26 | | 28 | | 30 | | 32 | 938 * +----+ +----+ +----+ +----+</pre> 939 * </TD> 940 * </TR> 941 * </TABLE> 942 * <p> 943 * <B>Scan</B> 944 * <p> 945 * Each process in the communicator has a buffer (type {@linkplain 946 * edu.rit.mp.Buf Buf}) filled with data. Each process calls the communicator's 947 * <code>scan()</code> method, specifying some binary operation (type {@linkplain 948 * edu.rit.pj.reduction.Op Op}) for combining the data. Afterwards, each element 949 * of the buffer in a particular process contains the result of combining all 950 * the corresponding elements in its own and all lower-ranked processes' buffers 951 * using the specified binary operation. For example, if the operation is 952 * addition, each buffer element ends up being the sum of its own and all 953 * lower-ranked processes' buffer elements. 954 * <TABLE> 955 * <CAPTION>Scan</CAPTION> 956 * <TR> 957 * <TD style="vertical-align:top;"> 958 * Before: 959 * <pre> 960 * Process Process Process Process 961 * 0 1 2 3 962 * +----+ +----+ +----+ +----+ 963 * | 1 | | 3 | | 5 | | 7 | 964 * | 2 | | 4 | | 6 | | 8 | 965 * +----+ +----+ +----+ +----+</pre> 966 * </TD> 967 * <TD> 968 * <pre> 969 * ... </pre> 970 * </TD> 971 * <TD style="vertical-align:top;"> 972 * After: 973 * <pre> 974 * Process Process Process Process 975 * 0 1 2 3 976 * +----+ +----+ +----+ +----+ 977 * | 1 | | 4 | | 9 | | 16 | 978 * | 2 | | 6 | | 12 | | 20 | 979 * +----+ +----+ +----+ +----+</pre> 980 * </TD> 981 * </TR> 982 * </TABLE> 983 * The scan operation is also known as "prefix scan" or "inclusive prefix scan" 984 * -- "inclusive" because the process's own element is included in the result. 985 * <p> 986 * <B>Exclusive-Scan</B> 987 * <p> 988 * The exclusive-scan operation is a variation of the scan operation. Each 989 * process in the communicator has a buffer (type {@linkplain edu.rit.mp.Buf 990 * Buf}) filled with data. Each process calls the communicator's 991 * <code>exclusiveScan()</code> method, specifying some binary operation (type 992 * {@linkplain edu.rit.pj.reduction.Op Op}) for combining the data, and 993 * specifying an initial data value. Afterwards, each element of the buffer in a 994 * particular process contains the result of combining all the corresponding 995 * elements in all lower-ranked processes' buffers using the specified binary 996 * operation, except in process 0 each element of the buffer contains the 997 * initial data value. For example, if the operation is addition and the initial 998 * data value is 0, each buffer element ends up being the sum of all 999 * lower-ranked processes' buffer elements. 1000 * <TABLE> 1001 * <CAPTION>Exclusive-Scan</CAPTION> 1002 * <TR> 1003 * <TD style="vertical-align:top;"> 1004 * Before: 1005 * <pre> 1006 * Process Process Process Process 1007 * 0 1 2 3 1008 * +----+ +----+ +----+ +----+ 1009 * | 1 | | 3 | | 5 | | 7 | 1010 * | 2 | | 4 | | 6 | | 8 | 1011 * +----+ +----+ +----+ +----+</pre> 1012 * </TD> 1013 * <TD> 1014 * <pre> 1015 * ... </pre> 1016 * </TD> 1017 * <TD style="vertical-align:top;"> 1018 * After: 1019 * <pre> 1020 * Process Process Process Process 1021 * 0 1 2 3 1022 * +----+ +----+ +----+ +----+ 1023 * | 0 | | 1 | | 4 | | 9 | 1024 * | 0 | | 2 | | 6 | | 12 | 1025 * +----+ +----+ +----+ +----+</pre> 1026 * </TD> 1027 * </TR> 1028 * </TABLE> 1029 * This version of the scan operation is also known as "exclusive prefix scan" 1030 * -- "exclusive" because the process's own element is excluded from the result. 1031 * <p> 1032 * <B>Barrier</B> 1033 * <p> 1034 * The barrier operation causes all the processes to synchronize with each 1035 * other. Each process calls the communicator's <code>barrier()</code> method. The 1036 * calling thread blocks until all processes in the communicator have called the 1037 * <code>barrier()</code> method. Then the calling thread unblocks and returns from 1038 * the <code>barrier()</code> method call. 1039 * 1040 * @author Alan Kaminsky 1041 * @version 21-Jan-2009 1042 */ 1043 public class Comm { 1044 1045 // Hidden data members. 1046 // Predefined communicators. 1047 private static Comm theWorldCommunicator; 1048 private static Comm theFrontendCommunicator; 1049 1050 // This communicator's size, rank, and host. 1051 private int mySize; 1052 private int myRank; 1053 private String myHost; 1054 1055 // The largest power of 2 less than or equal to this communicator's size. 1056 private int mySizePowerOf2; 1057 1058 // Channel group for message passing in this communicator. 1059 private ChannelGroup myChannelGroup; 1060 1061 // Map from rank (array index) to channel group address (array element). 1062 private InetSocketAddress[] myAddressForRank; 1063 1064 // Map from rank (array index) to channel for communicating with the process 1065 // at that rank (array element). 1066 private Channel[] myChannelForRank; 1067 1068 // Broadcast trees for flood-send, flood-receive, broadcast, and reduce 1069 // operations, indexed by root. 1070 private int[][] myBroadcastTree; 1071 1072 // Hidden constructors. 1073 1074 /** 1075 * Construct a new communicator. 1076 * 1077 * @param size Communicator's size. 1078 * @param rank Current process's rank in the communicator. 1079 * @param host Host name. 1080 * @param channelgroup Channel group for message passing in this 1081 * communicator. 1082 * @param address Map from rank (array index) to channel group address 1083 * (array element). 1084 */ 1085 private Comm(int size, 1086 int rank, 1087 String host, 1088 ChannelGroup channelgroup, 1089 InetSocketAddress[] address) { 1090 // Record size, rank, channel group. 1091 mySize = size; 1092 myRank = rank; 1093 myHost = host; 1094 myChannelGroup = channelgroup; 1095 1096 // Determine the largest power of 2 less than or equal to this 1097 // communicator's size. 1098 int p2 = 1; 1099 while (p2 <= size) { 1100 p2 <<= 1; 1101 } 1102 mySizePowerOf2 = p2 >>> 1; 1103 1104 // Set channel group ID equal to rank. 1105 myChannelGroup.setChannelGroupId(rank); 1106 1107 // Set up connect listener. 1108 myChannelGroup.setConnectListener(new ConnectListener() { 1109 public void nearEndConnected(ChannelGroup theChannelGroup, 1110 Channel theChannel) 1111 throws IOException { 1112 } 1113 1114 public void farEndConnected(ChannelGroup theChannelGroup, 1115 Channel theChannel) 1116 throws IOException { 1117 doFarEndConnected(theChannel); 1118 } 1119 }); 1120 1121 // Record socket address for each process rank. 1122 myAddressForRank = address; 1123 1124 // Set up channel for each process rank. 1125 myChannelForRank = new Channel[size]; 1126 1127 // Populate channel at my own rank with the loopback channel. 1128 myChannelForRank[myRank] = channelgroup.loopbackChannel(); 1129 1130 // If there's more than one process, start listening for incoming 1131 // connections. 1132 if (mySize > 1) { 1133 myChannelGroup.startListening(); 1134 } 1135 } 1136 1137 // Exported operations. 1138 1139 /** 1140 * Initialize the PJ message passing middleware. Certain Java system 1141 * properties specify the middleware's behavior; these properties are 1142 * typically given on the Java command line with the <code>"-D"</code> flag. For 1143 * further information, see class {@linkplain PJProperties}. 1144 * 1145 * @param args Command line arguments. 1146 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1147 * <code>args</code> is null. 1148 * @throws java.lang.IllegalArgumentException (unchecked exception) Thrown if the 1149 * value of one of the Java system properties is illegal. 1150 * @throws java.io.IOException Thrown if an I/O error occurred. 1151 */ 1152 public static void init(String[] args) 1153 throws IOException { 1154 // Verify preconditions. 1155 if (args == null) { 1156 throw new NullPointerException("Comm.init(): args is null"); 1157 } 1158 1159 // Get the job backend object. 1160 JobBackend backend = JobBackend.getJobBackend(); 1161 1162 if (backend == null) { 1163 // We're running on the frontend processor. 1164 1165 // Prepare constructor arguments for the Job Frontend object. 1166 String username = System.getProperty("user.name"); 1167 int Nn = PJProperties.getPjNn(); 1168 int Np = PJProperties.getPjNp(); 1169 int Nt = PJProperties.getPjNt(); 1170 boolean hasFrontendComm = false; 1171 1172 // Examine the call stack to find the main program class name. 1173 StackTraceElement[] stack 1174 = Thread.currentThread().getStackTrace(); 1175 StackTraceElement bottom = stack[stack.length - 1]; 1176 1177 if (!bottom.getMethodName().equals("main")) { 1178 // throw new IllegalStateException("Comm.init(): Not called from main program, but from " + bottom.getMethodName()); 1179 // Set up world communicator in this process. 1180 theWorldCommunicator 1181 = new Comm(/*size */1, 1182 /*rank */ 0, 1183 /*host */ "<unknown>", 1184 /*channelgroup*/ new ChannelGroup(), 1185 /*address */ 1186 new InetSocketAddress[]{new InetSocketAddress(0)}); 1187 return; 1188 } 1189 1190 String mainClassName = bottom.getClassName(); 1191 1192 // Set up the Job Frontend object. 1193 JobFrontend frontend = null; 1194 try { 1195 frontend 1196 = new JobFrontend(username, Nn, Np, Nt, hasFrontendComm, mainClassName, 1197 args); 1198 1199 // We were able to contact the Job Scheduler. 1200 // Run the job frontend in this process, then exit. 1201 frontend.run(); 1202 System.exit(0); 1203 } catch (JobSchedulerException exc) { 1204 // We were not able to contact the Job Scheduler. 1205 // System.err.println(" No Job Scheduler at " + PJProperties.getPjHost() + ":" + PJProperties.getPjPort() + ", running in this (one) process"); 1206 1207 // Set up world communicator. 1208 theWorldCommunicator 1209 = new Comm(/*size */1, 1210 /*rank */ 0, 1211 /*host */ "<unknown>", 1212 /*channelgroup*/ new ChannelGroup(), 1213 /*address */ 1214 new InetSocketAddress[]{new InetSocketAddress(0)}); 1215 } 1216 } else { 1217 // We're running on a backend processor. 1218 1219 // Set up world communicator. 1220 theWorldCommunicator 1221 = new Comm(/*size */backend.getK(), 1222 /*rank */ backend.getRank(), 1223 /*host */ backend.getBackendHost(), 1224 /*channelgroup*/ backend.getWorldChannelGroup(), 1225 /*address */ backend.getWorldAddress()); 1226 } 1227 } 1228 1229 /** 1230 * Obtain a reference to the world communicator. 1231 * 1232 * @return World communicator. 1233 * @throws java.lang.IllegalStateException (unchecked exception) Thrown if 1234 * <code>Comm.init()</code> has not been called. Thrown if <code>world()</code> is 1235 * called in the job frontend process; the world communicator does not exist 1236 * in the job frontend process. 1237 */ 1238 public static Comm world() { 1239 if (theWorldCommunicator != null) { 1240 return theWorldCommunicator; 1241 } else if (JobBackend.getJobBackend() != null) { 1242 throw new IllegalStateException("Comm.world(): Didn't call Comm.init()"); 1243 } else { 1244 throw new IllegalStateException("Comm.world(): World communicator doesn't exist in job frontend process"); 1245 } 1246 } 1247 1248 /** 1249 * Obtain the number of processes in this communicator. 1250 * 1251 * @return Size. 1252 */ 1253 public int size() { 1254 return mySize; 1255 } 1256 1257 /** 1258 * Obtain the current process's rank in this communicator. 1259 * 1260 * @return Rank. 1261 */ 1262 public int rank() { 1263 return myRank; 1264 } 1265 1266 /** 1267 * Obtain the host name of this communicator's backend processor. If this 1268 * communicator is not running on a cluster backend processor, the host name 1269 * is <code>"<unknown>"</code>. 1270 * 1271 * @return Host name. 1272 */ 1273 public String host() { 1274 return myHost; 1275 } 1276 1277 /** 1278 * Create a new communicator. <I>Every</I> process in this communicator must 1279 * call the <code>createComm()</code> method. Each process passes true or false 1280 * for the <code>participate</code> argument to state whether the process will 1281 * participate in the new communicator. At least one process must 1282 * participate in the new communicator. Messages to set up the new 1283 * communicator are sent to all processes in this communicator, using a 1284 * message tag of 0. 1285 * <p> 1286 * In processes participating in the new communicator, the new communicator 1287 * is returned. The participating processes appear in the same order by rank 1288 * in the new communicator as in this communicator. The process can call the 1289 * new communicator's <code>rank()</code> method to determine the process's rank 1290 * in the new communicator. 1291 * <p> 1292 * In processes not participating in the new communicator, null is returned. 1293 * 1294 * @param participate True if this process will participate in the new 1295 * communicator; false otherwise. 1296 * @return New communicator if this process will participate in the new 1297 * communicator; null otherwise. 1298 * @throws java.io.IOException Thrown if an I/O error occurred. 1299 */ 1300 public Comm createComm(boolean participate) 1301 throws IOException { 1302 return createComm(participate, 0); 1303 } 1304 1305 /** 1306 * Create a new communicator. <I>Every</I> process in this communicator must 1307 * call the <code>createComm()</code> method. Each process passes true or false 1308 * for the <code>participate</code> argument to state whether the process will 1309 * participate in the new communicator. At least one process must 1310 * participate in the new communicator. Messages to set up the new 1311 * communicator are sent to all processes in this communicator, using the 1312 * given message tag. 1313 * <p> 1314 * In processes participating in the new communicator, the new communicator 1315 * is returned. The participating processes appear in the same order by rank 1316 * in the new communicator as in this communicator. The process can call the 1317 * new communicator's <code>rank()</code> method to determine the process's rank 1318 * in the new communicator. 1319 * <p> 1320 * In processes not participating in the new communicator, null is returned. 1321 * 1322 * @param participate True if this process will participate in the new 1323 * communicator; false otherwise. 1324 * @param tag Message tag. 1325 * @return New communicator if this process will participate in the new 1326 * communicator; null otherwise. 1327 * @throws java.io.IOException Thrown if an I/O error occurred. 1328 */ 1329 public Comm createComm(boolean participate, 1330 int tag) 1331 throws IOException { 1332 // Set up array of socket addresses for all processes. 1333 InetSocketAddress[] address = new InetSocketAddress[mySize]; 1334 ObjectBuf<InetSocketAddress>[] addressbuf 1335 = ObjectBuf.sliceBuffers(address, 1336 new Range(0, mySize - 1).subranges(mySize)); 1337 1338 // Create channel group for new communicator, if participating. 1339 ChannelGroup channelgroup = null; 1340 InetSocketAddress myaddress = null; 1341 if (participate) { 1342 channelgroup 1343 = new ChannelGroup(new InetSocketAddress(myChannelGroup.listenAddress().getAddress(), 0)); 1344 myaddress = channelgroup.listenAddress(); 1345 address[myRank] = myaddress; 1346 } 1347 1348 // All-gather channel group socket addresses into every process. 1349 allGather(tag, addressbuf[myRank], addressbuf); 1350 1351 // Close up gaps in the socket address array if any. 1352 int off = 0; 1353 int newsize = 0; 1354 int newrank = -1; 1355 for (int i = 0; i < mySize; ++i) { 1356 if (address[i] == null) { 1357 ++off; 1358 } else { 1359 if (i == myRank) { 1360 newrank = i - off; 1361 } 1362 address[i - off] = address[i]; 1363 ++newsize; 1364 } 1365 } 1366 1367 // Verify size of new communicator. 1368 if (newsize == 0) { 1369 throw new IOException("Comm.createComm(): No processes in communicator"); 1370 } 1371 1372 // Return new communicator if participating. 1373 if (participate) { 1374 return new Comm(newsize, newrank, myHost, channelgroup, address); 1375 } // Return null if not participating. 1376 else { 1377 return null; 1378 } 1379 } 1380 1381 /** 1382 * Send a message to the process at the given rank in this communicator. The 1383 * message uses a tag of 0. The message items come from the given buffer. To 1384 * receive the message, the destination process must call the 1385 * <code>receive()</code> method. When the <code>send()</code> method returns, the 1386 * message has been fully sent, but it may not yet have been fully received. 1387 * <p> 1388 * A process can send a message to itself; in this case a different thread 1389 * must call the <code>receive()</code> method on this communicator. 1390 * 1391 * @param toRank Destination process's rank in this communicator. 1392 * @param buffer Buffer of data items to be sent. 1393 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1394 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1. 1395 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1396 * <code>buffer</code> is null. 1397 * @throws java.io.IOException Thrown if an I/O error occurred. 1398 */ 1399 public void send(int toRank, 1400 Buf buffer) 1401 throws IOException { 1402 send(toRank, 0, buffer); 1403 } 1404 1405 /** 1406 * Send a message to the process at the given rank in this communicator with 1407 * the given message tag. The message items come from the given buffer. To 1408 * receive the message, the destination process must call the 1409 * <code>receive()</code> method. When the <code>send()</code> method returns, the 1410 * message has been fully sent, but it may not yet have been fully received. 1411 * <p> 1412 * A process can send a message to itself; in this case a different thread 1413 * must call the <code>receive()</code> method on this communicator. 1414 * 1415 * @param toRank Destination process's rank in this communicator. 1416 * @param tag Message tag. 1417 * @param buffer Buffer of data items to be sent. 1418 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1419 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1. 1420 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1421 * <code>buffer</code> is null. 1422 * @throws java.io.IOException Thrown if an I/O error occurred. 1423 */ 1424 public void send(int toRank, 1425 int tag, 1426 Buf buffer) 1427 throws IOException { 1428 myChannelGroup.send(getChannel(toRank), tag, buffer); 1429 } 1430 1431 /** 1432 * Send a message to the process at the given rank in this communicator 1433 * (non-blocking). A message tag of 0 is used. The message items come from 1434 * the given buffer. To receive the message, the destination process must 1435 * call the <code>receive()</code> method. 1436 * <p> 1437 * The <code>send()</code> method initiates the send operation and immediately 1438 * returns a {@linkplain CommRequest} object. The send operation is 1439 * performed by a separate thread. To wait for the send operation to finish, 1440 * call the returned {@linkplain CommRequest} object's 1441 * <code>waitForFinish()</code> method. When that method returns, the message 1442 * has been fully sent, but it may not yet have been fully received. 1443 * <p> 1444 * A process can send a message to itself; in this case a different thread 1445 * must call the <code>receive()</code> method on this communicator. 1446 * 1447 * @param toRank Destination process's rank in this communicator. 1448 * @param buffer Buffer of data items to be sent. 1449 * @param request CommRequest object to use to wait for the operation to 1450 * finish; in this case <code>request</code> is returned. If 1451 * <code>request</code> is null, a new CommRequest object is created and 1452 * returned. 1453 * @return CommRequest object to use to wait for the operation to finish. 1454 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1455 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1. 1456 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1457 * <code>buffer</code> is null. 1458 * @throws java.io.IOException Thrown if an I/O error occurred. 1459 */ 1460 public CommRequest send(int toRank, 1461 Buf buffer, 1462 CommRequest request) 1463 throws IOException { 1464 return send(toRank, 0, buffer, request); 1465 } 1466 1467 /** 1468 * Send a message to the process at the given rank in this communicator with 1469 * the given message tag (non-blocking). The message items come from the 1470 * given buffer. To receive the message, the destination process must call 1471 * the <code>receive()</code> method. 1472 * <p> 1473 * The <code>send()</code> method initiates the send operation and immediately 1474 * returns a {@linkplain CommRequest} object. The send operation is 1475 * performed by a separate thread. To wait for the send operation to finish, 1476 * call the returned {@linkplain CommRequest} object's 1477 * <code>waitForFinish()</code> method. When that method returns, the message 1478 * has been fully sent, but it may not yet have been fully received. 1479 * <p> 1480 * A process can send a message to itself; in this case a different thread 1481 * must call the <code>receive()</code> method on this communicator. 1482 * 1483 * @param toRank Destination process's rank in this communicator. 1484 * @param tag Message tag. 1485 * @param buffer Buffer of data items to be sent. 1486 * @param request CommRequest object to use to wait for the operation to 1487 * finish; in this case <code>request</code> is returned. If 1488 * <code>request</code> is null, a new CommRequest object is created and 1489 * returned. 1490 * @return CommRequest object to use to wait for the operation to finish. 1491 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1492 * <code>toRank</code> is not in the range 0 .. <code>size()</code>-1. 1493 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1494 * <code>buffer</code> is null. 1495 * @throws java.io.IOException Thrown if an I/O error occurred. 1496 */ 1497 public CommRequest send(int toRank, 1498 int tag, 1499 Buf buffer, 1500 CommRequest request) 1501 throws IOException { 1502 // Set up CommRequest object. 1503 CommRequest req = request == null ? new CommRequest() : request; 1504 1505 // Send message (non-blocking). 1506 req.mySendRequest = new IORequest(); 1507 req.myRecvRequest = null; 1508 myChannelGroup.sendNoWait(getChannel(toRank), tag, buffer, req.mySendRequest); 1509 1510 // Return CommRequest object. 1511 return req; 1512 } 1513 1514 /** 1515 * Receive a message from the process at the given rank in this 1516 * communicator. If <code>rank</code> is null, a message will be received from 1517 * any process in this communicator. The message must have a tag of 0. The 1518 * received message items are stored in the given buffer. To send the 1519 * message, the source process must call the <code>send()</code> method. When 1520 * the <code>receive()</code> method returns, the message has been fully 1521 * received. 1522 * <p> 1523 * A {@linkplain CommStatus} object is returned. The status object gives the 1524 * actual rank of the process that sent the message, the actual message tag 1525 * that was received, and the actual number of data items in the message. If 1526 * the actual number of data items in the message is less than the length of 1527 * the buffer, nothing is stored into the extra data items at the end of the 1528 * buffer. If the actual number of data items in the message is greater than 1529 * the length of the buffer, the extra data items at the end of the message 1530 * are discarded. 1531 * <p> 1532 * A process can receive a message from itself; in this case a different 1533 * thread must call the <code>send()</code> method on this communicator. 1534 * 1535 * @param fromRank Source process's rank in this communicator, or null to 1536 * receive from any process. 1537 * @param buffer Buffer of data items to be received. 1538 * @return Status object giving the outcome of the message reception. 1539 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1540 * <code>fromRank</code> is not null and is not in the range 0 .. 1541 * <code>size()</code>-1. 1542 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1543 * <code>buffer</code> is null. 1544 * @throws java.io.IOException Thrown if an I/O error occurred. 1545 */ 1546 public CommStatus receive(Integer fromRank, 1547 Buf buffer) 1548 throws IOException { 1549 return receive(fromRank, 0, buffer); 1550 } 1551 1552 /** 1553 * Receive a message from the process at the given rank in this communicator 1554 * with the given message tag. If <code>rank</code> is null, a message will be 1555 * received from any process in this communicator. The received message 1556 * items are stored in the given buffer. To send the message, the source 1557 * process must call the <code>send()</code> method. When the <code>receive()</code> 1558 * method returns, the message has been fully received. 1559 * <p> 1560 * A {@linkplain CommStatus} object is returned. The status object gives the 1561 * actual rank of the process that sent the message, the actual message tag 1562 * that was received, and the actual number of data items in the message. If 1563 * the actual number of data items in the message is less than the length of 1564 * the buffer, nothing is stored into the extra data items at the end of the 1565 * buffer. If the actual number of data items in the message is greater than 1566 * the length of the buffer, the extra data items at the end of the message 1567 * are discarded. 1568 * <p> 1569 * A process can receive a message from itself; in this case a different 1570 * thread must call the <code>send()</code> method on this communicator. 1571 * 1572 * @param fromRank Source process's rank in this communicator, or null to 1573 * receive from any process. 1574 * @param tag Message tag. 1575 * @param buffer Buffer of data items to be received. 1576 * @return Status object giving the outcome of the message reception. 1577 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1578 * <code>fromRank</code> is not null and is not in the range 0 .. 1579 * <code>size()</code>-1. 1580 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1581 * <code>buffer</code> is null. 1582 * @throws java.io.IOException Thrown if an I/O error occurred. 1583 */ 1584 public CommStatus receive(Integer fromRank, 1585 int tag, 1586 Buf buffer) 1587 throws IOException { 1588 Status status; 1589 1590 // If source is a wildcard, ensure a channel to every process, then 1591 // receive from any process. 1592 if (fromRank == null) { 1593 for (int src = 0; src < mySize; ++src) { 1594 ensureChannel(src); 1595 } 1596 status = myChannelGroup.receive(null, tag, buffer); 1597 } // If source is not a wildcard, receive from that process. 1598 else { 1599 status 1600 = myChannelGroup.receive(getChannel(fromRank), tag, buffer); 1601 } 1602 1603 // Return CommStatus object. 1604 return new CommStatus(getFarRank(status.channel), 1605 status.tag, 1606 status.length); 1607 } 1608 1609 /** 1610 * Receive a message from the process at the given rank in this communicator 1611 * with the given message tag range. If <code>rank</code> is null, a message 1612 * will be received from any process in this communicator. If 1613 * <code>tagRange</code> is null, a message will be received with any tag. If 1614 * <code>tagRange</code> is not null, a message will be received with any tag in 1615 * the given range. The received message items are stored in the given 1616 * buffer. To send the message, the source process must call the 1617 * <code>send()</code> method. When the <code>receive()</code> method returns, the 1618 * message has been fully received. 1619 * <p> 1620 * A {@linkplain CommStatus} object is returned. The status object gives the 1621 * actual rank of the process that sent the message, the actual message tag 1622 * that was received, and the actual number of data items in the message. If 1623 * the actual number of data items in the message is less than the length of 1624 * the buffer, nothing is stored into the extra data items at the end of the 1625 * buffer. If the actual number of data items in the message is greater than 1626 * the length of the buffer, the extra data items at the end of the message 1627 * are discarded. 1628 * <p> 1629 * A process can receive a message from itself; in this case a different 1630 * thread must call the <code>send()</code> method on this communicator. 1631 * 1632 * @param fromRank Source process's rank in this communicator, or null to 1633 * receive from any process. 1634 * @param tagRange Message tag range, or null to receive any tag. 1635 * @param buffer Buffer of data items to be received. 1636 * @return Status object giving the outcome of the message reception. 1637 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1638 * <code>fromRank</code> is not null and is not in the range 0 .. 1639 * <code>size()</code>-1. 1640 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1641 * <code>buffer</code> is null. 1642 * @throws java.io.IOException Thrown if an I/O error occurred. 1643 */ 1644 public CommStatus receive(Integer fromRank, 1645 Range tagRange, 1646 Buf buffer) 1647 throws IOException { 1648 Status status; 1649 1650 // If source is a wildcard, ensure a channel to every process, then 1651 // receive from any process. 1652 if (fromRank == null) { 1653 for (int src = 0; src < mySize; ++src) { 1654 ensureChannel(src); 1655 } 1656 status = myChannelGroup.receive(null, tagRange, buffer); 1657 } // If source is not a wildcard, receive from that process. 1658 else { 1659 status = myChannelGroup.receive(getChannel(fromRank), tagRange, buffer); 1660 } 1661 1662 // Return CommStatus object. 1663 return new CommStatus(getFarRank(status.channel), 1664 status.tag, 1665 status.length); 1666 } 1667 1668 /** 1669 * Receive a message from the process at the given rank in this communicator 1670 * (non-blocking). If <code>rank</code> is null, a message will be received from 1671 * any process in this communicator. The message must have a tag of 0. The 1672 * received message items are stored in the given buffer. To send the 1673 * message, the source process must call the <code>send()</code> method. 1674 * <p> 1675 * The <code>receive()</code> method initiates the receive operation and 1676 * immediately returns a {@linkplain CommRequest} object. The receive 1677 * operation is performed by a separate thread. To wait for the receive 1678 * operation to finish, call the returned {@linkplain CommRequest} object's 1679 * <code>waitForFinish()</code> method. When that method returns, the incoming 1680 * message items have been fully received. 1681 * <p> 1682 * A process can receive a message from itself; in this case a different 1683 * thread must call the <code>send()</code> method on this communicator. 1684 * 1685 * @param fromRank Source process's rank in this communicator, or null to 1686 * receive from any process. 1687 * @param buffer Buffer of data items to be received. 1688 * @param request CommRequest object to use to wait for the operation to 1689 * finish; in this case <code>request</code> is returned. If 1690 * <code>request</code> is null, a new CommRequest object is created and 1691 * returned. 1692 * @return CommRequest object to use to wait for the operation to finish. 1693 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1694 * <code>fromRank</code> is not null and is not in the range 0 .. 1695 * <code>size()</code>-1. 1696 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1697 * <code>buffer</code> is null. 1698 * @throws java.io.IOException Thrown if an I/O error occurred. 1699 */ 1700 public CommRequest receive(Integer fromRank, 1701 Buf buffer, 1702 CommRequest request) 1703 throws IOException { 1704 return receive(fromRank, 0, buffer, request); 1705 } 1706 1707 /** 1708 * Receive a message from the process at the given rank in this communicator 1709 * with the given message tag (non-blocking). If <code>rank</code> is null, a 1710 * message will be received from any process in this communicator. If 1711 * <code>tag</code> is null, a message will be received with any tag. The 1712 * received message items are stored in the given buffer. To send the 1713 * message, the source process must call the <code>send()</code> method. 1714 * <p> 1715 * The <code>receive()</code> method initiates the receive operation and 1716 * immediately returns a {@linkplain CommRequest} object. The receive 1717 * operation is performed by a separate thread. To wait for the receive 1718 * operation to finish, call the returned {@linkplain CommRequest} object's 1719 * <code>waitForFinish()</code> method. When that method returns, the incoming 1720 * message items have been fully received. 1721 * <p> 1722 * A process can receive a message from itself; in this case a different 1723 * thread must call the <code>send()</code> method on this communicator. 1724 * 1725 * @param fromRank Source process's rank in this communicator, or null to 1726 * receive from any process. 1727 * @param tag Message tag. 1728 * @param buffer Buffer of data items to be received. 1729 * @param request CommRequest object to use to wait for the operation to 1730 * finish; in this case <code>request</code> is returned. If 1731 * <code>request</code> is null, a new CommRequest object is created and 1732 * returned. 1733 * @return CommRequest object to use to wait for the operation to finish. 1734 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1735 * <code>fromRank</code> is not null and is not in the range 0 .. 1736 * <code>size()</code>-1. 1737 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1738 * <code>buffer</code> is null. 1739 * @throws java.io.IOException Thrown if an I/O error occurred. 1740 */ 1741 public CommRequest receive(Integer fromRank, 1742 int tag, 1743 Buf buffer, 1744 CommRequest request) 1745 throws IOException { 1746 // Set up CommRequest object. 1747 CommRequest req = request == null ? new CommRequest() : request; 1748 req.mySendRequest = null; 1749 req.myRecvRequest = new IORequest(); 1750 1751 // If source is a wildcard, ensure a channel to every process, then 1752 // receive (non-blocking) from any process. 1753 if (fromRank == null) { 1754 for (int src = 0; src < mySize; ++src) { 1755 ensureChannel(src); 1756 } 1757 myChannelGroup.receiveNoWait(null, tag, buffer, req.myRecvRequest); 1758 } // If source is not a wildcard, receive (non-blocking) from that 1759 // process. 1760 else { 1761 myChannelGroup.receiveNoWait(getChannel(fromRank), tag, buffer, req.myRecvRequest); 1762 } 1763 1764 // Return CommRequest object. 1765 return req; 1766 } 1767 1768 /** 1769 * Receive a message from the process at the given rank in this communicator 1770 * with the given message tag range (non-blocking). If <code>rank</code> is 1771 * null, a message will be received from any process in this communicator. 1772 * If <code>tagRange</code> is null, a message will be received with any tag. If 1773 * <code>tagRange</code> is not null, a message will be received with any tag in 1774 * the given range. The received message items are stored in the given 1775 * buffer. To send the message, the source process must call the 1776 * <code>send()</code> method. 1777 * <p> 1778 * The <code>receive()</code> method initiates the receive operation and 1779 * immediately returns a {@linkplain CommRequest} object. The receive 1780 * operation is performed by a separate thread. To wait for the receive 1781 * operation to finish, call the returned {@linkplain CommRequest} object's 1782 * <code>waitForFinish()</code> method. When that method returns, the incoming 1783 * message items have been fully received. 1784 * <p> 1785 * A process can receive a message from itself; in this case a different 1786 * thread must call the <code>send()</code> method on this communicator. 1787 * 1788 * @param fromRank Source process's rank in this communicator, or null to 1789 * receive from any process. 1790 * @param tagRange Message tag range, or null to receive any tag. 1791 * @param buffer Buffer of data items to be received. 1792 * @param request CommRequest object to use to wait for the operation to 1793 * finish; in this case <code>request</code> is returned. If 1794 * <code>request</code> is null, a new CommRequest object is created and 1795 * returned. 1796 * @return CommRequest object to use to wait for the operation to finish. 1797 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1798 * <code>fromRank</code> is not null and is not in the range 0 .. 1799 * <code>size()</code>-1. 1800 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1801 * <code>buffer</code> is null. 1802 * @throws java.io.IOException Thrown if an I/O error occurred. 1803 */ 1804 public CommRequest receive(Integer fromRank, 1805 Range tagRange, 1806 Buf buffer, 1807 CommRequest request) 1808 throws IOException { 1809 // Set up CommRequest object. 1810 CommRequest req = request == null ? new CommRequest() : request; 1811 req.mySendRequest = null; 1812 req.myRecvRequest = new IORequest(); 1813 1814 // If source is a wildcard, ensure a channel to every process, then 1815 // receive (non-blocking) from any process. 1816 if (fromRank == null) { 1817 for (int src = 0; src < mySize; ++src) { 1818 ensureChannel(src); 1819 } 1820 myChannelGroup.receiveNoWait(null, tagRange, buffer, req.myRecvRequest); 1821 } // If source is not a wildcard, receive (non-blocking) from that 1822 // process. 1823 else { 1824 myChannelGroup.receiveNoWait(getChannel(fromRank), tagRange, buffer, req.myRecvRequest); 1825 } 1826 1827 // Return CommRequest object. 1828 return req; 1829 } 1830 1831 /** 1832 * Send a message to the process at the given rank in this communicator, and 1833 * receive a message from the process at the given rank in this 1834 * communicator. A message tag of 0 is used. The outgoing message items come 1835 * from the buffer <code>sendbuf</code>. The incoming message items go into the 1836 * buffer <code>recvbuf</code>. The outgoing message items must come from a 1837 * different place than where the incoming message items will be stored. The 1838 * destination process (process <code>toRank</code>) must call a method to 1839 * receive this process's outgoing message items. The source process 1840 * (process <code>fromRank</code>) must call a method to send this process's 1841 * incoming message items. When the <code>sendReceive()</code> method returns, 1842 * the outgoing message items have been fully sent, but they may not yet 1843 * have been fully received; and the incoming message items have been fully 1844 * received. 1845 * <p> 1846 * A {@linkplain CommStatus} object is returned giving the results of the 1847 * receive half of the operation. The status object gives the rank of the 1848 * process that sent the incoming message, the message tag that was 1849 * received, and the actual number of data items in the message. If the 1850 * actual number of data items in the message is less than the length of the 1851 * receive buffer, nothing is stored into the extra data items at the end of 1852 * the receive buffer. If the actual number of data items in the message is 1853 * greater than the length of the receive buffer, the extra data items at 1854 * the end of the message are discarded. 1855 * <p> 1856 * A process can send-receive messages with itself; in this case a different 1857 * thread must call the <code>sendReceive()</code> method on this communicator. 1858 * 1859 * @param toRank Destination process's rank in this communicator. 1860 * @param sendBuf Buffer of data items to be sent. 1861 * @param fromRank Source process's rank in this communicator. 1862 * @param recvBuf Buffer of data items to be received. 1863 * @return Status object giving the outcome of the message reception. 1864 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1865 * <code>toRank</code> or <code>fromRank</code> 1866 * is not in the range 0 .. <code>size()</code>-1. 1867 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1868 * <code>sendBuf</code> or <code>recvBuf</code> 1869 * is null. 1870 * @throws java.io.IOException Thrown if an I/O error occurred. 1871 */ 1872 public CommStatus sendReceive(int toRank, 1873 Buf sendBuf, 1874 int fromRank, 1875 Buf recvBuf) 1876 throws IOException { 1877 return sendReceive(toRank, 0, sendBuf, fromRank, 0, recvBuf); 1878 } 1879 1880 /** 1881 * Send a message to the process at the given rank in this communicator with 1882 * the given message tag, and receive a message from the process at the 1883 * given rank in this communicator with the given message tag. The outgoing 1884 * message items come from the buffer <code>sendbuf</code>. The incoming message 1885 * items go into the buffer <code>recvbuf</code>. The outgoing message items 1886 * must come from a different place than where the incoming message items 1887 * will be stored. The destination process (process <code>toRank</code>) must 1888 * call a method to receive this process's outgoing message items. The 1889 * source process (process <code>fromRank</code>) must call a method to send 1890 * this process's incoming message items. When the <code>sendReceive()</code> 1891 * method returns, the outgoing message items have been fully sent, but they 1892 * may not yet have been fully received; and the incoming message items have 1893 * been fully received. 1894 * <p> 1895 * A {@linkplain CommStatus} object is returned giving the results of the 1896 * receive half of the operation. The status object gives the rank of the 1897 * process that sent the incoming message, the message tag that was 1898 * received, and the actual number of data items in the message. If the 1899 * actual number of data items in the message is less than the length of the 1900 * receive buffer, nothing is stored into the extra data items at the end of 1901 * the receive buffer. If the actual number of data items in the message is 1902 * greater than the length of the receive buffer, the extra data items at 1903 * the end of the message are discarded. 1904 * <p> 1905 * A process can send-receive messages with itself; in this case a different 1906 * thread must call the <code>sendReceive()</code> method on this communicator. 1907 * 1908 * @param toRank Destination process's rank in this communicator. 1909 * @param sendTag Message tag for outgoing message. 1910 * @param sendBuf Buffer of data items to be sent. 1911 * @param fromRank Source process's rank in this communicator. 1912 * @param recvTag Message tag for incoming message. 1913 * @param recvBuf Buffer of data items to be received. 1914 * @return Status object giving the outcome of the message reception. 1915 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1916 * <code>toRank</code> or <code>fromRank</code> 1917 * is not in the range 0 .. <code>size()</code>-1. 1918 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1919 * <code>sendBuf</code> or <code>recvBuf</code> 1920 * is null. 1921 * @throws java.io.IOException Thrown if an I/O error occurred. 1922 */ 1923 public CommStatus sendReceive(int toRank, 1924 int sendTag, 1925 Buf sendBuf, 1926 int fromRank, 1927 int recvTag, 1928 Buf recvBuf) 1929 throws IOException { 1930 // Send the outgoing message (non-blocking). 1931 IORequest sendRequest = new IORequest(); 1932 myChannelGroup.sendNoWait(getChannel(toRank), sendTag, sendBuf, sendRequest); 1933 1934 // Receive the outgoing message (non-blocking). 1935 IORequest recvRequest = new IORequest(); 1936 myChannelGroup.receiveNoWait(getChannel(fromRank), recvTag, recvBuf, recvRequest); 1937 1938 // Wait for both messages to finish. 1939 sendRequest.waitForFinish(); 1940 Status status = recvRequest.waitForFinish(); 1941 1942 return new CommStatus(getFarRank(status.channel), 1943 status.tag, 1944 status.length); 1945 } 1946 1947 /** 1948 * Send a message to the process at the given rank in this communicator, and 1949 * receive a message from the process at the given rank in this communicator 1950 * (non-blocking). A message tag of 0 is used. The outgoing message items 1951 * come from the buffer <code>sendbuf</code>. The incoming message items go into 1952 * the buffer <code>recvbuf</code>. The outgoing message items must come from a 1953 * different place than where the incoming message items will be stored. The 1954 * destination process (process <code>toRank</code>) must call a method to 1955 * receive this process's outgoing message items. The source process 1956 * (process <code>fromRank</code>) must call a method to send this process's 1957 * incoming message items. 1958 * <p> 1959 * The <code>sendReceive()</code> method initiates the send and receive 1960 * operations and immediately returns a {@linkplain CommRequest} object. The 1961 * send and receive operations are performed by a separate thread. To wait 1962 * for the send and receive operations to finish, call the returned 1963 * {@linkplain CommRequest} object's <code>waitForFinish()</code> method. When 1964 * that method returns, the outgoing message items have been fully sent, but 1965 * they may not yet have been fully received; and the incoming message items 1966 * have been fully received. 1967 * <p> 1968 * A process can send-receive messages with itself; in this case a different 1969 * thread must call the <code>sendReceive()</code> method on this communicator. 1970 * 1971 * @param toRank Destination process's rank in this communicator. 1972 * @param sendBuf Buffer of data items to be sent. 1973 * @param fromRank Source process's rank in this communicator. 1974 * @param recvBuf Buffer of data items to be received. 1975 * @param request CommRequest object to use to wait for the operation to 1976 * finish; in this case <code>request</code> is returned. If 1977 * <code>request</code> is null, a new CommRequest object is created and 1978 * returned. 1979 * @return CommRequest object to use to wait for the operation to finish. 1980 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 1981 * <code>toRank</code> or <code>fromRank</code> 1982 * is not in the range 0 .. <code>size()</code>-1. 1983 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 1984 * <code>sendBuf</code> or <code>recvBuf</code> 1985 * is null. 1986 * @throws java.io.IOException Thrown if an I/O error occurred. 1987 */ 1988 public CommRequest sendReceive(int toRank, 1989 Buf sendBuf, 1990 int fromRank, 1991 Buf recvBuf, 1992 CommRequest request) 1993 throws IOException { 1994 return sendReceive(toRank, 0, sendBuf, fromRank, 0, recvBuf, request); 1995 } 1996 1997 /** 1998 * Send a message to the process at the given rank in this communicator with 1999 * the given message tag, and receive a message from the process at the 2000 * given rank in this communicator with the given message tag 2001 * (non-blocking). The outgoing message items come from the buffer 2002 * <code>sendbuf</code>. The incoming message items go into the buffer 2003 * <code>recvbuf</code>. The outgoing message items must come from a different 2004 * place than where the incoming message items will be stored. The 2005 * destination process (process <code>toRank</code>) must call a method to 2006 * receive this process's outgoing message items. The source process 2007 * (process <code>fromRank</code>) must call a method to send this process's 2008 * incoming message items. 2009 * <p> 2010 * The <code>sendReceive()</code> method initiates the send and receive 2011 * operations and immediately returns a {@linkplain CommRequest} object. The 2012 * send and receive operations are performed by a separate thread. To wait 2013 * for the send and receive operations to finish, call the returned 2014 * {@linkplain CommRequest} object's <code>waitForFinish()</code> method. When 2015 * that method returns, the outgoing message items have been fully sent, but 2016 * they may not yet have been fully received; and the incoming message items 2017 * have been fully received. 2018 * <p> 2019 * A process can send-receive messages with itself; in this case a different 2020 * thread must call the <code>sendReceive()</code> method on this communicator. 2021 * 2022 * @param toRank Destination process's rank in this communicator. 2023 * @param sendTag Message tag for outgoing message. 2024 * @param sendBuf Buffer of data items to be sent. 2025 * @param fromRank Source process's rank in this communicator. 2026 * @param recvTag Message tag for incoming message. 2027 * @param recvBuf Buffer of data items to be received. 2028 * @param request CommRequest object to use to wait for the operation to 2029 * finish; in this case <code>request</code> is returned. If 2030 * <code>request</code> is null, a new CommRequest object is created and 2031 * returned. 2032 * @return CommRequest object to use to wait for the operation to finish. 2033 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2034 * <code>toRank</code> or <code>fromRank</code> 2035 * is not in the range 0 .. <code>size()</code>-1. 2036 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2037 * <code>sendBuf</code> or <code>recvBuf</code> 2038 * is null. 2039 * @throws java.io.IOException Thrown if an I/O error occurred. 2040 */ 2041 public CommRequest sendReceive(int toRank, 2042 int sendTag, 2043 Buf sendBuf, 2044 int fromRank, 2045 int recvTag, 2046 Buf recvBuf, 2047 CommRequest request) 2048 throws IOException { 2049 // Set up CommRequest object. 2050 CommRequest req = request == null ? new CommRequest() : request; 2051 2052 // Send the outgoing message (non-blocking). 2053 req.mySendRequest = new IORequest(); 2054 myChannelGroup.sendNoWait(getChannel(toRank), sendTag, sendBuf, req.mySendRequest); 2055 2056 // Receive the outgoing message (non-blocking). 2057 req.myRecvRequest = new IORequest(); 2058 myChannelGroup.receiveNoWait(getChannel(fromRank), recvTag, recvBuf, req.myRecvRequest); 2059 2060 // Return CommRequest object. 2061 return req; 2062 } 2063 2064 /** 2065 * Flood-send a message to all processes in this communicator. The message 2066 * uses a tag of 0. The message items come from the given buffer. To receive 2067 * the message, every process (including the sending process) must call the 2068 * <code>floodReceive()</code> method. When the <code>floodSend()</code> method 2069 * returns, the message has been fully sent, but it may not yet have been 2070 * fully received in all processes. 2071 * <p> 2072 * <I>Note:</I> The length of the incoming buffer in the 2073 * <code>floodReceive()</code> method call must be the same as the length of the 2074 * outgoing buffer in the <code>floodSend()</code> method call. 2075 * 2076 * @param buffer Buffer of data items to be sent. 2077 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2078 * <code>buffer</code> is null. 2079 * @throws java.io.IOException Thrown if an I/O error occurred. 2080 */ 2081 public void floodSend(Buf buffer) 2082 throws IOException { 2083 floodSend(0, buffer, null).waitForFinish(); 2084 } 2085 2086 /** 2087 * Flood-send a message to all processes in this communicator with the given 2088 * message tag. The message items come from the given buffer. To receive the 2089 * message, every process (including the sending process) must call the 2090 * <code>floodReceive()</code> method. When the <code>floodSend()</code> method 2091 * returns, the message has been fully sent, but it may not yet have been 2092 * fully received in all processes. 2093 * <p> 2094 * <I>Note:</I> The length of the incoming buffer in the 2095 * <code>floodReceive()</code> method call must be the same as the length of the 2096 * outgoing buffer in the <code>floodSend()</code> method call. 2097 * 2098 * @param tag Message tag. 2099 * @param buffer Buffer of data items to be sent. 2100 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2101 * <code>buffer</code> is null. 2102 * @throws java.io.IOException Thrown if an I/O error occurred. 2103 */ 2104 public void floodSend(int tag, 2105 Buf buffer) 2106 throws IOException { 2107 floodSend(tag, buffer, null).waitForFinish(); 2108 } 2109 2110 /** 2111 * Flood-send a message to all processes in this communicator 2112 * (non-blocking). A message tag of 0 is used. The message items come from 2113 * the given buffer. To receive the message, every process (including the 2114 * sending process) must call the <code>floodReceive()</code> method. 2115 * <p> 2116 * The <code>floodSend()</code> method initiates the flood-send operation and 2117 * immediately returns a {@linkplain CommRequest} object. The flood-send 2118 * operation is performed by a separate thread. To wait for the flood-send 2119 * operation to finish, call the returned {@linkplain CommRequest} object's 2120 * <code>waitForFinish()</code> method. When that method returns, the message 2121 * has been fully sent, but it may not yet have been fully received in all 2122 * processes. 2123 * <p> 2124 * <I>Note:</I> The length of the incoming buffer in the 2125 * <code>floodReceive()</code> method call must be the same as the length of the 2126 * outgoing buffer in the <code>floodSend()</code> method call. 2127 * 2128 * @param buffer Buffer of data items to be sent. 2129 * @param request CommRequest object to use to wait for the operation to 2130 * finish; in this case <code>request</code> is returned. If 2131 * <code>request</code> is null, a new CommRequest object is created and 2132 * returned. 2133 * @return CommRequest object to use to wait for the operation to finish. 2134 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2135 * <code>buffer</code> is null. 2136 * @throws java.io.IOException Thrown if an I/O error occurred. 2137 */ 2138 public CommRequest floodSend(Buf buffer, 2139 CommRequest request) 2140 throws IOException { 2141 return floodSend(0, buffer, request); 2142 } 2143 2144 /** 2145 * Flood-send a message to all processes in this communicator with the given 2146 * message tag (non-blocking). The message items come from the given buffer. 2147 * To receive the message, every process (including the sending process) 2148 * must call the <code>floodReceive()</code> method. 2149 * <p> 2150 * The <code>floodSend()</code> method initiates the flood-send operation and 2151 * immediately returns a {@linkplain CommRequest} object. The flood-send 2152 * operation is performed by a separate thread. To wait for the flood-send 2153 * operation to finish, call the returned {@linkplain CommRequest} object's 2154 * <code>waitForFinish()</code> method. When that method returns, the message 2155 * has been fully sent, but it may not yet have been fully received in all 2156 * processes. 2157 * <p> 2158 * <I>Note:</I> The length of the incoming buffer in the 2159 * <code>floodReceive()</code> method call must be the same as the length of the 2160 * outgoing buffer in the <code>floodSend()</code> method call. 2161 * 2162 * @param tag Message tag. 2163 * @param buffer Buffer of data items to be sent. 2164 * @param request CommRequest object to use to wait for the operation to 2165 * finish; in this case <code>request</code> is returned. If 2166 * <code>request</code> is null, a new CommRequest object is created and 2167 * returned. 2168 * @return CommRequest object to use to wait for the operation to finish. 2169 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2170 * <code>buffer</code> is null. 2171 * @throws java.io.IOException Thrown if an I/O error occurred. 2172 */ 2173 public CommRequest floodSend(int tag, 2174 Buf buffer, 2175 CommRequest request) 2176 throws IOException { 2177 // Set up CommRequest object. 2178 CommRequest req = request == null ? new CommRequest() : request; 2179 req.mySendRequest = new IORequest(); 2180 req.myRecvRequest = null; 2181 2182 // Send data to process 0. Process 0's flood-receive I/O request object 2183 // will forward the data to all the processes. 2184 myChannelGroup.sendNoWait(getChannel(0), tag, buffer, req.mySendRequest); 2185 2186 // Return CommRequest object. 2187 return req; 2188 } 2189 2190 /** 2191 * Flood-receive a message from any process in this communicator. The 2192 * message must have a tag of 0. The received message items are stored in 2193 * the given buffer. To send the message, the source process must call the 2194 * <code>floodSend()</code> method. When the <code>floodReceive()</code> method 2195 * returns, the message has been fully received. 2196 * <p> 2197 * A {@linkplain CommStatus} object is returned. The status object gives the 2198 * actual rank of the process that sent the message, the actual message tag 2199 * that was received, and the actual number of data items in the message. 2200 * <p> 2201 * <I>Note:</I> The length of the incoming buffer in the 2202 * <code>floodReceive()</code> method call must be the same as the length of the 2203 * outgoing buffer in the <code>floodSend()</code> method call. 2204 * 2205 * @param buffer Buffer of data items to be received. 2206 * @return Status object giving the outcome of the message reception. 2207 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2208 * <code>buffer</code> is null. 2209 * @throws java.io.IOException Thrown if an I/O error occurred. 2210 */ 2211 public CommStatus floodReceive(Buf buffer) 2212 throws IOException { 2213 return floodReceive(0, buffer, null).waitForFinish(); 2214 } 2215 2216 /** 2217 * Flood-receive a message from any process in this communicator with the 2218 * given message tag. If <code>tag</code> is null, a message will be received 2219 * with any tag. The received message items are stored in the given buffer. 2220 * To send the message, the source process must call the 2221 * <code>floodSend()</code> method. When the <code>floodReceive()</code> method 2222 * returns, the message has been fully received. 2223 * <p> 2224 * A {@linkplain CommStatus} object is returned. The status object gives the 2225 * actual rank of the process that sent the message, the actual message tag 2226 * that was received, and the actual number of data items in the message. 2227 * <p> 2228 * <I>Note:</I> The length of the incoming buffer in the 2229 * <code>floodReceive()</code> method call must be the same as the length of the 2230 * outgoing buffer in the <code>floodSend()</code> method call. 2231 * 2232 * @param tag Message tag, or null to receive any tag. 2233 * @param buffer Buffer of data items to be received. 2234 * @return Status object giving the outcome of the message reception. 2235 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2236 * <code>buffer</code> is null. 2237 * @throws java.io.IOException Thrown if an I/O error occurred. 2238 */ 2239 public CommStatus floodReceive(Integer tag, 2240 Buf buffer) 2241 throws IOException { 2242 return floodReceive(tag, buffer, null).waitForFinish(); 2243 } 2244 2245 /** 2246 * Flood-receive a message from any process in this communicator 2247 * (non-blocking). A message tag of 0 is used. If <code>tag</code> is null, a 2248 * message will be received with any tag. The received message items are 2249 * stored in the given buffer. To send the message, the source process must 2250 * call the <code>floodSend()</code> method. 2251 * <p> 2252 * The <code>floodReceive()</code> method initiates the flood-receive operation 2253 * and immediately returns a {@linkplain CommRequest} object. The 2254 * flood-receive operation is performed by a separate thread. To wait for 2255 * the flood-receive operation to finish, call the returned {@linkplain 2256 * CommRequest} object's <code>waitForFinish()</code> method. When that method 2257 * returns, the incoming message items have been fully received. 2258 * <p> 2259 * <I>Note:</I> The length of the incoming buffer in the 2260 * <code>floodReceive()</code> method call must be the same as the length of the 2261 * outgoing buffer in the <code>floodSend()</code> method call. 2262 * 2263 * @param buffer Buffer of data items to be received. 2264 * @param request CommRequest object to use to wait for the operation to 2265 * finish; in this case <code>request</code> is returned. If 2266 * <code>request</code> is null, a new CommRequest object is created and 2267 * returned. 2268 * @return CommRequest object to use to wait for the operation to finish. 2269 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2270 * <code>buffer</code> is null. 2271 * @throws java.io.IOException Thrown if an I/O error occurred. 2272 */ 2273 public CommRequest floodReceive(Buf buffer, 2274 CommRequest request) 2275 throws IOException { 2276 return floodReceive(0, buffer, request); 2277 } 2278 2279 /** 2280 * Flood-receive a message from any process in this communicator with the 2281 * given message tag (non-blocking). If <code>tag</code> is null, a message will 2282 * be received with any tag. The received message items are stored in the 2283 * given buffer. To send the message, the source process must call the 2284 * <code>floodSend()</code> method. 2285 * <p> 2286 * The <code>floodReceive()</code> method initiates the flood-receive operation 2287 * and immediately returns a {@linkplain CommRequest} object. The 2288 * flood-receive operation is performed by a separate thread. To wait for 2289 * the flood-receive operation to finish, call the returned {@linkplain 2290 * CommRequest} object's <code>waitForFinish()</code> method. When that method 2291 * returns, the incoming message items have been fully received. 2292 * <p> 2293 * <I>Note:</I> The length of the incoming buffer in the 2294 * <code>floodReceive()</code> method call must be the same as the length of the 2295 * outgoing buffer in the <code>floodSend()</code> method call. 2296 * 2297 * @param tag Message tag, or null to receive any tag. 2298 * @param buffer Buffer of data items to be received. 2299 * @param request CommRequest object to use to wait for the operation to 2300 * finish; in this case <code>request</code> is returned. If 2301 * <code>request</code> is null, a new CommRequest object is created and 2302 * returned. 2303 * @return CommRequest object to use to wait for the operation to finish. 2304 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2305 * <code>buffer</code> is null. 2306 * @throws java.io.IOException Thrown if an I/O error occurred. 2307 */ 2308 public CommRequest floodReceive(Integer tag, 2309 Buf buffer, 2310 CommRequest request) 2311 throws IOException { 2312 // Get broadcast tree for root=0. 2313 int[] tree = getBroadcastTree(0); 2314 2315 // Set up CommRequest object with a special I/O request object that 2316 // forwards the message down the broadcast tree. 2317 CommRequest req = request == null ? new CommRequest() : request; 2318 req.mySendRequest = null; 2319 req.myRecvRequest = new FloodReceiveIORequest(tree); 2320 2321 // In process 0, ensure a channel to every process, then receive 2322 // (non-blocking) a message from any process. 2323 if (myRank == 0) { 2324 for (int src = 0; src < mySize; ++src) { 2325 ensureChannel(src); 2326 } 2327 myChannelGroup.receiveNoWait(null, tag, buffer, req.myRecvRequest); 2328 } // In other processes, ensure a channel to the child processes in the 2329 // broadcast tree, then receive (non-blocking) a message from the parent 2330 // process in the broadcast tree. 2331 else { 2332 for (int i = 1; i < tree.length; ++i) { 2333 ensureChannel(tree[i]); 2334 } 2335 myChannelGroup.receiveNoWait(getChannel(tree[0]), tag, buffer, req.myRecvRequest); 2336 } 2337 2338 // Return CommRequest object. 2339 return req; 2340 } 2341 2342 /** 2343 * Class FloodReceiveIORequest overrides the methods of class IORequest with 2344 * additional processing to forward the message when a message is received. 2345 */ 2346 private class FloodReceiveIORequest 2347 extends IORequest { 2348 2349 // Broadcast tree. 2350 private int[] tree; 2351 2352 // List of zero or more additional I/O requests to forward copies of the 2353 // received message. 2354 private LinkedList<IORequest> myForwardedIORequests 2355 = new LinkedList<IORequest>(); 2356 2357 /** 2358 * Construct a new I/O request object. 2359 * 2360 * @param tree Broadcast tree. 2361 */ 2362 public FloodReceiveIORequest(int[] tree) { 2363 super(); 2364 this.tree = tree; 2365 } 2366 2367 /** 2368 * Determine if this I/O request has finished. 2369 * 2370 * @return False if this I/O request has not finished, true if this I/O 2371 * request has finished successfully. 2372 * @throws IOException Thrown if this I/O request has finished and an 2373 * I/O error occurred. 2374 */ 2375 public synchronized boolean isFinished() 2376 throws IOException { 2377 if (!super.isFinished()) { 2378 return false; 2379 } 2380 for (IORequest req : myForwardedIORequests) { 2381 if (!req.isFinished()) { 2382 return false; 2383 } 2384 } 2385 return true; 2386 } 2387 2388 /** 2389 * Wait until the send or receive operation corresponding to this I/O 2390 * request has finished. For a receive operation, a {@linkplain Status} 2391 * object containing the results of the receive operation is returned; 2392 * for a send operation, null is returned. 2393 * 2394 * @return Receive status for a receive operation, or null for a send 2395 * operation. 2396 * @throws IOException Thrown if an I/O error occurred. 2397 */ 2398 public synchronized Status waitForFinish() 2399 throws IOException { 2400 Status status = super.waitForFinish(); 2401 for (IORequest req : myForwardedIORequests) { 2402 req.waitForFinish(); 2403 } 2404 return status; 2405 } 2406 2407 /** 2408 * Report that this I/O request succeeded. 2409 */ 2410 protected synchronized void reportSuccess() { 2411 try { 2412 super.reportSuccess(); 2413 2414 // Get message tag. 2415 int msgtag = myStatus.tag; 2416 2417 // Flood the message to every child process in the broadcast 2418 // tree. 2419 int n = tree.length; 2420 for (int i = 1; i < n; ++i) { 2421 IORequest req = new IORequest(); 2422 myForwardedIORequests.add(req); 2423 myChannelGroup.sendNoWait(getChannel(tree[i]), msgtag, myBuf, req); 2424 } 2425 } catch (IOException exc) { 2426 reportFailure(exc); 2427 } 2428 } 2429 } 2430 2431 /** 2432 * Broadcast a message to all processes in this communicator. The broadcast 2433 * uses a message tag of 0. All processes must call <code>broadcast()</code> 2434 * with the same value for <code>root</code> and with a buffer of the same 2435 * length and the same item data type. 2436 * <p> 2437 * The root process (the process whose rank in this communicator is 2438 * <code>root</code>) sends the message items. The message items come from the 2439 * given buffer. When the <code>broadcast()</code> method returns, the message 2440 * has been fully sent, but it may not yet have been fully received by all 2441 * processes. 2442 * <p> 2443 * Each non-root process receives the message items. The message items are 2444 * stored in the given buffer. When the <code>broadcast()</code> method returns, 2445 * the message has been fully received. 2446 * 2447 * @param root Root process's rank in this communicator. 2448 * @param buffer Buffer of data items to be sent (root process) or received 2449 * (non-root processes). 2450 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2451 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. 2452 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2453 * <code>buffer</code> is null. 2454 * @throws java.io.IOException Thrown if an I/O error occurred. 2455 */ 2456 public void broadcast(int root, 2457 Buf buffer) 2458 throws IOException { 2459 broadcast(root, 0, buffer); 2460 } 2461 2462 /** 2463 * Broadcast a message to all processes in this communicator using the given 2464 * message tag. All processes must call <code>broadcast()</code> with the same 2465 * values for <code>root</code> and <code>tag</code> and with a buffer of the same 2466 * length and the same item data type. 2467 * <p> 2468 * The root process (the process whose rank in this communicator is 2469 * <code>root</code>) sends the message items. The message items come from the 2470 * given buffer. When the <code>broadcast()</code> method returns, the message 2471 * has been fully sent, but it may not yet have been fully received by all 2472 * processes. 2473 * <p> 2474 * Each non-root process receives the message items. The message items are 2475 * stored in the given buffer. When the <code>broadcast()</code> method returns, 2476 * the message has been fully received. 2477 * 2478 * @param root Root process's rank in this communicator. 2479 * @param tag Message tag. 2480 * @param buffer Buffer of data items to be sent (root process) or received 2481 * (non-root processes). 2482 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2483 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. 2484 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2485 * <code>buffer</code> is null. 2486 * @throws java.io.IOException Thrown if an I/O error occurred. 2487 */ 2488 public void broadcast(int root, 2489 int tag, 2490 Buf buffer) 2491 throws IOException { 2492 // Verify preconditions. 2493 if (0 > root || root >= mySize) { 2494 throw new IndexOutOfBoundsException("Comm.broadcast(): root = " + root + " out of bounds"); 2495 } 2496 2497 // Early return if only one process. 2498 if (mySize == 1) { 2499 return; 2500 } 2501 2502 // A broadcast is done as a series of point-to-point messages. The 2503 // messages are organized into rounds. The number of rounds is 2504 // ceil(log_2(mySize)). In each round, processes send messages to other 2505 // processes in parallel. Here is the message pattern for a communicator 2506 // with 8 processes doing a broadcast from root process 0: 2507 // 2508 // Process 2509 // 0 1 2 3 4 5 6 7 2510 // | | | | | | | | 2511 // |---->| | | | | | | Round 1 2512 // | | | | | | | | 2513 // |---------->| | | | | | Round 2 2514 // | |---------->| | | | | 2515 // | | | | | | | | 2516 // |---------------------->| | | | Round 3 2517 // | |---------------------->| | | 2518 // | | |---------------------->| | 2519 // | | | |---------------------->| 2520 // | | | | | | | | 2521 // 2522 // If a process other than process 0 is the root, the message pattern is 2523 // the same, except the process ranks are circularly rotated. 2524 // Get array of process ranks in the broadcast tree. 2525 int[] broadcasttree = getBroadcastTree(root); 2526 int n = broadcasttree.length; 2527 2528 // Receive data from parent if any (blocking). 2529 int parent = broadcasttree[0]; 2530 if (parent != -1) { 2531 myChannelGroup.receive(getChannel(parent), tag, buffer); 2532 } 2533 2534 // Send data to children if any (non-blocking). 2535 IORequest[] iorequest = new IORequest[n]; 2536 for (int i = 1; i < n; ++i) { 2537 int child = broadcasttree[i]; 2538 iorequest[i] = new IORequest(); 2539 myChannelGroup.sendNoWait(getChannel(child), tag, buffer, iorequest[i]); 2540 } 2541 2542 // Wait for sends to finish if any. 2543 for (int i = 1; i < n; ++i) { 2544 iorequest[i].waitForFinish(); 2545 } 2546 } 2547 2548 /** 2549 * Scatter messages to all processes in this communicator. The scatter uses 2550 * a message tag of 0. All processes must call <code>scatter()</code> 2551 * with the same value for <code>root</code>. 2552 * <p> 2553 * The root process (the process whose rank in this communicator is 2554 * <code>root</code>) sends the message items. The message items sent to process 2555 * <I>i</I> come from the source buffer at index <I>i</I> in the given array 2556 * of source buffers. When the <code>scatter()</code> method returns, the 2557 * messages have been fully sent, but they may not yet have been fully 2558 * received by all processes. 2559 * <p> 2560 * Each process, including the root process, receives the message items. The 2561 * message items are stored in the given destination buffer. This must have 2562 * the same length and the same item data type as the corresponding source 2563 * buffer. When the <code>scatter()</code> method returns, the message has been 2564 * fully received. 2565 * <p> 2566 * In the non-root processes, the source buffer array is ignored and may be 2567 * null. 2568 * 2569 * @param root Root process's rank in this communicator. 2570 * @param srcarray Array of source buffers to be sent by the root process. 2571 * Ignored in the non-root processes. 2572 * @param dst Destination buffer to be received. 2573 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2574 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if 2575 * <code>srcarray</code>'s length does not equal the size of this communicator. 2576 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2577 * <code>srcarray</code> or any element thereof is null. Thrown if <code>dst</code> 2578 * is null. 2579 * @throws java.io.IOException Thrown if an I/O error occurred. 2580 */ 2581 public void scatter(int root, 2582 Buf[] srcarray, 2583 Buf dst) 2584 throws IOException { 2585 scatter(root, 0, srcarray, dst); 2586 } 2587 2588 /** 2589 * Scatter messages to all processes in this communicator using the given 2590 * message tag. All processes must call <code>scatter()</code> with the same 2591 * values for <code>root</code> and <code>tag</code>. 2592 * <p> 2593 * The root process (the process whose rank in this communicator is 2594 * <code>root</code>) sends the message items. The message items sent to process 2595 * <I>i</I> come from the source buffer at index <I>i</I> in the given array 2596 * of source buffers. When the <code>scatter()</code> method returns, the 2597 * messages have been fully sent, but they may not yet have been fully 2598 * received by all processes. 2599 * <p> 2600 * Each process, including the root process, receives the message items. The 2601 * message items are stored in the given destination buffer. This must have 2602 * the same length and the same item data type as the corresponding source 2603 * buffer. When the <code>scatter()</code> method returns, the message has been 2604 * fully received. 2605 * <p> 2606 * In the non-root processes, the source buffer array is ignored and may be 2607 * null. 2608 * 2609 * @param root Root process's rank in this communicator. 2610 * @param tag Message tag. 2611 * @param srcarray Array of source buffers to be sent by the root process. 2612 * Ignored in the non-root processes. 2613 * @param dst Destination buffer to be received. 2614 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2615 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if 2616 * <code>srcarray</code>'s length does not equal the size of this communicator. 2617 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2618 * <code>srcarray</code> or any element thereof is null. Thrown if <code>dst</code> 2619 * is null. 2620 * @throws java.io.IOException Thrown if an I/O error occurred. 2621 */ 2622 public void scatter(int root, 2623 int tag, 2624 Buf[] srcarray, 2625 Buf dst) 2626 throws IOException { 2627 // Verify preconditions. 2628 if (0 > root || root >= mySize) { 2629 throw new IndexOutOfBoundsException("Comm.scatter(): root = " + root + " out of bounds"); 2630 } 2631 2632 // A scatter is done as a series of point-to-point messages. The root 2633 // process sends a separate message to every other process. Here is the 2634 // message pattern for a communicator with 8 processes scattering from 2635 // root process 0: 2636 // 2637 // Process 2638 // 0 1 2 3 4 5 6 7 2639 // | | | | | | | | 2640 // |---->| | | | | | | 2641 // | | | | | | | | 2642 // |---------->| | | | | | 2643 // | | | | | | | | 2644 // |---------------->| | | | | 2645 // | | | | | | | | 2646 // |---------------------->| | | | 2647 // | | | | | | | | 2648 // |---------------------------->| | | 2649 // | | | | | | | | 2650 // |---------------------------------->| | 2651 // | | | | | | | | 2652 // |---------------------------------------->| 2653 // | | | | | | | | 2654 // Root process sends all messages. 2655 if (myRank == root) { 2656 // Array of IORequest objects for non-blocking sends. 2657 IORequest[] iorequest = new IORequest[mySize]; 2658 2659 // Initiate sends to lower-ranked processes. 2660 for (int rank = 0; rank < myRank; ++rank) { 2661 iorequest[rank] = new IORequest(); 2662 myChannelGroup.sendNoWait(getChannel(rank), tag, srcarray[rank], iorequest[rank]); 2663 } 2664 2665 // Initiate sends to higher-ranked processes. 2666 for (int rank = myRank + 1; rank < mySize; ++rank) { 2667 iorequest[rank] = new IORequest(); 2668 myChannelGroup.sendNoWait(getChannel(rank), tag, srcarray[rank], iorequest[rank]); 2669 } 2670 2671 // Copy to itself. 2672 dst.copy(srcarray[myRank]); 2673 2674 // Wait for completion of sends to lower-ranked processes. 2675 for (int rank = 0; rank < myRank; ++rank) { 2676 iorequest[rank].waitForFinish(); 2677 } 2678 2679 // Wait for completion of sends to higher-ranked processes. 2680 for (int rank = myRank + 1; rank < mySize; ++rank) { 2681 iorequest[rank].waitForFinish(); 2682 } 2683 } // Non-root process receives one message. 2684 else { 2685 myChannelGroup.receive(getChannel(root), tag, dst); 2686 } 2687 } 2688 2689 /** 2690 * Gather messages from all processes in this communicator. The gather uses 2691 * a message tag of 0. All processes must call <code>gather()</code> with the 2692 * same value for <code>root</code>. 2693 * <p> 2694 * The root process (the process whose rank in this communicator is 2695 * <code>root</code>) receives the message items. The message items received 2696 * from process <I>i</I> are stored in the destination buffer at index 2697 * <I>i</I> in the given array of destination buffers. When the 2698 * <code>gather()</code> method returns, all the messages have been fully 2699 * received. 2700 * <p> 2701 * Each process, including the root process, sends the message items. The 2702 * message items come from the given source buffer. This must have the same 2703 * length and the same item data type as the corresponding destination 2704 * buffer. When the <code>gather()</code> method returns, the message has been 2705 * fully sent, but it may not yet have been fully received by the root 2706 * process. 2707 * <p> 2708 * In the non-root processes, the destination buffer array is ignored and 2709 * may be null. 2710 * 2711 * @param root Root process's rank in this communicator. 2712 * @param src Source buffer to be sent. 2713 * @param dstarray Array of destination buffers to be received by the root 2714 * process. Ignored in the non-root processes. 2715 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2716 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if 2717 * <code>dstarray</code>'s length does not equal the size of this communicator. 2718 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2719 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code> 2720 * is null. 2721 * @throws java.io.IOException Thrown if an I/O error occurred. 2722 */ 2723 public void gather(int root, 2724 Buf src, 2725 Buf[] dstarray) 2726 throws IOException { 2727 gather(root, 0, src, dstarray); 2728 } 2729 2730 /** 2731 * Gather messages from all processes in this communicator using the given 2732 * message tag. All processes must call <code>gather()</code> with the same 2733 * values for <code>root</code> and <code>tag</code>. 2734 * <p> 2735 * The root process (the process whose rank in this communicator is 2736 * <code>root</code>) receives the message items. The message items received 2737 * from process <I>i</I> are stored in the destination buffer at index 2738 * <I>i</I> in the given array of destination buffers. When the 2739 * <code>gather()</code> method returns, all the messages have been fully 2740 * received. 2741 * <p> 2742 * Each process, including the root process, sends the message items. The 2743 * message items come from the given source buffer. This must have the same 2744 * length and the same item data type as the corresponding destination 2745 * buffer. When the <code>gather()</code> method returns, the message has been 2746 * fully sent, but it may not yet have been fully received by the root 2747 * process. 2748 * <p> 2749 * In the non-root processes, the destination buffer array is ignored and 2750 * may be null. 2751 * 2752 * @param root Root process's rank in this communicator. 2753 * @param tag Message tag. 2754 * @param src Source buffer to be sent. 2755 * @param dstarray Array of destination buffers to be received by the root 2756 * process. Ignored in the non-root processes. 2757 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2758 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. Thrown if 2759 * <code>dstarray</code>'s length does not equal the size of this communicator. 2760 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2761 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code> 2762 * is null. 2763 * @throws java.io.IOException Thrown if an I/O error occurred. 2764 */ 2765 public void gather(int root, 2766 int tag, 2767 Buf src, 2768 Buf[] dstarray) 2769 throws IOException { 2770 // Verify preconditions. 2771 if (0 > root || root >= mySize) { 2772 throw new IndexOutOfBoundsException("Comm.gather(): root = " + root + " out of bounds"); 2773 } 2774 2775 // A gather is done as a series of point-to-point messages. The root 2776 // process receives a separate message from every other process. Here is 2777 // the message pattern for a communicator with 8 processes gathering 2778 // into root process 0: 2779 // 2780 // Process 2781 // 0 1 2 3 4 5 6 7 2782 // | | | | | | | | 2783 // |<----| | | | | | | 2784 // | | | | | | | | 2785 // |<----------| | | | | | 2786 // | | | | | | | | 2787 // |<----------------| | | | | 2788 // | | | | | | | | 2789 // |<----------------------| | | | 2790 // | | | | | | | | 2791 // |<----------------------------| | | 2792 // | | | | | | | | 2793 // |<----------------------------------| | 2794 // | | | | | | | | 2795 // |<----------------------------------------| 2796 // | | | | | | | | 2797 // Root process receives all messages. 2798 if (myRank == root) { 2799 // Array of IORequest objects for non-blocking receives. 2800 IORequest[] iorequest = new IORequest[mySize]; 2801 2802 // Initiate receives from lower-ranked processes. 2803 for (int rank = 0; rank < myRank; ++rank) { 2804 iorequest[rank] = new IORequest(); 2805 myChannelGroup.receiveNoWait(getChannel(rank), tag, dstarray[rank], iorequest[rank]); 2806 } 2807 2808 // Initiate receives from higher-ranked processes. 2809 for (int rank = myRank + 1; rank < mySize; ++rank) { 2810 iorequest[rank] = new IORequest(); 2811 myChannelGroup.receiveNoWait(getChannel(rank), tag, dstarray[rank], iorequest[rank]); 2812 } 2813 2814 // Copy to itself. 2815 dstarray[myRank].copy(src); 2816 2817 // Wait for completion of receives from lower-ranked processes. 2818 for (int rank = 0; rank < myRank; ++rank) { 2819 iorequest[rank].waitForFinish(); 2820 } 2821 2822 // Wait for completion of receives from higher-ranked processes. 2823 for (int rank = myRank + 1; rank < mySize; ++rank) { 2824 iorequest[rank].waitForFinish(); 2825 } 2826 } // Non-root process sends one message. 2827 else { 2828 myChannelGroup.send(getChannel(root), tag, src); 2829 } 2830 } 2831 2832 /** 2833 * All-gather messages from each process to all processes in this 2834 * communicator. A message tag of 0 is used. All processes must call 2835 * <code>allGather()</code>. 2836 * <p> 2837 * Each process sends the message items in the given source buffer. When the 2838 * <code>allGather()</code> method returns, the source buffer has been fully 2839 * sent. 2840 * <p> 2841 * Each process receives message items from the other processes. The message 2842 * items received from process <I>i</I> are stored in the destination buffer 2843 * at index <I>i</I> in the given array of destination buffers. This 2844 * destination buffer must have the same length and the same item data type 2845 * as the source buffer in process <I>i</I>. When the <code>allGather()</code> 2846 * method returns, all the destination buffers have been fully received. 2847 * <p> 2848 * All-gather is the same as gather, except that every process has an array 2849 * of destination buffers, and every process receives the results of the 2850 * gather. 2851 * 2852 * @param src Source buffer to be sent. 2853 * @param dstarray Array of destination buffers to be received. 2854 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2855 * <code>dstarray</code>'s length does not equal the size of this communicator. 2856 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2857 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code> 2858 * is null. 2859 * @throws java.io.IOException Thrown if an I/O error occurred. 2860 */ 2861 public void allGather(Buf src, 2862 Buf[] dstarray) 2863 throws IOException { 2864 allGather(0, src, dstarray); 2865 } 2866 2867 /** 2868 * All-gather messages from each process to all processes in this 2869 * communicator using the given message tag. All processes must call 2870 * <code>allGather()</code> with the same value for <code>tag</code>. 2871 * <p> 2872 * Each process sends the message items in the given source buffer. When the 2873 * <code>allGather()</code> method returns, the source buffer has been fully 2874 * sent. 2875 * <p> 2876 * Each process receives message items from the other processes. The message 2877 * items received from process <I>i</I> are stored in the destination buffer 2878 * at index <I>i</I> in the given array of destination buffers. This 2879 * destination buffer must have the same length and the same item data type 2880 * as the source buffer in process <I>i</I>. When the <code>allGather()</code> 2881 * method returns, all the destination buffers have been fully received. 2882 * <p> 2883 * All-gather is the same as gather, except that every process has an array 2884 * of destination buffers, and every process receives the results of the 2885 * gather. 2886 * 2887 * @param tag Message tag. 2888 * @param src Source buffer to be sent. 2889 * @param dstarray Array of destination buffers to be received. 2890 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2891 * <code>dstarray</code>'s length does not equal the size of this communicator. 2892 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2893 * <code>dstarray</code> or any element thereof is null. Thrown if <code>src</code> 2894 * is null. 2895 * @throws java.io.IOException Thrown if an I/O error occurred. 2896 */ 2897 public void allGather(int tag, 2898 Buf src, 2899 Buf[] dstarray) 2900 throws IOException { 2901 // Get ranks of predecessor and successor processes. 2902 int pred = (myRank - 1 + mySize) % mySize; 2903 int succ = (myRank + 1) % mySize; 2904 2905 // Copy source buffer into destination buffer at my own rank. 2906 dstarray[myRank].copy(src); 2907 2908 // Do (mySize-1) message rounds. Messages are sent in a pipelined 2909 // fashion from each process to its predecessor until each process's 2910 // source data has arrived in every process. Each outgoing message is 2911 // overlapped with an incoming message. 2912 for (int i = 1; i < mySize; ++i) { 2913 sendReceive(/*toRank */pred, 2914 /*sendTag */ tag, 2915 /*sendBuf */ dstarray[(myRank + i - 1) % mySize], 2916 /*fromRank*/ succ, 2917 /*recvTag */ tag, 2918 /*recvBuf */ dstarray[(myRank + i) % mySize]); 2919 } 2920 } 2921 2922 /** 2923 * Perform a reduction on all processes in this communicator. The reduction 2924 * uses a message tag of 0. All processes must call <code>reduce()</code> with 2925 * the same value for <code>root</code>, with a buffer of the same length and 2926 * the same item data type, and with the same binary operation (class 2927 * {@linkplain edu.rit.pj.reduction.Op Op}). 2928 * <p> 2929 * Before calling <code>reduce()</code>, each process has a buffer filled with 2930 * data items. After <code>reduce()</code> returns, each data item in the root 2931 * process's buffer has been set to the <B>reduction</B> of the 2932 * corresponding data items in all the processes' buffers. The reduction is 2933 * calculated by this formula: 2934 * <p> 2935 * <I>item</I><SUB>0</SUB> <I>op</I> 2936 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 2937 * <p> 2938 * where <I>op</I> is the binary operation passed in as an argument and 2939 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 2940 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 2941 * process rank 0, 1, 2, and so on. However, the order in which the data 2942 * items actually are combined is not specified. Therefore, the binary 2943 * operation must be such that the answer will be the same regardless of the 2944 * order in which the data items are combined; that is, the binary operation 2945 * must be commutative and associative. 2946 * <p> 2947 * In the root process, the reduce operation always changes the buffer's 2948 * contents as described above. In the non-root processes, the reduce 2949 * operation may or may not change the buffer's contents; the final contents 2950 * of the buffer in the non-root processes is not specified. 2951 * <p> 2952 * When the <code>reduce()</code> method returns in the root process, the 2953 * reduction has been fully performed as described above. When the 2954 * <code>reduce()</code> method returns in a non-root process, the non-root 2955 * process has sent all its data items into the reduction, but the reduction 2956 * may not be fully complete in the root process yet. 2957 * 2958 * @param root Root process's rank in this communicator. 2959 * @param buffer Buffer of data items to be reduced. 2960 * @param op Binary operation. 2961 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 2962 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. 2963 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 2964 * <code>buf</code> is null or <code>op</code> 2965 * is null. 2966 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 2967 * <code>buf</code> and <code>op</code> do not use the same item data type. 2968 * @throws java.io.IOException Thrown if an I/O error occurred. 2969 */ 2970 public void reduce(int root, 2971 Buf buffer, 2972 Op op) 2973 throws IOException { 2974 reduce(root, 0, buffer, op); 2975 } 2976 2977 /** 2978 * Perform a reduction on all processes in this communicator using the given 2979 * message tag. All processes must call <code>reduce()</code> with the same 2980 * value for <code>root</code>, with a buffer of the same length and the same 2981 * item data type, and with the same binary operation (class {@linkplain 2982 * edu.rit.pj.reduction.Op Op}). 2983 * <p> 2984 * Before calling <code>reduce()</code>, each process has a buffer filled with 2985 * data items. After <code>reduce()</code> returns, each data item in the root 2986 * process's buffer has been set to the <B>reduction</B> of the 2987 * corresponding data items in all the processes' buffers. The reduction is 2988 * calculated by this formula: 2989 * <p> 2990 * <I>item</I><SUB>0</SUB> <I>op</I> 2991 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 2992 * <p> 2993 * where <I>op</I> is the binary operation passed in as an argument and 2994 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 2995 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 2996 * process rank 0, 1, 2, and so on. However, the order in which the data 2997 * items actually are combined is not specified. Therefore, the binary 2998 * operation must be such that the answer will be the same regardless of the 2999 * order in which the data items are combined; that is, the binary operation 3000 * must be commutative and associative. 3001 * <p> 3002 * In the root process, the reduce operation always changes the buffer's 3003 * contents as described above. In the non-root processes, the reduce 3004 * operation may or may not change the buffer's contents; the final contents 3005 * of the buffer in the non-root processes is not specified. 3006 * <p> 3007 * When the <code>reduce()</code> method returns in the root process, the 3008 * reduction has been fully performed as described above. When the 3009 * <code>reduce()</code> method returns in a non-root process, the non-root 3010 * process has sent all its data items into the reduction, but the reduction 3011 * may not be fully complete in the root process yet. 3012 * 3013 * @param root Root process's rank in this communicator. 3014 * @param tag Message tag. 3015 * @param buffer Buffer of data items to be reduced. 3016 * @param op Binary operation. 3017 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 3018 * <code>root</code> is not in the range 0 .. <code>size()</code>-1. 3019 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3020 * <code>buf</code> is null or <code>op</code> 3021 * is null. 3022 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3023 * <code>buf</code> and <code>op</code> do not use the same item data type. 3024 * @throws java.io.IOException Thrown if an I/O error occurred. 3025 */ 3026 public void reduce(int root, 3027 int tag, 3028 Buf buffer, 3029 Op op) 3030 throws IOException { 3031 // Verify preconditions. 3032 if (0 > root || root >= mySize) { 3033 throw new IndexOutOfBoundsException("Comm.reduce(): root = " + root + " out of bounds"); 3034 } 3035 3036 // Early return if only one process. 3037 if (mySize == 1) { 3038 return; 3039 } 3040 3041 // A reduction is done as a series of point-to-point messages. The 3042 // messages are organized into rounds. The number of rounds is 3043 // ceil(log_2(mySize)). The message pattern is the reverse of the 3044 // broadcast message pattern. In each round, processes receive messages 3045 // from other processes and reduce the data items into their accumulator 3046 // buffers in parallel. When a process has received all messages, it 3047 // sends the reduced results on. Here is the message pattern for a 3048 // communicator with 8 processes doing a reduction into root process 0: 3049 // 3050 // Process 3051 // 0 1 2 3 4 5 6 7 3052 // | | | | | | | | 3053 // |<----------------------| | | | Round 1 3054 // | |<----------------------| | | 3055 // | | |<----------------------| | 3056 // | | | |<----------------------| 3057 // | | | | | | | | 3058 // |<----------| | | | | | Round 2 3059 // | |<----------| | | | | 3060 // | | | | | | | | 3061 // |<----| | | | | | | Round 3 3062 // | | | | | | | | 3063 // 3064 // If a process other than process 0 is the root, the message pattern is 3065 // the same, except the process ranks are circularly rotated. 3066 // Get array of process ranks in the broadcast tree. 3067 int[] broadcasttree = getBroadcastTree(root); 3068 int n = broadcasttree.length; 3069 3070 // Set up reduction buffer on top of source buffer. 3071 Buf reductionbuf = buffer.getReductionBuf(op); 3072 3073 // Receive data from children if any, one at a time in reverse order. 3074 for (int i = n - 1; i >= 1; --i) { 3075 int child = broadcasttree[i]; 3076 myChannelGroup.receive(getChannel(child), tag, reductionbuf); 3077 } 3078 3079 // Send data to parent if any. 3080 int parent = broadcasttree[0]; 3081 if (parent != -1) { 3082 myChannelGroup.send(getChannel(parent), tag, buffer); 3083 } 3084 } 3085 3086 /** 3087 * Perform an all-reduce on all processes in this communicator. The 3088 * all-reduce uses a message tag of 0. All processes must call 3089 * <code>allReduce()</code> with a buffer of the same length and the same item 3090 * data type, and with the same binary operation (class {@linkplain 3091 * edu.rit.pj.reduction.Op Op}). 3092 * <p> 3093 * Before calling <code>allReduce()</code>, each process has a buffer filled 3094 * with data items. After <code>allReduce()</code> returns, each data item in 3095 * the calling process's buffer has been set to the <B>reduction</B> of the 3096 * corresponding data items in all the processes' buffers. The reduction is 3097 * calculated by this formula: 3098 * <p> 3099 * <I>item</I><SUB>0</SUB> <I>op</I> 3100 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 3101 * <p> 3102 * where <I>op</I> is the binary operation passed in as an argument and 3103 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 3104 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 3105 * process rank 0, 1, 2, and so on. However, the order in which the data 3106 * items actually are combined is not specified. Therefore, the binary 3107 * operation must be such that the answer will be the same regardless of the 3108 * order in which the data items are combined; that is, the binary operation 3109 * must be commutative and associative. 3110 * <p> 3111 * The <code>allReduce()</code> method is similar to the <code>reduce()</code> 3112 * method, except the results are stored in all the processes' buffers, not 3113 * just the one root process's buffer. 3114 * 3115 * @param buffer Buffer of data items to be reduced. 3116 * @param op Binary operation. 3117 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3118 * <code>buf</code> is null or <code>op</code> 3119 * is null. 3120 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3121 * <code>buf</code> and <code>op</code> do not use the same item data type. 3122 * @throws java.io.IOException Thrown if an I/O error occurred. 3123 */ 3124 public void allReduce(Buf buffer, 3125 Op op) 3126 throws IOException { 3127 allReduce(0, buffer, op); 3128 } 3129 3130 /** 3131 * Perform an all-reduce on all processes in this communicator using the 3132 * given message tag. All processes must call <code>allReduce()</code> with a 3133 * buffer of the same length and the same item data type, and with the same 3134 * binary operation (class {@linkplain edu.rit.pj.reduction.Op Op}). 3135 * <p> 3136 * Before calling <code>allReduce()</code>, each process has a buffer filled 3137 * with data items. After <code>allReduce()</code> returns, each data item in 3138 * the calling process's buffer has been set to the <B>reduction</B> of the 3139 * corresponding data items in all the processes' buffers. The reduction is 3140 * calculated by this formula: 3141 * <p> 3142 * <I>item</I><SUB>0</SUB> <I>op</I> 3143 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 3144 * <p> 3145 * where <I>op</I> is the binary operation passed in as an argument and 3146 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 3147 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 3148 * process rank 0, 1, 2, and so on. However, the order in which the data 3149 * items actually are combined is not specified. Therefore, the binary 3150 * operation must be such that the answer will be the same regardless of the 3151 * order in which the data items are combined; that is, the binary operation 3152 * must be commutative and associative. 3153 * <p> 3154 * The <code>allReduce()</code> method is similar to the <code>reduce()</code> 3155 * method, except the results are stored in all the processes' buffers, not 3156 * just the one root process's buffer. 3157 * 3158 * @param tag Message tag. 3159 * @param buffer Buffer of data items to be reduced. 3160 * @param op Binary operation. 3161 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3162 * <code>buf</code> is null or <code>op</code> 3163 * is null. 3164 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3165 * <code>buf</code> and <code>op</code> do not use the same item data type. 3166 * @throws java.io.IOException Thrown if an I/O error occurred. 3167 */ 3168 public void allReduce(int tag, 3169 Buf buffer, 3170 Op op) 3171 throws IOException { 3172 // An all-reduce is done using a "butterfly" message passing pattern. 3173 // Consider the case of K=8 processes. In the first round, processes one 3174 // rank apart exchange data, then each processes accumulates the data 3175 // from the other process using the reduction operator. In the second 3176 // round, processes two ranks apart exchange and accumulate data. In the 3177 // third round, processes four ranks apart exchange and accumulate data. 3178 // 3179 // Process 3180 // 0 1 2 3 4 5 6 7 3181 // | | | | | | | | 3182 // |<--->| |<--->| |<--->| |<--->| Round 1 3183 // | | | | | | | | 3184 // |<--------->| | |<--------->| | Round 2 3185 // | |<--------->| | |<--------->| 3186 // | | | | | | | | 3187 // |<--------------------->| | | | Round 3 3188 // | |<--------------------->| | | 3189 // | | |<--------------------->| | 3190 // | | | |<--------------------->| 3191 // | | | | | | | | 3192 // 3193 // The butterfly pattern works only if the number of processes is a 3194 // power of two. If this is not the case, there are two extra message 3195 // rounds. Each process outside the butterfly pattern sends its data to 3196 // its counterpart inside the butterfly pattern, which accumulates the 3197 // data using the reduction operator. Then the butterfly pattern takes 3198 // place. Afterwards, each process outside the butterfly pattern 3199 // receives the final result from its counterpart inside the butterfly 3200 // pattern. For the case of K=10 processes: 3201 // 3202 // Process 3203 // 0 1 2 3 4 5 6 7 8 9 3204 // | | | | | | | | | | 3205 // |<----------------------------------------------| | Pre 3206 // | |<----------------------------------------------| 3207 // | | | | | | | | | | 3208 // |<--->| |<--->| |<--->| |<--->| | | Round 1 3209 // | | | | | | | | | | 3210 // |<--------->| | |<--------->| | | | Round 2 3211 // | |<--------->| | |<--------->| | | 3212 // | | | | | | | | | | 3213 // |<--------------------->| | | | | | Round 3 3214 // | |<--------------------->| | | | | 3215 // | | |<--------------------->| | | | 3216 // | | | |<--------------------->| | | 3217 // | | | | | | | | | | 3218 // |---------------------------------------------->| | Post 3219 // | |---------------------------------------------->| 3220 // | | | | | | | | | | 3221 // 3222 // If K is a power of two, the all-reduce takes (log_2 K) rounds. If K 3223 // is not a power of two, the all-reduce takes floor(log_2 K)+2 rounds. 3224 3225 // Early exit if only one process. 3226 if (mySize == 1) { 3227 return; 3228 } 3229 3230 // Determine the highest power of 2 less than or equal to this 3231 // communicator's size. Processes at this rank and above will be outside 3232 // the butterfly message passing pattern. 3233 int outside = mySizePowerOf2; 3234 3235 // For processes outside the butterfly: 3236 if (myRank >= outside) { 3237 int insideRank = myRank - outside; 3238 3239 // Send initial data to counterpart inside. 3240 send(insideRank, tag, buffer); 3241 3242 // Receive reduced result from counterpart inside. 3243 receive(insideRank, tag, buffer); 3244 } // For processes inside the butterfly: 3245 else { 3246 // Set up temporary receive buffer. 3247 Buf receiveBuf = buffer.getTemporaryBuf(); 3248 3249 // Set up reduction buffer on top of data buffer. 3250 Buf reductionBuf = buffer.getReductionBuf(op); 3251 3252 // If there is a counterpart process outside, receive and accumulate 3253 // its initial data. 3254 int outsideRank = myRank + outside; 3255 if (outsideRank < mySize) { 3256 receive(outsideRank, tag, reductionBuf); 3257 } 3258 3259 // Perform butterfly message passing rounds with counterpart 3260 // processes inside. 3261 int round = 1; 3262 while (round < outside) { 3263 int otherRank = myRank ^ round; 3264 sendReceive(otherRank, tag, buffer, otherRank, tag, receiveBuf); 3265 reductionBuf.copy(receiveBuf); 3266 round <<= 1; 3267 } 3268 3269 // If there is a counterpart process outside, send the reduced 3270 // result. 3271 if (outsideRank < mySize) { 3272 send(outsideRank, tag, buffer); 3273 } 3274 } 3275 } 3276 3277 /** 3278 * Do an all-to-all among all processes in this communicator. A message tag 3279 * of 0 is used. 3280 * <p> 3281 * <code>srcarray</code> must be an array of <I>K</I> buffers, where <I>K</I> is 3282 * the size of this communicator. <code>dstarray</code> must be an array of 3283 * <I>K</I> buffers referring to different storage from the source buffers. 3284 * For each process rank <I>k</I>, 0 <= <I>k</I> <= <I>K</I>, and each 3285 * buffer index <I>i</I>, 0 <= <I>i</I> <= <I>K</I>, the contents of 3286 * <code>srcarray[k]</code> in process <I>i</I> are sent to <code>dstarray[i]</code> 3287 * in process <I>k</I>. 3288 * 3289 * @param srcarray Array of source buffers to be sent by this process. 3290 * @param dstarray Array of destination buffers to be received by this 3291 * process. 3292 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 3293 * <code>srcarray</code>'s length does not equal the size of this communicator. 3294 * Thrown if <code>dstarray</code>'s length does not equal the size of this 3295 * communicator. 3296 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3297 * <code>srcarray</code> or any element thereof is null. Thrown if 3298 * <code>dstarray</code> or any element thereof is null. 3299 * @throws java.io.IOException Thrown if an I/O error occurred. 3300 */ 3301 public void allToAll(Buf[] srcarray, 3302 Buf[] dstarray) 3303 throws IOException { 3304 allToAll(0, srcarray, dstarray); 3305 } 3306 3307 /** 3308 * Do an all-to-all among all processes in this communicator using the given 3309 * message tag. All processes must call <code>allToAll()</code> with the same 3310 * value for <code>tag</code>. 3311 * <p> 3312 * <code>srcarray</code> must be an array of <I>K</I> buffers, where <I>K</I> is 3313 * the size of this communicator. <code>dstarray</code> must be an array of 3314 * <I>K</I> buffers referring to different storage from the source buffers. 3315 * For each process rank <I>k</I>, 0 <= <I>k</I> <= <I>K</I>, and each 3316 * buffer index <I>i</I>, 0 <= <I>i</I> <= <I>K</I>, the contents of 3317 * <code>srcarray[k]</code> in process <I>i</I> are sent to <code>dstarray[i]</code> 3318 * in process <I>k</I>. 3319 * 3320 * @param tag Message tag. 3321 * @param srcarray Array of source buffers to be sent by this process. 3322 * @param dstarray Array of destination buffers to be received by this 3323 * process. 3324 * @throws java.lang.IndexOutOfBoundsException (unchecked exception) Thrown if 3325 * <code>srcarray</code>'s length does not equal the size of this communicator. 3326 * Thrown if <code>dstarray</code>'s length does not equal the size of this 3327 * communicator. 3328 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3329 * <code>srcarray</code> or any element thereof is null. Thrown if 3330 * <code>dstarray</code> or any element thereof is null. 3331 * @throws java.io.IOException Thrown if an I/O error occurred. 3332 */ 3333 public void allToAll(int tag, 3334 Buf[] srcarray, 3335 Buf[] dstarray) 3336 throws IOException { 3337 // An all-to-all is done as a series of send-receives. Each process 3338 // sends the appropriate buffer to the process one ahead and receives 3339 // the appropriate buffer from the process one behind. Then each process 3340 // sends the appropriate buffer to the process two ahead and receives 3341 // the appropriate buffer from the process two behind. And so on. Here 3342 // is the message pattern for a communicator with 4 processes doing an 3343 // all-to-all: 3344 // 3345 // Process 3346 // 0 1 2 3 3347 // | | | | 3348 // |---->| | | Round 1 3349 // | |---->| | 3350 // | | |---->| 3351 // - -->| | | |--- - 3352 // | | | | 3353 // | | | | 3354 // |---------->| | Round 2 3355 // | |---------->| 3356 // - -->| | |--------- - 3357 // - -------->| | |--- - 3358 // | | | | 3359 // | | | | 3360 // |---------------->| Round 3 3361 // - -->| |--------------- - 3362 // - -------->| |--------- - 3363 // - -------------->| |--- - 3364 // | | | | 3365 3366 // Copy source to destination at this process's own rank. 3367 dstarray[myRank].copy(srcarray[myRank]); 3368 3369 // Initiate K-1 non-blocking send-receives. 3370 CommRequest[] commrequest = new CommRequest[mySize]; 3371 for (int i = 1; i < mySize; ++i) { 3372 int toRank = (myRank + i) % mySize; 3373 int fromRank = (myRank - i + mySize) % mySize; 3374 commrequest[i] = sendReceive(toRank, tag, srcarray[toRank], 3375 fromRank, tag, dstarray[fromRank], null); 3376 } 3377 3378 // Wait for completion of all send-receives. 3379 for (int i = 1; i < mySize; ++i) { 3380 commrequest[i].waitForFinish(); 3381 } 3382 } 3383 3384 /** 3385 * Perform a scan on all processes in this communicator. A message tag of 0 3386 * is used. All processes must call <code>scan()</code> with a buffer of the 3387 * same length and the same item data type, and with the same binary 3388 * operation (class {@linkplain edu.rit.pj.reduction.Op Op}). 3389 * <p> 3390 * Before calling <code>scan()</code>, each process has a buffer filled with 3391 * data items. After <code>scan()</code> returns, each data item in the buffer 3392 * of process rank <I>i</I> has been set to the <B>reduction</B> of the 3393 * corresponding data items in the buffers of process ranks 0 through 3394 * <I>i</I>. The reduction is calculated by this formula: 3395 * <p> 3396 * <I>item</I><SUB>0</SUB> <I>op</I> 3397 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 3398 * <p> 3399 * where <I>op</I> is the binary operation passed in as an argument and 3400 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 3401 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 3402 * process ranks 0 through <I>i</I>. However, the order in which the data 3403 * items actually are combined is not specified. Therefore, the binary 3404 * operation must be such that the answer will be the same regardless of the 3405 * order in which the data items are combined; that is, the binary operation 3406 * must be commutative and associative. 3407 * 3408 * @param buf Buffer of data items to be scanned. 3409 * @param op Binary operation. 3410 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3411 * <code>buf</code> is null or <code>op</code> 3412 * is null. 3413 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3414 * <code>buf</code> and <code>op</code> do not use the same item data type. 3415 * @throws java.io.IOException Thrown if an I/O error occurred. 3416 */ 3417 public void scan(Buf buf, 3418 Op op) 3419 throws IOException { 3420 scan(0, buf, op); 3421 } 3422 3423 /** 3424 * Perform a scan on all processes in this communicator using the given 3425 * message tag. All processes must call <code>scan()</code> with the same value 3426 * for <code>tag</code>, with a buffer of the same length and the same item data 3427 * type, and with the same binary operation (class {@linkplain 3428 * edu.rit.pj.reduction.Op Op}). 3429 * <p> 3430 * Before calling <code>scan()</code>, each process has a buffer filled with 3431 * data items. After <code>scan()</code> returns, each data item in the buffer 3432 * of process rank <I>i</I> has been set to the <B>reduction</B> of the 3433 * corresponding data items in the buffers of process ranks 0 through 3434 * <I>i</I>. The reduction is calculated by this formula: 3435 * <p> 3436 * <I>item</I><SUB>0</SUB> <I>op</I> 3437 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 3438 * <p> 3439 * where <I>op</I> is the binary operation passed in as an argument and 3440 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 3441 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 3442 * process ranks 0 through <I>i</I>. However, the order in which the data 3443 * items actually are combined is not specified. Therefore, the binary 3444 * operation must be such that the answer will be the same regardless of the 3445 * order in which the data items are combined; that is, the binary operation 3446 * must be commutative and associative. 3447 * 3448 * @param tag Message tag. 3449 * @param buf Buffer of data items to be scanned. 3450 * @param op Binary operation. 3451 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3452 * <code>buf</code> is null or <code>op</code> 3453 * is null. 3454 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3455 * <code>buf</code> and <code>op</code> do not use the same item data type. 3456 * @throws java.io.IOException Thrown if an I/O error occurred. 3457 */ 3458 public void scan(int tag, 3459 Buf buf, 3460 Op op) 3461 throws IOException { 3462 // Early return if only one process. 3463 if (mySize == 1) { 3464 return; 3465 } 3466 3467 // A scan is done as a series of point-to-point messages. The messages 3468 // are organized into rounds. The number of rounds is 3469 // ceil(log_2(mySize)). In the first round, each process sends its data 3470 // to the process one rank ahead, and the incoming data is combined with 3471 // the process's data using the reduction operator. In the second round, 3472 // each process sends its data to the process two ranks ahead. In the 3473 // third round, each process sends its data to process four ranks ahead. 3474 // And so on. Here is the message pattern for a communicator with 8 3475 // processes: 3476 // 3477 // Process 3478 // 0 1 2 3 4 5 6 7 3479 // | | | | | | | | 3480 // |---->|---->|---->|---->|---->|---->|---->| Round 1 3481 // | | | | | | | | 3482 // |---------->| | | | | | Round 2 3483 // | |---------->| | | | | 3484 // | | |---------->| | | | 3485 // | | | |---------->| | | 3486 // | | | | |---------->| | 3487 // | | | | | |---------->| 3488 // | | | | | | | | 3489 // |---------------------->| | | | Round 3 3490 // | |---------------------->| | | 3491 // | | |---------------------->| | 3492 // | | | |---------------------->| 3493 // 3494 // Get temporary buffer for holding incoming data items. 3495 Buf tempbuf = buf.getTemporaryBuf(); 3496 3497 // Get reduction buffer for combining data items. 3498 Buf reductionbuf = buf.getReductionBuf(op); 3499 3500 // Do rounds of message passing and reduction. 3501 int skip = 1; 3502 for (; ; ) { 3503 int toRank = myRank + skip; 3504 int fromRank = myRank - skip; 3505 boolean toExists = 0 <= toRank && toRank < mySize; 3506 boolean fromExists = 0 <= fromRank && fromRank < mySize; 3507 if (toExists && fromExists) { 3508 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf); 3509 reductionbuf.copy(tempbuf); 3510 } else if (fromExists) { 3511 receive(fromRank, tag, reductionbuf); 3512 } else if (toExists) { 3513 send(toRank, tag, buf); 3514 } else { 3515 break; 3516 } 3517 skip <<= 1; 3518 } 3519 } 3520 3521 /** 3522 * Perform an exclusive scan on all processes in this communicator. A 3523 * message tag of 0 is used. All processes must call 3524 * <code>exclusiveScan()</code> with a buffer of the same length and the same 3525 * item data type, with the same binary operation (class {@linkplain 3526 * edu.rit.pj.reduction.Op Op}), and with the same initial data value. 3527 * <p> 3528 * Before calling <code>exclusiveScan()</code>, each process has a buffer filled 3529 * with data items. After <code>exclusiveScan()</code> returns, each data item 3530 * in the buffer of process rank <I>i</I> > 0 has been set to the 3531 * <B>reduction</B> of the corresponding data items in the buffers of 3532 * process ranks 0 through <I>i</I>-1. The reduction is calculated by this 3533 * formula: 3534 * <p> 3535 * <I>item</I><SUB>0</SUB> <I>op</I> 3536 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 3537 * <p> 3538 * where <I>op</I> is the binary operation passed in as an argument and 3539 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 3540 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 3541 * process ranks 0 through <I>i</I>-1. However, the order in which the data 3542 * items actually are combined is not specified. Therefore, the binary 3543 * operation must be such that the answer will be the same regardless of the 3544 * order in which the data items are combined; that is, the binary operation 3545 * must be commutative and associative. 3546 * <p> 3547 * In process 0, each data item in the buffer has been set to the initial 3548 * data value using the buffer's <code>fill()</code> method. 3549 * <p> 3550 * If the buffer's item data type is a primitive type, the <code>item</code> 3551 * must be an instance of the corresponding primitive wrapper class -- class 3552 * Integer for type <code>int</code>, class Double for type <code>double</code>, and 3553 * so on. If the <code>item</code> is null, the item data type's default initial 3554 * value is assigned to each element in the buffer. 3555 * <p> 3556 * If the buffer's item data type is a nonprimitive type, the <code>item</code> 3557 * must be an instance of the item class or a subclass thereof. The 3558 * <code>item</code> may be null. Note that since <code>item</code> is 3559 * <I>assigned</I> to every buffer element, every buffer element ends up 3560 * referring to the same <code>item</code>. 3561 * 3562 * @param buf Buffer of data items to be scanned. 3563 * @param op Binary operation. 3564 * @param item Initial data value. 3565 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3566 * <code>buf</code> is null or <code>op</code> 3567 * is null. 3568 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3569 * <code>buf</code> and <code>op</code> do not use the same item data type. 3570 * @throws java.io.IOException Thrown if an I/O error occurred. 3571 */ 3572 public void exclusiveScan(Buf buf, 3573 Op op, 3574 Object item) 3575 throws IOException { 3576 exclusiveScan(0, buf, op, item); 3577 } 3578 3579 /** 3580 * Perform an exclusive scan on all processes in this communicator using the 3581 * given message tag. All processes must call <code>exclusiveScan()</code> with 3582 * the same value for <code>tag</code>, with a buffer of the same length and the 3583 * same item data type, with the same binary operation (class {@linkplain 3584 * edu.rit.pj.reduction.Op Op}), and with the same initial data value. 3585 * <p> 3586 * Before calling <code>exclusiveScan()</code>, each process has a buffer filled 3587 * with data items. After <code>exclusiveScan()</code> returns, each data item 3588 * in the buffer of process rank <I>i</I> > 0 has been set to the 3589 * <B>reduction</B> of the corresponding data items in the buffers of 3590 * process ranks 0 through <I>i</I>-1. The reduction is calculated by this 3591 * formula: 3592 * <p> 3593 * <I>item</I><SUB>0</SUB> <I>op</I> 3594 * <I>item</I><SUB>1</SUB> <I>op</I> <I>item</I><SUB>2</SUB> <I>op</I> . . . 3595 * <p> 3596 * where <I>op</I> is the binary operation passed in as an argument and 3597 * <I>item</I><SUB>0</SUB>, <I>item</I><SUB>1</SUB>, 3598 * <I>item</I><SUB>2</SUB>, and so on are the data items in the buffers of 3599 * process ranks 0 through <I>i</I>-1. However, the order in which the data 3600 * items actually are combined is not specified. Therefore, the binary 3601 * operation must be such that the answer will be the same regardless of the 3602 * order in which the data items are combined; that is, the binary operation 3603 * must be commutative and associative. 3604 * <p> 3605 * In process 0, each data item in the buffer has been set to the initial 3606 * data value using the buffer's <code>fill()</code> method. 3607 * <p> 3608 * If the buffer's item data type is a primitive type, the <code>item</code> 3609 * must be an instance of the corresponding primitive wrapper class -- class 3610 * Integer for type <code>int</code>, class Double for type <code>double</code>, and 3611 * so on. If the <code>item</code> is null, the item data type's default initial 3612 * value is assigned to each element in the buffer. 3613 * <p> 3614 * If the buffer's item data type is a nonprimitive type, the <code>item</code> 3615 * must be an instance of the item class or a subclass thereof. The 3616 * <code>item</code> may be null. Note that since <code>item</code> is 3617 * <I>assigned</I> to every buffer element, every buffer element ends up 3618 * referring to the same <code>item</code>. 3619 * 3620 * @param tag Message tag. 3621 * @param buf Buffer of data items to be scanned. 3622 * @param op Binary operation. 3623 * @param item Initial data value. 3624 * @throws java.lang.NullPointerException (unchecked exception) Thrown if 3625 * <code>buf</code> is null or <code>op</code> 3626 * is null. 3627 * @throws java.lang.ClassCastException (unchecked exception) Thrown if 3628 * <code>buf</code> and <code>op</code> do not use the same item data type. 3629 * @throws java.io.IOException Thrown if an I/O error occurred. 3630 */ 3631 public void exclusiveScan(int tag, 3632 Buf buf, 3633 Op op, 3634 Object item) 3635 throws IOException { 3636 // An exclusive scan begins with each process sending its buffer to the 3637 // next higher process. Then process 0 fills its buffer with the initial 3638 // data value, while processes 1 and higher do an inclusive scan. 3639 3640 int toRank; 3641 int fromRank; 3642 boolean toExists; 3643 boolean fromExists; 3644 3645 // Process 0 does this. 3646 if (myRank == 0) { 3647 // Send buffer to next higher process. 3648 toRank = 1; 3649 toExists = toRank < mySize; 3650 if (toExists) { 3651 send(toRank, tag, buf); 3652 } 3653 3654 // Fill buffer with initial data value. 3655 buf.fill(item); 3656 } // Processes 1 and higher do this. 3657 else { 3658 // Get temporary buffer for holding incoming data items. 3659 Buf tempbuf = buf.getTemporaryBuf(); 3660 3661 // Get reduction buffer for combining data items. 3662 Buf reductionbuf = buf.getReductionBuf(op); 3663 3664 // Send buffer to next higher process. 3665 toRank = myRank + 1; 3666 fromRank = myRank - 1; 3667 toExists = 0 <= toRank && toRank < mySize; 3668 if (toExists) { 3669 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf); 3670 buf.copy(tempbuf); 3671 } else { 3672 receive(fromRank, tag, buf); 3673 } 3674 3675 // Do rounds of message passing and reduction. 3676 int skip = 1; 3677 for (; ; ) { 3678 toRank = myRank + skip; 3679 fromRank = myRank - skip; 3680 toExists = 1 <= toRank && toRank < mySize; 3681 fromExists = 1 <= fromRank && fromRank < mySize; 3682 if (toExists && fromExists) { 3683 sendReceive(toRank, tag, buf, fromRank, tag, tempbuf); 3684 reductionbuf.copy(tempbuf); 3685 } else if (fromExists) { 3686 receive(fromRank, tag, reductionbuf); 3687 } else if (toExists) { 3688 send(toRank, tag, buf); 3689 } else { 3690 break; 3691 } 3692 skip <<= 1; 3693 } 3694 } 3695 } 3696 3697 /** 3698 * Cause all processes in this communicator to wait at a barrier. The 3699 * barrier uses a message tag of 0. All processes must call 3700 * <code>barrier()</code>. The calling thread blocks until every process has 3701 * called <code>barrier()</code>, then the calling thread unblocks and returns 3702 * from the <code>barrier()</code> call. 3703 * 3704 * @throws java.io.IOException Thrown if an I/O error occurred. 3705 */ 3706 public void barrier() 3707 throws IOException { 3708 barrier(0); 3709 } 3710 3711 /** 3712 * Cause all processes in this communicator to wait at a barrier, using the 3713 * given message tag. All processes must call <code>barrier()</code> with the 3714 * same tag. The calling thread blocks until every process has called 3715 * <code>barrier()</code>, then the calling thread unblocks and returns from the 3716 * <code>barrier()</code> call. 3717 * 3718 * @param tag Message tag. 3719 * @throws java.io.IOException Thrown if an I/O error occurred. 3720 */ 3721 public void barrier(int tag) 3722 throws IOException { 3723 // A barrier is done as an all-reduce of an empty buffer. 3724 allReduce(tag, IntegerBuf.emptyBuffer(), IntegerOp.SUM); 3725 } 3726 3727 /** 3728 * Returns a string version of this communicator. The string includes the 3729 * communicator's size, the current process's rank, and the host and port of 3730 * each backend process. 3731 * 3732 * @return String version. 3733 */ 3734 public String toString() { 3735 StringBuilder buf = new StringBuilder(); 3736 buf.append("Comm(size="); 3737 buf.append(mySize); 3738 buf.append(",rank="); 3739 buf.append(myRank); 3740 buf.append(",backend"); 3741 for (int i = 0; i < mySize; ++i) { 3742 if (i > 0) { 3743 buf.append(','); 3744 } 3745 buf.append('['); 3746 buf.append(i); 3747 buf.append("]="); 3748 buf.append(myAddressForRank[i]); 3749 } 3750 buf.append(')'); 3751 return buf.toString(); 3752 } 3753 3754 /** 3755 * Dump the state of this communicator on the given print stream. For 3756 * debugging. 3757 * 3758 * @param out Print stream. 3759 * @param prefix String to print at the beginning of each line. 3760 */ 3761 public void dump(PrintStream out, 3762 String prefix) { 3763 out.println(); 3764 out.println(prefix + getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(this))); 3765 out.println(prefix + "mySize = " + mySize); 3766 out.println(prefix + "myRank = " + myRank); 3767 out.println(prefix + "myHost = " + myHost); 3768 out.println(prefix + "mySizePowerOf2 = " + mySizePowerOf2); 3769 out.println(prefix + "myChannelGroup = " + myChannelGroup); 3770 out.println(prefix + "myAddressForRank:"); 3771 for (int i = 0; i < myAddressForRank.length; ++i) { 3772 out.println(prefix + "\t[" + i + "] " + myAddressForRank[i]); 3773 } 3774 out.println(prefix + "myChannelForRank:"); 3775 for (int i = 0; i < myChannelForRank.length; ++i) { 3776 out.println(prefix + "\t[" + i + "] " + myChannelForRank[i]); 3777 } 3778 out.println(prefix + "myBroadcastTree:"); 3779 for (int i = 0; i < myBroadcastTree.length; ++i) { 3780 out.print(prefix + "\t[" + i + "]"); 3781 int[] tree = myBroadcastTree[i]; 3782 if (tree == null) { 3783 out.print(" null"); 3784 } else { 3785 for (int j = 0; j < tree.length; ++j) { 3786 out.print(" " + tree[j]); 3787 } 3788 } 3789 out.println(); 3790 } 3791 out.println(); 3792 myChannelGroup.dump(out, prefix); 3793 } 3794 3795 // Hidden operations. 3796 3797 /** 3798 * Notify that another process connected a channel to this process. 3799 * 3800 * @param theChannel Channel. 3801 * @throws IOException Thrown if an I/O error occurred. 3802 */ 3803 private synchronized void doFarEndConnected(Channel theChannel) 3804 throws IOException { 3805 // Record channel and rank. 3806 myChannelForRank[getFarRank(theChannel)] = theChannel; 3807 3808 // Notify any threads waiting in getChannel(). 3809 notifyAll(); 3810 } 3811 3812 /** 3813 * Ensure that a channel for communicating with the process at the given 3814 * rank is or will be set up. 3815 * 3816 * @param farrank Rank of far end process. 3817 * @throws IOException Thrown if an I/O error occurred. 3818 */ 3819 private synchronized void ensureChannel(int farrank) 3820 throws IOException { 3821 // Get channel from channel array. 3822 Channel channel = myChannelForRank[farrank]; 3823 3824 // If the channel does not exist: 3825 if (channel == null) { 3826 // If this is the lower-ranked process, set up the connection. 3827 if (myRank < farrank) { 3828 myChannelForRank[farrank] 3829 = myChannelGroup.connect(myAddressForRank[farrank]); 3830 } 3831 3832 // If this is the higher-ranked process, the lower-ranked process 3833 // will set up the connection. 3834 } 3835 } 3836 3837 /** 3838 * Get the channel for communicating with the process at the given rank. 3839 * 3840 * @param farrank Rank of far end process. 3841 * @return Channel. 3842 * @throws IOException Thrown if an I/O error occurred. 3843 */ 3844 private synchronized Channel getChannel(int farrank) 3845 throws IOException { 3846 // Get channel from channel array. 3847 Channel channel = myChannelForRank[farrank]; 3848 3849 // If the channel does not exist: 3850 if (channel == null) { 3851 // If this is the lower-ranked process, set up the connection. 3852 if (myRank < farrank) { 3853 channel = myChannelGroup.connect(myAddressForRank[farrank]); 3854 myChannelForRank[farrank] = channel; 3855 } // If this is the higher-ranked process, wait for the lower-ranked 3856 // process to set up the connection. 3857 else { 3858 try { 3859 while (channel == null) { 3860 wait(); 3861 channel = myChannelForRank[farrank]; 3862 } 3863 } catch (InterruptedException exc) { 3864 IOException exc2 = new InterruptedIOException(); 3865 exc2.initCause(exc); 3866 throw exc2; 3867 } 3868 } 3869 } 3870 3871 return channel; 3872 } 3873 3874 /** 3875 * Get the rank of the process at the far end of the given channel. 3876 * 3877 * @param channel Channel. 3878 * @return Far end process rank. 3879 */ 3880 static int getFarRank(Channel channel) { 3881 return channel.farEndChannelGroupId(); 3882 } 3883 3884 /** 3885 * Get an array of process ranks in the broadcast tree for the given root. 3886 * The broadcast tree is cached in the field myBroadcastTree for later use. 3887 * 3888 * @param root Root process's rank. 3889 * @return Broadcast tree. 3890 */ 3891 private synchronized int[] getBroadcastTree(int root) { 3892 if (myBroadcastTree == null) { 3893 myBroadcastTree = new int[mySize][]; 3894 } 3895 int[] broadcasttree = myBroadcastTree[root]; 3896 if (broadcasttree == null) { 3897 broadcasttree = CommPattern.broadcastPattern(mySize, myRank, root); 3898 myBroadcastTree[root] = broadcasttree; 3899 } 3900 return broadcasttree; 3901 } 3902 3903 }