View Javadoc
1   //******************************************************************************
2   //
3   // File:    NetworkChannelSendThread.java
4   // Package: edu.rit.mp
5   // Unit:    Class edu.rit.mp.NetworkChannelSendThread
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.mp;
41  
42  import java.io.IOException;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.SocketChannel;
45  import java.util.concurrent.LinkedBlockingQueue;
46  
47  /**
48   * Class NetworkChannelSendThread provides a thread for sending outgoing
49   * messages for a {@linkplain NetworkChannel}.
50   *
51   * @author Alan Kaminsky
52   * @version 12-Oct-2008
53   */
54  class NetworkChannelSendThread
55          extends Thread {
56  
57  // Hidden data members.
58      // Enclosing network channel.
59      private NetworkChannel myNetworkChannel;
60  
61      // Underlying socket channel.
62      private SocketChannel mySocketChannel;
63  
64      // Queue of outgoing I/O requests.
65      private LinkedBlockingQueue<IORequest> myOutgoingQueue;
66  
67      // Byte buffer.
68      private ByteBuffer myByteBuffer;
69  
70  // Hidden constructors.
71      /**
72       * Construct a new network channel send thread.
73       *
74       * @param theNetworkChannel Enclosing network channel.
75       * @param theSocketChannel Underlying socket channel.
76       * @param theOutgoingQueue Queue of outgoing I/O requests.
77       */
78      NetworkChannelSendThread(NetworkChannel theNetworkChannel,
79              SocketChannel theSocketChannel,
80              LinkedBlockingQueue<IORequest> theOutgoingQueue) {
81          myNetworkChannel = theNetworkChannel;
82          mySocketChannel = theSocketChannel;
83          myOutgoingQueue = theOutgoingQueue;
84          myByteBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_SIZE);
85          setDaemon(true);
86          start();
87      }
88  
89  // Exported operations.
90      /**
91       * Run this network channel send thread.
92       */
93      public void run() {
94          IORequest iorequest = null;
95          Buf buf = null;
96  
97          // Processing loop.
98          sendloop:
99          for (;;) {
100             try {
101                 // Wait for an I/O request to show up in the outgoing queue.
102                 iorequest = myOutgoingQueue.take();
103                 buf = iorequest.myBuf;
104 
105                 // Message preprocessing.
106                 buf.preSend();
107                 myByteBuffer.clear();
108                 int i = 0;
109                 int msglength = buf.myMessageLength;
110 
111                 // Write message header.
112                 myByteBuffer.putInt(Constants.MAGIC_NUMBER);
113                 myByteBuffer.putInt(iorequest.myTagLb);
114                 myByteBuffer.put(buf.myMessageType);
115                 myByteBuffer.putInt(msglength);
116 
117                 // Repeatedly transfer items from source buffer to byte buffer,
118                 // then from byte buffer to socket channel.
119                 while (i < msglength) {
120                     i += buf.sendItems(i, myByteBuffer);
121                     myByteBuffer.flip();
122                     mySocketChannel.write(myByteBuffer);
123                     myByteBuffer.compact();
124                 }
125                 myByteBuffer.flip();
126                 while (myByteBuffer.hasRemaining()) {
127                     mySocketChannel.write(myByteBuffer);
128                     myByteBuffer.compact();
129                     myByteBuffer.flip();
130                 }
131 
132                 // Message postprocessing.
133                 buf.postSend();
134 
135                 // Report success of current I/O request.
136                 iorequest.reportSuccess();
137 
138                 iorequest = null;
139                 buf = null;
140             } catch (IOException exc) {
141                 // Report failure of current I/O request.
142                 if (iorequest != null) {
143                     iorequest.reportFailure(exc);
144                 }
145             } catch (InterruptedException exc) {
146                 ChannelClosedException exc2
147                         = new ChannelClosedException("Channel closed");
148 
149                 // Report failure of current I/O request.
150                 if (iorequest != null) {
151                     iorequest.reportFailure(exc2);
152                 }
153 
154                 // Report failure of any pending I/O requests.
155                 while ((iorequest = myOutgoingQueue.poll()) != null) {
156                     iorequest.reportFailure(exc2);
157                 }
158 
159                 // Terminate this thread.
160                 break sendloop;
161             } catch (RuntimeException exc) {
162                 // Report failure of current I/O request.
163                 if (iorequest != null) {
164                     iorequest.reportFailure(exc);
165                 }
166             } catch (Error exc) {
167                 // Report failure of current I/O request.
168                 if (iorequest != null) {
169                     iorequest.reportFailure(exc);
170                 }
171             }
172         }
173 
174         // This thread is terminating. Enable garbage collection of data
175         // members.
176         myNetworkChannel = null;
177         mySocketChannel = null;
178         myOutgoingQueue = null;
179         myByteBuffer = null;
180     }
181 
182 }