View Javadoc
1   //******************************************************************************
2   //
3   // File:    BackendFileInputStream.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.BackendFileInputStream
6   //
7   // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import java.io.File;
43  import java.io.IOException;
44  import java.io.InputStream;
45  import java.io.InterruptedIOException;
46  import java.util.concurrent.LinkedBlockingQueue;
47  
48  import edu.rit.mp.ByteBuf;
49  import edu.rit.util.Range;
50  
51  /**
52   * Class BackendFileInputStream provides an object in a job backend process that
53   * reads a sequential file in the job frontend process. A backend file input
54   * stream is not constructed directly, rather it is created by a factory method
55   * in class {@linkplain BackendFileReader}.
56   * <P>
57   * <I>Note:</I> Class BackendFileInputStream does not do any buffering. Each
58   * method call sends a message to and receives a message from the job frontend.
59   * Consider layering a BufferedInputStream on top of the BackendFileInputStream.
60   *
61   * @author Alan Kaminsky
62   * @version 20-Nov-2006
63   */
64  public class BackendFileInputStream
65          extends InputStream {
66  
67  // Hidden data members.
68      private JobFrontendRef myJobFrontend;
69      private JobBackendRef myJobBackend;
70  
71      // Queue of results from job frontend.
72      private LinkedBlockingQueue<Result> myResultQueue
73              = new LinkedBlockingQueue<Result>();
74  
75      private static class Result {
76  
77          public int ffd;
78          public int readlen;
79          public long skiplen;
80          public IOException exc;
81  
82          public Result(int ffd,
83                  int readlen,
84                  long skiplen,
85                  IOException exc) {
86              this.ffd = ffd;
87              this.readlen = readlen;
88              this.skiplen = skiplen;
89              this.exc = exc;
90          }
91      }
92  
93      // Frontend file descriptor.
94      private int ffd;
95  
96  // Hidden constructors.
97      /**
98       * Construct a new backend file input stream. Call the <code>open()</code>
99       * method to open the file and obtain the frontend file descriptor.
100      *
101      * @param theJobFrontend Job Frontend.
102      * @param theJobBackend Job Backend.
103      */
104     BackendFileInputStream(JobFrontendRef theJobFrontend,
105             JobBackendRef theJobBackend) {
106         this.myJobFrontend = theJobFrontend;
107         this.myJobBackend = theJobBackend;
108     }
109 
110     /**
111      * Construct a new backend file input stream. Use the given frontend file
112      * descriptor.
113      *
114      * @param theJobFrontend Job Frontend.
115      * @param theJobBackend Job Backend.
116      * @param ffd Frontend file descriptor.
117      */
118     BackendFileInputStream(JobFrontendRef theJobFrontend,
119             JobBackendRef theJobBackend,
120             int ffd) {
121         this.myJobFrontend = theJobFrontend;
122         this.myJobBackend = theJobBackend;
123         this.ffd = ffd;
124     }
125 
126 // Exported operations.
127     /**
128      * Read a byte from this input stream. The byte is returned as an
129      * <code>int</code> in the range 0 .. 255.
130      *
131      * @exception IOException Thrown if an I/O error occurred.
132      * @return a int.
133      * @throws java.io.IOException if any.
134      */
135     public int read()
136             throws IOException {
137         byte[] buf = new byte[1];
138         int len = read(buf);
139         return len == -1 ? -1 : buf[0] & 0xFF;
140     }
141 
142     /**
143      * Read the given byte array from this input stream.
144      *
145      * @param buf Byte array.
146      * @return Number of bytes actually read, or -1 if the end-of-stream was
147      * encountered.
148      * @exception NullPointerException (unchecked exception) Thrown if
149      * <code>buf</code> is null.
150      * @exception IOException Thrown if an I/O error occurred.
151      * @throws java.io.IOException if any.
152      */
153     public int read(byte[] buf)
154             throws IOException {
155         return read(buf, 0, buf.length);
156     }
157 
158     /**
159      * {@inheritDoc}
160      *
161      * Read a portion of the given byte array from this input stream.
162      * @exception NullPointerException (unchecked exception) Thrown if
163      * <code>buf</code> is null.
164      * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
165      * <code>off</code> &lt; 0, <code>len</code>
166      * &lt; 0, or <code>off+len</code> &gt; <code>buf.length</code>.
167      * @exception IOException Thrown if an I/O error occurred.
168      */
169     public int read(byte[] buf,
170             int off,
171             int len)
172             throws IOException {
173         if (off < 0 || len < 0 || off + len > buf.length) {
174             throw new IndexOutOfBoundsException();
175         }
176         verifyOpen();
177         myJobFrontend.inputFileRead(myJobBackend, ffd, len);
178         Result r = getResult();
179         if (r.readlen > 0) {
180             ((JobFrontendProxy) myJobFrontend).receive(ffd,
181                     ByteBuf.sliceBuffer(buf, new Range(off, off + len - 1)));
182         }
183         return r.readlen;
184     }
185 
186     /**
187      * {@inheritDoc}
188      *
189      * Skip the given number of bytes from this input stream.
190      * @exception IOException Thrown if an I/O error occurred.
191      */
192     public long skip(long len)
193             throws IOException {
194         verifyOpen();
195         if (len < 0L) {
196             return 0L;
197         } else {
198             myJobFrontend.inputFileSkip(myJobBackend, ffd, len);
199             return getResult().skiplen;
200         }
201     }
202 
203     /**
204      * Close this input stream.
205      *
206      * @exception IOException Thrown if an I/O error occurred.
207      * @throws java.io.IOException if any.
208      */
209     public void close()
210             throws IOException {
211         verifyOpen();
212         try {
213             myJobFrontend.inputFileClose(myJobBackend, ffd);
214             getResult();
215         } finally {
216             ffd = 0;
217         }
218     }
219 
220 // Hidden operations.
221     /**
222      * Request the Job Frontend to open the file.
223      *
224      * @param bfd Backend file descriptor.
225      * @param file File.
226      *
227      * @return Frontend file descriptor.
228      *
229      * @exception IOException Thrown if an I/O error occurred.
230      */
231     int open(int bfd,
232             File file)
233             throws IOException {
234         myJobFrontend.inputFileOpen(myJobBackend, bfd, file);
235         this.ffd = getResult().ffd;
236         return this.ffd;
237     }
238 
239     /**
240      * Get the next result from the result queue. Throw an IOException if
241      * necessary.
242      *
243      * @return Result object.
244      *
245      * @exception IOException Thrown if an I/O error occurred.
246      */
247     private Result getResult()
248             throws IOException {
249         try {
250             Result result = myResultQueue.take();
251             if (result.exc != null) {
252                 throw result.exc;
253             }
254             return result;
255         } catch (InterruptedException exc) {
256             IOException exc2 = new InterruptedIOException("I/O interrupted");
257             exc2.initCause(exc);
258             throw exc2;
259         }
260     }
261 
262     /**
263      * Put the given result into the result queue.
264      *
265      * @param ffd Frontend file descriptor.
266      * @param readlen Number of bytes actually read.
267      * @param skiplen Number of bytes actually skipped.
268      * @param exc Null if success, exception if failure.
269      */
270     void putResult(int ffd,
271             int readlen,
272             long skiplen,
273             IOException exc) {
274         myResultQueue.offer(new Result(ffd, readlen, skiplen, exc));
275     }
276 
277     /**
278      * Verify that this file is open.
279      *
280      * @exception IOException Thrown if this file is not open.
281      */
282     private void verifyOpen()
283             throws IOException {
284         if (ffd == 0) {
285             throw new IOException("File closed");
286         }
287     }
288 
289 }