View Javadoc
1   //******************************************************************************
2   //
3   // File:    FrontendFileReader.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.FrontendFileReader
6   //
7   // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import java.io.File;
43  import java.io.FileInputStream;
44  import java.io.IOException;
45  import java.io.InputStream;
46  import java.util.HashMap;
47  import java.util.Map;
48  import java.util.concurrent.LinkedBlockingQueue;
49  
50  /**
51   * Class FrontendFileReader provides an object that reads sequential files in
52   * the job frontend process.
53   *
54   * @author Alan Kaminsky
55   * @version 05-Nov-2006
56   */
57  public class FrontendFileReader {
58  
59  // Hidden data members.
60      private JobFrontend myJobFrontend;
61  
62      // Mapping from frontend file descriptor to file handler.
63      private Map<Integer, FileHandler> myFileHandlerForFFD
64              = new HashMap<Integer, FileHandler>();
65  
66      // Next frontend file descriptor.
67      private int myNextFFD = 2;
68  
69  // Hidden helper classes.
70      /**
71       * Class FileHandler is an object that performs each file operation in a
72       * separate thread, so as not to block the job frontend's message processing
73       * thread.
74       *
75       * @author Alan Kaminsky
76       * @version 05-Nov-2006
77       */
78      private class FileHandler
79              extends Thread {
80  
81          private LinkedBlockingQueue<Invocation> myQueue
82                  = new LinkedBlockingQueue<Invocation>();
83  
84          private InputStream myInputStream;
85          private byte[] myBuffer = new byte[0];
86  
87          private abstract static class Invocation {
88  
89              public abstract boolean invoke()
90                      throws IOException;
91          }
92  
93          /**
94           * Construct a new file handler.
95           */
96          public FileHandler() {
97              setDaemon(true);
98              start();
99          }
100 
101         /**
102          * Construct a new file handler to read the given input stream.
103          *
104          * @param theInputStream Input stream.
105          */
106         public FileHandler(InputStream theInputStream) {
107             myInputStream = theInputStream;
108             setDaemon(true);
109             start();
110         }
111 
112         /**
113          * Run this file handler.
114          */
115         public void run() {
116             try {
117                 while (myQueue.take().invoke());
118             } catch (Throwable exc) {
119                 myJobFrontend.terminateCancelJobOther(exc);
120             }
121         }
122 
123         /**
124          * Open the given input file for reading.
125          *
126          * @param theJobBackend Job Backend that is calling this method.
127          * @param bfd Backend file descriptor.
128          * @param file File.
129          */
130         public void inputFileOpen(JobBackendRef theJobBackend,
131                 int bfd,
132                 File file) {
133             myQueue.offer(new InputFileOpenInvocation(theJobBackend, bfd, file));
134         }
135 
136         private class InputFileOpenInvocation
137                 extends Invocation {
138 
139             private JobBackendRef theJobBackend;
140             private int bfd;
141             private File file;
142 
143             public InputFileOpenInvocation(JobBackendRef theJobBackend,
144                     int bfd,
145                     File file) {
146                 this.theJobBackend = theJobBackend;
147                 this.bfd = bfd;
148                 this.file = file;
149             }
150 
151             public boolean invoke()
152                     throws IOException {
153                 return invokeInputFileOpen(theJobBackend, bfd, file);
154             }
155         }
156 
157         private boolean invokeInputFileOpen(JobBackendRef theJobBackend,
158                 int bfd,
159                 File file)
160                 throws IOException {
161             int ffd = 0;
162             IOException result = null;
163             boolean more = false;
164             try {
165                 myInputStream = new FileInputStream(file);
166                 synchronized (myFileHandlerForFFD) {
167                     ffd = myNextFFD++;
168                     myFileHandlerForFFD.put(ffd, this);
169                 }
170                 more = true;
171             } catch (IOException exc) {
172                 result = exc;
173             }
174             theJobBackend.inputFileOpenResult(myJobFrontend, bfd, ffd, result);
175             return more;
176         }
177 
178         /**
179          * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
180          * job's standard input stream; other values refer to a previously
181          * opened file.
182          *
183          * @param theJobBackend Job Backend that is calling this method.
184          * @param ffd Frontend file descriptor.
185          * @param len Number of bytes to read.
186          */
187         public void inputFileRead(JobBackendRef theJobBackend,
188                 int ffd,
189                 int len) {
190             myQueue.offer(new InputFileReadInvocation(theJobBackend, ffd, len));
191         }
192 
193         private class InputFileReadInvocation
194                 extends Invocation {
195 
196             private JobBackendRef theJobBackend;
197             private int ffd;
198             private int len;
199 
200             public InputFileReadInvocation(JobBackendRef theJobBackend,
201                     int ffd,
202                     int len) {
203                 this.theJobBackend = theJobBackend;
204                 this.ffd = ffd;
205                 this.len = len;
206             }
207 
208             public boolean invoke()
209                     throws IOException {
210                 return invokeInputFileRead(theJobBackend, ffd, len);
211             }
212         }
213 
214         private boolean invokeInputFileRead(JobBackendRef theJobBackend,
215                 int ffd,
216                 int len)
217                 throws IOException {
218             int resultlen = 0;
219             IOException resultexc = null;
220             boolean more = false;
221             try {
222                 if (myBuffer.length < len) {
223                     myBuffer = new byte[len];
224                 }
225                 resultlen = myInputStream.read(myBuffer, 0, len);
226                 more = true;
227             } catch (IOException exc) {
228                 resultexc = exc;
229                 try {
230                     myInputStream.close();
231                 } catch (IOException ignored) {
232                 }
233                 synchronized (myFileHandlerForFFD) {
234                     myFileHandlerForFFD.remove(ffd);
235                 }
236             }
237             theJobBackend.inputFileReadResult(myJobFrontend, ffd, myBuffer, resultlen, resultexc);
238             return more;
239         }
240 
241         /**
242          * Skip bytes from the given input file.
243          *
244          * @param theJobBackend Job Backend that is calling this method.
245          * @param ffd Frontend file descriptor.
246          * @param len Number of bytes to skip.
247          */
248         public void inputFileSkip(JobBackendRef theJobBackend,
249                 int ffd,
250                 long len) {
251             myQueue.offer(new InputFileSkipInvocation(theJobBackend, ffd, len));
252         }
253 
254         private class InputFileSkipInvocation
255                 extends Invocation {
256 
257             private JobBackendRef theJobBackend;
258             private int ffd;
259             private long len;
260 
261             public InputFileSkipInvocation(JobBackendRef theJobBackend,
262                     int ffd,
263                     long len) {
264                 this.theJobBackend = theJobBackend;
265                 this.ffd = ffd;
266                 this.len = len;
267             }
268 
269             public boolean invoke()
270                     throws IOException {
271                 return invokeInputFileSkip(theJobBackend, ffd, len);
272             }
273         }
274 
275         private boolean invokeInputFileSkip(JobBackendRef theJobBackend,
276                 int ffd,
277                 long len)
278                 throws IOException {
279             long resultlen = 0L;
280             IOException resultexc = null;
281             boolean more = false;
282             try {
283                 resultlen = myInputStream.skip(len);
284                 more = true;
285             } catch (IOException exc) {
286                 resultexc = exc;
287                 try {
288                     myInputStream.close();
289                 } catch (IOException ignored) {
290                 }
291                 synchronized (myFileHandlerForFFD) {
292                     myFileHandlerForFFD.remove(ffd);
293                 }
294             }
295             theJobBackend.inputFileSkipResult(myJobFrontend, ffd, resultlen, resultexc);
296             return more;
297         }
298 
299         /**
300          * Close the given input file.
301          *
302          * @param theJobBackend Job Backend that is calling this method.
303          * @param ffd Frontend file descriptor.
304          */
305         public void inputFileClose(JobBackendRef theJobBackend,
306                 int ffd) {
307             myQueue.offer(new InputFileCloseInvocation(theJobBackend, ffd));
308         }
309 
310         private class InputFileCloseInvocation
311                 extends Invocation {
312 
313             private JobBackendRef theJobBackend;
314             private int ffd;
315 
316             public InputFileCloseInvocation(JobBackendRef theJobBackend,
317                     int ffd) {
318                 this.theJobBackend = theJobBackend;
319                 this.ffd = ffd;
320             }
321 
322             public boolean invoke()
323                     throws IOException {
324                 return invokeInputFileClose(theJobBackend, ffd);
325             }
326         }
327 
328         private boolean invokeInputFileClose(JobBackendRef theJobBackend,
329                 int ffd)
330                 throws IOException {
331             IOException result = null;
332             try {
333                 myInputStream.close();
334             } catch (IOException exc) {
335                 result = exc;
336             }
337             synchronized (myFileHandlerForFFD) {
338                 myFileHandlerForFFD.remove(ffd);
339             }
340             theJobBackend.inputFileCloseResult(myJobFrontend, ffd, result);
341             return false;
342         }
343     }
344 
345 // Exported constructors.
346     /**
347      * Construct a new frontend file reader.
348      *
349      * @param theJobFrontend Job Frontend.
350      */
351     public FrontendFileReader(JobFrontend theJobFrontend) {
352         myJobFrontend = theJobFrontend;
353 
354         // Set up frontend file descriptor 1 (stdin).
355         myFileHandlerForFFD.put(1, new FileHandler(System.in));
356     }
357 
358 // Exported operations.
359     /**
360      * Open the given input file for reading.
361      *
362      * @param theJobBackend Job Backend that is calling this method.
363      * @param bfd Backend file descriptor.
364      * @param file File.
365      * @exception IOException Thrown if an I/O error occurred.
366      * @throws java.io.IOException if any.
367      */
368     public void inputFileOpen(JobBackendRef theJobBackend,
369             int bfd,
370             File file)
371             throws IOException {
372         new FileHandler().inputFileOpen(theJobBackend, bfd, file);
373     }
374 
375     /**
376      * Read bytes from the given input file. <code>ffd</code> = 1 refers to the
377      * job's standard input stream; other values refer to a previously opened
378      * file.
379      *
380      * @param theJobBackend Job Backend that is calling this method.
381      * @param ffd Frontend file descriptor.
382      * @param len Number of bytes to read.
383      * @exception IOException Thrown if an I/O error occurred.
384      * @throws java.io.IOException if any.
385      */
386     public void inputFileRead(JobBackendRef theJobBackend,
387             int ffd,
388             int len)
389             throws IOException {
390         FileHandler handler = null;
391         synchronized (myFileHandlerForFFD) {
392             handler = myFileHandlerForFFD.get(ffd);
393         }
394         if (handler != null) {
395             handler.inputFileRead(theJobBackend, ffd, len);
396         } else {
397             theJobBackend.inputFileReadResult(myJobFrontend, ffd, null, -1,
398                     new IOException("File closed, ffd=" + ffd));
399         }
400     }
401 
402     /**
403      * Skip bytes from the given input file.
404      *
405      * @param theJobBackend Job Backend that is calling this method.
406      * @param ffd Frontend file descriptor.
407      * @param len Number of bytes to skip.
408      * @exception IOException Thrown if an I/O error occurred.
409      * @throws java.io.IOException if any.
410      */
411     public void inputFileSkip(JobBackendRef theJobBackend,
412             int ffd,
413             long len)
414             throws IOException {
415         FileHandler handler = null;
416         synchronized (myFileHandlerForFFD) {
417             handler = myFileHandlerForFFD.get(ffd);
418         }
419         if (handler != null) {
420             handler.inputFileSkip(theJobBackend, ffd, len);
421         } else {
422             theJobBackend.inputFileSkipResult(myJobFrontend, ffd, 0L,
423                     new IOException("File closed, ffd=" + ffd));
424         }
425     }
426 
427     /**
428      * Close the given input file.
429      *
430      * @param theJobBackend Job Backend that is calling this method.
431      * @param ffd Frontend file descriptor.
432      * @exception IOException Thrown if an I/O error occurred.
433      * @throws java.io.IOException if any.
434      */
435     public void inputFileClose(JobBackendRef theJobBackend,
436             int ffd)
437             throws IOException {
438         FileHandler handler = null;
439         synchronized (myFileHandlerForFFD) {
440             handler = myFileHandlerForFFD.get(ffd);
441         }
442         if (handler != null) {
443             handler.inputFileClose(theJobBackend, ffd);
444         } else {
445             theJobBackend.inputFileCloseResult(myJobFrontend, ffd,
446                     new IOException("File closed, ffd=" + ffd));
447         }
448     }
449 
450 }