Package edu.rit.mp

Class ChannelGroup

java.lang.Object
edu.rit.mp.ChannelGroup

public class ChannelGroup extends Object
Class ChannelGroup provides a group of Channels for sending and receiving messages in the Message Protocol (MP).

Creating Channels

A channel group can be used to create channels in two ways: by accepting a connection request from another process, and by requesting a connection to another process.

The channel group can be configured to listen to a certain host and port for connection requests. Configure the host and port by by calling the listen() method. To start accepting connection requests, call the startListening() method.

If desired, an application can receive notification of newly created channels by providing a ConnectListener object to the channel group's setConnectListener() method. Specify a connect listener object before calling the startListening() method, otherwise the application may not receive some notifications.

When a connection request arrives, the channel group sets up a new channel object for communicating over the connection. If a connect listener has been registered, the channel group then passes the channel to the connect listener's farEndConnected() method, which does whatever the application needs to record the new channel's presence. The channel does not start sending and receiving messages until after the connect listener's farEndConnected() method (if any) has returned.

The application can also call the channel group's connect() method to request a connection to another host and port. The channel group sets up a new channel object for communicating over the connection. If a connect listener has been registered, the channel group then passes the channel to the connect listener's nearEndConnected() method, which does whatever the application needs to record the new channel's presence. The channel does not start sending and receiving messages until after the connect listener's nearEndConnected() method (if any) has returned. The connect() method also returns the new channel.

Once a connection has been set up and a channel object has been created on each side, the applications can use their respective channels to send and receive messages.

If a channel group does not need to accept incoming connection requests, the channel group need not listen to any host and port. The channel group can still be used to make outgoing connection requests.

Channel Group IDs

Each channel group has a channel group ID. The channel group ID is an integer, initially 0, that can be changed by the setChannelGroupId() method. The channel group attaches no significance to the channel group ID; it is provided for the use of the application using the channel group.

You can query a channel group object to determine its channel group ID. You can also query a channel object to determine the ID of the channel group at the near end of the channel and the ID of the channel group at the far end of the channel.

Sending Messages

To send a message, the application creates a message buffer (class Buf) specifying where to get the items to be sent. The application calls the channel group's send() method, passing the channel on which to send the message, the message buffer, and the message tag. (If the message tag is not specified, it defaults to 0.) The channel group extracts the items from the message buffer and sends a message over the channel's connection. When the send() method returns, the message has been fully sent, but the message may not have been fully received yet.

The far end application must receive the message from the channel at the other end of the connection. If no application is receiving the message, the send() method may block (because of flow control). This in turn may lead to a deadlock.

At most one outgoing message at a time may be in progress on a channel. If a second thread tries to send a message on a channel while a first thread is still sending a message on that channel, the second thread will block until the first thread has finished sending the message.

Receiving Messages

To receive a message, the application creates a message buffer (class Buf) specifying where to put the items to be received. The application calls the channel group's receive() method, passing the channel from which to receive the message, the desired message tag, and the message buffer. The application can specify "any channel" instead of a specific channel. The application can specify a range of tags or "any tag" instead of a specific tag. Any number of threads can have receive requests pending at the same time.

When the channel group receives a message from a channel, the channel group tries to match the message with the pending receive requests. A message matches a receive request if (a) the message's channel is the same as the receive request's channel, or the receive request specified "any channel;" and (b) the message's tag is the same as the receive request's tag, or the message's tag falls within the receive request's range of tags, or the receive request specified "any tag;" and (c) the message's item type is the same as the receive request's item type (as given by the message buffer). The pending receive requests are maintained in FIFO order. If no receive request matches the message, the channel group does not read the message until such time as a matching receive request occurs. If more than one receive request matches the message, the channel group chooses the first matching receive request.

Once the channel group has matched the incoming message with the receive request, the channel group reads the items from the message and stores them into the message buffer. If there are fewer items in the message than the length of the message buffer, the extra items at the end of the message buffer are not set to anything. If there are more items in the message than the length of the message buffer, the extra items are read from the message and discarded. Once the message has been read, the receive() method returns a Status object reporting the channel on which the message arrived, the message tag, and the actual number of items in the message (which may or may not be the same as the number of items in the message buffer).

If the receive requests do not match properly with the incoming messages, a deadlock may occur.

Sending and Receiving Within the Same Process

Each channel group has a "loopback" channel that is used to send messages within the same process. To obtain the loopback channel, call the loopbackChannel() method. Then one thread can send messages using the loopback channel while a different thread receives messages using the loopback channel. If the same thread both sends and receives using the loopback channel, a deadlock may occur.

The loopback channel uses the copy() method of class Buf to transfer data items directly from the source buffer to the destination buffer. The loopback channel does not do any network communication.

Non-Blocking Send and Receive Operations

The send() method described so far does a blocking send operation; the send() method does not return until the message has been fully sent. There is also a non-blocking send operation, sendNoWait(), which includes an IORequest argument. The sendNoWait() method initiates the send operation and returns immediately. This allows the caller to continue processing while the channel group sends the message in a separate thread. To wait for the message to be fully sent, the caller must call the IORequest object's waitForFinish() method.

Likewise, the receive() method described so far does a blocking receive operation; the receive() method does not return until the message has been fully received. There is also a non-blocking receive operation, receiveNoWait(), which includes an IORequest argument. The receiveNoWait() method initiates the receive operation and returns immediately. This allows the caller to continue processing while the channel group receives the message in a separate thread. To wait for the message to be fully received, the caller must call the IORequest object's waitForFinish() method, which returns a Status object giving the results of the receive operation.

Version:
11-Mar-2009
Author:
Alan Kaminsky
  • Constructor Details

    • ChannelGroup

      public ChannelGroup()
      Construct a new channel group. The channel group ID is initially 0. The channel group will not listen for connection requests. To listen for connection requests at a later time, call the listen() method followed by the startListening method.

      The channel group will log error messages on the standard error.

    • ChannelGroup

      public ChannelGroup(InetSocketAddress theListenAddress) throws IOException
      Construct a new channel group. The channel group ID is initially 0. The channel group will listen for connection requests on the given host and port. To start actively listening, call the startListening() method.

      The channel group will log error messages on the standard error.

      Parameters:
      theListenAddress - Host and port at which to listen.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theListenAddress is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • ChannelGroup

      public ChannelGroup(ServerSocketChannel theServerSocketChannel) throws IOException
      Construct a new channel group. The channel group ID is initially 0. The channel group will listen for connection requests using the given server socket channel. The server socket channel must be bound to a host and port. To start actively listening, call the startListening() method.

      The channel group will log error messages on the standard error.

      Parameters:
      theServerSocketChannel - Server socket channel.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theServerSocketChannel is null.
      IOException - Thrown if an I/O error occurred. Thrown if theServerSocketChannel is not bound.
      IOException - if any.
    • ChannelGroup

      public ChannelGroup(Logger theLogger)
      Construct a new channel group. The channel group ID is initially 0. The channel group will not listen for connection requests. To listen for connection requests at a later time, call the listen() method followed by the startListening method.

      The channel group will log error messages using the given logger.

      Parameters:
      theLogger - Logger for error messages.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theLogger is null.
    • ChannelGroup

      public ChannelGroup(InetSocketAddress theListenAddress, Logger theLogger) throws IOException
      Construct a new channel group. The channel group ID is initially 0. The channel group will listen for connection requests on the given host and port. To start actively listening, call the startListening() method.

      The channel group will log error messages using the given logger.

      Parameters:
      theListenAddress - Host and port at which to listen.
      theLogger - Logger for error messages.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theListenAddress is null. Thrown if theLogger is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • ChannelGroup

      public ChannelGroup(ServerSocketChannel theServerSocketChannel, Logger theLogger) throws IOException
      Construct a new channel group. The channel group ID is initially 0. The channel group will listen for connection requests using the given server socket channel. The server socket channel must be bound to a host and port. To start actively listening, call the startListening() method.

      The channel group will log error messages using the given logger.

      Parameters:
      theServerSocketChannel - Server socket channel.
      theLogger - Logger for error messages.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theServerSocketChannel is null. Thrown if theLogger is null.
      IOException - Thrown if an I/O error occurred. Thrown if theServerSocketChannel is not bound.
      IOException - if any.
  • Method Details

    • setChannelGroupId

      public void setChannelGroupId(int theChannelGroupId)
      Set this channel group's channel group ID.
      Parameters:
      theChannelGroupId - Channel group ID.
    • getChannelGroupId

      public int getChannelGroupId()
      Obtain this channel group's channel group ID.
      Returns:
      Channel group ID.
    • listenAddress

      public InetSocketAddress listenAddress()
      Obtain this channel group's listen address. This is the near end host and port to which this channel group is listening for connection requests. If this channel group is not listening for connection requests, null is returned.
      Returns:
      Near end address, or null.
    • listen

      public void listen(InetSocketAddress theListenAddress) throws IOException
      Listen for connection requests on the given host and port. To start actively listening, call the startListening() method.
      Parameters:
      theListenAddress - Host and port at which to listen.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theListenAddress is null.
      IllegalStateException - (unchecked exception) Thrown if listening has already started.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • listen

      public void listen(ServerSocketChannel theServerSocketChannel) throws IOException
      Listen for connection requests using the given server socket channel. The server socket channel must be bound to a host and port. To start actively listening, call the startListening() method.
      Parameters:
      theServerSocketChannel - Server socket channel.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theServerSocketChannel is null.
      IllegalStateException - (unchecked exception) Thrown if listening has already started.
      IOException - Thrown if an I/O error occurred. Thrown if theServerSocketChannel is not bound.
      IOException - if any.
    • setConnectListener

      public void setConnectListener(ConnectListener theConnectListener)
      Register the given connect listener with this channel group. Thereafter, this channel group will report each connected channel by calling theConnectListener's nearEndConnected() method (if the connection request originated in this process) or farEndConnected() method (if the connection request originated in another process). It is assumed that these methods will not do any lengthy processing and will not block the calling thread.

      At most one connect listener may be registered. If a connect listener is already registered, it is replaced with the given connect listener. If theConnectListener is null, any registered connect listener is discarded, and this channel group will not report connected channels.

      Call the setConnectListener() method before calling the startListening() method, otherwise the application may not receive some connection notifications.

      Parameters:
      theConnectListener - Connect listener, or null.
    • startListening

      public void startListening()
      Start actively listening for connection requests.
      Throws:
      IllegalStateException - (unchecked exception) Thrown if a host and port or a server socket channel upon which to listen has not been specified. Thrown if listening has already started.
    • connect

      public Channel connect(InetSocketAddress theFarEndAddress) throws IOException
      Create a new channel connected to the given far end host and port. In the far end computer, there must be a channel group listening to the given host and port. Once the connection is set up, if a connect listener has been registered, the channel group calls the connect listener's nearEndConnected() method to report the new channel.
      Parameters:
      theFarEndAddress - Host and port of far end channel group.
      Returns:
      New channel.
      Throws:
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • loopbackChannel

      public Channel loopbackChannel()
      Obtain this channel group's loopback channel. If this channel group is closed, null is returned.
      Returns:
      Loopback channel, or null.
    • send

      public void send(Channel theChannel, Buf theSrc) throws IOException
      Send a message to the given channel. The message uses a tag of 0. The message items come from the given item source buffer.

      The send() method does not return until the message has been fully sent. (The message may not have been fully received yet.)

      The send() method assumes that theChannel was created by this channel group. If not, the send() method's behavior is unspecified.

      Parameters:
      theChannel - Channel.
      theSrc - Item source buffer.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theChannel is null or theSrc is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • send

      public void send(Channel theChannel, int theTag, Buf theSrc) throws IOException
      Send a message to the given channel with the given tag. The message items come from the given item source buffer.

      The send() method does not return until the message has been fully sent. (The message may not have been fully received yet.)

      The send() method assumes that theChannel was created by this channel group. If not, the send() method's behavior is unspecified.

      Parameters:
      theChannel - Channel.
      theTag - Message tag.
      theSrc - Item source buffer.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theChannel is null or theSrc is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • sendNoWait

      public void sendNoWait(Channel theChannel, Buf theSrc, IORequest theIORequest) throws IOException
      Send (non-blocking) a message to the given channel. The message uses a tag of 0. The message items come from the given item source buffer. theIORequest is the IORequest object to be associated with the send operation.

      The sendNoWait() method returns immediately. To wait for the message to be fully sent, call theIORequest.waitForFinish().

      The sendNoWait() method assumes that theChannel was created by this channel group. If not, the sendNoWait() method's behavior is unspecified.

      Parameters:
      theChannel - Channel.
      theSrc - Item source buffer.
      theIORequest - IORequest object.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theChannel is null, theSrc is null, or theIORequest is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • sendNoWait

      public void sendNoWait(Channel theChannel, int theTag, Buf theSrc, IORequest theIORequest) throws IOException
      Send (non-blocking) a message to the given channel with the given tag. The message items come from the given item source buffer. theIORequest is the IORequest object to be associated with the send operation.

      The sendNoWait() method returns immediately. To wait for the message to be fully sent, call theIORequest.waitForFinish().

      The sendNoWait() method assumes that theChannel was created by this channel group. If not, the sendNoWait() method's behavior is unspecified.

      Parameters:
      theChannel - Channel.
      theTag - Message tag.
      theSrc - Item source buffer.
      theIORequest - IORequest object.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theChannel is null, theSrc is null, or theIORequest is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • receive

      public Status receive(Channel theChannel, Buf theDst) throws IOException
      Receive a message from the given channel. If theChannel is null, a message will be received from any channel in this channel group. The message must have a tag of 0. The message items are stored in the given item destination buffer.

      The receive() method does not return until the message has been fully received.

      The receive() method assumes that theChannel was created by this channel group. If not, the receive() method's behavior is unspecified.

      Parameters:
      theChannel - Channel, or null to receive from any channel.
      theDst - Item destination buffer.
      Returns:
      Status object giving the outcome of the message reception.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theDst is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • receive

      public Status receive(Channel theChannel, int theTag, Buf theDst) throws IOException
      Receive a message from the given channel with the given tag. If theChannel is null, a message will be received from any channel in this channel group. The message items are stored in the given item destination buffer.

      The receive() method does not return until the message has been fully received.

      The receive() method assumes that theChannel was created by this channel group. If not, the receive() method's behavior is unspecified.

      Parameters:
      theChannel - Channel, or null to receive from any channel.
      theTag - Message tag.
      theDst - Item destination buffer.
      Returns:
      Status object giving the outcome of the message reception.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theDst is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • receive

      public Status receive(Channel theChannel, Range theTagRange, Buf theDst) throws IOException
      Receive a message from the given channel with the given range of tags. If theChannel is null, a message will be received from any channel in this channel group. If theTagRange is null, a message will be received with any tag. The message items are stored in the given item destination buffer.

      The receive() method does not return until the message has been fully received.

      The receive() method assumes that theChannel was created by this channel group. If not, the receive() method's behavior is unspecified.

      Parameters:
      theChannel - Channel, or null to receive from any channel.
      theTagRange - Message tag range, or null to receive any tag.
      theDst - Item destination buffer.
      Returns:
      Status object giving the outcome of the message reception.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theDst is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • receiveNoWait

      public void receiveNoWait(Channel theChannel, Buf theDst, IORequest theIORequest) throws IOException
      Receive (non-blocking) a message from the given channel. If theChannel is null, a message will be received from any channel in this channel group. The message must have a tag of 0. The message items are stored in the given item destination buffer. theIORequest is the IORequest object to be associated with the receive operation.

      The receiveNoWait() method returns immediately. To wait for the message to be fully received, call theIORequest.waitForFinish().

      The receiveNoWait() method assumes that theChannel was created by this channel group. If not, the receiveNoWait() method's behavior is unspecified.

      Parameters:
      theChannel - Channel, or null to receive from any channel.
      theDst - Item destination buffer.
      theIORequest - IORequest object.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theDst is null or theIORequest is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • receiveNoWait

      public void receiveNoWait(Channel theChannel, int theTag, Buf theDst, IORequest theIORequest) throws IOException
      Receive (non-blocking) a message from the given channel with the given tag. If theChannel is null, a message will be received from any channel in this channel group. The message items are stored in the given item destination buffer. theIORequest is the IORequest object to be associated with the receive operation.

      The receiveNoWait() method returns immediately. To wait for the message to be fully received, call theIORequest.waitForFinish().

      The receiveNoWait() method assumes that theChannel was created by this channel group. If not, the receiveNoWait() method's behavior is unspecified.

      Parameters:
      theChannel - Channel, or null to receive from any channel.
      theTag - Message tag.
      theDst - Item destination buffer.
      theIORequest - IORequest object.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theDst is null or theIORequest is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • receiveNoWait

      public void receiveNoWait(Channel theChannel, Range theTagRange, Buf theDst, IORequest theIORequest) throws IOException
      Receive (non-blocking) a message from the given channel with the given range of tags. If theChannel is null, a message will be received from any channel in this channel group. If theTagRange is null, a message will be received with any tag. The message items are stored in the given item destination buffer. theIORequest is the IORequest object to be associated with the receive operation.

      The receiveNoWait() method returns immediately. To wait for the message to be fully received, call theIORequest.waitForFinish().

      The receiveNoWait() method assumes that theChannel was created by this channel group. If not, the receiveNoWait() method's behavior is unspecified.

      Parameters:
      theChannel - Channel, or null to receive from any channel.
      theTagRange - Message tag range, or null to receive any tag.
      theDst - Item destination buffer.
      theIORequest - IORequest object.
      Throws:
      NullPointerException - (unchecked exception) Thrown if theDst is null or theIORequest is null.
      IOException - Thrown if an I/O error occurred.
      IOException - if any.
    • setAlternateClassLoader

      public void setAlternateClassLoader(ClassLoader theClassLoader)
      Specify an alternate class loader for this channel group. When objects are received in a message via this channel group, the given class loader will be used to load the objects' classes. If setAlternateClassLoader() is never called, or if theClassLoader is null, an alternate class loader will not be used.
      Parameters:
      theClassLoader - Alternate class loader, or null.
    • close

      public void close()
      Close this channel group. Any pending receive requests will fail with a ChannelGroupClosedException.
    • dump

      public void dump(PrintStream out, String prefix)
      Dump the state of this channel group on the given print stream. For debugging.
      Parameters:
      out - Print stream.
      prefix - String to print at the beginning of each line.