1 //******************************************************************************
2 //
3 // File: BackendFileOutputStream.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.BackendFileOutputStream
6 //
7 // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.IOException;
44 import java.io.InterruptedIOException;
45 import java.io.OutputStream;
46 import java.util.concurrent.LinkedBlockingQueue;
47
48 /**
49 * Class BackendFileOutputStream provides an object in a job backend process
50 * that writes a sequential file in the job frontend process. A backend file
51 * output stream is not constructed directly, rather it is created by a factory
52 * method in class {@linkplain BackendFileWriter}.
53 * <P>
54 * <I>Note:</I> Class BackendFileOutputStream does not do any buffering. Each
55 * method call sends a message to and receives a message from the job frontend.
56 * Consider layering a BufferedOutputStream on top of the
57 * BackendFileOutputStream.
58 *
59 * @author Alan Kaminsky
60 * @version 05-Nov-2006
61 */
62 public class BackendFileOutputStream
63 extends OutputStream {
64
65 // Hidden data members.
66 private JobFrontendRef myJobFrontend;
67 private JobBackendRef myJobBackend;
68
69 // Queue of results from job frontend.
70 private LinkedBlockingQueue<Result> myResultQueue
71 = new LinkedBlockingQueue<Result>();
72
73 private static class Result {
74
75 public int ffd;
76 public IOException exc;
77
78 public Result(int ffd,
79 IOException exc) {
80 this.ffd = ffd;
81 this.exc = exc;
82 }
83 }
84
85 // Frontend file descriptor.
86 private int ffd;
87
88 // Hidden constructors.
89 /**
90 * Construct a new backend file output stream. Call the <code>open()</code>
91 * method to open the file and obtain the frontend file descriptor.
92 *
93 * @param theJobFrontend Job Frontend.
94 * @param theJobBackend Job Backend.
95 */
96 BackendFileOutputStream(JobFrontendRef theJobFrontend,
97 JobBackendRef theJobBackend) {
98 this.myJobFrontend = theJobFrontend;
99 this.myJobBackend = theJobBackend;
100 }
101
102 /**
103 * Construct a new backend file output stream. Use the given frontend file
104 * descriptor.
105 *
106 * @param theJobFrontend Job Frontend.
107 * @param theJobBackend Job Backend.
108 * @param ffd Frontend file descriptor.
109 */
110 BackendFileOutputStream(JobFrontendRef theJobFrontend,
111 JobBackendRef theJobBackend,
112 int ffd) {
113 this.myJobFrontend = theJobFrontend;
114 this.myJobBackend = theJobBackend;
115 this.ffd = ffd;
116 }
117
118 // Exported operations.
119 /**
120 * Write the given byte to this output stream. Only the least significant
121 * eight bits are written.
122 *
123 * @param b Byte.
124 * @exception IOException Thrown if an I/O error occurred.
125 * @throws java.io.IOException if any.
126 */
127 public void write(int b)
128 throws IOException {
129 write(new byte[]{(byte) b});
130 }
131
132 /**
133 * Write the given byte array to this output stream.
134 *
135 * @param buf Byte array.
136 * @exception NullPointerException (unchecked exception) Thrown if
137 * <code>buf</code> is null.
138 * @exception IOException Thrown if an I/O error occurred.
139 * @throws java.io.IOException if any.
140 */
141 public void write(byte[] buf)
142 throws IOException {
143 write(buf, 0, buf.length);
144 }
145
146 /**
147 * {@inheritDoc}
148 *
149 * Write a portion of the given byte array to this output stream.
150 * @exception NullPointerException (unchecked exception) Thrown if
151 * <code>buf</code> is null.
152 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
153 * <code>off</code> < 0, <code>len</code>
154 * < 0, or <code>off+len</code> > <code>buf.length</code>.
155 * @exception IOException Thrown if an I/O error occurred.
156 */
157 public void write(byte[] buf,
158 int off,
159 int len)
160 throws IOException {
161 if (off < 0 || len < 0 || off + len > buf.length) {
162 throw new IndexOutOfBoundsException();
163 }
164 verifyOpen();
165 myJobFrontend.outputFileWrite(myJobBackend, ffd, buf, off, len);
166 getResult();
167 }
168
169 /**
170 * Flush this output stream.
171 *
172 * @exception IOException Thrown if an I/O error occurred.
173 * @throws java.io.IOException if any.
174 */
175 public void flush()
176 throws IOException {
177 verifyOpen();
178 myJobFrontend.outputFileFlush(myJobBackend, ffd);
179 getResult();
180 }
181
182 /**
183 * Close this output stream.
184 *
185 * @exception IOException Thrown if an I/O error occurred.
186 * @throws java.io.IOException if any.
187 */
188 public void close()
189 throws IOException {
190 verifyOpen();
191 try {
192 myJobFrontend.outputFileClose(myJobBackend, ffd);
193 getResult();
194 } finally {
195 ffd = 0;
196 }
197 }
198
199 // Hidden operations.
200 /**
201 * Request the Job Frontend to open the file.
202 *
203 * @param bfd Backend file descriptor.
204 * @param file File.
205 * @param append True to append, false to overwrite.
206 *
207 * @return Frontend file descriptor.
208 *
209 * @exception IOException Thrown if an I/O error occurred.
210 */
211 int open(int bfd,
212 File file,
213 boolean append)
214 throws IOException {
215 myJobFrontend.outputFileOpen(myJobBackend, bfd, file, append);
216 this.ffd = getResult().ffd;
217 return this.ffd;
218 }
219
220 /**
221 * Get the next result from the result queue. Throw an IOException if
222 * necessary.
223 *
224 * @return Result object.
225 *
226 * @exception IOException Thrown if an I/O error occurred.
227 */
228 private Result getResult()
229 throws IOException {
230 try {
231 Result result = myResultQueue.take();
232 if (result.exc != null) {
233 throw result.exc;
234 }
235 return result;
236 } catch (InterruptedException exc) {
237 IOException exc2 = new InterruptedIOException("I/O interrupted");
238 exc2.initCause(exc);
239 throw exc2;
240 }
241 }
242
243 /**
244 * Put the given result into the result queue.
245 *
246 * @param ffd Frontend file descriptor.
247 * @param exc Null if success, exception if failure.
248 */
249 void putResult(int ffd,
250 IOException exc) {
251 myResultQueue.offer(new Result(ffd, exc));
252 }
253
254 /**
255 * Verify that this file is open.
256 *
257 * @exception IOException Thrown if this file is not open.
258 */
259 private void verifyOpen()
260 throws IOException {
261 if (ffd == 0) {
262 throw new IOException("File closed");
263 }
264 }
265
266 }