View Javadoc
1   //******************************************************************************
2   //
3   // File:    NetworkChannelReceiveThread.java
4   // Package: edu.rit.mp
5   // Unit:    Class edu.rit.mp.NetworkChannelReceiveThread
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.mp;
41  
42  import java.io.EOFException;
43  import java.io.IOException;
44  import java.io.InterruptedIOException;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.SocketChannel;
47  
48  /**
49   * Class NetworkChannelReceiveThread provides a thread for receiving incoming
50   * messages for a {@linkplain NetworkChannel}.
51   *
52   * @author Alan Kaminsky
53   * @version 23-Apr-2008
54   */
55  class NetworkChannelReceiveThread
56          extends Thread {
57  
58  // Hidden data members.
59      // Enclosing network channel and channel group.
60      private NetworkChannel myNetworkChannel;
61      private ChannelGroup myChannelGroup;
62  
63      // Underlying socket channel.
64      private SocketChannel mySocketChannel;
65  
66      // Queue of incoming I/O requests.
67      private IORequestList myIORequestList;
68  
69      // Byte buffer.
70      private ByteBuffer myByteBuffer;
71  
72  // Hidden constructors.
73      /**
74       * Construct a new network channel receive thread.
75       *
76       * @param theNetworkChannel Enclosing network channel.
77       * @param theSocketChannel Underlying socket channel.
78       */
79      NetworkChannelReceiveThread(NetworkChannel theNetworkChannel,
80              SocketChannel theSocketChannel) {
81          myNetworkChannel = theNetworkChannel;
82          myChannelGroup = theNetworkChannel.myChannelGroup;
83          mySocketChannel = theSocketChannel;
84          myIORequestList = theNetworkChannel.myIORequestList;
85          myByteBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
86          setDaemon(true);
87          start();
88      }
89  
90  // Exported operations.
91      /**
92       * Run this network channel receive thread.
93       */
94      public void run() {
95          IORequest iorequest = null;
96          Buf buf = null;
97          myByteBuffer.position(0);
98          myByteBuffer.limit(0);
99  
100         // Processing loop.
101         receiveloop:
102         for (;;) {
103             try {
104                 // Read the next 13-byte message header.
105                 while (myByteBuffer.remaining() < 13) {
106                     myByteBuffer.compact();
107                     if (mySocketChannel.read(myByteBuffer) == -1) {
108                         break receiveloop;
109                     }
110                     myByteBuffer.flip();
111                 }
112 
113                 // Extract message header fields.
114                 int magic = myByteBuffer.getInt();
115                 int messagetag = myByteBuffer.getInt();
116                 byte messagetype = myByteBuffer.get();
117                 int messagelength = myByteBuffer.getInt();
118 
119                 // If the magic number is incorrect, bad error. Close the
120                 // channel and terminate this thread.
121                 if (magic != Constants.MAGIC_NUMBER) {
122                     myChannelGroup.myLogger.log("edu.rit.mp.NetworkChannelReceiveThread: Invalid magic number received");
123                     myNetworkChannel.close();
124                     break receiveloop;
125                 }
126 
127                 // Wait for a matching I/O request to show up in the incoming
128                 // queue.
129                 iorequest
130                         = myIORequestList.waitForMatch(myNetworkChannel, messagetag, messagetype);
131 
132                 // Message preprocessing.
133                 buf = iorequest.myBuf;
134                 buf.preReceive(messagelength);
135                 int buflength = buf.myMessageLength;
136                 int n;
137                 int i = 0;
138                 int num = Math.min(messagelength, buflength);
139 
140                 // Repeatedly transfer items from socket channel to byte buffer,
141                 // then from byte buffer to destination buffer.
142                 n = buf.receiveItems(i, num, myByteBuffer);
143                 i += n;
144                 num -= n;
145                 while (num > 0) {
146                     myByteBuffer.compact();
147                     if (mySocketChannel.read(myByteBuffer) == -1) {
148                         throw new EOFException("Unexpected end-of-stream while receiving message");
149                     }
150                     myByteBuffer.flip();
151                     n = buf.receiveItems(i, num, myByteBuffer);
152                     i += n;
153                     num -= n;
154                 }
155 
156                 // If there are more items in the message than in the
157                 // destination buffer, suck out the extra message items.
158                 num = messagelength - buflength;
159                 if (num > 0) {
160                     num -= buf.skipItems(num, myByteBuffer);
161                     while (num > 0) {
162                         myByteBuffer.compact();
163                         if (mySocketChannel.read(myByteBuffer) == -1) {
164                             throw new EOFException("Unexpected end-of-stream while receiving message");
165                         }
166                         myByteBuffer.flip();
167                         num -= buf.skipItems(num, myByteBuffer);
168                     }
169                 }
170 
171                 // Message postprocessing.
172                 Status status
173                         = new Status(myNetworkChannel, messagetag, messagelength);
174                 buf.postReceive(status, myChannelGroup.myClassLoader);
175                 iorequest.myStatus = status;
176 
177                 // Report success to receiving thread.
178                 iorequest.reportSuccess();
179 
180                 iorequest = null;
181                 buf = null;
182             } catch (IOException exc) {
183                 // Report failure to receiving thread. Terminate this thread.
184                 if (iorequest != null) {
185                     iorequest.reportFailure(exc);
186                 }
187                 break receiveloop;
188             } catch (InterruptedException exc) {
189                 // Report failure to sending thread. Terminate this thread.
190                 if (iorequest != null) {
191                     InterruptedIOException exc2 = new InterruptedIOException();
192                     exc2.initCause(exc);
193                     iorequest.reportFailure(exc2);
194                 }
195                 break receiveloop;
196             } catch (RuntimeException exc) {
197                 // Report failure to sending thread. Terminate this thread.
198                 if (iorequest != null) {
199                     iorequest.reportFailure(exc);
200                 }
201                 break receiveloop;
202             } catch (Error exc) {
203                 // Report failure to sending thread. Terminate this thread.
204                 if (iorequest != null) {
205                     iorequest.reportFailure(exc);
206                 }
207                 break receiveloop;
208             }
209         }
210 
211         // This thread is terminating. Enable garbage collection of data
212         // members.
213         myNetworkChannel.shutdownInput();
214         myNetworkChannel = null;
215         myChannelGroup = null;
216         mySocketChannel = null;
217         myIORequestList = null;
218         myByteBuffer = null;
219     }
220 
221 }