View Javadoc
1   //******************************************************************************
2   //
3   // File:    IORequest.java
4   // Package: edu.rit.mp
5   // Unit:    Class edu.rit.mp.IORequest
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.mp;
41  
42  import java.io.IOException;
43  import java.io.InterruptedIOException;
44  import java.lang.reflect.Constructor;
45  
46  /**
47   * Class IORequest encapsulates the state of a message being sent or received in
48   * the Message Protocol (MP).
49   * <P>
50   * Class {@linkplain ChannelGroup}'s non-blocking <code>sendNoWait()</code> method
51   * includes an IORequest argument. This allows the caller to initiate the send
52   * operation and continue processing while the channel group sends the message
53   * in a separate thread. To wait for the message to be sent, the caller must
54   * call the IORequest object's <code>waitForFinish()</code> method.
55   * <P>
56   * Class {@linkplain ChannelGroup}'s non-blocking <code>receiveNoWait()</code>
57   * method includes an IORequest argument. This allows the caller to initiate the
58   * receive operation and continue processing while the channel group receives
59   * the message in a separate thread. To wait for the message to be received, the
60   * caller must call the IORequest object's <code>waitForFinish()</code> method,
61   * which returns a {@linkplain Status} object giving the results of the receive
62   * operation.
63   *
64   * @author Alan Kaminsky
65   * @version 12-Oct-2008
66   */
67  public class IORequest {
68  
69  // Hidden data members.
70      /**
71       * Channel on which to send or receive message. Null means "any channel"
72       * (only valid for receiving).
73       */
74      protected Channel myChannel;
75  
76      /**
77       * Message tag range lower bound. For sending, myTagLb gives the outgoing
78       * message tag, and myTagUb is not used. For receiving, an incoming message
79       * tag in the range myTagLb..myTabUb inclusive will match this I/O request.
80       */
81      protected int myTagLb;
82  
83      /**
84       * Message tag range upper bound.
85       */
86      protected int myTagUb;
87  
88      /**
89       * Source or destination of message items.
90       */
91      protected Buf myBuf;
92  
93      /**
94       * Status of a received message.
95       */
96      protected Status myStatus;
97  
98      // Exception that occurred, or null if none.
99      IOException myIOException;
100     RuntimeException myRuntimeException;
101     Error myError;
102 
103     // State: PENDING = request still in progress; SUCCEEDED = request finished
104     // successfully; FAILED = request finished unsuccessfully and myIOException,
105     // myRuntimeException, or myError contains the exception object to throw.
106     int myState = PENDING;
107     static final int PENDING = 0;
108     static final int SUCCEEDED = 1;
109     static final int FAILED = 2;
110 
111 // Exported constructors.
112     /**
113      * Construct a new I/O request object.
114      */
115     public IORequest() {
116     }
117 
118     /**
119      * Initialize this I/O request object.
120      *
121      * @param theChannel Channel on which to send or receive message. Null
122      * denotes "any channel."
123      * @param theTagLb Message tag range lower bound.
124      * @param theTagUb Message tag range upper bound.
125      * @param theBuf Source or destination of message items.
126      */
127     void initialize(Channel theChannel,
128             int theTagLb,
129             int theTagUb,
130             Buf theBuf) {
131         myChannel = theChannel;
132         myTagLb = theTagLb;
133         myTagUb = theTagUb;
134         myBuf = theBuf;
135         myStatus = null;
136         myIOException = null;
137         myRuntimeException = null;
138         myError = null;
139         myState = PENDING;
140     }
141 
142 // Exported operations.
143     /**
144      * Determine if this I/O request has finished.
145      *
146      * @return False if this I/O request has not finished, true if this I/O
147      * request has finished successfully.
148      * @exception IOException Thrown if this I/O request has finished and an I/O
149      * error occurred.
150      * @throws java.io.IOException if any.
151      */
152     public synchronized boolean isFinished()
153             throws IOException {
154         if (myState == PENDING) {
155             return false;
156         }
157         rethrow("IORequest: Exception during send or receive");
158         return true;
159     }
160 
161     /**
162      * Wait until the send or receive operation corresponding to this I/O
163      * request has finished. For a receive operation, a {@linkplain Status}
164      * object containing the results of the receive operation is returned; for a
165      * send operation, null is returned.
166      *
167      * @return Receive status for a receive operation, or null for a send
168      * operation.
169      * @exception IOException Thrown if an I/O error occurred.
170      * @throws java.io.IOException if any.
171      */
172     public synchronized Status waitForFinish()
173             throws IOException {
174         try {
175             while (myState == PENDING) {
176                 wait();
177             }
178             rethrow("IORequest: Exception during send or receive");
179             return myStatus;
180         } catch (InterruptedException exc) {
181             IOException exc2
182                     = new InterruptedIOException("IORequest: waitForFinish() interrupted");
183             exc2.initCause(exc);
184             throw exc2;
185         }
186     }
187 
188     /**
189      * Returns a string version of this I/O request.
190      *
191      * @return String version.
192      */
193     public String toString() {
194         StringBuilder b = new StringBuilder();
195         b.append("IORequest(myChannel=");
196         b.append(myChannel);
197         b.append(",myTagLb=");
198         b.append(myTagLb);
199         b.append(",myTagUb=");
200         b.append(myTagUb);
201         b.append(",myBuf=");
202         b.append(myBuf);
203         b.append(",myState=");
204         b.append(myState);
205         b.append(")");
206         return b.toString();
207     }
208 
209 // Hidden operations.
210     /**
211      * Determine if this I/O request matches the given IORequest.
212      *
213      * @param that I/O request to match.
214      *
215      * @return True if this I/O request matches <code>theIORequest</code>, false
216      * otherwise.
217      */
218     boolean match(IORequest that) {
219         return (this.myChannel == null
220                 || that.myChannel == null
221                 || this.myChannel == that.myChannel)
222                 && (this.myTagLb <= that.myTagLb)
223                 && (that.myTagLb <= this.myTagUb)
224                 && (this.myBuf.myMessageType == that.myBuf.myMessageType);
225     }
226 
227     /**
228      * Determine if this I/O request matches the given channel, message tag, and
229      * message type.
230      *
231      * @param channel Channel.
232      * @param tag Message tag.
233      * @param type Message type.
234      *
235      * @return True if this I/O request matches the given information, false
236      * otherwise.
237      */
238     boolean match(Channel channel,
239             int tag,
240             byte type) {
241         return (this.myChannel == null
242                 || channel == null
243                 || this.myChannel == channel)
244                 && (this.myTagLb <= tag)
245                 && (tag <= this.myTagUb)
246                 && (this.myBuf.myMessageType == type);
247     }
248 
249     /**
250      * Report that this I/O request succeeded.
251      */
252     protected synchronized void reportSuccess() {
253         myState = SUCCEEDED;
254         notifyAll();
255     }
256 
257     /**
258      * Report that this I/O request failed with an I/O exception.
259      *
260      * @param theIOException I/O exception.
261      */
262     protected synchronized void reportFailure(IOException theIOException) {
263         myIOException = theIOException;
264         myState = FAILED;
265         notifyAll();
266     }
267 
268     /**
269      * Report that this I/O request failed with a runtime exception.
270      *
271      * @param theRuntimeException Runtime exception.
272      */
273     synchronized void reportFailure(RuntimeException theRuntimeException) {
274         myRuntimeException = theRuntimeException;
275         myState = FAILED;
276         notifyAll();
277     }
278 
279     /**
280      * Report that this I/O request failed with an error.
281      *
282      * @param theError Error.
283      */
284     synchronized void reportFailure(Error theError) {
285         myError = theError;
286         myState = FAILED;
287         notifyAll();
288     }
289 
290     /**
291      * Rethrow the exception reported to this I/O request if any.
292      *
293      * @param msg Detail message for rethrown exception.
294      *
295      * @exception IOException Thrown if an I/O error occurred.
296      */
297     private void rethrow(String msg)
298             throws IOException {
299         if (myIOException != null) {
300             rethrowIOException(msg);
301         } else if (myRuntimeException != null) {
302             rethrowRuntimeException(msg);
303         } else if (myError != null) {
304             rethrowError(msg);
305         }
306     }
307 
308     /**
309      * Rethrow the I/O exception reported to this I/O request.
310      *
311      * @param msg Detail message for rethrown exception.
312      *
313      * @exception IOException Thrown if an I/O error occurred.
314      */
315     private void rethrowIOException(String msg)
316             throws IOException {
317         IOException exc2 = null;
318         try {
319             Class<? extends IOException> excClass
320                     = myIOException.getClass();
321             Constructor<? extends IOException> excConstructor
322                     = excClass.getConstructor(String.class);
323             exc2 = excConstructor.newInstance(msg);
324         } catch (Throwable exc) {
325             exc2 = new IOException(msg);
326         }
327         exc2.initCause(myIOException);
328         throw exc2;
329     }
330 
331     /**
332      * Rethrow the runtime exception reported to this I/O request.
333      *
334      * @param msg Detail message for rethrown exception.
335      */
336     private void rethrowRuntimeException(String msg) {
337         RuntimeException exc2 = null;
338         try {
339             Class<? extends RuntimeException> excClass
340                     = myRuntimeException.getClass();
341             Constructor<? extends RuntimeException> excConstructor
342                     = excClass.getConstructor(String.class);
343             exc2 = excConstructor.newInstance(msg);
344         } catch (Throwable exc) {
345             exc2 = new RuntimeException(msg);
346         }
347         exc2.initCause(myRuntimeException);
348         throw exc2;
349     }
350 
351     /**
352      * Rethrow the error reported to this I/O request.
353      *
354      * @param msg Detail message for rethrown exception.
355      */
356     private void rethrowError(String msg) {
357         Error exc2 = null;
358         try {
359             Class<? extends Error> excClass
360                     = myError.getClass();
361             Constructor<? extends Error> excConstructor
362                     = excClass.getConstructor(String.class);
363             exc2 = excConstructor.newInstance(msg);
364         } catch (Throwable exc) {
365             exc2 = new Error(msg);
366         }
367         exc2.initCause(myError);
368         throw exc2;
369     }
370 
371 }