1 //******************************************************************************
2 //
3 // File: ChannelGroup.java
4 // Package: edu.rit.mp
5 // Unit: Class edu.rit.mp.ChannelGroup
6 //
7 // This Java source file is copyright (C) 2009 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.mp;
41
42 import java.io.IOException;
43 import java.io.PrintStream;
44 import java.net.InetSocketAddress;
45 import java.net.Socket;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.ServerSocketChannel;
49 import java.nio.channels.SocketChannel;
50 import java.util.LinkedList;
51 import java.util.List;
52
53 import edu.rit.util.Logger;
54 import edu.rit.util.PrintStreamLogger;
55 import edu.rit.util.Range;
56 import edu.rit.util.Timer;
57 import edu.rit.util.TimerTask;
58 import edu.rit.util.TimerThread;
59
60 /**
61 * Class ChannelGroup provides a group of {@linkplain Channel}s for sending and
62 * receiving messages in the Message Protocol (MP).
63 * <P>
64 * <B>Creating Channels</B>
65 * <P>
66 * A channel group can be used to create channels in two ways: by accepting a
67 * connection request from another process, and by requesting a connection to
68 * another process.
69 * <P>
70 * The channel group can be configured to listen to a certain host and port for
71 * connection requests. Configure the host and port by by calling the
72 * <code>listen()</code> method. To start accepting connection requests, call the
73 * <code>startListening()</code> method.
74 * <P>
75 * If desired, an application can receive notification of newly created channels
76 * by providing a {@linkplain ConnectListener} object to the channel group's
77 * <code>setConnectListener()</code> method. Specify a connect listener object
78 * before calling the <code>startListening()</code> method, otherwise the
79 * application may not receive some notifications.
80 * <P>
81 * When a connection request arrives, the channel group sets up a new channel
82 * object for communicating over the connection. If a connect listener has been
83 * registered, the channel group then passes the channel to the connect
84 * listener's <code>farEndConnected()</code> method, which does whatever the
85 * application needs to record the new channel's presence. The channel does not
86 * start sending and receiving messages until after the connect listener's
87 * <code>farEndConnected()</code> method (if any) has returned.
88 * <P>
89 * The application can also call the channel group's <code>connect()</code> method
90 * to request a connection to another host and port. The channel group sets up a
91 * new channel object for communicating over the connection. If a connect
92 * listener has been registered, the channel group then passes the channel to
93 * the connect listener's <code>nearEndConnected()</code> method, which does
94 * whatever the application needs to record the new channel's presence. The
95 * channel does not start sending and receiving messages until after the connect
96 * listener's <code>nearEndConnected()</code> method (if any) has returned. The
97 * <code>connect()</code> method also returns the new channel.
98 * <P>
99 * Once a connection has been set up and a channel object has been created on
100 * each side, the applications can use their respective channels to send and
101 * receive messages.
102 * <P>
103 * If a channel group does not need to accept incoming connection requests, the
104 * channel group need not listen to any host and port. The channel group can
105 * still be used to make outgoing connection requests.
106 * <P>
107 * <B>Channel Group IDs</B>
108 * <P>
109 * Each channel group has a channel group ID. The channel group ID is an
110 * integer, initially 0, that can be changed by the <code>setChannelGroupId()</code>
111 * method. The channel group attaches no significance to the channel group ID;
112 * it is provided for the use of the application using the channel group.
113 * <P>
114 * You can query a channel group object to determine its channel group ID. You
115 * can also query a channel object to determine the ID of the channel group at
116 * the near end of the channel and the ID of the channel group at the far end of
117 * the channel.
118 * <P>
119 * <B>Sending Messages</B>
120 * <P>
121 * To send a message, the application creates a message buffer (class
122 * {@linkplain Buf}) specifying where to get the items to be sent. The
123 * application calls the channel group's <code>send()</code> method, passing the
124 * channel on which to send the message, the message buffer, and the message
125 * tag. (If the message tag is not specified, it defaults to 0.) The channel
126 * group extracts the items from the message buffer and sends a message over the
127 * channel's connection. When the <code>send()</code> method returns, the message
128 * has been fully sent, but the message may not have been fully received yet.
129 * <P>
130 * The far end application must receive the message from the channel at the
131 * other end of the connection. If no application is receiving the message, the
132 * <code>send()</code> method may block (because of flow control). This in turn may
133 * lead to a deadlock.
134 * <P>
135 * At most one outgoing message at a time may be in progress on a channel. If a
136 * second thread tries to send a message on a channel while a first thread is
137 * still sending a message on that channel, the second thread will block until
138 * the first thread has finished sending the message.
139 * <P>
140 * <B>Receiving Messages</B>
141 * <P>
142 * To receive a message, the application creates a message buffer (class
143 * {@linkplain Buf}) specifying where to put the items to be received. The
144 * application calls the channel group's <code>receive()</code> method, passing the
145 * channel from which to receive the message, the desired message tag, and the
146 * message buffer. The application can specify "any channel" instead of a
147 * specific channel. The application can specify a range of tags or "any tag"
148 * instead of a specific tag. Any number of threads can have receive requests
149 * pending at the same time.
150 * <P>
151 * When the channel group receives a message from a channel, the channel group
152 * tries to match the message with the pending receive requests. A message
153 * matches a receive request if (a) the message's channel is the same as the
154 * receive request's channel, or the receive request specified "any channel;"
155 * and (b) the message's tag is the same as the receive request's tag, or the
156 * message's tag falls within the receive request's range of tags, or the
157 * receive request specified "any tag;" and (c) the message's item type is the
158 * same as the receive request's item type (as given by the message buffer). The
159 * pending receive requests are maintained in FIFO order. If no receive request
160 * matches the message, the channel group does not read the message until such
161 * time as a matching receive request occurs. If more than one receive request
162 * matches the message, the channel group chooses the first matching receive
163 * request.
164 * <P>
165 * Once the channel group has matched the incoming message with the receive
166 * request, the channel group reads the items from the message and stores them
167 * into the message buffer. If there are fewer items in the message than the
168 * length of the message buffer, the extra items at the end of the message
169 * buffer are not set to anything. If there are more items in the message than
170 * the length of the message buffer, the extra items are read from the message
171 * and discarded. Once the message has been read, the <code>receive()</code> method
172 * returns a {@linkplain Status} object reporting the channel on which the
173 * message arrived, the message tag, and the actual number of items in the
174 * message (which may or may not be the same as the number of items in the
175 * message buffer).
176 * <P>
177 * If the receive requests do not match properly with the incoming messages, a
178 * deadlock may occur.
179 * <P>
180 * <B>Sending and Receiving Within the Same Process</B>
181 * <P>
182 * Each channel group has a "loopback" channel that is used to send messages
183 * within the same process. To obtain the loopback channel, call the
184 * <code>loopbackChannel()</code> method. Then one thread can send messages using
185 * the loopback channel while a different thread receives messages using the
186 * loopback channel. If the same thread both sends and receives using the
187 * loopback channel, a deadlock may occur.
188 * <P>
189 * The loopback channel uses the <code>copy()</code> method of class {@linkplain
190 * Buf} to transfer data items directly from the source buffer to the
191 * destination buffer. The loopback channel does not do any network
192 * communication.
193 * <P>
194 * <B>Non-Blocking Send and Receive Operations</B>
195 * <P>
196 * The <code>send()</code> method described so far does a <B>blocking send</B>
197 * operation; the <code>send()</code> method does not return until the message has
198 * been fully sent. There is also a <B>non-blocking send</B> operation,
199 * <code>sendNoWait()</code>, which includes an {@linkplain IORequest} argument. The
200 * <code>sendNoWait()</code> method initiates the send operation and returns
201 * immediately. This allows the caller to continue processing while the channel
202 * group sends the message in a separate thread. To wait for the message to be
203 * fully sent, the caller must call the IORequest object's
204 * <code>waitForFinish()</code> method.
205 * <P>
206 * Likewise, the <code>receive()</code> method described so far does a <B>blocking
207 * receive</B> operation; the <code>receive()</code> method does not return until
208 * the message has been fully received. There is also a <B>non-blocking
209 * receive</B> operation, <code>receiveNoWait()</code>, which includes an
210 * {@linkplain IORequest} argument. The <code>receiveNoWait()</code> method
211 * initiates the receive operation and returns immediately. This allows the
212 * caller to continue processing while the channel group receives the message in
213 * a separate thread. To wait for the message to be fully received, the caller
214 * must call the IORequest object's <code>waitForFinish()</code> method, which
215 * returns a {@linkplain Status} object giving the results of the receive
216 * operation.
217 *
218 * @author Alan Kaminsky
219 * @version 11-Mar-2009
220 */
221 public class ChannelGroup {
222
223 // Hidden data members.
224 // Channel group ID.
225 int myChannelGroupId;
226
227 // Server socket channel for accepting incoming connections, or null if not
228 // accepting incoming connections.
229 ServerSocketChannel myServerSocketChannel;
230
231 // I/O request list for matching incoming messages to receive I/O requests.
232 IORequestList myIORequestList;
233
234 // Alternate class loader for use when receiving objects.
235 ClassLoader myClassLoader;
236
237 // Loopback channel.
238 LoopbackChannel myLoopbackChannel;
239
240 // List of open channels.
241 List<Channel> myChannelList;
242
243 // Accepting thread, or null if not accepting.
244 AcceptThread myAcceptThread;
245
246 // Registered connect listener, or null if none.
247 ConnectListener myConnectListener;
248
249 // For logging error messages.
250 Logger myLogger;
251
252 // For timeouts during channel setup.
253 TimerThread myTimerThread;
254
255 // Hidden helper classes.
256 /**
257 * Class AcceptThread provides a thread that accepts incoming connections.
258 *
259 * @author Alan Kaminsky
260 * @version 11-Mar-2009
261 */
262 private class AcceptThread
263 extends Thread {
264
265 public AcceptThread() {
266 setDaemon(true);
267 start();
268 }
269
270 public void run() {
271 acceptloop:
272 for (;;) {
273 // Wait for an incoming connection.
274 SocketChannel connection = null;
275 try {
276 connection = myServerSocketChannel.accept();
277 } catch (ClosedChannelException exc) {
278 myLogger.log("ChannelGroup: Channel closed",
279 exc);
280 break acceptloop;
281 } catch (IOException exc) {
282 myLogger.log("ChannelGroup: I/O error while accepting connection",
283 exc);
284 break acceptloop;
285 }
286
287 // Set up channel over connection.
288 if (connection != null) {
289 try {
290 farEndConnect(connection);
291 } catch (IOException exc) {
292 // Clear thread's interrupted status, otherwise accept()
293 // above will throw an exception.
294 Thread.interrupted();
295 myLogger.log("ChannelGroup: I/O error while setting up channel",
296 exc);
297 try {
298 connection.close();
299 } catch (IOException ignored) {
300 }
301 }
302 }
303 }
304 }
305 }
306
307 // Exported constructors.
308 /**
309 * Construct a new channel group. The channel group ID is initially 0. The
310 * channel group will not listen for connection requests. To listen for
311 * connection requests at a later time, call the <code>listen()</code> method
312 * followed by the <code>startListening</code> method.
313 * <P>
314 * The channel group will log error messages on the standard error.
315 */
316 public ChannelGroup() {
317 this(new PrintStreamLogger());
318 }
319
320 /**
321 * Construct a new channel group. The channel group ID is initially 0. The
322 * channel group will listen for connection requests on the given host and
323 * port. To start actively listening, call the <code>startListening()</code>
324 * method.
325 * <P>
326 * The channel group will log error messages on the standard error.
327 *
328 * @param theListenAddress Host and port at which to listen.
329 * @exception NullPointerException (unchecked exception) Thrown if
330 * <code>theListenAddress</code> is null.
331 * @exception IOException Thrown if an I/O error occurred.
332 * @throws java.io.IOException if any.
333 */
334 public ChannelGroup(InetSocketAddress theListenAddress)
335 throws IOException {
336 this(theListenAddress, new PrintStreamLogger());
337 }
338
339 /**
340 * Construct a new channel group. The channel group ID is initially 0. The
341 * channel group will listen for connection requests using the given server
342 * socket channel. The server socket channel must be bound to a host and
343 * port. To start actively listening, call the <code>startListening()</code>
344 * method.
345 * <P>
346 * The channel group will log error messages on the standard error.
347 *
348 * @param theServerSocketChannel Server socket channel.
349 * @exception NullPointerException (unchecked exception) Thrown if
350 * <code>theServerSocketChannel</code> is null.
351 * @exception IOException Thrown if an I/O error occurred. Thrown if
352 * <code>theServerSocketChannel</code> is not bound.
353 * @throws java.io.IOException if any.
354 */
355 public ChannelGroup(ServerSocketChannel theServerSocketChannel)
356 throws IOException {
357 this(theServerSocketChannel, new PrintStreamLogger());
358 }
359
360 /**
361 * Construct a new channel group. The channel group ID is initially 0. The
362 * channel group will not listen for connection requests. To listen for
363 * connection requests at a later time, call the <code>listen()</code> method
364 * followed by the <code>startListening</code> method.
365 * <P>
366 * The channel group will log error messages using the given logger.
367 *
368 * @param theLogger Logger for error messages.
369 * @exception NullPointerException (unchecked exception) Thrown if
370 * <code>theLogger</code> is null.
371 */
372 public ChannelGroup(Logger theLogger) {
373 if (theLogger == null) {
374 throw new NullPointerException("ChannelGroup(): theLogger is null");
375 }
376 myIORequestList = new IORequestList();
377 myLoopbackChannel = new LoopbackChannel(this);
378 myChannelList = new LinkedList<>();
379 myChannelList.add(myLoopbackChannel);
380 myLogger = theLogger;
381 myTimerThread = new TimerThread();
382 myTimerThread.setDaemon(true);
383 myTimerThread.start();
384 }
385
386 /**
387 * Construct a new channel group. The channel group ID is initially 0. The
388 * channel group will listen for connection requests on the given host and
389 * port. To start actively listening, call the <code>startListening()</code>
390 * method.
391 * <P>
392 * The channel group will log error messages using the given logger.
393 *
394 * @param theListenAddress Host and port at which to listen.
395 * @param theLogger Logger for error messages.
396 * @exception NullPointerException (unchecked exception) Thrown if
397 * <code>theListenAddress</code> is null. Thrown if <code>theLogger</code> is null.
398 * @exception IOException Thrown if an I/O error occurred.
399 * @throws java.io.IOException if any.
400 */
401 public ChannelGroup(InetSocketAddress theListenAddress,
402 Logger theLogger)
403 throws IOException {
404 this(theLogger);
405 listen(theListenAddress);
406 }
407
408 /**
409 * Construct a new channel group. The channel group ID is initially 0. The
410 * channel group will listen for connection requests using the given server
411 * socket channel. The server socket channel must be bound to a host and
412 * port. To start actively listening, call the <code>startListening()</code>
413 * method.
414 * <P>
415 * The channel group will log error messages using the given logger.
416 *
417 * @param theServerSocketChannel Server socket channel.
418 * @param theLogger Logger for error messages.
419 * @exception NullPointerException (unchecked exception) Thrown if
420 * <code>theServerSocketChannel</code> is null. Thrown if <code>theLogger</code> is
421 * null.
422 * @exception IOException Thrown if an I/O error occurred. Thrown if
423 * <code>theServerSocketChannel</code> is not bound.
424 * @throws java.io.IOException if any.
425 */
426 public ChannelGroup(ServerSocketChannel theServerSocketChannel,
427 Logger theLogger)
428 throws IOException {
429 this(theLogger);
430 listen(theServerSocketChannel);
431 }
432
433 // Exported operations.
434 /**
435 * Set this channel group's channel group ID.
436 *
437 * @param theChannelGroupId Channel group ID.
438 */
439 public void setChannelGroupId(int theChannelGroupId) {
440 myChannelGroupId = theChannelGroupId;
441 }
442
443 /**
444 * Obtain this channel group's channel group ID.
445 *
446 * @return Channel group ID.
447 */
448 public int getChannelGroupId() {
449 return myChannelGroupId;
450 }
451
452 /**
453 * Obtain this channel group's listen address. This is the near end host and
454 * port to which this channel group is listening for connection requests. If
455 * this channel group is not listening for connection requests, null is
456 * returned.
457 *
458 * @return Near end address, or null.
459 */
460 public synchronized InetSocketAddress listenAddress() {
461 return myServerSocketChannel == null
462 ? null
463 : (InetSocketAddress) myServerSocketChannel.socket().getLocalSocketAddress();
464 }
465
466 /**
467 * Listen for connection requests on the given host and port. To start
468 * actively listening, call the <code>startListening()</code> method.
469 *
470 * @param theListenAddress Host and port at which to listen.
471 * @exception NullPointerException (unchecked exception) Thrown if
472 * <code>theListenAddress</code> is null.
473 * @exception IllegalStateException (unchecked exception) Thrown if
474 * listening has already started.
475 * @exception IOException Thrown if an I/O error occurred.
476 * @throws java.io.IOException if any.
477 */
478 public synchronized void listen(InetSocketAddress theListenAddress)
479 throws IOException {
480 if (theListenAddress == null) {
481 throw new NullPointerException("ChannelGroup.listen(): theListenAddress is null");
482 }
483 ServerSocketChannel channel = ServerSocketChannel.open();
484 channel.socket().bind(theListenAddress);
485 listen(channel);
486 }
487
488 /**
489 * Listen for connection requests using the given server socket channel. The
490 * server socket channel must be bound to a host and port. To start actively
491 * listening, call the <code>startListening()</code> method.
492 *
493 * @param theServerSocketChannel Server socket channel.
494 * @exception NullPointerException (unchecked exception) Thrown if
495 * <code>theServerSocketChannel</code> is null.
496 * @exception IllegalStateException (unchecked exception) Thrown if
497 * listening has already started.
498 * @exception IOException Thrown if an I/O error occurred. Thrown if
499 * <code>theServerSocketChannel</code> is not bound.
500 * @throws java.io.IOException if any.
501 */
502 public synchronized void listen(ServerSocketChannel theServerSocketChannel)
503 throws IOException {
504 if (theServerSocketChannel == null) {
505 throw new NullPointerException("ChannelGroup.listen(): theServerSocketChannel is null");
506 }
507 if (!theServerSocketChannel.socket().isBound()) {
508 throw new IOException("ChannelGroup.listen(): theServerSocketChannel is not bound");
509 }
510 if (myAcceptThread != null) {
511 throw new IllegalStateException("ChannelGroup.listen(): Listening has already started");
512 }
513 if (myIORequestList == null) {
514 throw new IOException("ChannelGroup.listen(): Channel group closed");
515 }
516
517 myServerSocketChannel = theServerSocketChannel;
518 }
519
520 /**
521 * Register the given connect listener with this channel group. Thereafter,
522 * this channel group will report each connected channel by calling
523 * <code>theConnectListener</code>'s <code>nearEndConnected()</code> method (if the
524 * connection request originated in this process) or
525 * <code>farEndConnected()</code> method (if the connection request originated
526 * in another process). It is assumed that these methods will not do any
527 * lengthy processing and will not block the calling thread.
528 * <P>
529 * At most one connect listener may be registered. If a connect listener is
530 * already registered, it is replaced with the given connect listener. If
531 * <code>theConnectListener</code> is null, any registered connect listener is
532 * discarded, and this channel group will not report connected channels.
533 * <P>
534 * Call the <code>setConnectListener()</code> method before calling the
535 * <code>startListening()</code> method, otherwise the application may not
536 * receive some connection notifications.
537 *
538 * @param theConnectListener Connect listener, or null.
539 */
540 public synchronized void setConnectListener(ConnectListener theConnectListener) {
541 myConnectListener = theConnectListener;
542 }
543
544 /**
545 * Start actively listening for connection requests.
546 *
547 * @exception IllegalStateException (unchecked exception) Thrown if a host
548 * and port or a server socket channel upon which to listen has not been
549 * specified. Thrown if listening has already started.
550 */
551 public synchronized void startListening() {
552 if (myServerSocketChannel == null) {
553 throw new IllegalStateException("ChannelGroup.startListening(): No server socket channel");
554 }
555 if (myAcceptThread != null) {
556 throw new IllegalStateException("ChannelGroup.listen(): Listening has already started");
557 }
558
559 myAcceptThread = new AcceptThread();
560 }
561
562 /**
563 * Create a new channel connected to the given far end host and port. In the
564 * far end computer, there must be a channel group listening to the given
565 * host and port. Once the connection is set up, if a connect listener has
566 * been registered, the channel group calls the connect listener's
567 * <code>nearEndConnected()</code> method to report the new channel.
568 *
569 * @param theFarEndAddress Host and port of far end channel group.
570 * @return New channel.
571 * @exception IOException Thrown if an I/O error occurred.
572 * @throws java.io.IOException if any.
573 */
574 public Channel connect(InetSocketAddress theFarEndAddress)
575 throws IOException {
576 synchronized (this) {
577 if (myIORequestList == null) {
578 throw new IOException("ChannelGroup.connect(): Channel group closed");
579 }
580 }
581
582 SocketChannel connection = null;
583 try {
584 connection = SocketChannel.open(theFarEndAddress);
585 return nearEndConnect(connection);
586 } catch (IOException exc) {
587 // Clear thread's interrupted status.
588 Thread.interrupted();
589 if (connection != null) {
590 try {
591 connection.close();
592 } catch (IOException ignored) {
593 }
594 }
595 throw exc;
596 }
597 }
598
599 /**
600 * Obtain this channel group's loopback channel. If this channel group is
601 * closed, null is returned.
602 *
603 * @return Loopback channel, or null.
604 */
605 public synchronized Channel loopbackChannel() {
606 return myLoopbackChannel;
607 }
608
609 /**
610 * Send a message to the given channel. The message uses a tag of 0. The
611 * message items come from the given item source buffer.
612 * <P>
613 * The <code>send()</code> method does not return until the message has been
614 * fully sent. (The message may not have been fully received yet.)
615 * <P>
616 * The <code>send()</code> method assumes that <code>theChannel</code> was created
617 * by this channel group. If not, the <code>send()</code> method's behavior is
618 * unspecified.
619 *
620 * @param theChannel Channel.
621 * @param theSrc Item source buffer.
622 * @exception NullPointerException (unchecked exception) Thrown if
623 * <code>theChannel</code> is null or
624 * <code>theSrc</code> is null.
625 * @exception IOException Thrown if an I/O error occurred.
626 * @throws java.io.IOException if any.
627 */
628 public void send(Channel theChannel,
629 Buf theSrc)
630 throws IOException {
631 IORequest req = new IORequest();
632 sendNoWait(theChannel, 0, theSrc, req);
633 req.waitForFinish();
634 }
635
636 /**
637 * Send a message to the given channel with the given tag. The message items
638 * come from the given item source buffer.
639 * <P>
640 * The <code>send()</code> method does not return until the message has been
641 * fully sent. (The message may not have been fully received yet.)
642 * <P>
643 * The <code>send()</code> method assumes that <code>theChannel</code> was created
644 * by this channel group. If not, the <code>send()</code> method's behavior is
645 * unspecified.
646 *
647 * @param theChannel Channel.
648 * @param theTag Message tag.
649 * @param theSrc Item source buffer.
650 * @exception NullPointerException (unchecked exception) Thrown if
651 * <code>theChannel</code> is null or
652 * <code>theSrc</code> is null.
653 * @exception IOException Thrown if an I/O error occurred.
654 * @throws java.io.IOException if any.
655 */
656 public void send(Channel theChannel,
657 int theTag,
658 Buf theSrc)
659 throws IOException {
660 IORequest req = new IORequest();
661 sendNoWait(theChannel, theTag, theSrc, req);
662 req.waitForFinish();
663 }
664
665 /**
666 * Send (non-blocking) a message to the given channel. The message uses a
667 * tag of 0. The message items come from the given item source buffer.
668 * <code>theIORequest</code> is the IORequest object to be associated with the
669 * send operation.
670 * <P>
671 * The <code>sendNoWait()</code> method returns immediately. To wait for the
672 * message to be fully sent, call <code>theIORequest.waitForFinish()</code>.
673 * <P>
674 * The <code>sendNoWait()</code> method assumes that <code>theChannel</code> was
675 * created by this channel group. If not, the <code>sendNoWait()</code> method's
676 * behavior is unspecified.
677 *
678 * @param theChannel Channel.
679 * @param theSrc Item source buffer.
680 * @param theIORequest IORequest object.
681 * @exception NullPointerException (unchecked exception) Thrown if
682 * <code>theChannel</code> is null,
683 * <code>theSrc</code> is null, or <code>theIORequest</code> is null.
684 * @exception IOException Thrown if an I/O error occurred.
685 * @throws java.io.IOException if any.
686 */
687 public void sendNoWait(Channel theChannel,
688 Buf theSrc,
689 IORequest theIORequest)
690 throws IOException {
691 sendNoWait(theChannel, 0, theSrc, theIORequest);
692 }
693
694 /**
695 * Send (non-blocking) a message to the given channel with the given tag.
696 * The message items come from the given item source buffer.
697 * <code>theIORequest</code> is the IORequest object to be associated with the
698 * send operation.
699 * <P>
700 * The <code>sendNoWait()</code> method returns immediately. To wait for the
701 * message to be fully sent, call <code>theIORequest.waitForFinish()</code>.
702 * <P>
703 * The <code>sendNoWait()</code> method assumes that <code>theChannel</code> was
704 * created by this channel group. If not, the <code>sendNoWait()</code> method's
705 * behavior is unspecified.
706 *
707 * @param theChannel Channel.
708 * @param theTag Message tag.
709 * @param theSrc Item source buffer.
710 * @param theIORequest IORequest object.
711 * @exception NullPointerException (unchecked exception) Thrown if
712 * <code>theChannel</code> is null,
713 * <code>theSrc</code> is null, or <code>theIORequest</code> is null.
714 * @exception IOException Thrown if an I/O error occurred.
715 * @throws java.io.IOException if any.
716 */
717 public void sendNoWait(Channel theChannel,
718 int theTag,
719 Buf theSrc,
720 IORequest theIORequest)
721 throws IOException {
722 // Note: This method is not synchronized. Synchronization happens inside
723 // theChannel.send().
724
725 // Verify preconditions.
726 if (myIORequestList == null) {
727 throw new IOException("ChannelGroup.sendNoWait(): Channel group closed");
728 }
729 if (theSrc == null) {
730 throw new NullPointerException("ChannelGroup.sendNoWait(): Source buffer is null");
731 }
732
733 theIORequest.initialize(theChannel, theTag, theTag, theSrc);
734 theChannel.send(theIORequest);
735 }
736
737 /**
738 * Receive a message from the given channel. If <code>theChannel</code> is null,
739 * a message will be received from any channel in this channel group. The
740 * message must have a tag of 0. The message items are stored in the given
741 * item destination buffer.
742 * <P>
743 * The <code>receive()</code> method does not return until the message has been
744 * fully received.
745 * <P>
746 * The <code>receive()</code> method assumes that <code>theChannel</code> was
747 * created by this channel group. If not, the <code>receive()</code> method's
748 * behavior is unspecified.
749 *
750 * @param theChannel Channel, or null to receive from any channel.
751 * @param theDst Item destination buffer.
752 * @return Status object giving the outcome of the message reception.
753 * @exception NullPointerException (unchecked exception) Thrown if
754 * <code>theDst</code> is null.
755 * @exception IOException Thrown if an I/O error occurred.
756 * @throws java.io.IOException if any.
757 */
758 public Status receive(Channel theChannel,
759 Buf theDst)
760 throws IOException {
761 IORequest req = new IORequest();
762 receiveNoWait(theChannel, 0, 0, theDst, req);
763 return req.waitForFinish();
764 }
765
766 /**
767 * Receive a message from the given channel with the given tag. If
768 * <code>theChannel</code> is null, a message will be received from any channel
769 * in this channel group. The message items are stored in the given item
770 * destination buffer.
771 * <P>
772 * The <code>receive()</code> method does not return until the message has been
773 * fully received.
774 * <P>
775 * The <code>receive()</code> method assumes that <code>theChannel</code> was
776 * created by this channel group. If not, the <code>receive()</code> method's
777 * behavior is unspecified.
778 *
779 * @param theChannel Channel, or null to receive from any channel.
780 * @param theTag Message tag.
781 * @param theDst Item destination buffer.
782 * @return Status object giving the outcome of the message reception.
783 * @exception NullPointerException (unchecked exception) Thrown if
784 * <code>theDst</code> is null.
785 * @exception IOException Thrown if an I/O error occurred.
786 * @throws java.io.IOException if any.
787 */
788 public Status receive(Channel theChannel,
789 int theTag,
790 Buf theDst)
791 throws IOException {
792 IORequest req = new IORequest();
793 receiveNoWait(theChannel, theTag, theTag, theDst, req);
794 return req.waitForFinish();
795 }
796
797 /**
798 * Receive a message from the given channel with the given range of tags. If
799 * <code>theChannel</code> is null, a message will be received from any channel
800 * in this channel group. If <code>theTagRange</code> is null, a message will be
801 * received with any tag. The message items are stored in the given item
802 * destination buffer.
803 * <P>
804 * The <code>receive()</code> method does not return until the message has been
805 * fully received.
806 * <P>
807 * The <code>receive()</code> method assumes that <code>theChannel</code> was
808 * created by this channel group. If not, the <code>receive()</code> method's
809 * behavior is unspecified.
810 *
811 * @param theChannel Channel, or null to receive from any channel.
812 * @param theTagRange Message tag range, or null to receive any tag.
813 * @param theDst Item destination buffer.
814 * @return Status object giving the outcome of the message reception.
815 * @exception NullPointerException (unchecked exception) Thrown if
816 * <code>theDst</code> is null.
817 * @exception IOException Thrown if an I/O error occurred.
818 * @throws java.io.IOException if any.
819 */
820 public Status receive(Channel theChannel,
821 Range theTagRange,
822 Buf theDst)
823 throws IOException {
824 IORequest req = new IORequest();
825 if (theTagRange == null) {
826 receiveNoWait(theChannel, Integer.MIN_VALUE, Integer.MAX_VALUE, theDst, req);
827 } else {
828 receiveNoWait(theChannel, theTagRange.lb(), theTagRange.ub(), theDst, req);
829 }
830 return req.waitForFinish();
831 }
832
833 /**
834 * Receive (non-blocking) a message from the given channel. If
835 * <code>theChannel</code> is null, a message will be received from any channel
836 * in this channel group. The message must have a tag of 0. The message
837 * items are stored in the given item destination buffer.
838 * <code>theIORequest</code> is the IORequest object to be associated with the
839 * receive operation.
840 * <P>
841 * The <code>receiveNoWait()</code> method returns immediately. To wait for the
842 * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
843 * <P>
844 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
845 * created by this channel group. If not, the <code>receiveNoWait()</code>
846 * method's behavior is unspecified.
847 *
848 * @param theChannel Channel, or null to receive from any channel.
849 * @param theDst Item destination buffer.
850 * @param theIORequest IORequest object.
851 * @exception NullPointerException (unchecked exception) Thrown if
852 * <code>theDst</code> is null or
853 * <code>theIORequest</code> is null.
854 * @exception IOException Thrown if an I/O error occurred.
855 * @throws java.io.IOException if any.
856 */
857 public void receiveNoWait(Channel theChannel,
858 Buf theDst,
859 IORequest theIORequest)
860 throws IOException {
861 receiveNoWait(theChannel, 0, 0, theDst, theIORequest);
862 }
863
864 /**
865 * Receive (non-blocking) a message from the given channel with the given
866 * tag. If <code>theChannel</code> is null, a message will be received from any
867 * channel in this channel group. The message items are stored in the given
868 * item destination buffer. <code>theIORequest</code> is the IORequest object to
869 * be associated with the receive operation.
870 * <P>
871 * The <code>receiveNoWait()</code> method returns immediately. To wait for the
872 * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
873 * <P>
874 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
875 * created by this channel group. If not, the <code>receiveNoWait()</code>
876 * method's behavior is unspecified.
877 *
878 * @param theChannel Channel, or null to receive from any channel.
879 * @param theTag Message tag.
880 * @param theDst Item destination buffer.
881 * @param theIORequest IORequest object.
882 * @exception NullPointerException (unchecked exception) Thrown if
883 * <code>theDst</code> is null or
884 * <code>theIORequest</code> is null.
885 * @exception IOException Thrown if an I/O error occurred.
886 * @throws java.io.IOException if any.
887 */
888 public void receiveNoWait(Channel theChannel,
889 int theTag,
890 Buf theDst,
891 IORequest theIORequest)
892 throws IOException {
893 receiveNoWait(theChannel, theTag, theTag, theDst, theIORequest);
894 }
895
896 /**
897 * Receive (non-blocking) a message from the given channel with the given
898 * range of tags. If <code>theChannel</code> is null, a message will be received
899 * from any channel in this channel group. If <code>theTagRange</code> is null,
900 * a message will be received with any tag. The message items are stored in
901 * the given item destination buffer. <code>theIORequest</code> is the IORequest
902 * object to be associated with the receive operation.
903 * <P>
904 * The <code>receiveNoWait()</code> method returns immediately. To wait for the
905 * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
906 * <P>
907 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
908 * created by this channel group. If not, the <code>receiveNoWait()</code>
909 * method's behavior is unspecified.
910 *
911 * @param theChannel Channel, or null to receive from any channel.
912 * @param theTagRange Message tag range, or null to receive any tag.
913 * @param theDst Item destination buffer.
914 * @param theIORequest IORequest object.
915 * @exception NullPointerException (unchecked exception) Thrown if
916 * <code>theDst</code> is null or
917 * <code>theIORequest</code> is null.
918 * @exception IOException Thrown if an I/O error occurred.
919 * @throws java.io.IOException if any.
920 */
921 public void receiveNoWait(Channel theChannel,
922 Range theTagRange,
923 Buf theDst,
924 IORequest theIORequest)
925 throws IOException {
926 if (theTagRange == null) {
927 receiveNoWait(theChannel,
928 Integer.MIN_VALUE,
929 Integer.MAX_VALUE,
930 theDst,
931 theIORequest);
932 } else {
933 receiveNoWait(theChannel,
934 theTagRange.lb(),
935 theTagRange.ub(),
936 theDst,
937 theIORequest);
938 }
939 }
940
941 /**
942 * Receive (non-blocking) a message from the given channel with the given
943 * tag range. If <code>theChannel</code> is null, a message will be received
944 * from any channel in this channel group. The message items are stored in
945 * the given item destination buffer. <code>theIORequest</code> is the IORequest
946 * object to be associated with the receive operation.
947 * <P>
948 * The <code>receiveNoWait()</code> method returns immediately. To wait for the
949 * message to be fully received, call <code>theIORequest.waitForFinish()</code>.
950 * <P>
951 * The <code>receiveNoWait()</code> method assumes that <code>theChannel</code> was
952 * created by this channel group. If not, the <code>receiveNoWait()</code>
953 * method's behavior is unspecified.
954 *
955 * @param theChannel Channel, or null to receive from any channel.
956 * @param theTagLb Message tag range lower bound.
957 * @param theTagUb Message tag range upper bound.
958 * @param theDst Item destination buffer.
959 * @param theIORequest IORequest object.
960 *
961 * @exception NullPointerException (unchecked exception) Thrown if
962 * <code>theDst</code> is null or
963 * <code>theIORequest</code> is null.
964 * @exception IOException Thrown if an I/O error occurred.
965 */
966 private void receiveNoWait(Channel theChannel,
967 int theTagLb,
968 int theTagUb,
969 Buf theDst,
970 IORequest theIORequest)
971 throws IOException {
972 // Note: This method is not synchronized. Synchronization happens inside
973 // myIORequestList.add().
974
975 // Verify preconditions.
976 if (myIORequestList == null) {
977 throw new IOException("ChannelGroup.receiveNoWait(): Channel group closed");
978 }
979 if (theDst == null) {
980 throw new NullPointerException("ChannelGroup.receiveNoWait(): Destination buffer is null");
981 }
982 if (theChannel != null) {
983 synchronized (theChannel) {
984 // Check whether channel is closed.
985 if (theChannel.myReadState == Channel.READ_CLOSED) {
986 throw new IOException("ChannelGroup.receiveNoWait(): Channel closed");
987 }
988 }
989 }
990
991 theIORequest.initialize(theChannel, theTagLb, theTagUb, theDst);
992 myIORequestList.add(theIORequest);
993 }
994
995 /**
996 * Specify an alternate class loader for this channel group. When objects
997 * are received in a message via this channel group, the given class loader
998 * will be used to load the objects' classes. If
999 * <code>setAlternateClassLoader()</code> is never called, or if
1000 * <code>theClassLoader</code> is null, an alternate class loader will not be
1001 * used.
1002 *
1003 * @param theClassLoader Alternate class loader, or null.
1004 */
1005 public synchronized void setAlternateClassLoader(ClassLoader theClassLoader) {
1006 myClassLoader = theClassLoader;
1007 }
1008
1009 /**
1010 * Close this channel group. Any pending receive requests will fail with a
1011 * {@linkplain ChannelGroupClosedException}.
1012 */
1013 public synchronized void close() {
1014 // Stop listening for connections.
1015 if (myServerSocketChannel != null) {
1016 try {
1017 myServerSocketChannel.close();
1018 } catch (IOException ignored) {
1019 }
1020 }
1021
1022 // Close all channels.
1023 if (myChannelList != null) {
1024 while (!myChannelList.isEmpty()) {
1025 myChannelList.get(0).close();
1026 }
1027 }
1028
1029 // Report failure to all pending receive requests.
1030 if (myIORequestList != null) {
1031 myIORequestList.reportFailure(new ChannelGroupClosedException("Channel group closed"));
1032 }
1033
1034 // Enable garbage collection of fields.
1035 myServerSocketChannel = null;
1036 myIORequestList = null;
1037 myClassLoader = null;
1038 myLoopbackChannel = null;
1039 myChannelList = null;
1040 myAcceptThread = null;
1041 }
1042
1043 /**
1044 * Dump the state of this channel group on the given print stream. For
1045 * debugging.
1046 *
1047 * @param out Print stream.
1048 * @param prefix String to print at the beginning of each line.
1049 */
1050 public void dump(PrintStream out,
1051 String prefix) {
1052 out.println(prefix + getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(this)));
1053 out.println(prefix + "myChannelGroupId = " + myChannelGroupId);
1054 out.println(prefix + "myServerSocketChannel = " + myServerSocketChannel);
1055 out.println(prefix + "myIORequestList:");
1056 myIORequestList.dump(out, prefix + "\t");
1057 out.println(prefix + "myClassLoader = " + myClassLoader);
1058 out.println(prefix + "myLoopbackChannel = " + myLoopbackChannel);
1059 out.println(prefix + "myChannelList:");
1060 out.println(prefix + "\t" + myChannelList.size() + " entries");
1061 for (Channel c : myChannelList) {
1062 c.dump(out, prefix + "\t");
1063 }
1064 out.println(prefix + "myAcceptThread = " + myAcceptThread);
1065 out.println(prefix + "myConnectListener = " + myConnectListener);
1066 out.println(prefix + "myLogger = " + myLogger);
1067 out.println(prefix + "myTimerThread = " + myTimerThread);
1068 }
1069
1070 // Hidden operations.
1071 /**
1072 * Create a new network channel using the given socket channel. The
1073 * connection request originated from the near end. If this channel group is
1074 * closed, null is returned.
1075 *
1076 * @param theSocketChannel Socket channel.
1077 *
1078 * @return New channel.
1079 *
1080 * @exception IOException Thrown if an I/O error occurred.
1081 */
1082 Channel nearEndConnect(SocketChannel theSocketChannel)
1083 throws IOException {
1084 // Note: This method is not synchronized. Synchronization happens inside
1085 // createNetworkChannel().
1086
1087 // Turn on socket's TCP no-delay option.
1088 Socket socket = theSocketChannel.socket();
1089 socket.setTcpNoDelay(true);
1090
1091 // Send channel group ID to far end.
1092 ByteBuffer buf = ByteBuffer.allocate(4);
1093 buf.putInt(myChannelGroupId);
1094 buf.flip();
1095 if (theSocketChannel.write(buf) != 4) {
1096 throw new IOException("ChannelGroup.nearEndConnect(): Cannot send channel group ID");
1097 }
1098
1099 // Receive channel group ID from far end with a 30-second timeout.
1100 buf.clear();
1101 final Thread thread = Thread.currentThread();
1102 Timer timer = myTimerThread.createTimer(new TimerTask() {
1103 public void action(Timer theTimer) {
1104 thread.interrupt();
1105 }
1106 });
1107 timer.start(30000L);
1108 if (theSocketChannel.read(buf) != 4) {
1109 throw new IOException("ChannelGroup.nearEndConnect(): Cannot receive channel group ID");
1110 }
1111 timer.stop();
1112 buf.flip();
1113 int farChannelGroupId = buf.getInt();
1114
1115 // Set up channel.
1116 Channel channel
1117 = createNetworkChannel(theSocketChannel, farChannelGroupId);
1118
1119 // Inform listener if any.
1120 if (myConnectListener != null) {
1121 myConnectListener.nearEndConnected(this, channel);
1122 }
1123
1124 // Start the channel sending and receiving messages.
1125 channel.start();
1126
1127 return channel;
1128 }
1129
1130 /**
1131 * Create a new network channel using the given socket channel. The
1132 * connection request originated from the far end. If this channel group is
1133 * closed, null is returned.
1134 *
1135 * @param theSocketChannel Socket channel.
1136 *
1137 * @return New channel.
1138 *
1139 * @exception IOException Thrown if an I/O error occurred.
1140 */
1141 Channel farEndConnect(SocketChannel theSocketChannel)
1142 throws IOException {
1143 // Note: This method is not synchronized. Synchronization happens inside
1144 // createNetworkChannel().
1145
1146 // Turn on socket's TCP no-delay option.
1147 Socket socket = theSocketChannel.socket();
1148 socket.setTcpNoDelay(true);
1149
1150 // Start a 30-second timeout for receiving channel group ID.
1151 final Thread thread = Thread.currentThread();
1152 Timer timer = myTimerThread.createTimer(new TimerTask() {
1153 public void action(Timer theTimer) {
1154 thread.interrupt();
1155 }
1156 });
1157 timer.start(30000L);
1158
1159 try {
1160 // Receive channel group ID from far end.
1161 ByteBuffer buf = ByteBuffer.allocate(4);
1162 if (theSocketChannel.read(buf) != 4) {
1163 throw new IOException("ChannelGroup.farEndConnect(): Cannot receive channel group ID");
1164 }
1165 timer.stop();
1166 buf.flip();
1167 int farChannelGroupId = buf.getInt();
1168
1169 // Send channel group ID to far end.
1170 buf.clear();
1171 buf.putInt(myChannelGroupId);
1172 buf.flip();
1173 if (theSocketChannel.write(buf) != 4) {
1174 throw new IOException("ChannelGroup.farEndConnect(): Cannot send channel group ID");
1175 }
1176
1177 // Set up channel.
1178 Channel channel
1179 = createNetworkChannel(theSocketChannel, farChannelGroupId);
1180
1181 // Inform listener if any.
1182 if (myConnectListener != null) {
1183 myConnectListener.farEndConnected(this, channel);
1184 }
1185
1186 // Start the channel sending and receiving messages.
1187 channel.start();
1188
1189 return channel;
1190 } // Stop timer when an IOException is thrown.
1191 catch (IOException exc) {
1192 timer.stop();
1193 throw exc;
1194 }
1195 }
1196
1197 /**
1198 * Create a new network channel using the given socket channel. If this
1199 * channel group is closed, null is returned.
1200 *
1201 * @param theSocketChannel Socket channel.
1202 * @param theFarChannelGroupId Far end channel group ID.
1203 *
1204 * @return New channel, or null.
1205 *
1206 * @exception IOException Thrown if an I/O error occurred.
1207 */
1208 synchronized Channel createNetworkChannel(SocketChannel theSocketChannel,
1209 int theFarChannelGroupId)
1210 throws IOException {
1211 Channel channel = null;
1212 if (myIORequestList != null) {
1213 channel
1214 = new NetworkChannel(this, theSocketChannel, theFarChannelGroupId);
1215 myChannelList.add(channel);
1216 }
1217 return channel;
1218 }
1219
1220 /**
1221 * Remove the given channel from this channel group.
1222 *
1223 * @param Channel.
1224 */
1225 synchronized void removeChannel(Channel channel) {
1226 if (myChannelList != null) {
1227 myChannelList.remove(channel);
1228 }
1229 }
1230
1231 }