1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.mp;
41
42 import java.io.EOFException;
43 import java.io.IOException;
44 import java.io.InterruptedIOException;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.SocketChannel;
47
48
49
50
51
52
53
54
55 class NetworkChannelReceiveThread
56 extends Thread {
57
58
59
60 private NetworkChannel myNetworkChannel;
61 private ChannelGroup myChannelGroup;
62
63
64 private SocketChannel mySocketChannel;
65
66
67 private IORequestList myIORequestList;
68
69
70 private ByteBuffer myByteBuffer;
71
72
73
74
75
76
77
78
79 NetworkChannelReceiveThread(NetworkChannel theNetworkChannel,
80 SocketChannel theSocketChannel) {
81 myNetworkChannel = theNetworkChannel;
82 myChannelGroup = theNetworkChannel.myChannelGroup;
83 mySocketChannel = theSocketChannel;
84 myIORequestList = theNetworkChannel.myIORequestList;
85 myByteBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
86 setDaemon(true);
87 start();
88 }
89
90
91
92
93
94 public void run() {
95 IORequest iorequest = null;
96 Buf buf = null;
97 myByteBuffer.position(0);
98 myByteBuffer.limit(0);
99
100
101 receiveloop:
102 for (;;) {
103 try {
104
105 while (myByteBuffer.remaining() < 13) {
106 myByteBuffer.compact();
107 if (mySocketChannel.read(myByteBuffer) == -1) {
108 break receiveloop;
109 }
110 myByteBuffer.flip();
111 }
112
113
114 int magic = myByteBuffer.getInt();
115 int messagetag = myByteBuffer.getInt();
116 byte messagetype = myByteBuffer.get();
117 int messagelength = myByteBuffer.getInt();
118
119
120
121 if (magic != Constants.MAGIC_NUMBER) {
122 myChannelGroup.myLogger.log("edu.rit.mp.NetworkChannelReceiveThread: Invalid magic number received");
123 myNetworkChannel.close();
124 break receiveloop;
125 }
126
127
128
129 iorequest
130 = myIORequestList.waitForMatch(myNetworkChannel, messagetag, messagetype);
131
132
133 buf = iorequest.myBuf;
134 buf.preReceive(messagelength);
135 int buflength = buf.myMessageLength;
136 int n;
137 int i = 0;
138 int num = Math.min(messagelength, buflength);
139
140
141
142 n = buf.receiveItems(i, num, myByteBuffer);
143 i += n;
144 num -= n;
145 while (num > 0) {
146 myByteBuffer.compact();
147 if (mySocketChannel.read(myByteBuffer) == -1) {
148 throw new EOFException("Unexpected end-of-stream while receiving message");
149 }
150 myByteBuffer.flip();
151 n = buf.receiveItems(i, num, myByteBuffer);
152 i += n;
153 num -= n;
154 }
155
156
157
158 num = messagelength - buflength;
159 if (num > 0) {
160 num -= buf.skipItems(num, myByteBuffer);
161 while (num > 0) {
162 myByteBuffer.compact();
163 if (mySocketChannel.read(myByteBuffer) == -1) {
164 throw new EOFException("Unexpected end-of-stream while receiving message");
165 }
166 myByteBuffer.flip();
167 num -= buf.skipItems(num, myByteBuffer);
168 }
169 }
170
171
172 Status status
173 = new Status(myNetworkChannel, messagetag, messagelength);
174 buf.postReceive(status, myChannelGroup.myClassLoader);
175 iorequest.myStatus = status;
176
177
178 iorequest.reportSuccess();
179
180 iorequest = null;
181 buf = null;
182 } catch (IOException exc) {
183
184 if (iorequest != null) {
185 iorequest.reportFailure(exc);
186 }
187 break receiveloop;
188 } catch (InterruptedException exc) {
189
190 if (iorequest != null) {
191 InterruptedIOException exc2 = new InterruptedIOException();
192 exc2.initCause(exc);
193 iorequest.reportFailure(exc2);
194 }
195 break receiveloop;
196 } catch (RuntimeException exc) {
197
198 if (iorequest != null) {
199 iorequest.reportFailure(exc);
200 }
201 break receiveloop;
202 } catch (Error exc) {
203
204 if (iorequest != null) {
205 iorequest.reportFailure(exc);
206 }
207 break receiveloop;
208 }
209 }
210
211
212
213 myNetworkChannel.shutdownInput();
214 myNetworkChannel = null;
215 myChannelGroup = null;
216 mySocketChannel = null;
217 myIORequestList = null;
218 myByteBuffer = null;
219 }
220
221 }