View Javadoc
1   //******************************************************************************
2   //
3   // File:    ChannelGroup.java
4   // Package: edu.rit.mp
5   // Unit:    Class edu.rit.mp.ChannelGroup
6   //
7   // This Java source file is copyright (C) 2009 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.mp;
41  
42  import java.io.IOException;
43  import java.io.PrintStream;
44  import java.net.InetSocketAddress;
45  import java.net.Socket;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.ClosedChannelException;
48  import java.nio.channels.ServerSocketChannel;
49  import java.nio.channels.SocketChannel;
50  import java.util.LinkedList;
51  import java.util.List;
52  
53  import edu.rit.util.Logger;
54  import edu.rit.util.PrintStreamLogger;
55  import edu.rit.util.Range;
56  import edu.rit.util.Timer;
57  import edu.rit.util.TimerTask;
58  import edu.rit.util.TimerThread;
59  
60  /**
61   * Class ChannelGroup provides a group of {@linkplain Channel}s for sending and
62   * receiving messages in the Message Protocol (MP).
63   * <P>
64   * <B>Creating Channels</B>
65   * <P>
66   * A channel group can be used to create channels in two ways: by accepting a
67   * connection request from another process, and by requesting a connection to
68   * another process.
69   * <P>
70   * The channel group can be configured to listen to a certain host and port for
71   * connection requests. Configure the host and port by by calling the
72   * <code>listen()</code> method. To start accepting connection requests, call the
73   * <code>startListening()</code> method.
74   * <P>
75   * If desired, an application can receive notification of newly created channels
76   * by providing a {@linkplain ConnectListener} object to the channel group's
77   * <code>setConnectListener()</code> method. Specify a connect listener object
78   * before calling the <code>startListening()</code> method, otherwise the
79   * application may not receive some notifications.
80   * <P>
81   * When a connection request arrives, the channel group sets up a new channel
82   * object for communicating over the connection. If a connect listener has been
83   * registered, the channel group then passes the channel to the connect
84   * listener's <code>farEndConnected()</code> method, which does whatever the
85   * application needs to record the new channel's presence. The channel does not
86   * start sending and receiving messages until after the connect listener's
87   * <code>farEndConnected()</code> method (if any) has returned.
88   * <P>
89   * The application can also call the channel group's <code>connect()</code> method
90   * to request a connection to another host and port. The channel group sets up a
91   * new channel object for communicating over the connection. If a connect
92   * listener has been registered, the channel group then passes the channel to
93   * the connect listener's <code>nearEndConnected()</code> method, which does
94   * whatever the application needs to record the new channel's presence. The
95   * channel does not start sending and receiving messages until after the connect
96   * listener's <code>nearEndConnected()</code> method (if any) has returned. The
97   * <code>connect()</code> method also returns the new channel.
98   * <P>
99   * Once a connection has been set up and a channel object has been created on
100  * each side, the applications can use their respective channels to send and
101  * receive messages.
102  * <P>
103  * If a channel group does not need to accept incoming connection requests, the
104  * channel group need not listen to any host and port. The channel group can
105  * still be used to make outgoing connection requests.
106  * <P>
107  * <B>Channel Group IDs</B>
108  * <P>
109  * Each channel group has a channel group ID. The channel group ID is an
110  * integer, initially 0, that can be changed by the <code>setChannelGroupId()</code>
111  * method. The channel group attaches no significance to the channel group ID;
112  * it is provided for the use of the application using the channel group.
113  * <P>
114  * You can query a channel group object to determine its channel group ID. You
115  * can also query a channel object to determine the ID of the channel group at
116  * the near end of the channel and the ID of the channel group at the far end of
117  * the channel.
118  * <P>
119  * <B>Sending Messages</B>
120  * <P>
121  * To send a message, the application creates a message buffer (class
122  * {@linkplain Buf}) specifying where to get the items to be sent. The
123  * application calls the channel group's <code>send()</code> method, passing the
124  * channel on which to send the message, the message buffer, and the message
125  * tag. (If the message tag is not specified, it defaults to 0.) The channel
126  * group extracts the items from the message buffer and sends a message over the
127  * channel's connection. When the <code>send()</code> method returns, the message
128  * has been fully sent, but the message may not have been fully received yet.
129  * <P>
130  * The far end application must receive the message from the channel at the
131  * other end of the connection. If no application is receiving the message, the
132  * <code>send()</code> method may block (because of flow control). This in turn may
133  * lead to a deadlock.
134  * <P>
135  * At most one outgoing message at a time may be in progress on a channel. If a
136  * second thread tries to send a message on a channel while a first thread is
137  * still sending a message on that channel, the second thread will block until
138  * the first thread has finished sending the message.
139  * <P>
140  * <B>Receiving Messages</B>
141  * <P>
142  * To receive a message, the application creates a message buffer (class
143  * {@linkplain Buf}) specifying where to put the items to be received. The
144  * application calls the channel group's <code>receive()</code> method, passing the
145  * channel from which to receive the message, the desired message tag, and the
146  * message buffer. The application can specify "any channel" instead of a
147  * specific channel. The application can specify a range of tags or "any tag"
148  * instead of a specific tag. Any number of threads can have receive requests
149  * pending at the same time.
150  * <P>
151  * When the channel group receives a message from a channel, the channel group
152  * tries to match the message with the pending receive requests. A message
153  * matches a receive request if (a) the message's channel is the same as the
154  * receive request's channel, or the receive request specified "any channel;"
155  * and (b) the message's tag is the same as the receive request's tag, or the
156  * message's tag falls within the receive request's range of tags, or the
157  * receive request specified "any tag;" and (c) the message's item type is the
158  * same as the receive request's item type (as given by the message buffer). The
159  * pending receive requests are maintained in FIFO order. If no receive request
160  * matches the message, the channel group does not read the message until such
161  * time as a matching receive request occurs. If more than one receive request
162  * matches the message, the channel group chooses the first matching receive
163  * request.
164  * <P>
165  * Once the channel group has matched the incoming message with the receive
166  * request, the channel group reads the items from the message and stores them
167  * into the message buffer. If there are fewer items in the message than the
168  * length of the message buffer, the extra items at the end of the message
169  * buffer are not set to anything. If there are more items in the message than
170  * the length of the message buffer, the extra items are read from the message
171  * and discarded. Once the message has been read, the <code>receive()</code> method
172  * returns a {@linkplain Status} object reporting the channel on which the
173  * message arrived, the message tag, and the actual number of items in the
174  * message (which may or may not be the same as the number of items in the
175  * message buffer).
176  * <P>
177  * If the receive requests do not match properly with the incoming messages, a
178  * deadlock may occur.
179  * <P>
180  * <B>Sending and Receiving Within the Same Process</B>
181  * <P>
182  * Each channel group has a "loopback" channel that is used to send messages
183  * within the same process. To obtain the loopback channel, call the
184  * <code>loopbackChannel()</code> method. Then one thread can send messages using
185  * the loopback channel while a different thread receives messages using the
186  * loopback channel. If the same thread both sends and receives using the
187  * loopback channel, a deadlock may occur.
188  * <P>
189  * The loopback channel uses the <code>copy()</code> method of class {@linkplain
190  * Buf} to transfer data items directly from the source buffer to the
191  * destination buffer. The loopback channel does not do any network
192  * communication.
193  * <P>
194  * <B>Non-Blocking Send and Receive Operations</B>
195  * <P>
196  * The <code>send()</code> method described so far does a <B>blocking send</B>
197  * operation; the <code>send()</code> method does not return until the message has
198  * been fully sent. There is also a <B>non-blocking send</B> operation,
199  * <code>sendNoWait()</code>, which includes an {@linkplain IORequest} argument. The
200  * <code>sendNoWait()</code> method initiates the send operation and returns
201  * immediately. This allows the caller to continue processing while the channel
202  * group sends the message in a separate thread. To wait for the message to be
203  * fully sent, the caller must call the IORequest object's
204  * <code>waitForFinish()</code> method.
205  * <P>
206  * Likewise, the <code>receive()</code> method described so far does a <B>blocking
207  * receive</B> operation; the <code>receive()</code> method does not return until
208  * the message has been fully received. There is also a <B>non-blocking
209  * receive</B> operation, <code>receiveNoWait()</code>, which includes an
210  * {@linkplain IORequest} argument. The <code>receiveNoWait()</code> method
211  * initiates the receive operation and returns immediately. This allows the
212  * caller to continue processing while the channel group receives the message in
213  * a separate thread. To wait for the message to be fully received, the caller
214  * must call the IORequest object's <code>waitForFinish()</code> method, which
215  * returns a {@linkplain Status} object giving the results of the receive
216  * operation.
217  *
218  * @author Alan Kaminsky
219  * @version 11-Mar-2009
220  */
221 public class ChannelGroup {
222 
223 // Hidden data members.
224     // Channel group ID.
225     int myChannelGroupId;
226 
227     // Server socket channel for accepting incoming connections, or null if not
228     // accepting incoming connections.
229     ServerSocketChannel myServerSocketChannel;
230 
231     // I/O request list for matching incoming messages to receive I/O requests.
232     IORequestList myIORequestList;
233 
234     // Alternate class loader for use when receiving objects.
235     ClassLoader myClassLoader;
236 
237     // Loopback channel.
238     LoopbackChannel myLoopbackChannel;
239 
240     // List of open channels.
241     List<Channel> myChannelList;
242 
243     // Accepting thread, or null if not accepting.
244     AcceptThread myAcceptThread;
245 
246     // Registered connect listener, or null if none.
247     ConnectListener myConnectListener;
248 
249     // For logging error messages.
250     Logger myLogger;
251 
252     // For timeouts during channel setup.
253     TimerThread myTimerThread;
254 
255 // Hidden helper classes.
256     /**
257      * Class AcceptThread provides a thread that accepts incoming connections.
258      *
259      * @author Alan Kaminsky
260      * @version 11-Mar-2009
261      */
262     private class AcceptThread
263             extends Thread {
264 
265         public AcceptThread() {
266             setDaemon(true);
267             start();
268         }
269 
270         public void run() {
271             acceptloop:
272             for (;;) {
273                 // Wait for an incoming connection.
274                 SocketChannel connection = null;
275                 try {
276                     connection = myServerSocketChannel.accept();
277                 } catch (ClosedChannelException exc) {
278                     myLogger.log("ChannelGroup: Channel closed",
279                             exc);
280                     break acceptloop;
281                 } catch (IOException exc) {
282                     myLogger.log("ChannelGroup: I/O error while accepting connection",
283                             exc);
284                     break acceptloop;
285                 }
286 
287                 // Set up channel over connection.
288                 if (connection != null) {
289                     try {
290                         farEndConnect(connection);
291                     } catch (IOException exc) {
292                         // Clear thread's interrupted status, otherwise accept()
293                         // above will throw an exception.
294                         Thread.interrupted();
295                         myLogger.log("ChannelGroup: I/O error while setting up channel",
296                                 exc);
297                         try {
298                             connection.close();
299                         } catch (IOException ignored) {
300                         }
301                     }
302                 }
303             }
304         }
305     }
306 
307 // Exported constructors.
308     /**
309      * Construct a new channel group. The channel group ID is initially 0. The
310      * channel group will not listen for connection requests. To listen for
311      * connection requests at a later time, call the <code>listen()</code> method
312      * followed by the <code>startListening</code> method.
313      * <P>
314      * The channel group will log error messages on the standard error.
315      */
316     public ChannelGroup() {
317         this(new PrintStreamLogger());
318     }
319 
320     /**
321      * Construct a new channel group. The channel group ID is initially 0. The
322      * channel group will listen for connection requests on the given host and
323      * port. To start actively listening, call the <code>startListening()</code>
324      * method.
325      * <P>
326      * The channel group will log error messages on the standard error.
327      *
328      * @param theListenAddress Host and port at which to listen.
329      * @exception NullPointerException (unchecked exception) Thrown if
330      * <code>theListenAddress</code> is null.
331      * @exception IOException Thrown if an I/O error occurred.
332      * @throws java.io.IOException if any.
333      */
334     public ChannelGroup(InetSocketAddress theListenAddress)
335             throws IOException {
336         this(theListenAddress, new PrintStreamLogger());
337     }
338 
339     /**
340      * Construct a new channel group. The channel group ID is initially 0. The
341      * channel group will listen for connection requests using the given server
342      * socket channel. The server socket channel must be bound to a host and
343      * port. To start actively listening, call the <code>startListening()</code>
344      * method.
345      * <P>
346      * The channel group will log error messages on the standard error.
347      *
348      * @param theServerSocketChannel Server socket channel.
349      * @exception NullPointerException (unchecked exception) Thrown if
350      * <code>theServerSocketChannel</code> is null.
351      * @exception IOException Thrown if an I/O error occurred. Thrown if
352      * <code>theServerSocketChannel</code> is not bound.
353      * @throws java.io.IOException if any.
354      */
355     public ChannelGroup(ServerSocketChannel theServerSocketChannel)
356             throws IOException {
357         this(theServerSocketChannel, new PrintStreamLogger());
358     }
359 
360     /**
361      * Construct a new channel group. The channel group ID is initially 0. The
362      * channel group will not listen for connection requests. To listen for
363      * connection requests at a later time, call the <code>listen()</code> method
364      * followed by the <code>startListening</code> method.
365      * <P>
366      * The channel group will log error messages using the given logger.
367      *
368      * @param theLogger Logger for error messages.
369      * @exception NullPointerException (unchecked exception) Thrown if
370      * <code>theLogger</code> is null.
371      */
372     public ChannelGroup(Logger theLogger) {
373         if (theLogger == null) {
374             throw new NullPointerException("ChannelGroup(): theLogger is null");
375         }
376         myIORequestList = new IORequestList();
377         myLoopbackChannel = new LoopbackChannel(this);
378         myChannelList = new LinkedList<>();
379         myChannelList.add(myLoopbackChannel);
380         myLogger = theLogger;
381         myTimerThread = new TimerThread();
382         myTimerThread.setDaemon(true);
383         myTimerThread.start();
384     }
385 
386     /**
387      * Construct a new channel group. The channel group ID is initially 0. The
388      * channel group will listen for connection requests on the given host and
389      * port. To start actively listening, call the <code>startListening()</code>
390      * method.
391      * <P>
392      * The channel group will log error messages using the given logger.
393      *
394      * @param theListenAddress Host and port at which to listen.
395      * @param theLogger Logger for error messages.
396      * @exception NullPointerException (unchecked exception) Thrown if
397      * <code>theListenAddress</code> is null. Thrown if <code>theLogger</code> is null.
398      * @exception IOException Thrown if an I/O error occurred.
399      * @throws java.io.IOException if any.
400      */
401     public ChannelGroup(InetSocketAddress theListenAddress,
402             Logger theLogger)
403             throws IOException {
404         this(theLogger);
405         listen(theListenAddress);
406     }
407 
408     /**
409      * Construct a new channel group. The channel group ID is initially 0. The
410      * channel group will listen for connection requests using the given server
411      * socket channel. The server socket channel must be bound to a host and
412      * port. To start actively listening, call the <code>startListening()</code>
413      * method.
414      * <P>
415      * The channel group will log error messages using the given logger.
416      *
417      * @param theServerSocketChannel Server socket channel.
418      * @param theLogger Logger for error messages.
419      * @exception NullPointerException (unchecked exception) Thrown if
420      * <code>theServerSocketChannel</code> is null. Thrown if <code>theLogger</code> is
421      * null.
422      * @exception IOException Thrown if an I/O error occurred. Thrown if
423      * <code>theServerSocketChannel</code> is not bound.
424      * @throws java.io.IOException if any.
425      */
426     public ChannelGroup(ServerSocketChannel theServerSocketChannel,
427             Logger theLogger)
428             throws IOException {
429         this(theLogger);
430         listen(theServerSocketChannel);
431     }
432 
433 // Exported operations.
434     /**
435      * Set this channel group's channel group ID.
436      *
437      * @param theChannelGroupId Channel group ID.
438      */
439     public void setChannelGroupId(int theChannelGroupId) {
440         myChannelGroupId = theChannelGroupId;
441     }
442 
443     /**
444      * Obtain this channel group's channel group ID.
445      *
446      * @return Channel group ID.
447      */
448     public int getChannelGroupId() {
449         return myChannelGroupId;
450     }
451 
452     /**
453      * Obtain this channel group's listen address. This is the near end host and
454      * port to which this channel group is listening for connection requests. If
455      * this channel group is not listening for connection requests, null is
456      * returned.
457      *
458      * @return Near end address, or null.
459      */
460     public synchronized InetSocketAddress listenAddress() {
461         return myServerSocketChannel == null
462                 ? null
463                 : (InetSocketAddress) myServerSocketChannel.socket().getLocalSocketAddress();
464     }
465 
466     /**
467      * Listen for connection requests on the given host and port. To start
468      * actively listening, call the <code>startListening()</code> method.
469      *
470      * @param theListenAddress Host and port at which to listen.
471      * @exception NullPointerException (unchecked exception) Thrown if
472      * <code>theListenAddress</code> is null.
473      * @exception IllegalStateException (unchecked exception) Thrown if
474      * listening has already started.
475      * @exception IOException Thrown if an I/O error occurred.
476      * @throws java.io.IOException if any.
477      */
478     public synchronized void listen(InetSocketAddress theListenAddress)
479             throws IOException {
480         if (theListenAddress == null) {
481             throw new NullPointerException("ChannelGroup.listen(): theListenAddress is null");
482         }
483         ServerSocketChannel channel = ServerSocketChannel.open();
484         channel.socket().bind(theListenAddress);
485         listen(channel);
486     }
487 
488     /**
489      * Listen for connection requests using the given server socket channel. The
490      * server socket channel must be bound to a host and port. To start actively
491      * listening, call the <code>startListening()</code> method.
492      *
493      * @param theServerSocketChannel Server socket channel.
494      * @exception NullPointerException (unchecked exception) Thrown if
495      * <code>theServerSocketChannel</code> is null.
496      * @exception IllegalStateException (unchecked exception) Thrown if
497      * listening has already started.
498      * @exception IOException Thrown if an I/O error occurred. Thrown if
499      * <code>theServerSocketChannel</code> is not bound.
500      * @throws java.io.IOException if any.
501      */
502     public synchronized void listen(ServerSocketChannel theServerSocketChannel)
503             throws IOException {
504         if (theServerSocketChannel == null) {
505             throw new NullPointerException("ChannelGroup.listen(): theServerSocketChannel is null");
506         }
507         if (!theServerSocketChannel.socket().isBound()) {
508             throw new IOException("ChannelGroup.listen(): theServerSocketChannel is not bound");
509         }
510         if (myAcceptThread != null) {
511             throw new IllegalStateException("ChannelGroup.listen(): Listening has already started");
512         }
513         if (myIORequestList == null) {
514             throw new IOException("ChannelGroup.listen(): Channel group closed");
515         }
516 
517         myServerSocketChannel = theServerSocketChannel;
518     }
519 
520     /**
521      * Register the given connect listener with this channel group. Thereafter,
522      * this channel group will report each connected channel by calling
523      * <code>theConnectListener</code>'s <code>nearEndConnected()</code> method (if the
524      * connection request originated in this process) or
525      * <code>farEndConnected()</code> method (if the connection request originated
526      * in another process). It is assumed that these methods will not do any
527      * lengthy processing and will not block the calling thread.
528      * <P>
529      * At most one connect listener may be registered. If a connect listener is
530      * already registered, it is replaced with the given connect listener. If
531      * <code>theConnectListener</code> is null, any registered connect listener is
532      * discarded, and this channel group will not report connected channels.
533      * <P>
534      * Call the <code>setConnectListener()</code> method before calling the
535      * <code>startListening()</code> method, otherwise the application may not
536      * receive some connection notifications.
537      *
538      * @param theConnectListener Connect listener, or null.
539      */
540     public synchronized void setConnectListener(ConnectListener theConnectListener) {
541         myConnectListener = theConnectListener;
542     }
543 
544     /**
545      * Start actively listening for connection requests.
546      *
547      * @exception IllegalStateException (unchecked exception) Thrown if a host
548      * and port or a server socket channel upon which to listen has not been
549      * specified. Thrown if listening has already started.
550      */
551     public synchronized void startListening() {
552         if (myServerSocketChannel == null) {
553             throw new IllegalStateException("ChannelGroup.startListening(): No server socket channel");
554         }
555         if (myAcceptThread != null) {
556             throw new IllegalStateException("ChannelGroup.listen(): Listening has already started");
557         }
558 
559         myAcceptThread = new AcceptThread();
560     }
561 
562     /**
563      * Create a new channel connected to the given far end host and port. In the
564      * far end computer, there must be a channel group listening to the given
565      * host and port. Once the connection is set up, if a connect listener has
566      * been registered, the channel group calls the connect listener's
567      * <code>nearEndConnected()</code> method to report the new channel.
568      *
569      * @param theFarEndAddress Host and port of far end channel group.
570      * @return New channel.
571      * @exception IOException Thrown if an I/O error occurred.
572      * @throws java.io.IOException if any.
573      */
574     public Channel connect(InetSocketAddress theFarEndAddress)
575             throws IOException {
576         synchronized (this) {
577             if (myIORequestList == null) {
578                 throw new IOException("ChannelGroup.connect(): Channel group closed");
579             }
580         }
581 
582         SocketChannel connection = null;
583         try {
584             connection = SocketChannel.open(theFarEndAddress);
585             return nearEndConnect(connection);
586         } catch (IOException exc) {
587             // Clear thread's interrupted status.
588             Thread.interrupted();
589             if (connection != null) {
590                 try {
591                     connection.close();
592                 } catch (IOException ignored) {
593                 }
594             }
595             throw exc;
596         }
597     }
598 
599     /**
600      * Obtain this channel group's loopback channel. If this channel group is
601      * closed, null is returned.
602      *
603      * @return Loopback channel, or null.
604      */
605     public synchronized Channel loopbackChannel() {
606         return myLoopbackChannel;
607     }
608 
609     /**
610      * Send a message to the given channel. The message uses a tag of 0. The
611      * message items come from the given item source buffer.
612      * <P>
613      * The <code>send()</code> method does not return until the message has been
614      * fully sent. (The message may not have been fully received yet.)
615      * <P>
616      * The <code>send()</code> method assumes that <code>theChannel</code> was created
617      * by this channel group. If not, the <code>send()</code> method's behavior is
618      * unspecified.
619      *
620      * @param theChannel Channel.
621      * @param theSrc Item source buffer.
622      * @exception NullPointerException (unchecked exception) Thrown if
623      * <code>theChannel</code> is null or
624      * <code>theSrc</code> is null.
625      * @exception IOException Thrown if an I/O error occurred.
626      * @throws java.io.IOException if any.
627      */
628     public void send(Channel theChannel,
629             Buf theSrc)
630             throws IOException {
631         IORequest req = new IORequest();
632         sendNoWait(theChannel, 0, theSrc, req);
633         req.waitForFinish();
634     }
635 
636     /**
637      * Send a message to the given channel with the given tag. The message items
638      * come from the given item source buffer.
639      * <P>
640      * The <code>send()</code> method does not return until the message has been
641      * fully sent. (The message may not have been fully received yet.)
642      * <P>
643      * The <code>send()</code> method assumes that <code>theChannel</code> was created
644      * by this channel group. If not, the <code>send()</code> method's behavior is
645      * unspecified.
646      *
647      * @param theChannel Channel.
648      * @param theTag Message tag.
649      * @param theSrc Item source buffer.
650      * @exception NullPointerException (unchecked exception) Thrown if
651      * <code>theChannel</code> is null or
652      * <code>theSrc</code> is null.
653      * @exception IOException Thrown if an I/O error occurred.
654      * @throws java.io.IOException if any.
655      */
656     public void send(Channel theChannel,
657             int theTag,
658             Buf theSrc)
659             throws IOException {
660         IORequest req = new IORequest();
661         sendNoWait(theChannel, theTag, theSrc, req);
662         req.waitForFinish();
663     }
664 
665     /**
666      * Send (non-blocking) a message to the given channel. The message uses a
667      * tag of 0. The message items come from the given item source buffer.
668      * <code>theIORequest</code> is the IORequest object to be associated with the
669      * send operation.
670      * <P>
671      * The <code>sendNoWait()</code> method returns immediately. To wait for the
672      * message to be fully sent, call <code>theIORequest.waitForFinish()</code>.
673      * <P>
674      * The <code>sendNoWait()</code> method assumes that <code>theChannel</code> was
675      * created by this channel group. If not, the <code>sendNoWait()</code> method's
676      * behavior is unspecified.
677      *
678      * @param theChannel Channel.
679      * @param theSrc Item source buffer.
680      * @param theIORequest IORequest object.
681      * @exception NullPointerException (unchecked exception) Thrown if
682      * <code>theChannel</code> is null,
683      * <code>theSrc</code> is null, or <code>theIORequest</code> is null.
684      * @exception IOException Thrown if an I/O error occurred.
685      * @throws java.io.IOException if any.
686      */
687     public void sendNoWait(Channel theChannel,
688             Buf theSrc,
689             IORequest theIORequest)
690             throws IOException {
691         sendNoWait(theChannel, 0, theSrc, theIORequest);
692     }
693 
694     /**
695      * Send (non-blocking) a message to the given channel with the given tag.
696      * The message items come from the given item source buffer.
697      * <code>theIORequest</code> is the IORequest object to be associated with the
698      * send operation.
699      * <P>
700      * The <code>sendNoWait()</code> method returns immediately. To wait for the
701      * message to be fully sent, call <code>theIORequest.waitForFinish()</code>.
702      * <P>
703      * The <code>sendNoWait()</code> method assumes that <code>theChannel</code> was
704      * created by this channel group. If not, the <code>sendNoWait()</code> method's
705      * behavior is unspecified.
706      *
707      * @param theChannel Channel.
708      * @param theTag Message tag.
709      * @param theSrc Item source buffer.
710      * @param theIORequest IORequest object.
711      * @exception NullPointerException (unchecked exception) Thrown if
712      * <code>theChannel</code> is null,
713      * <code>theSrc</code> is null, or <code>theIORequest</code> is null.
714      * @exception IOException Thrown if an I/O error occurred.
715      * @throws java.io.IOException if any.
716      */
717     public void sendNoWait(Channel theChannel,
718             int theTag,
719             Buf theSrc,
720             IORequest theIORequest)
721             throws IOException {
722 		// Note: This method is not synchronized. Synchronization happens inside
723         // theChannel.send().
724 
725         // Verify preconditions.
726         if (myIORequestList == null) {
727             throw new IOException("ChannelGroup.sendNoWait(): Channel group closed");
728         }
729         if (theSrc == null) {
730             throw new NullPointerException("ChannelGroup.sendNoWait(): Source buffer is null");
731         }
732 
733         theIORequest.initialize(theChannel, theTag, theTag, theSrc);
734         theChannel.send(theIORequest);
735     }
736 
737     /**
738      * Receive a message from the given channel. If <code>theChannel</code> is null,
739      * a message will be received from any channel in this channel group. The
740      * message must have a tag of 0. The message items are stored in the given
741      * item destination buffer.
742      * <P>
743      * The <code>receive()</code> method does not return until the message has been
744      * fully received.
745      * <P>
746      * The <code>receive()</code> method assumes that <code>theChannel</code> was
747      * created by this channel group. If not, the <code>receive()</code> method's
748      * behavior is unspecified.
749      *
750      * @param theChannel Channel, or null to receive from any channel.
751      * @param theDst Item destination buffer.
752      * @return Status object giving the outcome of the message reception.
753      * @exception NullPointerException (unchecked exception) Thrown if
754      * <code>theDst</code> is null.
755      * @exception IOException Thrown if an I/O error occurred.
756      * @throws java.io.IOException if any.
757      */
758     public Status receive(Channel theChannel,
759             Buf theDst)
760             throws IOException {
761         IORequest req = new IORequest();
762         receiveNoWait(theChannel, 0, 0, theDst, req);
763         return req.waitForFinish();
764     }
765 
766     /**
767      * Receive a message from the given channel with the given tag. If
768      * <code>theChannel</code> is null, a message will be received from any channel
769      * in this channel group. The message items are stored in the given item
770      * destination buffer.
771      * <P>
772      * The <code>receive()</code> method does not return until the message has been
773      * fully received.
774      * <P>
775      * The <code>receive()</code> method assumes that <code>theChannel</code> was
776      * created by this channel group. If not, the <code>receive()</code> method's
777      * behavior is unspecified.
778      *
779      * @param theChannel Channel, or null to receive from any channel.
780      * @param theTag Message tag.
781      * @param theDst Item destination buffer.
782      * @return Status object giving the outcome of the message reception.
783      * @exception NullPointerException (unchecked exception) Thrown if
784      * <code>theDst</code> is null.
785      * @exception IOException Thrown if an I/O error occurred.
786      * @throws java.io.IOException if any.
787      */
788     public Status receive(Channel theChannel,
789             int theTag,
790             Buf theDst)
791             throws IOException {
792         IORequest req = new IORequest();
793         receiveNoWait(theChannel, theTag, theTag, theDst, req);
794         return req.waitForFinish();
795     }
796 
797     /**
798      * Receive a message from the given channel with the given range of tags. If
799      * <code>theChannel</code> is null, a message will be received from any channel
800      * in this channel group. If <code>theTagRange</code> is null, a message will be
801      * received with any tag. The message items are stored in the given item
802      * destination buffer.
803      * <P>
804      * The <code>receive()</code> method does not return until the message has been
805      * fully received.
806      * <P>
807      * The <code>receive()</code> method assumes that <code>theChannel</code> was
808      * created by this channel group. If not, the <code>receive()</code> method's
809      * behavior is unspecified.
810      *
811      * @param theChannel Channel, or null to receive from any channel.
812      * @param theTagRange Message tag range, or null to receive any tag.
813      * @param theDst Item destination buffer.
814      * @return Status object giving the outcome of the message reception.
815      * @exception NullPointerException (unchecked exception) Thrown if
816      * <code>theDst</code> is null.
817      * @exception IOException Thrown if an I/O error occurred.
818      * @throws java.io.IOException if any.
819      */
820     public Status receive(Channel theChannel,
821             Range theTagRange,
822             Buf theDst)
823             throws IOException {
824         IORequest req = new IORequest();
825         if (theTagRange == null) {
826             receiveNoWait(theChannel, Integer.MIN_VALUE, Integer.MAX_VALUE, theDst, req);
827         } else {
828             receiveNoWait(theChannel, theTagRange.lb(), theTagRange.ub(), theDst, req);
829         }
830         return req.waitForFinish();
831     }
832 
833     /**
834      * Receive (non-blocking) a message from the given channel. If
835      * <code>theChannel</code> is null, a message will be received from any channel
836      * in this channel group. The message must have a tag of 0. The message
837      * items are stored in the given item destination buffer.
838      * <code>theIORequest</code> is the IORequest object to be associated with the
839      * receive operation.
840      * <P>
841      * The <code>receiveNoWait()</code> method returns immediately. To wait for the
842      * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
843      * <P>
844      * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
845      * created by this channel group. If not, the <code>receiveNoWait()</code>
846      * method's behavior is unspecified.
847      *
848      * @param theChannel Channel, or null to receive from any channel.
849      * @param theDst Item destination buffer.
850      * @param theIORequest IORequest object.
851      * @exception NullPointerException (unchecked exception) Thrown if
852      * <code>theDst</code> is null or
853      * <code>theIORequest</code> is null.
854      * @exception IOException Thrown if an I/O error occurred.
855      * @throws java.io.IOException if any.
856      */
857     public void receiveNoWait(Channel theChannel,
858             Buf theDst,
859             IORequest theIORequest)
860             throws IOException {
861         receiveNoWait(theChannel, 0, 0, theDst, theIORequest);
862     }
863 
864     /**
865      * Receive (non-blocking) a message from the given channel with the given
866      * tag. If <code>theChannel</code> is null, a message will be received from any
867      * channel in this channel group. The message items are stored in the given
868      * item destination buffer. <code>theIORequest</code> is the IORequest object to
869      * be associated with the receive operation.
870      * <P>
871      * The <code>receiveNoWait()</code> method returns immediately. To wait for the
872      * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
873      * <P>
874      * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
875      * created by this channel group. If not, the <code>receiveNoWait()</code>
876      * method's behavior is unspecified.
877      *
878      * @param theChannel Channel, or null to receive from any channel.
879      * @param theTag Message tag.
880      * @param theDst Item destination buffer.
881      * @param theIORequest IORequest object.
882      * @exception NullPointerException (unchecked exception) Thrown if
883      * <code>theDst</code> is null or
884      * <code>theIORequest</code> is null.
885      * @exception IOException Thrown if an I/O error occurred.
886      * @throws java.io.IOException if any.
887      */
888     public void receiveNoWait(Channel theChannel,
889             int theTag,
890             Buf theDst,
891             IORequest theIORequest)
892             throws IOException {
893         receiveNoWait(theChannel, theTag, theTag, theDst, theIORequest);
894     }
895 
896     /**
897      * Receive (non-blocking) a message from the given channel with the given
898      * range of tags. If <code>theChannel</code> is null, a message will be received
899      * from any channel in this channel group. If <code>theTagRange</code> is null,
900      * a message will be received with any tag. The message items are stored in
901      * the given item destination buffer. <code>theIORequest</code> is the IORequest
902      * object to be associated with the receive operation.
903      * <P>
904      * The <code>receiveNoWait()</code> method returns immediately. To wait for the
905      * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
906      * <P>
907      * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
908      * created by this channel group. If not, the <code>receiveNoWait()</code>
909      * method's behavior is unspecified.
910      *
911      * @param theChannel Channel, or null to receive from any channel.
912      * @param theTagRange Message tag range, or null to receive any tag.
913      * @param theDst Item destination buffer.
914      * @param theIORequest IORequest object.
915      * @exception NullPointerException (unchecked exception) Thrown if
916      * <code>theDst</code> is null or
917      * <code>theIORequest</code> is null.
918      * @exception IOException Thrown if an I/O error occurred.
919      * @throws java.io.IOException if any.
920      */
921     public void receiveNoWait(Channel theChannel,
922             Range theTagRange,
923             Buf theDst,
924             IORequest theIORequest)
925             throws IOException {
926         if (theTagRange == null) {
927             receiveNoWait(theChannel,
928                     Integer.MIN_VALUE,
929                     Integer.MAX_VALUE,
930                     theDst,
931                     theIORequest);
932         } else {
933             receiveNoWait(theChannel,
934                     theTagRange.lb(),
935                     theTagRange.ub(),
936                     theDst,
937                     theIORequest);
938         }
939     }
940 
941     /**
942      * Receive (non-blocking) a message from the given channel with the given
943      * tag range. If <code>theChannel</code> is null, a message will be received
944      * from any channel in this channel group. The message items are stored in
945      * the given item destination buffer. <code>theIORequest</code> is the IORequest
946      * object to be associated with the receive operation.
947      * <P>
948      * The <code>receiveNoWait()</code> method returns immediately. To wait for the
949      * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
950      * <P>
951      * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
952      * created by this channel group. If not, the <code>receiveNoWait()</code>
953      * method's behavior is unspecified.
954      *
955      * @param theChannel Channel, or null to receive from any channel.
956      * @param theTagLb Message tag range lower bound.
957      * @param theTagUb Message tag range upper bound.
958      * @param theDst Item destination buffer.
959      * @param theIORequest IORequest object.
960      *
961      * @exception NullPointerException (unchecked exception) Thrown if
962      * <code>theDst</code> is null or
963      * <code>theIORequest</code> is null.
964      * @exception IOException Thrown if an I/O error occurred.
965      */
966     private void receiveNoWait(Channel theChannel,
967             int theTagLb,
968             int theTagUb,
969             Buf theDst,
970             IORequest theIORequest)
971             throws IOException {
972 		// Note: This method is not synchronized. Synchronization happens inside
973         // myIORequestList.add().
974 
975         // Verify preconditions.
976         if (myIORequestList == null) {
977             throw new IOException("ChannelGroup.receiveNoWait(): Channel group closed");
978         }
979         if (theDst == null) {
980             throw new NullPointerException("ChannelGroup.receiveNoWait(): Destination buffer is null");
981         }
982         if (theChannel != null) {
983             synchronized (theChannel) {
984                 // Check whether channel is closed.
985                 if (theChannel.myReadState == Channel.READ_CLOSED) {
986                     throw new IOException("ChannelGroup.receiveNoWait(): Channel closed");
987                 }
988             }
989         }
990 
991         theIORequest.initialize(theChannel, theTagLb, theTagUb, theDst);
992         myIORequestList.add(theIORequest);
993     }
994 
995     /**
996      * Specify an alternate class loader for this channel group. When objects
997      * are received in a message via this channel group, the given class loader
998      * will be used to load the objects' classes. If
999      * <code>setAlternateClassLoader()</code> is never called, or if
1000      * <code>theClassLoader</code> is null, an alternate class loader will not be
1001      * used.
1002      *
1003      * @param theClassLoader Alternate class loader, or null.
1004      */
1005     public synchronized void setAlternateClassLoader(ClassLoader theClassLoader) {
1006         myClassLoader = theClassLoader;
1007     }
1008 
1009     /**
1010      * Close this channel group. Any pending receive requests will fail with a
1011      * {@linkplain ChannelGroupClosedException}.
1012      */
1013     public synchronized void close() {
1014         // Stop listening for connections.
1015         if (myServerSocketChannel != null) {
1016             try {
1017                 myServerSocketChannel.close();
1018             } catch (IOException ignored) {
1019             }
1020         }
1021 
1022         // Close all channels.
1023         if (myChannelList != null) {
1024             while (!myChannelList.isEmpty()) {
1025                 myChannelList.get(0).close();
1026             }
1027         }
1028 
1029         // Report failure to all pending receive requests.
1030         if (myIORequestList != null) {
1031             myIORequestList.reportFailure(new ChannelGroupClosedException("Channel group closed"));
1032         }
1033 
1034         // Enable garbage collection of fields.
1035         myServerSocketChannel = null;
1036         myIORequestList = null;
1037         myClassLoader = null;
1038         myLoopbackChannel = null;
1039         myChannelList = null;
1040         myAcceptThread = null;
1041     }
1042 
1043     /**
1044      * Dump the state of this channel group on the given print stream. For
1045      * debugging.
1046      *
1047      * @param out Print stream.
1048      * @param prefix String to print at the beginning of each line.
1049      */
1050     public void dump(PrintStream out,
1051             String prefix) {
1052         out.println(prefix + getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(this)));
1053         out.println(prefix + "myChannelGroupId = " + myChannelGroupId);
1054         out.println(prefix + "myServerSocketChannel = " + myServerSocketChannel);
1055         out.println(prefix + "myIORequestList:");
1056         myIORequestList.dump(out, prefix + "\t");
1057         out.println(prefix + "myClassLoader = " + myClassLoader);
1058         out.println(prefix + "myLoopbackChannel = " + myLoopbackChannel);
1059         out.println(prefix + "myChannelList:");
1060         out.println(prefix + "\t" + myChannelList.size() + " entries");
1061         for (Channel c : myChannelList) {
1062             c.dump(out, prefix + "\t");
1063         }
1064         out.println(prefix + "myAcceptThread = " + myAcceptThread);
1065         out.println(prefix + "myConnectListener = " + myConnectListener);
1066         out.println(prefix + "myLogger = " + myLogger);
1067         out.println(prefix + "myTimerThread = " + myTimerThread);
1068     }
1069 
1070 // Hidden operations.
1071     /**
1072      * Create a new network channel using the given socket channel. The
1073      * connection request originated from the near end. If this channel group is
1074      * closed, null is returned.
1075      *
1076      * @param theSocketChannel Socket channel.
1077      *
1078      * @return New channel.
1079      *
1080      * @exception IOException Thrown if an I/O error occurred.
1081      */
1082     Channel nearEndConnect(SocketChannel theSocketChannel)
1083             throws IOException {
1084 		// Note: This method is not synchronized. Synchronization happens inside
1085         // createNetworkChannel().
1086 
1087         // Turn on socket's TCP no-delay option.
1088         Socket socket = theSocketChannel.socket();
1089         socket.setTcpNoDelay(true);
1090 
1091         // Send channel group ID to far end.
1092         ByteBuffer buf = ByteBuffer.allocate(4);
1093         buf.putInt(myChannelGroupId);
1094         buf.flip();
1095         if (theSocketChannel.write(buf) != 4) {
1096             throw new IOException("ChannelGroup.nearEndConnect(): Cannot send channel group ID");
1097         }
1098 
1099         // Receive channel group ID from far end with a 30-second timeout.
1100         buf.clear();
1101         final Thread thread = Thread.currentThread();
1102         Timer timer = myTimerThread.createTimer(new TimerTask() {
1103             public void action(Timer theTimer) {
1104                 thread.interrupt();
1105             }
1106         });
1107         timer.start(30000L);
1108         if (theSocketChannel.read(buf) != 4) {
1109             throw new IOException("ChannelGroup.nearEndConnect(): Cannot receive channel group ID");
1110         }
1111         timer.stop();
1112         buf.flip();
1113         int farChannelGroupId = buf.getInt();
1114 
1115         // Set up channel.
1116         Channel channel
1117                 = createNetworkChannel(theSocketChannel, farChannelGroupId);
1118 
1119         // Inform listener if any.
1120         if (myConnectListener != null) {
1121             myConnectListener.nearEndConnected(this, channel);
1122         }
1123 
1124         // Start the channel sending and receiving messages.
1125         channel.start();
1126 
1127         return channel;
1128     }
1129 
1130     /**
1131      * Create a new network channel using the given socket channel. The
1132      * connection request originated from the far end. If this channel group is
1133      * closed, null is returned.
1134      *
1135      * @param theSocketChannel Socket channel.
1136      *
1137      * @return New channel.
1138      *
1139      * @exception IOException Thrown if an I/O error occurred.
1140      */
1141     Channel farEndConnect(SocketChannel theSocketChannel)
1142             throws IOException {
1143 		// Note: This method is not synchronized. Synchronization happens inside
1144         // createNetworkChannel().
1145 
1146         // Turn on socket's TCP no-delay option.
1147         Socket socket = theSocketChannel.socket();
1148         socket.setTcpNoDelay(true);
1149 
1150         // Start a 30-second timeout for receiving channel group ID.
1151         final Thread thread = Thread.currentThread();
1152         Timer timer = myTimerThread.createTimer(new TimerTask() {
1153             public void action(Timer theTimer) {
1154                 thread.interrupt();
1155             }
1156         });
1157         timer.start(30000L);
1158 
1159         try {
1160             // Receive channel group ID from far end.
1161             ByteBuffer buf = ByteBuffer.allocate(4);
1162             if (theSocketChannel.read(buf) != 4) {
1163                 throw new IOException("ChannelGroup.farEndConnect(): Cannot receive channel group ID");
1164             }
1165             timer.stop();
1166             buf.flip();
1167             int farChannelGroupId = buf.getInt();
1168 
1169             // Send channel group ID to far end.
1170             buf.clear();
1171             buf.putInt(myChannelGroupId);
1172             buf.flip();
1173             if (theSocketChannel.write(buf) != 4) {
1174                 throw new IOException("ChannelGroup.farEndConnect(): Cannot send channel group ID");
1175             }
1176 
1177             // Set up channel.
1178             Channel channel
1179                     = createNetworkChannel(theSocketChannel, farChannelGroupId);
1180 
1181             // Inform listener if any.
1182             if (myConnectListener != null) {
1183                 myConnectListener.farEndConnected(this, channel);
1184             }
1185 
1186             // Start the channel sending and receiving messages.
1187             channel.start();
1188 
1189             return channel;
1190         } // Stop timer when an IOException is thrown.
1191         catch (IOException exc) {
1192             timer.stop();
1193             throw exc;
1194         }
1195     }
1196 
1197     /**
1198      * Create a new network channel using the given socket channel. If this
1199      * channel group is closed, null is returned.
1200      *
1201      * @param theSocketChannel Socket channel.
1202      * @param theFarChannelGroupId Far end channel group ID.
1203      *
1204      * @return New channel, or null.
1205      *
1206      * @exception IOException Thrown if an I/O error occurred.
1207      */
1208     synchronized Channel createNetworkChannel(SocketChannel theSocketChannel,
1209             int theFarChannelGroupId)
1210             throws IOException {
1211         Channel channel = null;
1212         if (myIORequestList != null) {
1213             channel
1214                     = new NetworkChannel(this, theSocketChannel, theFarChannelGroupId);
1215             myChannelList.add(channel);
1216         }
1217         return channel;
1218     }
1219 
1220     /**
1221      * Remove the given channel from this channel group.
1222      *
1223      * @param Channel.
1224      */
1225     synchronized void removeChannel(Channel channel) {
1226         if (myChannelList != null) {
1227             myChannelList.remove(channel);
1228         }
1229     }
1230 
1231 }