1 //******************************************************************************
2 //
3 // File: NetworkChannelReceiveThread.java
4 // Package: edu.rit.mp
5 // Unit: Class edu.rit.mp.NetworkChannelReceiveThread
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.mp;
41
42 import java.io.EOFException;
43 import java.io.IOException;
44 import java.io.InterruptedIOException;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.SocketChannel;
47
48 /**
49 * Class NetworkChannelReceiveThread provides a thread for receiving incoming
50 * messages for a {@linkplain NetworkChannel}.
51 *
52 * @author Alan Kaminsky
53 * @version 23-Apr-2008
54 */
55 class NetworkChannelReceiveThread
56 extends Thread {
57
58 // Hidden data members.
59 // Enclosing network channel and channel group.
60 private NetworkChannel myNetworkChannel;
61 private ChannelGroup myChannelGroup;
62
63 // Underlying socket channel.
64 private SocketChannel mySocketChannel;
65
66 // Queue of incoming I/O requests.
67 private IORequestList myIORequestList;
68
69 // Byte buffer.
70 private ByteBuffer myByteBuffer;
71
72 // Hidden constructors.
73 /**
74 * Construct a new network channel receive thread.
75 *
76 * @param theNetworkChannel Enclosing network channel.
77 * @param theSocketChannel Underlying socket channel.
78 */
79 NetworkChannelReceiveThread(NetworkChannel theNetworkChannel,
80 SocketChannel theSocketChannel) {
81 myNetworkChannel = theNetworkChannel;
82 myChannelGroup = theNetworkChannel.myChannelGroup;
83 mySocketChannel = theSocketChannel;
84 myIORequestList = theNetworkChannel.myIORequestList;
85 myByteBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
86 setDaemon(true);
87 start();
88 }
89
90 // Exported operations.
91 /**
92 * Run this network channel receive thread.
93 */
94 public void run() {
95 IORequest iorequest = null;
96 Buf buf = null;
97 myByteBuffer.position(0);
98 myByteBuffer.limit(0);
99
100 // Processing loop.
101 receiveloop:
102 for (;;) {
103 try {
104 // Read the next 13-byte message header.
105 while (myByteBuffer.remaining() < 13) {
106 myByteBuffer.compact();
107 if (mySocketChannel.read(myByteBuffer) == -1) {
108 break receiveloop;
109 }
110 myByteBuffer.flip();
111 }
112
113 // Extract message header fields.
114 int magic = myByteBuffer.getInt();
115 int messagetag = myByteBuffer.getInt();
116 byte messagetype = myByteBuffer.get();
117 int messagelength = myByteBuffer.getInt();
118
119 // If the magic number is incorrect, bad error. Close the
120 // channel and terminate this thread.
121 if (magic != Constants.MAGIC_NUMBER) {
122 myChannelGroup.myLogger.log("edu.rit.mp.NetworkChannelReceiveThread: Invalid magic number received");
123 myNetworkChannel.close();
124 break receiveloop;
125 }
126
127 // Wait for a matching I/O request to show up in the incoming
128 // queue.
129 iorequest
130 = myIORequestList.waitForMatch(myNetworkChannel, messagetag, messagetype);
131
132 // Message preprocessing.
133 buf = iorequest.myBuf;
134 buf.preReceive(messagelength);
135 int buflength = buf.myMessageLength;
136 int n;
137 int i = 0;
138 int num = Math.min(messagelength, buflength);
139
140 // Repeatedly transfer items from socket channel to byte buffer,
141 // then from byte buffer to destination buffer.
142 n = buf.receiveItems(i, num, myByteBuffer);
143 i += n;
144 num -= n;
145 while (num > 0) {
146 myByteBuffer.compact();
147 if (mySocketChannel.read(myByteBuffer) == -1) {
148 throw new EOFException("Unexpected end-of-stream while receiving message");
149 }
150 myByteBuffer.flip();
151 n = buf.receiveItems(i, num, myByteBuffer);
152 i += n;
153 num -= n;
154 }
155
156 // If there are more items in the message than in the
157 // destination buffer, suck out the extra message items.
158 num = messagelength - buflength;
159 if (num > 0) {
160 num -= buf.skipItems(num, myByteBuffer);
161 while (num > 0) {
162 myByteBuffer.compact();
163 if (mySocketChannel.read(myByteBuffer) == -1) {
164 throw new EOFException("Unexpected end-of-stream while receiving message");
165 }
166 myByteBuffer.flip();
167 num -= buf.skipItems(num, myByteBuffer);
168 }
169 }
170
171 // Message postprocessing.
172 Status status
173 = new Status(myNetworkChannel, messagetag, messagelength);
174 buf.postReceive(status, myChannelGroup.myClassLoader);
175 iorequest.myStatus = status;
176
177 // Report success to receiving thread.
178 iorequest.reportSuccess();
179
180 iorequest = null;
181 buf = null;
182 } catch (IOException exc) {
183 // Report failure to receiving thread. Terminate this thread.
184 if (iorequest != null) {
185 iorequest.reportFailure(exc);
186 }
187 break receiveloop;
188 } catch (InterruptedException exc) {
189 // Report failure to sending thread. Terminate this thread.
190 if (iorequest != null) {
191 InterruptedIOException exc2 = new InterruptedIOException();
192 exc2.initCause(exc);
193 iorequest.reportFailure(exc2);
194 }
195 break receiveloop;
196 } catch (RuntimeException exc) {
197 // Report failure to sending thread. Terminate this thread.
198 if (iorequest != null) {
199 iorequest.reportFailure(exc);
200 }
201 break receiveloop;
202 } catch (Error exc) {
203 // Report failure to sending thread. Terminate this thread.
204 if (iorequest != null) {
205 iorequest.reportFailure(exc);
206 }
207 break receiveloop;
208 }
209 }
210
211 // This thread is terminating. Enable garbage collection of data
212 // members.
213 myNetworkChannel.shutdownInput();
214 myNetworkChannel = null;
215 myChannelGroup = null;
216 mySocketChannel = null;
217 myIORequestList = null;
218 myByteBuffer = null;
219 }
220
221 }