View Javadoc
1   //******************************************************************************
2   //
3   // File:    FrontendFileWriter.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.FrontendFileWriter
6   //
7   // This Java source file is copyright (C) 2007 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import java.io.File;
43  import java.io.FileOutputStream;
44  import java.io.IOException;
45  import java.io.OutputStream;
46  import java.util.HashMap;
47  import java.util.Map;
48  import java.util.concurrent.LinkedBlockingQueue;
49  
50  import edu.rit.mp.ByteBuf;
51  
52  /**
53   * Class FrontendFileWriter provides an object that writes sequential files in
54   * the job frontend process.
55   *
56   * @author Alan Kaminsky
57   * @version 21-Jun-2007
58   */
59  public class FrontendFileWriter {
60  
61  // Hidden data members.
62      private JobFrontend myJobFrontend;
63  
64      // Mapping from frontend file descriptor to file handler.
65      private Map<Integer, FileHandler> myFileHandlerForFFD
66              = new HashMap<Integer, FileHandler>();
67  
68      // Next frontend file descriptor.
69      private int myNextFFD = 3;
70  
71  // Hidden helper classes.
72      /**
73       * Class FileHandler is an object that performs each file operation in a
74       * separate thread, so as not to block the job frontend's message processing
75       * thread.
76       *
77       * @author Alan Kaminsky
78       * @version 20-Nov-2006
79       */
80      private class FileHandler
81              extends Thread {
82  
83          private LinkedBlockingQueue<Invocation> myQueue
84                  = new LinkedBlockingQueue<Invocation>();
85  
86          private OutputStream myOutputStream;
87  
88          private byte[] myByteArray = new byte[0];
89          private ByteBuf myByteBuf = ByteBuf.buffer(myByteArray);
90  
91          private abstract static class Invocation {
92  
93              public abstract boolean invoke()
94                      throws IOException;
95          }
96  
97          /**
98           * Construct a new file handler.
99           */
100         public FileHandler() {
101             setDaemon(true);
102             start();
103         }
104 
105         /**
106          * Construct a new file handler to write the given output stream.
107          *
108          * @param theOutputStream Output stream.
109          */
110         public FileHandler(OutputStream theOutputStream) {
111             myOutputStream = theOutputStream;
112             setDaemon(true);
113             start();
114         }
115 
116         /**
117          * Run this file handler.
118          */
119         public void run() {
120             try {
121                 while (myQueue.take().invoke());
122             } catch (Throwable exc) {
123                 myJobFrontend.terminateCancelJobOther(exc);
124             }
125         }
126 
127         /**
128          * Open the given output file for writing or appending.
129          *
130          * @param theJobBackend Job Backend that is calling this method.
131          * @param bfd Backend file descriptor.
132          * @param file File.
133          * @param append True to append, false to overwrite.
134          */
135         public void outputFileOpen(JobBackendRef theJobBackend,
136                 int bfd,
137                 File file,
138                 boolean append) {
139             myQueue.offer(new OutputFileOpenInvocation(theJobBackend, bfd, file, append));
140         }
141 
142         private class OutputFileOpenInvocation
143                 extends Invocation {
144 
145             private JobBackendRef theJobBackend;
146             private int bfd;
147             private File file;
148             private boolean append;
149 
150             public OutputFileOpenInvocation(JobBackendRef theJobBackend,
151                     int bfd,
152                     File file,
153                     boolean append) {
154                 this.theJobBackend = theJobBackend;
155                 this.bfd = bfd;
156                 this.file = file;
157                 this.append = append;
158             }
159 
160             public boolean invoke()
161                     throws IOException {
162                 return invokeOutputFileOpen(theJobBackend, bfd, file, append);
163             }
164         }
165 
166         private boolean invokeOutputFileOpen(JobBackendRef theJobBackend,
167                 int bfd,
168                 File file,
169                 boolean append)
170                 throws IOException {
171             int ffd = 0;
172             IOException result = null;
173             boolean more = false;
174             try {
175                 myOutputStream = new FileOutputStream(file, append);
176                 synchronized (myFileHandlerForFFD) {
177                     ffd = myNextFFD++;
178                     myFileHandlerForFFD.put(ffd, this);
179                 }
180                 more = true;
181             } catch (IOException exc) {
182                 result = exc;
183             }
184             theJobBackend.outputFileOpenResult(myJobFrontend, bfd, ffd, result);
185             return more;
186         }
187 
188         /**
189          * Write the given bytes to the given output file. <code>ffd</code> = 1
190          * refers to the job's standard output stream; <code>ffd</code> = 2 refers
191          * to the job's standard error stream; other values refer to a
192          * previously opened file.
193          *
194          * @param theJobBackend Job Backend that is calling this method.
195          * @param ffd Frontend file descriptor.
196          * @param len Number of bytes to write.
197          */
198         public void outputFileWrite(JobBackendRef theJobBackend,
199                 int ffd,
200                 int len) {
201             myQueue.offer(new OutputFileWriteInvocation(theJobBackend, ffd, len));
202         }
203 
204         private class OutputFileWriteInvocation
205                 extends Invocation {
206 
207             private JobBackendRef theJobBackend;
208             private int ffd;
209             private int len;
210 
211             public OutputFileWriteInvocation(JobBackendRef theJobBackend,
212                     int ffd,
213                     int len) {
214                 this.theJobBackend = theJobBackend;
215                 this.ffd = ffd;
216                 this.len = len;
217             }
218 
219             public boolean invoke()
220                     throws IOException {
221                 return invokeOutputFileWrite(theJobBackend, ffd, len);
222             }
223         }
224 
225         private boolean invokeOutputFileWrite(JobBackendRef theJobBackend,
226                 int ffd,
227                 int len)
228                 throws IOException {
229             IOException result = null;
230             boolean more = false;
231             try {
232                 if (myByteArray.length < len) {
233                     myByteArray = new byte[len];
234                     myByteBuf = ByteBuf.buffer(myByteArray);
235                 }
236                 ((JobBackendProxy) theJobBackend).receive(ffd, myByteBuf);
237                 myOutputStream.write(myByteArray, 0, len);
238                 more = true;
239             } catch (IOException exc) {
240                 result = exc;
241                 try {
242                     myOutputStream.close();
243                 } catch (IOException ignored) {
244                 }
245                 synchronized (myFileHandlerForFFD) {
246                     myFileHandlerForFFD.remove(ffd);
247                 }
248             }
249             theJobBackend.outputFileWriteResult(myJobFrontend, ffd, result);
250             return more;
251         }
252 
253         /**
254          * Flush accumulated bytes to the given output file.
255          *
256          * @param theJobBackend Job Backend that is calling this method.
257          * @param ffd Frontend file descriptor.
258          */
259         public void outputFileFlush(JobBackendRef theJobBackend,
260                 int ffd) {
261             myQueue.offer(new OutputFileFlushInvocation(theJobBackend, ffd));
262         }
263 
264         private class OutputFileFlushInvocation
265                 extends Invocation {
266 
267             private JobBackendRef theJobBackend;
268             private int ffd;
269 
270             public OutputFileFlushInvocation(JobBackendRef theJobBackend,
271                     int ffd) {
272                 this.theJobBackend = theJobBackend;
273                 this.ffd = ffd;
274             }
275 
276             public boolean invoke()
277                     throws IOException {
278                 return invokeOutputFileFlush(theJobBackend, ffd);
279             }
280         }
281 
282         private boolean invokeOutputFileFlush(JobBackendRef theJobBackend,
283                 int ffd)
284                 throws IOException {
285             IOException result = null;
286             boolean more = false;
287             try {
288                 myOutputStream.flush();
289                 more = true;
290             } catch (IOException exc) {
291                 result = exc;
292                 try {
293                     myOutputStream.close();
294                 } catch (IOException ignored) {
295                 }
296                 synchronized (myFileHandlerForFFD) {
297                     myFileHandlerForFFD.remove(ffd);
298                 }
299             }
300             theJobBackend.outputFileFlushResult(myJobFrontend, ffd, result);
301             return more;
302         }
303 
304         /**
305          * Close the given output file.
306          *
307          * @param theJobBackend Job Backend that is calling this method.
308          * @param ffd Frontend file descriptor.
309          */
310         public void outputFileClose(JobBackendRef theJobBackend,
311                 int ffd) {
312             myQueue.offer(new OutputFileCloseInvocation(theJobBackend, ffd));
313         }
314 
315         private class OutputFileCloseInvocation
316                 extends Invocation {
317 
318             private JobBackendRef theJobBackend;
319             private int ffd;
320 
321             public OutputFileCloseInvocation(JobBackendRef theJobBackend,
322                     int ffd) {
323                 this.theJobBackend = theJobBackend;
324                 this.ffd = ffd;
325             }
326 
327             public boolean invoke()
328                     throws IOException {
329                 return invokeOutputFileClose(theJobBackend, ffd);
330             }
331         }
332 
333         private boolean invokeOutputFileClose(JobBackendRef theJobBackend,
334                 int ffd)
335                 throws IOException {
336             IOException result = null;
337             try {
338                 myOutputStream.close();
339             } catch (IOException exc) {
340                 result = exc;
341             }
342             synchronized (myFileHandlerForFFD) {
343                 myFileHandlerForFFD.remove(ffd);
344             }
345             theJobBackend.outputFileCloseResult(myJobFrontend, ffd, result);
346             return false;
347         }
348     }
349 
350 // Exported constructors.
351     /**
352      * Construct a new frontend file writer.
353      *
354      * @param theJobFrontend Job Frontend.
355      */
356     public FrontendFileWriter(JobFrontend theJobFrontend) {
357         myJobFrontend = theJobFrontend;
358 
359         // Set up frontend file descriptor 1 (stdout) and 2 (stderr).
360         myFileHandlerForFFD.put(1, new FileHandler(System.out));
361         myFileHandlerForFFD.put(2, new FileHandler(System.err));
362     }
363 
364 // Exported operations.
365     /**
366      * Open the given output file for writing or appending.
367      *
368      * @param theJobBackend Job Backend that is calling this method.
369      * @param bfd Backend file descriptor.
370      * @param file File.
371      * @param append True to append, false to overwrite.
372      * @exception IOException Thrown if an I/O error occurred.
373      * @throws java.io.IOException if any.
374      */
375     public void outputFileOpen(JobBackendRef theJobBackend,
376             int bfd,
377             File file,
378             boolean append)
379             throws IOException {
380         new FileHandler().outputFileOpen(theJobBackend, bfd, file, append);
381     }
382 
383     /**
384      * Write the given bytes to the given output file. <code>ffd</code> = 1 refers
385      * to the job's standard output stream; <code>ffd</code> = 2 refers to the job's
386      * standard error stream; other values refer to a previously opened file.
387      *
388      * @param theJobBackend Job Backend that is calling this method.
389      * @param ffd Frontend file descriptor.
390      * @param len Number of bytes to write.
391      * @exception IOException Thrown if an I/O error occurred.
392      * @throws java.io.IOException if any.
393      */
394     public void outputFileWrite(JobBackendRef theJobBackend,
395             int ffd,
396             int len)
397             throws IOException {
398         FileHandler handler = null;
399         synchronized (myFileHandlerForFFD) {
400             handler = myFileHandlerForFFD.get(ffd);
401         }
402         if (handler != null) {
403             handler.outputFileWrite(theJobBackend, ffd, len);
404         } else {
405             theJobBackend.outputFileWriteResult(myJobFrontend, ffd,
406                     new IOException("File closed, ffd=" + ffd));
407         }
408     }
409 
410     /**
411      * Flush accumulated bytes to the given output file.
412      *
413      * @param theJobBackend Job Backend that is calling this method.
414      * @param ffd Frontend file descriptor.
415      * @exception IOException Thrown if an I/O error occurred.
416      * @throws java.io.IOException if any.
417      */
418     public void outputFileFlush(JobBackendRef theJobBackend,
419             int ffd)
420             throws IOException {
421         FileHandler handler = null;
422         synchronized (myFileHandlerForFFD) {
423             handler = myFileHandlerForFFD.get(ffd);
424         }
425         if (handler != null) {
426             handler.outputFileFlush(theJobBackend, ffd);
427         } else {
428             theJobBackend.outputFileFlushResult(myJobFrontend, ffd,
429                     new IOException("File closed, ffd=" + ffd));
430         }
431     }
432 
433     /**
434      * Close the given output file.
435      *
436      * @param theJobBackend Job Backend that is calling this method.
437      * @param ffd Frontend file descriptor.
438      * @exception IOException Thrown if an I/O error occurred.
439      * @throws java.io.IOException if any.
440      */
441     public void outputFileClose(JobBackendRef theJobBackend,
442             int ffd)
443             throws IOException {
444         FileHandler handler = null;
445         synchronized (myFileHandlerForFFD) {
446             handler = myFileHandlerForFFD.get(ffd);
447         }
448         if (handler != null) {
449             handler.outputFileClose(theJobBackend, ffd);
450         } else {
451             theJobBackend.outputFileCloseResult(myJobFrontend, ffd,
452                     new IOException("File closed, ffd=" + ffd));
453         }
454     }
455 
456 }