View Javadoc
1   //******************************************************************************
2   //
3   // File:    BackendFileOutputStream.java
4   // Package: edu.rit.pj.cluster
5   // Unit:    Class edu.rit.pj.cluster.BackendFileOutputStream
6   //
7   // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.cluster;
41  
42  import java.io.File;
43  import java.io.IOException;
44  import java.io.InterruptedIOException;
45  import java.io.OutputStream;
46  import java.util.concurrent.LinkedBlockingQueue;
47  
48  /**
49   * Class BackendFileOutputStream provides an object in a job backend process
50   * that writes a sequential file in the job frontend process. A backend file
51   * output stream is not constructed directly, rather it is created by a factory
52   * method in class {@linkplain BackendFileWriter}.
53   * <P>
54   * <I>Note:</I> Class BackendFileOutputStream does not do any buffering. Each
55   * method call sends a message to and receives a message from the job frontend.
56   * Consider layering a BufferedOutputStream on top of the
57   * BackendFileOutputStream.
58   *
59   * @author Alan Kaminsky
60   * @version 05-Nov-2006
61   */
62  public class BackendFileOutputStream
63          extends OutputStream {
64  
65  // Hidden data members.
66      private JobFrontendRef myJobFrontend;
67      private JobBackendRef myJobBackend;
68  
69      // Queue of results from job frontend.
70      private LinkedBlockingQueue<Result> myResultQueue
71              = new LinkedBlockingQueue<Result>();
72  
73      private static class Result {
74  
75          public int ffd;
76          public IOException exc;
77  
78          public Result(int ffd,
79                  IOException exc) {
80              this.ffd = ffd;
81              this.exc = exc;
82          }
83      }
84  
85      // Frontend file descriptor.
86      private int ffd;
87  
88  // Hidden constructors.
89      /**
90       * Construct a new backend file output stream. Call the <code>open()</code>
91       * method to open the file and obtain the frontend file descriptor.
92       *
93       * @param theJobFrontend Job Frontend.
94       * @param theJobBackend Job Backend.
95       */
96      BackendFileOutputStream(JobFrontendRef theJobFrontend,
97              JobBackendRef theJobBackend) {
98          this.myJobFrontend = theJobFrontend;
99          this.myJobBackend = theJobBackend;
100     }
101 
102     /**
103      * Construct a new backend file output stream. Use the given frontend file
104      * descriptor.
105      *
106      * @param theJobFrontend Job Frontend.
107      * @param theJobBackend Job Backend.
108      * @param ffd Frontend file descriptor.
109      */
110     BackendFileOutputStream(JobFrontendRef theJobFrontend,
111             JobBackendRef theJobBackend,
112             int ffd) {
113         this.myJobFrontend = theJobFrontend;
114         this.myJobBackend = theJobBackend;
115         this.ffd = ffd;
116     }
117 
118 // Exported operations.
119     /**
120      * Write the given byte to this output stream. Only the least significant
121      * eight bits are written.
122      *
123      * @param b Byte.
124      * @exception IOException Thrown if an I/O error occurred.
125      * @throws java.io.IOException if any.
126      */
127     public void write(int b)
128             throws IOException {
129         write(new byte[]{(byte) b});
130     }
131 
132     /**
133      * Write the given byte array to this output stream.
134      *
135      * @param buf Byte array.
136      * @exception NullPointerException (unchecked exception) Thrown if
137      * <code>buf</code> is null.
138      * @exception IOException Thrown if an I/O error occurred.
139      * @throws java.io.IOException if any.
140      */
141     public void write(byte[] buf)
142             throws IOException {
143         write(buf, 0, buf.length);
144     }
145 
146     /**
147      * {@inheritDoc}
148      *
149      * Write a portion of the given byte array to this output stream.
150      * @exception NullPointerException (unchecked exception) Thrown if
151      * <code>buf</code> is null.
152      * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
153      * <code>off</code> &lt; 0, <code>len</code>
154      * &lt; 0, or <code>off+len</code> &gt; <code>buf.length</code>.
155      * @exception IOException Thrown if an I/O error occurred.
156      */
157     public void write(byte[] buf,
158             int off,
159             int len)
160             throws IOException {
161         if (off < 0 || len < 0 || off + len > buf.length) {
162             throw new IndexOutOfBoundsException();
163         }
164         verifyOpen();
165         myJobFrontend.outputFileWrite(myJobBackend, ffd, buf, off, len);
166         getResult();
167     }
168 
169     /**
170      * Flush this output stream.
171      *
172      * @exception IOException Thrown if an I/O error occurred.
173      * @throws java.io.IOException if any.
174      */
175     public void flush()
176             throws IOException {
177         verifyOpen();
178         myJobFrontend.outputFileFlush(myJobBackend, ffd);
179         getResult();
180     }
181 
182     /**
183      * Close this output stream.
184      *
185      * @exception IOException Thrown if an I/O error occurred.
186      * @throws java.io.IOException if any.
187      */
188     public void close()
189             throws IOException {
190         verifyOpen();
191         try {
192             myJobFrontend.outputFileClose(myJobBackend, ffd);
193             getResult();
194         } finally {
195             ffd = 0;
196         }
197     }
198 
199 // Hidden operations.
200     /**
201      * Request the Job Frontend to open the file.
202      *
203      * @param bfd Backend file descriptor.
204      * @param file File.
205      * @param append True to append, false to overwrite.
206      *
207      * @return Frontend file descriptor.
208      *
209      * @exception IOException Thrown if an I/O error occurred.
210      */
211     int open(int bfd,
212             File file,
213             boolean append)
214             throws IOException {
215         myJobFrontend.outputFileOpen(myJobBackend, bfd, file, append);
216         this.ffd = getResult().ffd;
217         return this.ffd;
218     }
219 
220     /**
221      * Get the next result from the result queue. Throw an IOException if
222      * necessary.
223      *
224      * @return Result object.
225      *
226      * @exception IOException Thrown if an I/O error occurred.
227      */
228     private Result getResult()
229             throws IOException {
230         try {
231             Result result = myResultQueue.take();
232             if (result.exc != null) {
233                 throw result.exc;
234             }
235             return result;
236         } catch (InterruptedException exc) {
237             IOException exc2 = new InterruptedIOException("I/O interrupted");
238             exc2.initCause(exc);
239             throw exc2;
240         }
241     }
242 
243     /**
244      * Put the given result into the result queue.
245      *
246      * @param ffd Frontend file descriptor.
247      * @param exc Null if success, exception if failure.
248      */
249     void putResult(int ffd,
250             IOException exc) {
251         myResultQueue.offer(new Result(ffd, exc));
252     }
253 
254     /**
255      * Verify that this file is open.
256      *
257      * @exception IOException Thrown if this file is not open.
258      */
259     private void verifyOpen()
260             throws IOException {
261         if (ffd == 0) {
262             throw new IOException("File closed");
263         }
264     }
265 
266 }