1 //******************************************************************************
2 //
3 // File: FrontendFileReader.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.FrontendFileReader
6 //
7 // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.FileInputStream;
44 import java.io.IOException;
45 import java.io.InputStream;
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.concurrent.LinkedBlockingQueue;
49
50 /**
51 * Class FrontendFileReader provides an object that reads sequential files in
52 * the job frontend process.
53 *
54 * @author Alan Kaminsky
55 * @version 05-Nov-2006
56 */
57 public class FrontendFileReader {
58
59 // Hidden data members.
60 private JobFrontend myJobFrontend;
61
62 // Mapping from frontend file descriptor to file handler.
63 private Map<Integer, FileHandler> myFileHandlerForFFD
64 = new HashMap<Integer, FileHandler>();
65
66 // Next frontend file descriptor.
67 private int myNextFFD = 2;
68
69 // Hidden helper classes.
70 /**
71 * Class FileHandler is an object that performs each file operation in a
72 * separate thread, so as not to block the job frontend's message processing
73 * thread.
74 *
75 * @author Alan Kaminsky
76 * @version 05-Nov-2006
77 */
78 private class FileHandler
79 extends Thread {
80
81 private LinkedBlockingQueue<Invocation> myQueue
82 = new LinkedBlockingQueue<Invocation>();
83
84 private InputStream myInputStream;
85 private byte[] myBuffer = new byte[0];
86
87 private abstract static class Invocation {
88
89 public abstract boolean invoke()
90 throws IOException;
91 }
92
93 /**
94 * Construct a new file handler.
95 */
96 public FileHandler() {
97 setDaemon(true);
98 start();
99 }
100
101 /**
102 * Construct a new file handler to read the given input stream.
103 *
104 * @param theInputStream Input stream.
105 */
106 public FileHandler(InputStream theInputStream) {
107 myInputStream = theInputStream;
108 setDaemon(true);
109 start();
110 }
111
112 /**
113 * Run this file handler.
114 */
115 public void run() {
116 try {
117 while (myQueue.take().invoke());
118 } catch (Throwable exc) {
119 myJobFrontend.terminateCancelJobOther(exc);
120 }
121 }
122
123 /**
124 * Open the given input file for reading.
125 *
126 * @param theJobBackend Job Backend that is calling this method.
127 * @param bfd Backend file descriptor.
128 * @param file File.
129 */
130 public void inputFileOpen(JobBackendRef theJobBackend,
131 int bfd,
132 File file) {
133 myQueue.offer(new InputFileOpenInvocation(theJobBackend, bfd, file));
134 }
135
136 private class InputFileOpenInvocation
137 extends Invocation {
138
139 private JobBackendRef theJobBackend;
140 private int bfd;
141 private File file;
142
143 public InputFileOpenInvocation(JobBackendRef theJobBackend,
144 int bfd,
145 File file) {
146 this.theJobBackend = theJobBackend;
147 this.bfd = bfd;
148 this.file = file;
149 }
150
151 public boolean invoke()
152 throws IOException {
153 return invokeInputFileOpen(theJobBackend, bfd, file);
154 }
155 }
156
157 private boolean invokeInputFileOpen(JobBackendRef theJobBackend,
158 int bfd,
159 File file)
160 throws IOException {
161 int ffd = 0;
162 IOException result = null;
163 boolean more = false;
164 try {
165 myInputStream = new FileInputStream(file);
166 synchronized (myFileHandlerForFFD) {
167 ffd = myNextFFD++;
168 myFileHandlerForFFD.put(ffd, this);
169 }
170 more = true;
171 } catch (IOException exc) {
172 result = exc;
173 }
174 theJobBackend.inputFileOpenResult(myJobFrontend, bfd, ffd, result);
175 return more;
176 }
177
178 /**
179 * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
180 * job's standard input stream; other values refer to a previously
181 * opened file.
182 *
183 * @param theJobBackend Job Backend that is calling this method.
184 * @param ffd Frontend file descriptor.
185 * @param len Number of bytes to read.
186 */
187 public void inputFileRead(JobBackendRef theJobBackend,
188 int ffd,
189 int len) {
190 myQueue.offer(new InputFileReadInvocation(theJobBackend, ffd, len));
191 }
192
193 private class InputFileReadInvocation
194 extends Invocation {
195
196 private JobBackendRef theJobBackend;
197 private int ffd;
198 private int len;
199
200 public InputFileReadInvocation(JobBackendRef theJobBackend,
201 int ffd,
202 int len) {
203 this.theJobBackend = theJobBackend;
204 this.ffd = ffd;
205 this.len = len;
206 }
207
208 public boolean invoke()
209 throws IOException {
210 return invokeInputFileRead(theJobBackend, ffd, len);
211 }
212 }
213
214 private boolean invokeInputFileRead(JobBackendRef theJobBackend,
215 int ffd,
216 int len)
217 throws IOException {
218 int resultlen = 0;
219 IOException resultexc = null;
220 boolean more = false;
221 try {
222 if (myBuffer.length < len) {
223 myBuffer = new byte[len];
224 }
225 resultlen = myInputStream.read(myBuffer, 0, len);
226 more = true;
227 } catch (IOException exc) {
228 resultexc = exc;
229 try {
230 myInputStream.close();
231 } catch (IOException ignored) {
232 }
233 synchronized (myFileHandlerForFFD) {
234 myFileHandlerForFFD.remove(ffd);
235 }
236 }
237 theJobBackend.inputFileReadResult(myJobFrontend, ffd, myBuffer, resultlen, resultexc);
238 return more;
239 }
240
241 /**
242 * Skip bytes from the given input file.
243 *
244 * @param theJobBackend Job Backend that is calling this method.
245 * @param ffd Frontend file descriptor.
246 * @param len Number of bytes to skip.
247 */
248 public void inputFileSkip(JobBackendRef theJobBackend,
249 int ffd,
250 long len) {
251 myQueue.offer(new InputFileSkipInvocation(theJobBackend, ffd, len));
252 }
253
254 private class InputFileSkipInvocation
255 extends Invocation {
256
257 private JobBackendRef theJobBackend;
258 private int ffd;
259 private long len;
260
261 public InputFileSkipInvocation(JobBackendRef theJobBackend,
262 int ffd,
263 long len) {
264 this.theJobBackend = theJobBackend;
265 this.ffd = ffd;
266 this.len = len;
267 }
268
269 public boolean invoke()
270 throws IOException {
271 return invokeInputFileSkip(theJobBackend, ffd, len);
272 }
273 }
274
275 private boolean invokeInputFileSkip(JobBackendRef theJobBackend,
276 int ffd,
277 long len)
278 throws IOException {
279 long resultlen = 0L;
280 IOException resultexc = null;
281 boolean more = false;
282 try {
283 resultlen = myInputStream.skip(len);
284 more = true;
285 } catch (IOException exc) {
286 resultexc = exc;
287 try {
288 myInputStream.close();
289 } catch (IOException ignored) {
290 }
291 synchronized (myFileHandlerForFFD) {
292 myFileHandlerForFFD.remove(ffd);
293 }
294 }
295 theJobBackend.inputFileSkipResult(myJobFrontend, ffd, resultlen, resultexc);
296 return more;
297 }
298
299 /**
300 * Close the given input file.
301 *
302 * @param theJobBackend Job Backend that is calling this method.
303 * @param ffd Frontend file descriptor.
304 */
305 public void inputFileClose(JobBackendRef theJobBackend,
306 int ffd) {
307 myQueue.offer(new InputFileCloseInvocation(theJobBackend, ffd));
308 }
309
310 private class InputFileCloseInvocation
311 extends Invocation {
312
313 private JobBackendRef theJobBackend;
314 private int ffd;
315
316 public InputFileCloseInvocation(JobBackendRef theJobBackend,
317 int ffd) {
318 this.theJobBackend = theJobBackend;
319 this.ffd = ffd;
320 }
321
322 public boolean invoke()
323 throws IOException {
324 return invokeInputFileClose(theJobBackend, ffd);
325 }
326 }
327
328 private boolean invokeInputFileClose(JobBackendRef theJobBackend,
329 int ffd)
330 throws IOException {
331 IOException result = null;
332 try {
333 myInputStream.close();
334 } catch (IOException exc) {
335 result = exc;
336 }
337 synchronized (myFileHandlerForFFD) {
338 myFileHandlerForFFD.remove(ffd);
339 }
340 theJobBackend.inputFileCloseResult(myJobFrontend, ffd, result);
341 return false;
342 }
343 }
344
345 // Exported constructors.
346 /**
347 * Construct a new frontend file reader.
348 *
349 * @param theJobFrontend Job Frontend.
350 */
351 public FrontendFileReader(JobFrontend theJobFrontend) {
352 myJobFrontend = theJobFrontend;
353
354 // Set up frontend file descriptor 1 (stdin).
355 myFileHandlerForFFD.put(1, new FileHandler(System.in));
356 }
357
358 // Exported operations.
359 /**
360 * Open the given input file for reading.
361 *
362 * @param theJobBackend Job Backend that is calling this method.
363 * @param bfd Backend file descriptor.
364 * @param file File.
365 * @exception IOException Thrown if an I/O error occurred.
366 * @throws java.io.IOException if any.
367 */
368 public void inputFileOpen(JobBackendRef theJobBackend,
369 int bfd,
370 File file)
371 throws IOException {
372 new FileHandler().inputFileOpen(theJobBackend, bfd, file);
373 }
374
375 /**
376 * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
377 * job's standard input stream; other values refer to a previously opened
378 * file.
379 *
380 * @param theJobBackend Job Backend that is calling this method.
381 * @param ffd Frontend file descriptor.
382 * @param len Number of bytes to read.
383 * @exception IOException Thrown if an I/O error occurred.
384 * @throws java.io.IOException if any.
385 */
386 public void inputFileRead(JobBackendRef theJobBackend,
387 int ffd,
388 int len)
389 throws IOException {
390 FileHandler handler = null;
391 synchronized (myFileHandlerForFFD) {
392 handler = myFileHandlerForFFD.get(ffd);
393 }
394 if (handler != null) {
395 handler.inputFileRead(theJobBackend, ffd, len);
396 } else {
397 theJobBackend.inputFileReadResult(myJobFrontend, ffd, null, -1,
398 new IOException("File closed, ffd=" + ffd));
399 }
400 }
401
402 /**
403 * Skip bytes from the given input file.
404 *
405 * @param theJobBackend Job Backend that is calling this method.
406 * @param ffd Frontend file descriptor.
407 * @param len Number of bytes to skip.
408 * @exception IOException Thrown if an I/O error occurred.
409 * @throws java.io.IOException if any.
410 */
411 public void inputFileSkip(JobBackendRef theJobBackend,
412 int ffd,
413 long len)
414 throws IOException {
415 FileHandler handler = null;
416 synchronized (myFileHandlerForFFD) {
417 handler = myFileHandlerForFFD.get(ffd);
418 }
419 if (handler != null) {
420 handler.inputFileSkip(theJobBackend, ffd, len);
421 } else {
422 theJobBackend.inputFileSkipResult(myJobFrontend, ffd, 0L,
423 new IOException("File closed, ffd=" + ffd));
424 }
425 }
426
427 /**
428 * Close the given input file.
429 *
430 * @param theJobBackend Job Backend that is calling this method.
431 * @param ffd Frontend file descriptor.
432 * @exception IOException Thrown if an I/O error occurred.
433 * @throws java.io.IOException if any.
434 */
435 public void inputFileClose(JobBackendRef theJobBackend,
436 int ffd)
437 throws IOException {
438 FileHandler handler = null;
439 synchronized (myFileHandlerForFFD) {
440 handler = myFileHandlerForFFD.get(ffd);
441 }
442 if (handler != null) {
443 handler.inputFileClose(theJobBackend, ffd);
444 } else {
445 theJobBackend.inputFileCloseResult(myJobFrontend, ffd,
446 new IOException("File closed, ffd=" + ffd));
447 }
448 }
449
450 }