1 //****************************************************************************** 2 // 3 // File: ChannelGroup.java 4 // Package: edu.rit.mp 5 // Unit: Class edu.rit.mp.ChannelGroup 6 // 7 // This Java source file is copyright (C) 2009 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.mp; 41 42 import java.io.IOException; 43 import java.io.PrintStream; 44 import java.net.InetSocketAddress; 45 import java.net.Socket; 46 import java.nio.ByteBuffer; 47 import java.nio.channels.ClosedChannelException; 48 import java.nio.channels.ServerSocketChannel; 49 import java.nio.channels.SocketChannel; 50 import java.util.LinkedList; 51 import java.util.List; 52 53 import edu.rit.util.Logger; 54 import edu.rit.util.PrintStreamLogger; 55 import edu.rit.util.Range; 56 import edu.rit.util.Timer; 57 import edu.rit.util.TimerTask; 58 import edu.rit.util.TimerThread; 59 60 /** 61 * Class ChannelGroup provides a group of {@linkplain Channel}s for sending and 62 * receiving messages in the Message Protocol (MP). 63 * <P> 64 * <B>Creating Channels</B> 65 * <P> 66 * A channel group can be used to create channels in two ways: by accepting a 67 * connection request from another process, and by requesting a connection to 68 * another process. 69 * <P> 70 * The channel group can be configured to listen to a certain host and port for 71 * connection requests. Configure the host and port by by calling the 72 * <code>listen()</code> method. To start accepting connection requests, call the 73 * <code>startListening()</code> method. 74 * <P> 75 * If desired, an application can receive notification of newly created channels 76 * by providing a {@linkplain ConnectListener} object to the channel group's 77 * <code>setConnectListener()</code> method. Specify a connect listener object 78 * before calling the <code>startListening()</code> method, otherwise the 79 * application may not receive some notifications. 80 * <P> 81 * When a connection request arrives, the channel group sets up a new channel 82 * object for communicating over the connection. If a connect listener has been 83 * registered, the channel group then passes the channel to the connect 84 * listener's <code>farEndConnected()</code> method, which does whatever the 85 * application needs to record the new channel's presence. The channel does not 86 * start sending and receiving messages until after the connect listener's 87 * <code>farEndConnected()</code> method (if any) has returned. 88 * <P> 89 * The application can also call the channel group's <code>connect()</code> method 90 * to request a connection to another host and port. The channel group sets up a 91 * new channel object for communicating over the connection. If a connect 92 * listener has been registered, the channel group then passes the channel to 93 * the connect listener's <code>nearEndConnected()</code> method, which does 94 * whatever the application needs to record the new channel's presence. The 95 * channel does not start sending and receiving messages until after the connect 96 * listener's <code>nearEndConnected()</code> method (if any) has returned. The 97 * <code>connect()</code> method also returns the new channel. 98 * <P> 99 * Once a connection has been set up and a channel object has been created on 100 * each side, the applications can use their respective channels to send and 101 * receive messages. 102 * <P> 103 * If a channel group does not need to accept incoming connection requests, the 104 * channel group need not listen to any host and port. The channel group can 105 * still be used to make outgoing connection requests. 106 * <P> 107 * <B>Channel Group IDs</B> 108 * <P> 109 * Each channel group has a channel group ID. The channel group ID is an 110 * integer, initially 0, that can be changed by the <code>setChannelGroupId()</code> 111 * method. The channel group attaches no significance to the channel group ID; 112 * it is provided for the use of the application using the channel group. 113 * <P> 114 * You can query a channel group object to determine its channel group ID. You 115 * can also query a channel object to determine the ID of the channel group at 116 * the near end of the channel and the ID of the channel group at the far end of 117 * the channel. 118 * <P> 119 * <B>Sending Messages</B> 120 * <P> 121 * To send a message, the application creates a message buffer (class 122 * {@linkplain Buf}) specifying where to get the items to be sent. The 123 * application calls the channel group's <code>send()</code> method, passing the 124 * channel on which to send the message, the message buffer, and the message 125 * tag. (If the message tag is not specified, it defaults to 0.) The channel 126 * group extracts the items from the message buffer and sends a message over the 127 * channel's connection. When the <code>send()</code> method returns, the message 128 * has been fully sent, but the message may not have been fully received yet. 129 * <P> 130 * The far end application must receive the message from the channel at the 131 * other end of the connection. If no application is receiving the message, the 132 * <code>send()</code> method may block (because of flow control). This in turn may 133 * lead to a deadlock. 134 * <P> 135 * At most one outgoing message at a time may be in progress on a channel. If a 136 * second thread tries to send a message on a channel while a first thread is 137 * still sending a message on that channel, the second thread will block until 138 * the first thread has finished sending the message. 139 * <P> 140 * <B>Receiving Messages</B> 141 * <P> 142 * To receive a message, the application creates a message buffer (class 143 * {@linkplain Buf}) specifying where to put the items to be received. The 144 * application calls the channel group's <code>receive()</code> method, passing the 145 * channel from which to receive the message, the desired message tag, and the 146 * message buffer. The application can specify "any channel" instead of a 147 * specific channel. The application can specify a range of tags or "any tag" 148 * instead of a specific tag. Any number of threads can have receive requests 149 * pending at the same time. 150 * <P> 151 * When the channel group receives a message from a channel, the channel group 152 * tries to match the message with the pending receive requests. A message 153 * matches a receive request if (a) the message's channel is the same as the 154 * receive request's channel, or the receive request specified "any channel;" 155 * and (b) the message's tag is the same as the receive request's tag, or the 156 * message's tag falls within the receive request's range of tags, or the 157 * receive request specified "any tag;" and (c) the message's item type is the 158 * same as the receive request's item type (as given by the message buffer). The 159 * pending receive requests are maintained in FIFO order. If no receive request 160 * matches the message, the channel group does not read the message until such 161 * time as a matching receive request occurs. If more than one receive request 162 * matches the message, the channel group chooses the first matching receive 163 * request. 164 * <P> 165 * Once the channel group has matched the incoming message with the receive 166 * request, the channel group reads the items from the message and stores them 167 * into the message buffer. If there are fewer items in the message than the 168 * length of the message buffer, the extra items at the end of the message 169 * buffer are not set to anything. If there are more items in the message than 170 * the length of the message buffer, the extra items are read from the message 171 * and discarded. Once the message has been read, the <code>receive()</code> method 172 * returns a {@linkplain Status} object reporting the channel on which the 173 * message arrived, the message tag, and the actual number of items in the 174 * message (which may or may not be the same as the number of items in the 175 * message buffer). 176 * <P> 177 * If the receive requests do not match properly with the incoming messages, a 178 * deadlock may occur. 179 * <P> 180 * <B>Sending and Receiving Within the Same Process</B> 181 * <P> 182 * Each channel group has a "loopback" channel that is used to send messages 183 * within the same process. To obtain the loopback channel, call the 184 * <code>loopbackChannel()</code> method. Then one thread can send messages using 185 * the loopback channel while a different thread receives messages using the 186 * loopback channel. If the same thread both sends and receives using the 187 * loopback channel, a deadlock may occur. 188 * <P> 189 * The loopback channel uses the <code>copy()</code> method of class {@linkplain 190 * Buf} to transfer data items directly from the source buffer to the 191 * destination buffer. The loopback channel does not do any network 192 * communication. 193 * <P> 194 * <B>Non-Blocking Send and Receive Operations</B> 195 * <P> 196 * The <code>send()</code> method described so far does a <B>blocking send</B> 197 * operation; the <code>send()</code> method does not return until the message has 198 * been fully sent. There is also a <B>non-blocking send</B> operation, 199 * <code>sendNoWait()</code>, which includes an {@linkplain IORequest} argument. The 200 * <code>sendNoWait()</code> method initiates the send operation and returns 201 * immediately. This allows the caller to continue processing while the channel 202 * group sends the message in a separate thread. To wait for the message to be 203 * fully sent, the caller must call the IORequest object's 204 * <code>waitForFinish()</code> method. 205 * <P> 206 * Likewise, the <code>receive()</code> method described so far does a <B>blocking 207 * receive</B> operation; the <code>receive()</code> method does not return until 208 * the message has been fully received. There is also a <B>non-blocking 209 * receive</B> operation, <code>receiveNoWait()</code>, which includes an 210 * {@linkplain IORequest} argument. The <code>receiveNoWait()</code> method 211 * initiates the receive operation and returns immediately. This allows the 212 * caller to continue processing while the channel group receives the message in 213 * a separate thread. To wait for the message to be fully received, the caller 214 * must call the IORequest object's <code>waitForFinish()</code> method, which 215 * returns a {@linkplain Status} object giving the results of the receive 216 * operation. 217 * 218 * @author Alan Kaminsky 219 * @version 11-Mar-2009 220 */ 221 public class ChannelGroup { 222 223 // Hidden data members. 224 // Channel group ID. 225 int myChannelGroupId; 226 227 // Server socket channel for accepting incoming connections, or null if not 228 // accepting incoming connections. 229 ServerSocketChannel myServerSocketChannel; 230 231 // I/O request list for matching incoming messages to receive I/O requests. 232 IORequestList myIORequestList; 233 234 // Alternate class loader for use when receiving objects. 235 ClassLoader myClassLoader; 236 237 // Loopback channel. 238 LoopbackChannel myLoopbackChannel; 239 240 // List of open channels. 241 List<Channel> myChannelList; 242 243 // Accepting thread, or null if not accepting. 244 AcceptThread myAcceptThread; 245 246 // Registered connect listener, or null if none. 247 ConnectListener myConnectListener; 248 249 // For logging error messages. 250 Logger myLogger; 251 252 // For timeouts during channel setup. 253 TimerThread myTimerThread; 254 255 // Hidden helper classes. 256 /** 257 * Class AcceptThread provides a thread that accepts incoming connections. 258 * 259 * @author Alan Kaminsky 260 * @version 11-Mar-2009 261 */ 262 private class AcceptThread 263 extends Thread { 264 265 public AcceptThread() { 266 setDaemon(true); 267 start(); 268 } 269 270 public void run() { 271 acceptloop: 272 for (;;) { 273 // Wait for an incoming connection. 274 SocketChannel connection = null; 275 try { 276 connection = myServerSocketChannel.accept(); 277 } catch (ClosedChannelException exc) { 278 myLogger.log("ChannelGroup: Channel closed", 279 exc); 280 break acceptloop; 281 } catch (IOException exc) { 282 myLogger.log("ChannelGroup: I/O error while accepting connection", 283 exc); 284 break acceptloop; 285 } 286 287 // Set up channel over connection. 288 if (connection != null) { 289 try { 290 farEndConnect(connection); 291 } catch (IOException exc) { 292 // Clear thread's interrupted status, otherwise accept() 293 // above will throw an exception. 294 Thread.interrupted(); 295 myLogger.log("ChannelGroup: I/O error while setting up channel", 296 exc); 297 try { 298 connection.close(); 299 } catch (IOException ignored) { 300 } 301 } 302 } 303 } 304 } 305 } 306 307 // Exported constructors. 308 /** 309 * Construct a new channel group. The channel group ID is initially 0. The 310 * channel group will not listen for connection requests. To listen for 311 * connection requests at a later time, call the <code>listen()</code> method 312 * followed by the <code>startListening</code> method. 313 * <P> 314 * The channel group will log error messages on the standard error. 315 */ 316 public ChannelGroup() { 317 this(new PrintStreamLogger()); 318 } 319 320 /** 321 * Construct a new channel group. The channel group ID is initially 0. The 322 * channel group will listen for connection requests on the given host and 323 * port. To start actively listening, call the <code>startListening()</code> 324 * method. 325 * <P> 326 * The channel group will log error messages on the standard error. 327 * 328 * @param theListenAddress Host and port at which to listen. 329 * @exception NullPointerException (unchecked exception) Thrown if 330 * <code>theListenAddress</code> is null. 331 * @exception IOException Thrown if an I/O error occurred. 332 * @throws java.io.IOException if any. 333 */ 334 public ChannelGroup(InetSocketAddress theListenAddress) 335 throws IOException { 336 this(theListenAddress, new PrintStreamLogger()); 337 } 338 339 /** 340 * Construct a new channel group. The channel group ID is initially 0. The 341 * channel group will listen for connection requests using the given server 342 * socket channel. The server socket channel must be bound to a host and 343 * port. To start actively listening, call the <code>startListening()</code> 344 * method. 345 * <P> 346 * The channel group will log error messages on the standard error. 347 * 348 * @param theServerSocketChannel Server socket channel. 349 * @exception NullPointerException (unchecked exception) Thrown if 350 * <code>theServerSocketChannel</code> is null. 351 * @exception IOException Thrown if an I/O error occurred. Thrown if 352 * <code>theServerSocketChannel</code> is not bound. 353 * @throws java.io.IOException if any. 354 */ 355 public ChannelGroup(ServerSocketChannel theServerSocketChannel) 356 throws IOException { 357 this(theServerSocketChannel, new PrintStreamLogger()); 358 } 359 360 /** 361 * Construct a new channel group. The channel group ID is initially 0. The 362 * channel group will not listen for connection requests. To listen for 363 * connection requests at a later time, call the <code>listen()</code> method 364 * followed by the <code>startListening</code> method. 365 * <P> 366 * The channel group will log error messages using the given logger. 367 * 368 * @param theLogger Logger for error messages. 369 * @exception NullPointerException (unchecked exception) Thrown if 370 * <code>theLogger</code> is null. 371 */ 372 public ChannelGroup(Logger theLogger) { 373 if (theLogger == null) { 374 throw new NullPointerException("ChannelGroup(): theLogger is null"); 375 } 376 myIORequestList = new IORequestList(); 377 myLoopbackChannel = new LoopbackChannel(this); 378 myChannelList = new LinkedList<>(); 379 myChannelList.add(myLoopbackChannel); 380 myLogger = theLogger; 381 myTimerThread = new TimerThread(); 382 myTimerThread.setDaemon(true); 383 myTimerThread.start(); 384 } 385 386 /** 387 * Construct a new channel group. The channel group ID is initially 0. The 388 * channel group will listen for connection requests on the given host and 389 * port. To start actively listening, call the <code>startListening()</code> 390 * method. 391 * <P> 392 * The channel group will log error messages using the given logger. 393 * 394 * @param theListenAddress Host and port at which to listen. 395 * @param theLogger Logger for error messages. 396 * @exception NullPointerException (unchecked exception) Thrown if 397 * <code>theListenAddress</code> is null. Thrown if <code>theLogger</code> is null. 398 * @exception IOException Thrown if an I/O error occurred. 399 * @throws java.io.IOException if any. 400 */ 401 public ChannelGroup(InetSocketAddress theListenAddress, 402 Logger theLogger) 403 throws IOException { 404 this(theLogger); 405 listen(theListenAddress); 406 } 407 408 /** 409 * Construct a new channel group. The channel group ID is initially 0. The 410 * channel group will listen for connection requests using the given server 411 * socket channel. The server socket channel must be bound to a host and 412 * port. To start actively listening, call the <code>startListening()</code> 413 * method. 414 * <P> 415 * The channel group will log error messages using the given logger. 416 * 417 * @param theServerSocketChannel Server socket channel. 418 * @param theLogger Logger for error messages. 419 * @exception NullPointerException (unchecked exception) Thrown if 420 * <code>theServerSocketChannel</code> is null. Thrown if <code>theLogger</code> is 421 * null. 422 * @exception IOException Thrown if an I/O error occurred. Thrown if 423 * <code>theServerSocketChannel</code> is not bound. 424 * @throws java.io.IOException if any. 425 */ 426 public ChannelGroup(ServerSocketChannel theServerSocketChannel, 427 Logger theLogger) 428 throws IOException { 429 this(theLogger); 430 listen(theServerSocketChannel); 431 } 432 433 // Exported operations. 434 /** 435 * Set this channel group's channel group ID. 436 * 437 * @param theChannelGroupId Channel group ID. 438 */ 439 public void setChannelGroupId(int theChannelGroupId) { 440 myChannelGroupId = theChannelGroupId; 441 } 442 443 /** 444 * Obtain this channel group's channel group ID. 445 * 446 * @return Channel group ID. 447 */ 448 public int getChannelGroupId() { 449 return myChannelGroupId; 450 } 451 452 /** 453 * Obtain this channel group's listen address. This is the near end host and 454 * port to which this channel group is listening for connection requests. If 455 * this channel group is not listening for connection requests, null is 456 * returned. 457 * 458 * @return Near end address, or null. 459 */ 460 public synchronized InetSocketAddress listenAddress() { 461 return myServerSocketChannel == null 462 ? null 463 : (InetSocketAddress) myServerSocketChannel.socket().getLocalSocketAddress(); 464 } 465 466 /** 467 * Listen for connection requests on the given host and port. To start 468 * actively listening, call the <code>startListening()</code> method. 469 * 470 * @param theListenAddress Host and port at which to listen. 471 * @exception NullPointerException (unchecked exception) Thrown if 472 * <code>theListenAddress</code> is null. 473 * @exception IllegalStateException (unchecked exception) Thrown if 474 * listening has already started. 475 * @exception IOException Thrown if an I/O error occurred. 476 * @throws java.io.IOException if any. 477 */ 478 public synchronized void listen(InetSocketAddress theListenAddress) 479 throws IOException { 480 if (theListenAddress == null) { 481 throw new NullPointerException("ChannelGroup.listen(): theListenAddress is null"); 482 } 483 ServerSocketChannel channel = ServerSocketChannel.open(); 484 channel.socket().bind(theListenAddress); 485 listen(channel); 486 } 487 488 /** 489 * Listen for connection requests using the given server socket channel. The 490 * server socket channel must be bound to a host and port. To start actively 491 * listening, call the <code>startListening()</code> method. 492 * 493 * @param theServerSocketChannel Server socket channel. 494 * @exception NullPointerException (unchecked exception) Thrown if 495 * <code>theServerSocketChannel</code> is null. 496 * @exception IllegalStateException (unchecked exception) Thrown if 497 * listening has already started. 498 * @exception IOException Thrown if an I/O error occurred. Thrown if 499 * <code>theServerSocketChannel</code> is not bound. 500 * @throws java.io.IOException if any. 501 */ 502 public synchronized void listen(ServerSocketChannel theServerSocketChannel) 503 throws IOException { 504 if (theServerSocketChannel == null) { 505 throw new NullPointerException("ChannelGroup.listen(): theServerSocketChannel is null"); 506 } 507 if (!theServerSocketChannel.socket().isBound()) { 508 throw new IOException("ChannelGroup.listen(): theServerSocketChannel is not bound"); 509 } 510 if (myAcceptThread != null) { 511 throw new IllegalStateException("ChannelGroup.listen(): Listening has already started"); 512 } 513 if (myIORequestList == null) { 514 throw new IOException("ChannelGroup.listen(): Channel group closed"); 515 } 516 517 myServerSocketChannel = theServerSocketChannel; 518 } 519 520 /** 521 * Register the given connect listener with this channel group. Thereafter, 522 * this channel group will report each connected channel by calling 523 * <code>theConnectListener</code>'s <code>nearEndConnected()</code> method (if the 524 * connection request originated in this process) or 525 * <code>farEndConnected()</code> method (if the connection request originated 526 * in another process). It is assumed that these methods will not do any 527 * lengthy processing and will not block the calling thread. 528 * <P> 529 * At most one connect listener may be registered. If a connect listener is 530 * already registered, it is replaced with the given connect listener. If 531 * <code>theConnectListener</code> is null, any registered connect listener is 532 * discarded, and this channel group will not report connected channels. 533 * <P> 534 * Call the <code>setConnectListener()</code> method before calling the 535 * <code>startListening()</code> method, otherwise the application may not 536 * receive some connection notifications. 537 * 538 * @param theConnectListener Connect listener, or null. 539 */ 540 public synchronized void setConnectListener(ConnectListener theConnectListener) { 541 myConnectListener = theConnectListener; 542 } 543 544 /** 545 * Start actively listening for connection requests. 546 * 547 * @exception IllegalStateException (unchecked exception) Thrown if a host 548 * and port or a server socket channel upon which to listen has not been 549 * specified. Thrown if listening has already started. 550 */ 551 public synchronized void startListening() { 552 if (myServerSocketChannel == null) { 553 throw new IllegalStateException("ChannelGroup.startListening(): No server socket channel"); 554 } 555 if (myAcceptThread != null) { 556 throw new IllegalStateException("ChannelGroup.listen(): Listening has already started"); 557 } 558 559 myAcceptThread = new AcceptThread(); 560 } 561 562 /** 563 * Create a new channel connected to the given far end host and port. In the 564 * far end computer, there must be a channel group listening to the given 565 * host and port. Once the connection is set up, if a connect listener has 566 * been registered, the channel group calls the connect listener's 567 * <code>nearEndConnected()</code> method to report the new channel. 568 * 569 * @param theFarEndAddress Host and port of far end channel group. 570 * @return New channel. 571 * @exception IOException Thrown if an I/O error occurred. 572 * @throws java.io.IOException if any. 573 */ 574 public Channel connect(InetSocketAddress theFarEndAddress) 575 throws IOException { 576 synchronized (this) { 577 if (myIORequestList == null) { 578 throw new IOException("ChannelGroup.connect(): Channel group closed"); 579 } 580 } 581 582 SocketChannel connection = null; 583 try { 584 connection = SocketChannel.open(theFarEndAddress); 585 return nearEndConnect(connection); 586 } catch (IOException exc) { 587 // Clear thread's interrupted status. 588 Thread.interrupted(); 589 if (connection != null) { 590 try { 591 connection.close(); 592 } catch (IOException ignored) { 593 } 594 } 595 throw exc; 596 } 597 } 598 599 /** 600 * Obtain this channel group's loopback channel. If this channel group is 601 * closed, null is returned. 602 * 603 * @return Loopback channel, or null. 604 */ 605 public synchronized Channel loopbackChannel() { 606 return myLoopbackChannel; 607 } 608 609 /** 610 * Send a message to the given channel. The message uses a tag of 0. The 611 * message items come from the given item source buffer. 612 * <P> 613 * The <code>send()</code> method does not return until the message has been 614 * fully sent. (The message may not have been fully received yet.) 615 * <P> 616 * The <code>send()</code> method assumes that <code>theChannel</code> was created 617 * by this channel group. If not, the <code>send()</code> method's behavior is 618 * unspecified. 619 * 620 * @param theChannel Channel. 621 * @param theSrc Item source buffer. 622 * @exception NullPointerException (unchecked exception) Thrown if 623 * <code>theChannel</code> is null or 624 * <code>theSrc</code> is null. 625 * @exception IOException Thrown if an I/O error occurred. 626 * @throws java.io.IOException if any. 627 */ 628 public void send(Channel theChannel, 629 Buf theSrc) 630 throws IOException { 631 IORequest req = new IORequest(); 632 sendNoWait(theChannel, 0, theSrc, req); 633 req.waitForFinish(); 634 } 635 636 /** 637 * Send a message to the given channel with the given tag. The message items 638 * come from the given item source buffer. 639 * <P> 640 * The <code>send()</code> method does not return until the message has been 641 * fully sent. (The message may not have been fully received yet.) 642 * <P> 643 * The <code>send()</code> method assumes that <code>theChannel</code> was created 644 * by this channel group. If not, the <code>send()</code> method's behavior is 645 * unspecified. 646 * 647 * @param theChannel Channel. 648 * @param theTag Message tag. 649 * @param theSrc Item source buffer. 650 * @exception NullPointerException (unchecked exception) Thrown if 651 * <code>theChannel</code> is null or 652 * <code>theSrc</code> is null. 653 * @exception IOException Thrown if an I/O error occurred. 654 * @throws java.io.IOException if any. 655 */ 656 public void send(Channel theChannel, 657 int theTag, 658 Buf theSrc) 659 throws IOException { 660 IORequest req = new IORequest(); 661 sendNoWait(theChannel, theTag, theSrc, req); 662 req.waitForFinish(); 663 } 664 665 /** 666 * Send (non-blocking) a message to the given channel. The message uses a 667 * tag of 0. The message items come from the given item source buffer. 668 * <code>theIORequest</code> is the IORequest object to be associated with the 669 * send operation. 670 * <P> 671 * The <code>sendNoWait()</code> method returns immediately. To wait for the 672 * message to be fully sent, call <code>theIORequest.waitForFinish()</code>. 673 * <P> 674 * The <code>sendNoWait()</code> method assumes that <code>theChannel</code> was 675 * created by this channel group. If not, the <code>sendNoWait()</code> method's 676 * behavior is unspecified. 677 * 678 * @param theChannel Channel. 679 * @param theSrc Item source buffer. 680 * @param theIORequest IORequest object. 681 * @exception NullPointerException (unchecked exception) Thrown if 682 * <code>theChannel</code> is null, 683 * <code>theSrc</code> is null, or <code>theIORequest</code> is null. 684 * @exception IOException Thrown if an I/O error occurred. 685 * @throws java.io.IOException if any. 686 */ 687 public void sendNoWait(Channel theChannel, 688 Buf theSrc, 689 IORequest theIORequest) 690 throws IOException { 691 sendNoWait(theChannel, 0, theSrc, theIORequest); 692 } 693 694 /** 695 * Send (non-blocking) a message to the given channel with the given tag. 696 * The message items come from the given item source buffer. 697 * <code>theIORequest</code> is the IORequest object to be associated with the 698 * send operation. 699 * <P> 700 * The <code>sendNoWait()</code> method returns immediately. To wait for the 701 * message to be fully sent, call <code>theIORequest.waitForFinish()</code>. 702 * <P> 703 * The <code>sendNoWait()</code> method assumes that <code>theChannel</code> was 704 * created by this channel group. If not, the <code>sendNoWait()</code> method's 705 * behavior is unspecified. 706 * 707 * @param theChannel Channel. 708 * @param theTag Message tag. 709 * @param theSrc Item source buffer. 710 * @param theIORequest IORequest object. 711 * @exception NullPointerException (unchecked exception) Thrown if 712 * <code>theChannel</code> is null, 713 * <code>theSrc</code> is null, or <code>theIORequest</code> is null. 714 * @exception IOException Thrown if an I/O error occurred. 715 * @throws java.io.IOException if any. 716 */ 717 public void sendNoWait(Channel theChannel, 718 int theTag, 719 Buf theSrc, 720 IORequest theIORequest) 721 throws IOException { 722 // Note: This method is not synchronized. Synchronization happens inside 723 // theChannel.send(). 724 725 // Verify preconditions. 726 if (myIORequestList == null) { 727 throw new IOException("ChannelGroup.sendNoWait(): Channel group closed"); 728 } 729 if (theSrc == null) { 730 throw new NullPointerException("ChannelGroup.sendNoWait(): Source buffer is null"); 731 } 732 733 theIORequest.initialize(theChannel, theTag, theTag, theSrc); 734 theChannel.send(theIORequest); 735 } 736 737 /** 738 * Receive a message from the given channel. If <code>theChannel</code> is null, 739 * a message will be received from any channel in this channel group. The 740 * message must have a tag of 0. The message items are stored in the given 741 * item destination buffer. 742 * <P> 743 * The <code>receive()</code> method does not return until the message has been 744 * fully received. 745 * <P> 746 * The <code>receive()</code> method assumes that <code>theChannel</code> was 747 * created by this channel group. If not, the <code>receive()</code> method's 748 * behavior is unspecified. 749 * 750 * @param theChannel Channel, or null to receive from any channel. 751 * @param theDst Item destination buffer. 752 * @return Status object giving the outcome of the message reception. 753 * @exception NullPointerException (unchecked exception) Thrown if 754 * <code>theDst</code> is null. 755 * @exception IOException Thrown if an I/O error occurred. 756 * @throws java.io.IOException if any. 757 */ 758 public Status receive(Channel theChannel, 759 Buf theDst) 760 throws IOException { 761 IORequest req = new IORequest(); 762 receiveNoWait(theChannel, 0, 0, theDst, req); 763 return req.waitForFinish(); 764 } 765 766 /** 767 * Receive a message from the given channel with the given tag. If 768 * <code>theChannel</code> is null, a message will be received from any channel 769 * in this channel group. The message items are stored in the given item 770 * destination buffer. 771 * <P> 772 * The <code>receive()</code> method does not return until the message has been 773 * fully received. 774 * <P> 775 * The <code>receive()</code> method assumes that <code>theChannel</code> was 776 * created by this channel group. If not, the <code>receive()</code> method's 777 * behavior is unspecified. 778 * 779 * @param theChannel Channel, or null to receive from any channel. 780 * @param theTag Message tag. 781 * @param theDst Item destination buffer. 782 * @return Status object giving the outcome of the message reception. 783 * @exception NullPointerException (unchecked exception) Thrown if 784 * <code>theDst</code> is null. 785 * @exception IOException Thrown if an I/O error occurred. 786 * @throws java.io.IOException if any. 787 */ 788 public Status receive(Channel theChannel, 789 int theTag, 790 Buf theDst) 791 throws IOException { 792 IORequest req = new IORequest(); 793 receiveNoWait(theChannel, theTag, theTag, theDst, req); 794 return req.waitForFinish(); 795 } 796 797 /** 798 * Receive a message from the given channel with the given range of tags. If 799 * <code>theChannel</code> is null, a message will be received from any channel 800 * in this channel group. If <code>theTagRange</code> is null, a message will be 801 * received with any tag. The message items are stored in the given item 802 * destination buffer. 803 * <P> 804 * The <code>receive()</code> method does not return until the message has been 805 * fully received. 806 * <P> 807 * The <code>receive()</code> method assumes that <code>theChannel</code> was 808 * created by this channel group. If not, the <code>receive()</code> method's 809 * behavior is unspecified. 810 * 811 * @param theChannel Channel, or null to receive from any channel. 812 * @param theTagRange Message tag range, or null to receive any tag. 813 * @param theDst Item destination buffer. 814 * @return Status object giving the outcome of the message reception. 815 * @exception NullPointerException (unchecked exception) Thrown if 816 * <code>theDst</code> is null. 817 * @exception IOException Thrown if an I/O error occurred. 818 * @throws java.io.IOException if any. 819 */ 820 public Status receive(Channel theChannel, 821 Range theTagRange, 822 Buf theDst) 823 throws IOException { 824 IORequest req = new IORequest(); 825 if (theTagRange == null) { 826 receiveNoWait(theChannel, Integer.MIN_VALUE, Integer.MAX_VALUE, theDst, req); 827 } else { 828 receiveNoWait(theChannel, theTagRange.lb(), theTagRange.ub(), theDst, req); 829 } 830 return req.waitForFinish(); 831 } 832 833 /** 834 * Receive (non-blocking) a message from the given channel. If 835 * <code>theChannel</code> is null, a message will be received from any channel 836 * in this channel group. The message must have a tag of 0. The message 837 * items are stored in the given item destination buffer. 838 * <code>theIORequest</code> is the IORequest object to be associated with the 839 * receive operation. 840 * <P> 841 * The <code>receiveNoWait()</code> method returns immediately. To wait for the 842 * message to be fully received, call <code>theIORequest.waitForFinish()</code>. 843 * <P> 844 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was 845 * created by this channel group. If not, the <code>receiveNoWait()</code> 846 * method's behavior is unspecified. 847 * 848 * @param theChannel Channel, or null to receive from any channel. 849 * @param theDst Item destination buffer. 850 * @param theIORequest IORequest object. 851 * @exception NullPointerException (unchecked exception) Thrown if 852 * <code>theDst</code> is null or 853 * <code>theIORequest</code> is null. 854 * @exception IOException Thrown if an I/O error occurred. 855 * @throws java.io.IOException if any. 856 */ 857 public void receiveNoWait(Channel theChannel, 858 Buf theDst, 859 IORequest theIORequest) 860 throws IOException { 861 receiveNoWait(theChannel, 0, 0, theDst, theIORequest); 862 } 863 864 /** 865 * Receive (non-blocking) a message from the given channel with the given 866 * tag. If <code>theChannel</code> is null, a message will be received from any 867 * channel in this channel group. The message items are stored in the given 868 * item destination buffer. <code>theIORequest</code> is the IORequest object to 869 * be associated with the receive operation. 870 * <P> 871 * The <code>receiveNoWait()</code> method returns immediately. To wait for the 872 * message to be fully received, call <code>theIORequest.waitForFinish()</code>. 873 * <P> 874 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was 875 * created by this channel group. If not, the <code>receiveNoWait()</code> 876 * method's behavior is unspecified. 877 * 878 * @param theChannel Channel, or null to receive from any channel. 879 * @param theTag Message tag. 880 * @param theDst Item destination buffer. 881 * @param theIORequest IORequest object. 882 * @exception NullPointerException (unchecked exception) Thrown if 883 * <code>theDst</code> is null or 884 * <code>theIORequest</code> is null. 885 * @exception IOException Thrown if an I/O error occurred. 886 * @throws java.io.IOException if any. 887 */ 888 public void receiveNoWait(Channel theChannel, 889 int theTag, 890 Buf theDst, 891 IORequest theIORequest) 892 throws IOException { 893 receiveNoWait(theChannel, theTag, theTag, theDst, theIORequest); 894 } 895 896 /** 897 * Receive (non-blocking) a message from the given channel with the given 898 * range of tags. If <code>theChannel</code> is null, a message will be received 899 * from any channel in this channel group. If <code>theTagRange</code> is null, 900 * a message will be received with any tag. The message items are stored in 901 * the given item destination buffer. <code>theIORequest</code> is the IORequest 902 * object to be associated with the receive operation. 903 * <P> 904 * The <code>receiveNoWait()</code> method returns immediately. To wait for the 905 * message to be fully received, call <code>theIORequest.waitForFinish()</code>. 906 * <P> 907 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was 908 * created by this channel group. If not, the <code>receiveNoWait()</code> 909 * method's behavior is unspecified. 910 * 911 * @param theChannel Channel, or null to receive from any channel. 912 * @param theTagRange Message tag range, or null to receive any tag. 913 * @param theDst Item destination buffer. 914 * @param theIORequest IORequest object. 915 * @exception NullPointerException (unchecked exception) Thrown if 916 * <code>theDst</code> is null or 917 * <code>theIORequest</code> is null. 918 * @exception IOException Thrown if an I/O error occurred. 919 * @throws java.io.IOException if any. 920 */ 921 public void receiveNoWait(Channel theChannel, 922 Range theTagRange, 923 Buf theDst, 924 IORequest theIORequest) 925 throws IOException { 926 if (theTagRange == null) { 927 receiveNoWait(theChannel, 928 Integer.MIN_VALUE, 929 Integer.MAX_VALUE, 930 theDst, 931 theIORequest); 932 } else { 933 receiveNoWait(theChannel, 934 theTagRange.lb(), 935 theTagRange.ub(), 936 theDst, 937 theIORequest); 938 } 939 } 940 941 /** 942 * Receive (non-blocking) a message from the given channel with the given 943 * tag range. If <code>theChannel</code> is null, a message will be received 944 * from any channel in this channel group. The message items are stored in 945 * the given item destination buffer. <code>theIORequest</code> is the IORequest 946 * object to be associated with the receive operation. 947 * <P> 948 * The <code>receiveNoWait()</code> method returns immediately. To wait for the 949 * message to be fully received, call <code>theIORequest.waitForFinish()</code>. 950 * <P> 951 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was 952 * created by this channel group. If not, the <code>receiveNoWait()</code> 953 * method's behavior is unspecified. 954 * 955 * @param theChannel Channel, or null to receive from any channel. 956 * @param theTagLb Message tag range lower bound. 957 * @param theTagUb Message tag range upper bound. 958 * @param theDst Item destination buffer. 959 * @param theIORequest IORequest object. 960 * 961 * @exception NullPointerException (unchecked exception) Thrown if 962 * <code>theDst</code> is null or 963 * <code>theIORequest</code> is null. 964 * @exception IOException Thrown if an I/O error occurred. 965 */ 966 private void receiveNoWait(Channel theChannel, 967 int theTagLb, 968 int theTagUb, 969 Buf theDst, 970 IORequest theIORequest) 971 throws IOException { 972 // Note: This method is not synchronized. Synchronization happens inside 973 // myIORequestList.add(). 974 975 // Verify preconditions. 976 if (myIORequestList == null) { 977 throw new IOException("ChannelGroup.receiveNoWait(): Channel group closed"); 978 } 979 if (theDst == null) { 980 throw new NullPointerException("ChannelGroup.receiveNoWait(): Destination buffer is null"); 981 } 982 if (theChannel != null) { 983 synchronized (theChannel) { 984 // Check whether channel is closed. 985 if (theChannel.myReadState == Channel.READ_CLOSED) { 986 throw new IOException("ChannelGroup.receiveNoWait(): Channel closed"); 987 } 988 } 989 } 990 991 theIORequest.initialize(theChannel, theTagLb, theTagUb, theDst); 992 myIORequestList.add(theIORequest); 993 } 994 995 /** 996 * Specify an alternate class loader for this channel group. When objects 997 * are received in a message via this channel group, the given class loader 998 * will be used to load the objects' classes. If 999 * <code>setAlternateClassLoader()</code> is never called, or if 1000 * <code>theClassLoader</code> is null, an alternate class loader will not be 1001 * used. 1002 * 1003 * @param theClassLoader Alternate class loader, or null. 1004 */ 1005 public synchronized void setAlternateClassLoader(ClassLoader theClassLoader) { 1006 myClassLoader = theClassLoader; 1007 } 1008 1009 /** 1010 * Close this channel group. Any pending receive requests will fail with a 1011 * {@linkplain ChannelGroupClosedException}. 1012 */ 1013 public synchronized void close() { 1014 // Stop listening for connections. 1015 if (myServerSocketChannel != null) { 1016 try { 1017 myServerSocketChannel.close(); 1018 } catch (IOException ignored) { 1019 } 1020 } 1021 1022 // Close all channels. 1023 if (myChannelList != null) { 1024 while (!myChannelList.isEmpty()) { 1025 myChannelList.get(0).close(); 1026 } 1027 } 1028 1029 // Report failure to all pending receive requests. 1030 if (myIORequestList != null) { 1031 myIORequestList.reportFailure(new ChannelGroupClosedException("Channel group closed")); 1032 } 1033 1034 // Enable garbage collection of fields. 1035 myServerSocketChannel = null; 1036 myIORequestList = null; 1037 myClassLoader = null; 1038 myLoopbackChannel = null; 1039 myChannelList = null; 1040 myAcceptThread = null; 1041 } 1042 1043 /** 1044 * Dump the state of this channel group on the given print stream. For 1045 * debugging. 1046 * 1047 * @param out Print stream. 1048 * @param prefix String to print at the beginning of each line. 1049 */ 1050 public void dump(PrintStream out, 1051 String prefix) { 1052 out.println(prefix + getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(this))); 1053 out.println(prefix + "myChannelGroupId = " + myChannelGroupId); 1054 out.println(prefix + "myServerSocketChannel = " + myServerSocketChannel); 1055 out.println(prefix + "myIORequestList:"); 1056 myIORequestList.dump(out, prefix + "\t"); 1057 out.println(prefix + "myClassLoader = " + myClassLoader); 1058 out.println(prefix + "myLoopbackChannel = " + myLoopbackChannel); 1059 out.println(prefix + "myChannelList:"); 1060 out.println(prefix + "\t" + myChannelList.size() + " entries"); 1061 for (Channel c : myChannelList) { 1062 c.dump(out, prefix + "\t"); 1063 } 1064 out.println(prefix + "myAcceptThread = " + myAcceptThread); 1065 out.println(prefix + "myConnectListener = " + myConnectListener); 1066 out.println(prefix + "myLogger = " + myLogger); 1067 out.println(prefix + "myTimerThread = " + myTimerThread); 1068 } 1069 1070 // Hidden operations. 1071 /** 1072 * Create a new network channel using the given socket channel. The 1073 * connection request originated from the near end. If this channel group is 1074 * closed, null is returned. 1075 * 1076 * @param theSocketChannel Socket channel. 1077 * 1078 * @return New channel. 1079 * 1080 * @exception IOException Thrown if an I/O error occurred. 1081 */ 1082 Channel nearEndConnect(SocketChannel theSocketChannel) 1083 throws IOException { 1084 // Note: This method is not synchronized. Synchronization happens inside 1085 // createNetworkChannel(). 1086 1087 // Turn on socket's TCP no-delay option. 1088 Socket socket = theSocketChannel.socket(); 1089 socket.setTcpNoDelay(true); 1090 1091 // Send channel group ID to far end. 1092 ByteBuffer buf = ByteBuffer.allocate(4); 1093 buf.putInt(myChannelGroupId); 1094 buf.flip(); 1095 if (theSocketChannel.write(buf) != 4) { 1096 throw new IOException("ChannelGroup.nearEndConnect(): Cannot send channel group ID"); 1097 } 1098 1099 // Receive channel group ID from far end with a 30-second timeout. 1100 buf.clear(); 1101 final Thread thread = Thread.currentThread(); 1102 Timer timer = myTimerThread.createTimer(new TimerTask() { 1103 public void action(Timer theTimer) { 1104 thread.interrupt(); 1105 } 1106 }); 1107 timer.start(30000L); 1108 if (theSocketChannel.read(buf) != 4) { 1109 throw new IOException("ChannelGroup.nearEndConnect(): Cannot receive channel group ID"); 1110 } 1111 timer.stop(); 1112 buf.flip(); 1113 int farChannelGroupId = buf.getInt(); 1114 1115 // Set up channel. 1116 Channel channel 1117 = createNetworkChannel(theSocketChannel, farChannelGroupId); 1118 1119 // Inform listener if any. 1120 if (myConnectListener != null) { 1121 myConnectListener.nearEndConnected(this, channel); 1122 } 1123 1124 // Start the channel sending and receiving messages. 1125 channel.start(); 1126 1127 return channel; 1128 } 1129 1130 /** 1131 * Create a new network channel using the given socket channel. The 1132 * connection request originated from the far end. If this channel group is 1133 * closed, null is returned. 1134 * 1135 * @param theSocketChannel Socket channel. 1136 * 1137 * @return New channel. 1138 * 1139 * @exception IOException Thrown if an I/O error occurred. 1140 */ 1141 Channel farEndConnect(SocketChannel theSocketChannel) 1142 throws IOException { 1143 // Note: This method is not synchronized. Synchronization happens inside 1144 // createNetworkChannel(). 1145 1146 // Turn on socket's TCP no-delay option. 1147 Socket socket = theSocketChannel.socket(); 1148 socket.setTcpNoDelay(true); 1149 1150 // Start a 30-second timeout for receiving channel group ID. 1151 final Thread thread = Thread.currentThread(); 1152 Timer timer = myTimerThread.createTimer(new TimerTask() { 1153 public void action(Timer theTimer) { 1154 thread.interrupt(); 1155 } 1156 }); 1157 timer.start(30000L); 1158 1159 try { 1160 // Receive channel group ID from far end. 1161 ByteBuffer buf = ByteBuffer.allocate(4); 1162 if (theSocketChannel.read(buf) != 4) { 1163 throw new IOException("ChannelGroup.farEndConnect(): Cannot receive channel group ID"); 1164 } 1165 timer.stop(); 1166 buf.flip(); 1167 int farChannelGroupId = buf.getInt(); 1168 1169 // Send channel group ID to far end. 1170 buf.clear(); 1171 buf.putInt(myChannelGroupId); 1172 buf.flip(); 1173 if (theSocketChannel.write(buf) != 4) { 1174 throw new IOException("ChannelGroup.farEndConnect(): Cannot send channel group ID"); 1175 } 1176 1177 // Set up channel. 1178 Channel channel 1179 = createNetworkChannel(theSocketChannel, farChannelGroupId); 1180 1181 // Inform listener if any. 1182 if (myConnectListener != null) { 1183 myConnectListener.farEndConnected(this, channel); 1184 } 1185 1186 // Start the channel sending and receiving messages. 1187 channel.start(); 1188 1189 return channel; 1190 } // Stop timer when an IOException is thrown. 1191 catch (IOException exc) { 1192 timer.stop(); 1193 throw exc; 1194 } 1195 } 1196 1197 /** 1198 * Create a new network channel using the given socket channel. If this 1199 * channel group is closed, null is returned. 1200 * 1201 * @param theSocketChannel Socket channel. 1202 * @param theFarChannelGroupId Far end channel group ID. 1203 * 1204 * @return New channel, or null. 1205 * 1206 * @exception IOException Thrown if an I/O error occurred. 1207 */ 1208 synchronized Channel createNetworkChannel(SocketChannel theSocketChannel, 1209 int theFarChannelGroupId) 1210 throws IOException { 1211 Channel channel = null; 1212 if (myIORequestList != null) { 1213 channel 1214 = new NetworkChannel(this, theSocketChannel, theFarChannelGroupId); 1215 myChannelList.add(channel); 1216 } 1217 return channel; 1218 } 1219 1220 /** 1221 * Remove the given channel from this channel group. 1222 * 1223 * @param Channel. 1224 */ 1225 synchronized void removeChannel(Channel channel) { 1226 if (myChannelList != null) { 1227 myChannelList.remove(channel); 1228 } 1229 } 1230 1231 }