1 //******************************************************************************
2 //
3 // File: BackendFileInputStream.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.BackendFileInputStream
6 //
7 // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.IOException;
44 import java.io.InputStream;
45 import java.io.InterruptedIOException;
46 import java.util.concurrent.LinkedBlockingQueue;
47
48 import edu.rit.mp.ByteBuf;
49 import edu.rit.util.Range;
50
51 /**
52 * Class BackendFileInputStream provides an object in a job backend process that
53 * reads a sequential file in the job frontend process. A backend file input
54 * stream is not constructed directly, rather it is created by a factory method
55 * in class {@linkplain BackendFileReader}.
56 * <P>
57 * <I>Note:</I> Class BackendFileInputStream does not do any buffering. Each
58 * method call sends a message to and receives a message from the job frontend.
59 * Consider layering a BufferedInputStream on top of the BackendFileInputStream.
60 *
61 * @author Alan Kaminsky
62 * @version 20-Nov-2006
63 */
64 public class BackendFileInputStream
65 extends InputStream {
66
67 // Hidden data members.
68 private JobFrontendRef myJobFrontend;
69 private JobBackendRef myJobBackend;
70
71 // Queue of results from job frontend.
72 private LinkedBlockingQueue<Result> myResultQueue
73 = new LinkedBlockingQueue<Result>();
74
75 private static class Result {
76
77 public int ffd;
78 public int readlen;
79 public long skiplen;
80 public IOException exc;
81
82 public Result(int ffd,
83 int readlen,
84 long skiplen,
85 IOException exc) {
86 this.ffd = ffd;
87 this.readlen = readlen;
88 this.skiplen = skiplen;
89 this.exc = exc;
90 }
91 }
92
93 // Frontend file descriptor.
94 private int ffd;
95
96 // Hidden constructors.
97 /**
98 * Construct a new backend file input stream. Call the <code>open()</code>
99 * method to open the file and obtain the frontend file descriptor.
100 *
101 * @param theJobFrontend Job Frontend.
102 * @param theJobBackend Job Backend.
103 */
104 BackendFileInputStream(JobFrontendRef theJobFrontend,
105 JobBackendRef theJobBackend) {
106 this.myJobFrontend = theJobFrontend;
107 this.myJobBackend = theJobBackend;
108 }
109
110 /**
111 * Construct a new backend file input stream. Use the given frontend file
112 * descriptor.
113 *
114 * @param theJobFrontend Job Frontend.
115 * @param theJobBackend Job Backend.
116 * @param ffd Frontend file descriptor.
117 */
118 BackendFileInputStream(JobFrontendRef theJobFrontend,
119 JobBackendRef theJobBackend,
120 int ffd) {
121 this.myJobFrontend = theJobFrontend;
122 this.myJobBackend = theJobBackend;
123 this.ffd = ffd;
124 }
125
126 // Exported operations.
127 /**
128 * Read a byte from this input stream. The byte is returned as an
129 * <code>int</code> in the range 0 .. 255.
130 *
131 * @exception IOException Thrown if an I/O error occurred.
132 * @return a int.
133 * @throws java.io.IOException if any.
134 */
135 public int read()
136 throws IOException {
137 byte[] buf = new byte[1];
138 int len = read(buf);
139 return len == -1 ? -1 : buf[0] & 0xFF;
140 }
141
142 /**
143 * Read the given byte array from this input stream.
144 *
145 * @param buf Byte array.
146 * @return Number of bytes actually read, or -1 if the end-of-stream was
147 * encountered.
148 * @exception NullPointerException (unchecked exception) Thrown if
149 * <code>buf</code> is null.
150 * @exception IOException Thrown if an I/O error occurred.
151 * @throws java.io.IOException if any.
152 */
153 public int read(byte[] buf)
154 throws IOException {
155 return read(buf, 0, buf.length);
156 }
157
158 /**
159 * {@inheritDoc}
160 *
161 * Read a portion of the given byte array from this input stream.
162 * @exception NullPointerException (unchecked exception) Thrown if
163 * <code>buf</code> is null.
164 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
165 * <code>off</code> < 0, <code>len</code>
166 * < 0, or <code>off+len</code> > <code>buf.length</code>.
167 * @exception IOException Thrown if an I/O error occurred.
168 */
169 public int read(byte[] buf,
170 int off,
171 int len)
172 throws IOException {
173 if (off < 0 || len < 0 || off + len > buf.length) {
174 throw new IndexOutOfBoundsException();
175 }
176 verifyOpen();
177 myJobFrontend.inputFileRead(myJobBackend, ffd, len);
178 Result r = getResult();
179 if (r.readlen > 0) {
180 ((JobFrontendProxy) myJobFrontend).receive(ffd,
181 ByteBuf.sliceBuffer(buf, new Range(off, off + len - 1)));
182 }
183 return r.readlen;
184 }
185
186 /**
187 * {@inheritDoc}
188 *
189 * Skip the given number of bytes from this input stream.
190 * @exception IOException Thrown if an I/O error occurred.
191 */
192 public long skip(long len)
193 throws IOException {
194 verifyOpen();
195 if (len < 0L) {
196 return 0L;
197 } else {
198 myJobFrontend.inputFileSkip(myJobBackend, ffd, len);
199 return getResult().skiplen;
200 }
201 }
202
203 /**
204 * Close this input stream.
205 *
206 * @exception IOException Thrown if an I/O error occurred.
207 * @throws java.io.IOException if any.
208 */
209 public void close()
210 throws IOException {
211 verifyOpen();
212 try {
213 myJobFrontend.inputFileClose(myJobBackend, ffd);
214 getResult();
215 } finally {
216 ffd = 0;
217 }
218 }
219
220 // Hidden operations.
221 /**
222 * Request the Job Frontend to open the file.
223 *
224 * @param bfd Backend file descriptor.
225 * @param file File.
226 *
227 * @return Frontend file descriptor.
228 *
229 * @exception IOException Thrown if an I/O error occurred.
230 */
231 int open(int bfd,
232 File file)
233 throws IOException {
234 myJobFrontend.inputFileOpen(myJobBackend, bfd, file);
235 this.ffd = getResult().ffd;
236 return this.ffd;
237 }
238
239 /**
240 * Get the next result from the result queue. Throw an IOException if
241 * necessary.
242 *
243 * @return Result object.
244 *
245 * @exception IOException Thrown if an I/O error occurred.
246 */
247 private Result getResult()
248 throws IOException {
249 try {
250 Result result = myResultQueue.take();
251 if (result.exc != null) {
252 throw result.exc;
253 }
254 return result;
255 } catch (InterruptedException exc) {
256 IOException exc2 = new InterruptedIOException("I/O interrupted");
257 exc2.initCause(exc);
258 throw exc2;
259 }
260 }
261
262 /**
263 * Put the given result into the result queue.
264 *
265 * @param ffd Frontend file descriptor.
266 * @param readlen Number of bytes actually read.
267 * @param skiplen Number of bytes actually skipped.
268 * @param exc Null if success, exception if failure.
269 */
270 void putResult(int ffd,
271 int readlen,
272 long skiplen,
273 IOException exc) {
274 myResultQueue.offer(new Result(ffd, readlen, skiplen, exc));
275 }
276
277 /**
278 * Verify that this file is open.
279 *
280 * @exception IOException Thrown if this file is not open.
281 */
282 private void verifyOpen()
283 throws IOException {
284 if (ffd == 0) {
285 throw new IOException("File closed");
286 }
287 }
288
289 }