1 //******************************************************************************
2 //
3 // File: FrontendFileWriter.java
4 // Package: edu.rit.pj.cluster
5 // Unit: Class edu.rit.pj.cluster.FrontendFileWriter
6 //
7 // This Java source file is copyright (C) 2007 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.FileOutputStream;
44 import java.io.IOException;
45 import java.io.OutputStream;
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.concurrent.LinkedBlockingQueue;
49
50 import edu.rit.mp.ByteBuf;
51
52 /**
53 * Class FrontendFileWriter provides an object that writes sequential files in
54 * the job frontend process.
55 *
56 * @author Alan Kaminsky
57 * @version 21-Jun-2007
58 */
59 public class FrontendFileWriter {
60
61 // Hidden data members.
62 private JobFrontend myJobFrontend;
63
64 // Mapping from frontend file descriptor to file handler.
65 private Map<Integer, FileHandler> myFileHandlerForFFD
66 = new HashMap<Integer, FileHandler>();
67
68 // Next frontend file descriptor.
69 private int myNextFFD = 3;
70
71 // Hidden helper classes.
72 /**
73 * Class FileHandler is an object that performs each file operation in a
74 * separate thread, so as not to block the job frontend's message processing
75 * thread.
76 *
77 * @author Alan Kaminsky
78 * @version 20-Nov-2006
79 */
80 private class FileHandler
81 extends Thread {
82
83 private LinkedBlockingQueue<Invocation> myQueue
84 = new LinkedBlockingQueue<Invocation>();
85
86 private OutputStream myOutputStream;
87
88 private byte[] myByteArray = new byte[0];
89 private ByteBuf myByteBuf = ByteBuf.buffer(myByteArray);
90
91 private abstract static class Invocation {
92
93 public abstract boolean invoke()
94 throws IOException;
95 }
96
97 /**
98 * Construct a new file handler.
99 */
100 public FileHandler() {
101 setDaemon(true);
102 start();
103 }
104
105 /**
106 * Construct a new file handler to write the given output stream.
107 *
108 * @param theOutputStream Output stream.
109 */
110 public FileHandler(OutputStream theOutputStream) {
111 myOutputStream = theOutputStream;
112 setDaemon(true);
113 start();
114 }
115
116 /**
117 * Run this file handler.
118 */
119 public void run() {
120 try {
121 while (myQueue.take().invoke());
122 } catch (Throwable exc) {
123 myJobFrontend.terminateCancelJobOther(exc);
124 }
125 }
126
127 /**
128 * Open the given output file for writing or appending.
129 *
130 * @param theJobBackend Job Backend that is calling this method.
131 * @param bfd Backend file descriptor.
132 * @param file File.
133 * @param append True to append, false to overwrite.
134 */
135 public void outputFileOpen(JobBackendRef theJobBackend,
136 int bfd,
137 File file,
138 boolean append) {
139 myQueue.offer(new OutputFileOpenInvocation(theJobBackend, bfd, file, append));
140 }
141
142 private class OutputFileOpenInvocation
143 extends Invocation {
144
145 private JobBackendRef theJobBackend;
146 private int bfd;
147 private File file;
148 private boolean append;
149
150 public OutputFileOpenInvocation(JobBackendRef theJobBackend,
151 int bfd,
152 File file,
153 boolean append) {
154 this.theJobBackend = theJobBackend;
155 this.bfd = bfd;
156 this.file = file;
157 this.append = append;
158 }
159
160 public boolean invoke()
161 throws IOException {
162 return invokeOutputFileOpen(theJobBackend, bfd, file, append);
163 }
164 }
165
166 private boolean invokeOutputFileOpen(JobBackendRef theJobBackend,
167 int bfd,
168 File file,
169 boolean append)
170 throws IOException {
171 int ffd = 0;
172 IOException result = null;
173 boolean more = false;
174 try {
175 myOutputStream = new FileOutputStream(file, append);
176 synchronized (myFileHandlerForFFD) {
177 ffd = myNextFFD++;
178 myFileHandlerForFFD.put(ffd, this);
179 }
180 more = true;
181 } catch (IOException exc) {
182 result = exc;
183 }
184 theJobBackend.outputFileOpenResult(myJobFrontend, bfd, ffd, result);
185 return more;
186 }
187
188 /**
189 * Write the given bytes to the given output file. <code>ffd</code> = 1
190 * refers to the job's standard output stream; <code>ffd</code> = 2 refers
191 * to the job's standard error stream; other values refer to a
192 * previously opened file.
193 *
194 * @param theJobBackend Job Backend that is calling this method.
195 * @param ffd Frontend file descriptor.
196 * @param len Number of bytes to write.
197 */
198 public void outputFileWrite(JobBackendRef theJobBackend,
199 int ffd,
200 int len) {
201 myQueue.offer(new OutputFileWriteInvocation(theJobBackend, ffd, len));
202 }
203
204 private class OutputFileWriteInvocation
205 extends Invocation {
206
207 private JobBackendRef theJobBackend;
208 private int ffd;
209 private int len;
210
211 public OutputFileWriteInvocation(JobBackendRef theJobBackend,
212 int ffd,
213 int len) {
214 this.theJobBackend = theJobBackend;
215 this.ffd = ffd;
216 this.len = len;
217 }
218
219 public boolean invoke()
220 throws IOException {
221 return invokeOutputFileWrite(theJobBackend, ffd, len);
222 }
223 }
224
225 private boolean invokeOutputFileWrite(JobBackendRef theJobBackend,
226 int ffd,
227 int len)
228 throws IOException {
229 IOException result = null;
230 boolean more = false;
231 try {
232 if (myByteArray.length < len) {
233 myByteArray = new byte[len];
234 myByteBuf = ByteBuf.buffer(myByteArray);
235 }
236 ((JobBackendProxy) theJobBackend).receive(ffd, myByteBuf);
237 myOutputStream.write(myByteArray, 0, len);
238 more = true;
239 } catch (IOException exc) {
240 result = exc;
241 try {
242 myOutputStream.close();
243 } catch (IOException ignored) {
244 }
245 synchronized (myFileHandlerForFFD) {
246 myFileHandlerForFFD.remove(ffd);
247 }
248 }
249 theJobBackend.outputFileWriteResult(myJobFrontend, ffd, result);
250 return more;
251 }
252
253 /**
254 * Flush accumulated bytes to the given output file.
255 *
256 * @param theJobBackend Job Backend that is calling this method.
257 * @param ffd Frontend file descriptor.
258 */
259 public void outputFileFlush(JobBackendRef theJobBackend,
260 int ffd) {
261 myQueue.offer(new OutputFileFlushInvocation(theJobBackend, ffd));
262 }
263
264 private class OutputFileFlushInvocation
265 extends Invocation {
266
267 private JobBackendRef theJobBackend;
268 private int ffd;
269
270 public OutputFileFlushInvocation(JobBackendRef theJobBackend,
271 int ffd) {
272 this.theJobBackend = theJobBackend;
273 this.ffd = ffd;
274 }
275
276 public boolean invoke()
277 throws IOException {
278 return invokeOutputFileFlush(theJobBackend, ffd);
279 }
280 }
281
282 private boolean invokeOutputFileFlush(JobBackendRef theJobBackend,
283 int ffd)
284 throws IOException {
285 IOException result = null;
286 boolean more = false;
287 try {
288 myOutputStream.flush();
289 more = true;
290 } catch (IOException exc) {
291 result = exc;
292 try {
293 myOutputStream.close();
294 } catch (IOException ignored) {
295 }
296 synchronized (myFileHandlerForFFD) {
297 myFileHandlerForFFD.remove(ffd);
298 }
299 }
300 theJobBackend.outputFileFlushResult(myJobFrontend, ffd, result);
301 return more;
302 }
303
304 /**
305 * Close the given output file.
306 *
307 * @param theJobBackend Job Backend that is calling this method.
308 * @param ffd Frontend file descriptor.
309 */
310 public void outputFileClose(JobBackendRef theJobBackend,
311 int ffd) {
312 myQueue.offer(new OutputFileCloseInvocation(theJobBackend, ffd));
313 }
314
315 private class OutputFileCloseInvocation
316 extends Invocation {
317
318 private JobBackendRef theJobBackend;
319 private int ffd;
320
321 public OutputFileCloseInvocation(JobBackendRef theJobBackend,
322 int ffd) {
323 this.theJobBackend = theJobBackend;
324 this.ffd = ffd;
325 }
326
327 public boolean invoke()
328 throws IOException {
329 return invokeOutputFileClose(theJobBackend, ffd);
330 }
331 }
332
333 private boolean invokeOutputFileClose(JobBackendRef theJobBackend,
334 int ffd)
335 throws IOException {
336 IOException result = null;
337 try {
338 myOutputStream.close();
339 } catch (IOException exc) {
340 result = exc;
341 }
342 synchronized (myFileHandlerForFFD) {
343 myFileHandlerForFFD.remove(ffd);
344 }
345 theJobBackend.outputFileCloseResult(myJobFrontend, ffd, result);
346 return false;
347 }
348 }
349
350 // Exported constructors.
351 /**
352 * Construct a new frontend file writer.
353 *
354 * @param theJobFrontend Job Frontend.
355 */
356 public FrontendFileWriter(JobFrontend theJobFrontend) {
357 myJobFrontend = theJobFrontend;
358
359 // Set up frontend file descriptor 1 (stdout) and 2 (stderr).
360 myFileHandlerForFFD.put(1, new FileHandler(System.out));
361 myFileHandlerForFFD.put(2, new FileHandler(System.err));
362 }
363
364 // Exported operations.
365 /**
366 * Open the given output file for writing or appending.
367 *
368 * @param theJobBackend Job Backend that is calling this method.
369 * @param bfd Backend file descriptor.
370 * @param file File.
371 * @param append True to append, false to overwrite.
372 * @exception IOException Thrown if an I/O error occurred.
373 * @throws java.io.IOException if any.
374 */
375 public void outputFileOpen(JobBackendRef theJobBackend,
376 int bfd,
377 File file,
378 boolean append)
379 throws IOException {
380 new FileHandler().outputFileOpen(theJobBackend, bfd, file, append);
381 }
382
383 /**
384 * Write the given bytes to the given output file. <code>ffd</code> = 1 refers
385 * to the job's standard output stream; <code>ffd</code> = 2 refers to the job's
386 * standard error stream; other values refer to a previously opened file.
387 *
388 * @param theJobBackend Job Backend that is calling this method.
389 * @param ffd Frontend file descriptor.
390 * @param len Number of bytes to write.
391 * @exception IOException Thrown if an I/O error occurred.
392 * @throws java.io.IOException if any.
393 */
394 public void outputFileWrite(JobBackendRef theJobBackend,
395 int ffd,
396 int len)
397 throws IOException {
398 FileHandler handler = null;
399 synchronized (myFileHandlerForFFD) {
400 handler = myFileHandlerForFFD.get(ffd);
401 }
402 if (handler != null) {
403 handler.outputFileWrite(theJobBackend, ffd, len);
404 } else {
405 theJobBackend.outputFileWriteResult(myJobFrontend, ffd,
406 new IOException("File closed, ffd=" + ffd));
407 }
408 }
409
410 /**
411 * Flush accumulated bytes to the given output file.
412 *
413 * @param theJobBackend Job Backend that is calling this method.
414 * @param ffd Frontend file descriptor.
415 * @exception IOException Thrown if an I/O error occurred.
416 * @throws java.io.IOException if any.
417 */
418 public void outputFileFlush(JobBackendRef theJobBackend,
419 int ffd)
420 throws IOException {
421 FileHandler handler = null;
422 synchronized (myFileHandlerForFFD) {
423 handler = myFileHandlerForFFD.get(ffd);
424 }
425 if (handler != null) {
426 handler.outputFileFlush(theJobBackend, ffd);
427 } else {
428 theJobBackend.outputFileFlushResult(myJobFrontend, ffd,
429 new IOException("File closed, ffd=" + ffd));
430 }
431 }
432
433 /**
434 * Close the given output file.
435 *
436 * @param theJobBackend Job Backend that is calling this method.
437 * @param ffd Frontend file descriptor.
438 * @exception IOException Thrown if an I/O error occurred.
439 * @throws java.io.IOException if any.
440 */
441 public void outputFileClose(JobBackendRef theJobBackend,
442 int ffd)
443 throws IOException {
444 FileHandler handler = null;
445 synchronized (myFileHandlerForFFD) {
446 handler = myFileHandlerForFFD.get(ffd);
447 }
448 if (handler != null) {
449 handler.outputFileClose(theJobBackend, ffd);
450 } else {
451 theJobBackend.outputFileCloseResult(myJobFrontend, ffd,
452 new IOException("File closed, ffd=" + ffd));
453 }
454 }
455
456 }