1 //******************************************************************************
2 //
3 // File: NetworkChannelSendThread.java
4 // Package: edu.rit.mp
5 // Unit: Class edu.rit.mp.NetworkChannelSendThread
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.mp;
41
42 import java.io.IOException;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.SocketChannel;
45 import java.util.concurrent.LinkedBlockingQueue;
46
47 /**
48 * Class NetworkChannelSendThread provides a thread for sending outgoing
49 * messages for a {@linkplain NetworkChannel}.
50 *
51 * @author Alan Kaminsky
52 * @version 12-Oct-2008
53 */
54 class NetworkChannelSendThread
55 extends Thread {
56
57 // Hidden data members.
58 // Enclosing network channel.
59 private NetworkChannel myNetworkChannel;
60
61 // Underlying socket channel.
62 private SocketChannel mySocketChannel;
63
64 // Queue of outgoing I/O requests.
65 private LinkedBlockingQueue<IORequest> myOutgoingQueue;
66
67 // Byte buffer.
68 private ByteBuffer myByteBuffer;
69
70 // Hidden constructors.
71 /**
72 * Construct a new network channel send thread.
73 *
74 * @param theNetworkChannel Enclosing network channel.
75 * @param theSocketChannel Underlying socket channel.
76 * @param theOutgoingQueue Queue of outgoing I/O requests.
77 */
78 NetworkChannelSendThread(NetworkChannel theNetworkChannel,
79 SocketChannel theSocketChannel,
80 LinkedBlockingQueue<IORequest> theOutgoingQueue) {
81 myNetworkChannel = theNetworkChannel;
82 mySocketChannel = theSocketChannel;
83 myOutgoingQueue = theOutgoingQueue;
84 myByteBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
85 setDaemon(true);
86 start();
87 }
88
89 // Exported operations.
90 /**
91 * Run this network channel send thread.
92 */
93 public void run() {
94 IORequest iorequest = null;
95 Buf buf = null;
96
97 // Processing loop.
98 sendloop:
99 for (;;) {
100 try {
101 // Wait for an I/O request to show up in the outgoing queue.
102 iorequest = myOutgoingQueue.take();
103 buf = iorequest.myBuf;
104
105 // Message preprocessing.
106 buf.preSend();
107 myByteBuffer.clear();
108 int i = 0;
109 int msglength = buf.myMessageLength;
110
111 // Write message header.
112 myByteBuffer.putInt(Constants.MAGIC_NUMBER);
113 myByteBuffer.putInt(iorequest.myTagLb);
114 myByteBuffer.put(buf.myMessageType);
115 myByteBuffer.putInt(msglength);
116
117 // Repeatedly transfer items from source buffer to byte buffer,
118 // then from byte buffer to socket channel.
119 while (i < msglength) {
120 i += buf.sendItems(i, myByteBuffer);
121 myByteBuffer.flip();
122 mySocketChannel.write(myByteBuffer);
123 myByteBuffer.compact();
124 }
125 myByteBuffer.flip();
126 while (myByteBuffer.hasRemaining()) {
127 mySocketChannel.write(myByteBuffer);
128 myByteBuffer.compact();
129 myByteBuffer.flip();
130 }
131
132 // Message postprocessing.
133 buf.postSend();
134
135 // Report success of current I/O request.
136 iorequest.reportSuccess();
137
138 iorequest = null;
139 buf = null;
140 } catch (IOException exc) {
141 // Report failure of current I/O request.
142 if (iorequest != null) {
143 iorequest.reportFailure(exc);
144 }
145 } catch (InterruptedException exc) {
146 ChannelClosedException exc2
147 = new ChannelClosedException("Channel closed");
148
149 // Report failure of current I/O request.
150 if (iorequest != null) {
151 iorequest.reportFailure(exc2);
152 }
153
154 // Report failure of any pending I/O requests.
155 while ((iorequest = myOutgoingQueue.poll()) != null) {
156 iorequest.reportFailure(exc2);
157 }
158
159 // Terminate this thread.
160 break sendloop;
161 } catch (RuntimeException exc) {
162 // Report failure of current I/O request.
163 if (iorequest != null) {
164 iorequest.reportFailure(exc);
165 }
166 } catch (Error exc) {
167 // Report failure of current I/O request.
168 if (iorequest != null) {
169 iorequest.reportFailure(exc);
170 }
171 }
172 }
173
174 // This thread is terminating. Enable garbage collection of data
175 // members.
176 myNetworkChannel = null;
177 mySocketChannel = null;
178 myOutgoingQueue = null;
179 myByteBuffer = null;
180 }
181
182 }