1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.FileOutputStream;
44 import java.io.IOException;
45 import java.io.OutputStream;
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.concurrent.LinkedBlockingQueue;
49
50 import edu.rit.mp.ByteBuf;
51
52
53
54
55
56
57
58
59 public class FrontendFileWriter {
60
61
62 private JobFrontend myJobFrontend;
63
64
65 private Map<Integer, FileHandler> myFileHandlerForFFD
66 = new HashMap<Integer, FileHandler>();
67
68
69 private int myNextFFD = 3;
70
71
72
73
74
75
76
77
78
79
80 private class FileHandler
81 extends Thread {
82
83 private LinkedBlockingQueue<Invocation> myQueue
84 = new LinkedBlockingQueue<Invocation>();
85
86 private OutputStream myOutputStream;
87
88 private byte[] myByteArray = new byte[0];
89 private ByteBuf myByteBuf = ByteBuf.buffer(myByteArray);
90
91 private abstract static class Invocation {
92
93 public abstract boolean invoke()
94 throws IOException;
95 }
96
97
98
99
100 public FileHandler() {
101 setDaemon(true);
102 start();
103 }
104
105
106
107
108
109
110 public FileHandler(OutputStream theOutputStream) {
111 myOutputStream = theOutputStream;
112 setDaemon(true);
113 start();
114 }
115
116
117
118
119 public void run() {
120 try {
121 while (myQueue.take().invoke());
122 } catch (Throwable exc) {
123 myJobFrontend.terminateCancelJobOther(exc);
124 }
125 }
126
127
128
129
130
131
132
133
134
135 public void outputFileOpen(JobBackendRef theJobBackend,
136 int bfd,
137 File file,
138 boolean append) {
139 myQueue.offer(new OutputFileOpenInvocation(theJobBackend, bfd, file, append));
140 }
141
142 private class OutputFileOpenInvocation
143 extends Invocation {
144
145 private JobBackendRef theJobBackend;
146 private int bfd;
147 private File file;
148 private boolean append;
149
150 public OutputFileOpenInvocation(JobBackendRef theJobBackend,
151 int bfd,
152 File file,
153 boolean append) {
154 this.theJobBackend = theJobBackend;
155 this.bfd = bfd;
156 this.file = file;
157 this.append = append;
158 }
159
160 public boolean invoke()
161 throws IOException {
162 return invokeOutputFileOpen(theJobBackend, bfd, file, append);
163 }
164 }
165
166 private boolean invokeOutputFileOpen(JobBackendRef theJobBackend,
167 int bfd,
168 File file,
169 boolean append)
170 throws IOException {
171 int ffd = 0;
172 IOException result = null;
173 boolean more = false;
174 try {
175 myOutputStream = new FileOutputStream(file, append);
176 synchronized (myFileHandlerForFFD) {
177 ffd = myNextFFD++;
178 myFileHandlerForFFD.put(ffd, this);
179 }
180 more = true;
181 } catch (IOException exc) {
182 result = exc;
183 }
184 theJobBackend.outputFileOpenResult(myJobFrontend, bfd, ffd, result);
185 return more;
186 }
187
188
189
190
191
192
193
194
195
196
197
198 public void outputFileWrite(JobBackendRef theJobBackend,
199 int ffd,
200 int len) {
201 myQueue.offer(new OutputFileWriteInvocation(theJobBackend, ffd, len));
202 }
203
204 private class OutputFileWriteInvocation
205 extends Invocation {
206
207 private JobBackendRef theJobBackend;
208 private int ffd;
209 private int len;
210
211 public OutputFileWriteInvocation(JobBackendRef theJobBackend,
212 int ffd,
213 int len) {
214 this.theJobBackend = theJobBackend;
215 this.ffd = ffd;
216 this.len = len;
217 }
218
219 public boolean invoke()
220 throws IOException {
221 return invokeOutputFileWrite(theJobBackend, ffd, len);
222 }
223 }
224
225 private boolean invokeOutputFileWrite(JobBackendRef theJobBackend,
226 int ffd,
227 int len)
228 throws IOException {
229 IOException result = null;
230 boolean more = false;
231 try {
232 if (myByteArray.length < len) {
233 myByteArray = new byte[len];
234 myByteBuf = ByteBuf.buffer(myByteArray);
235 }
236 ((JobBackendProxy) theJobBackend).receive(ffd, myByteBuf);
237 myOutputStream.write(myByteArray, 0, len);
238 more = true;
239 } catch (IOException exc) {
240 result = exc;
241 try {
242 myOutputStream.close();
243 } catch (IOException ignored) {
244 }
245 synchronized (myFileHandlerForFFD) {
246 myFileHandlerForFFD.remove(ffd);
247 }
248 }
249 theJobBackend.outputFileWriteResult(myJobFrontend, ffd, result);
250 return more;
251 }
252
253
254
255
256
257
258
259 public void outputFileFlush(JobBackendRef theJobBackend,
260 int ffd) {
261 myQueue.offer(new OutputFileFlushInvocation(theJobBackend, ffd));
262 }
263
264 private class OutputFileFlushInvocation
265 extends Invocation {
266
267 private JobBackendRef theJobBackend;
268 private int ffd;
269
270 public OutputFileFlushInvocation(JobBackendRef theJobBackend,
271 int ffd) {
272 this.theJobBackend = theJobBackend;
273 this.ffd = ffd;
274 }
275
276 public boolean invoke()
277 throws IOException {
278 return invokeOutputFileFlush(theJobBackend, ffd);
279 }
280 }
281
282 private boolean invokeOutputFileFlush(JobBackendRef theJobBackend,
283 int ffd)
284 throws IOException {
285 IOException result = null;
286 boolean more = false;
287 try {
288 myOutputStream.flush();
289 more = true;
290 } catch (IOException exc) {
291 result = exc;
292 try {
293 myOutputStream.close();
294 } catch (IOException ignored) {
295 }
296 synchronized (myFileHandlerForFFD) {
297 myFileHandlerForFFD.remove(ffd);
298 }
299 }
300 theJobBackend.outputFileFlushResult(myJobFrontend, ffd, result);
301 return more;
302 }
303
304
305
306
307
308
309
310 public void outputFileClose(JobBackendRef theJobBackend,
311 int ffd) {
312 myQueue.offer(new OutputFileCloseInvocation(theJobBackend, ffd));
313 }
314
315 private class OutputFileCloseInvocation
316 extends Invocation {
317
318 private JobBackendRef theJobBackend;
319 private int ffd;
320
321 public OutputFileCloseInvocation(JobBackendRef theJobBackend,
322 int ffd) {
323 this.theJobBackend = theJobBackend;
324 this.ffd = ffd;
325 }
326
327 public boolean invoke()
328 throws IOException {
329 return invokeOutputFileClose(theJobBackend, ffd);
330 }
331 }
332
333 private boolean invokeOutputFileClose(JobBackendRef theJobBackend,
334 int ffd)
335 throws IOException {
336 IOException result = null;
337 try {
338 myOutputStream.close();
339 } catch (IOException exc) {
340 result = exc;
341 }
342 synchronized (myFileHandlerForFFD) {
343 myFileHandlerForFFD.remove(ffd);
344 }
345 theJobBackend.outputFileCloseResult(myJobFrontend, ffd, result);
346 return false;
347 }
348 }
349
350
351
352
353
354
355
356 public FrontendFileWriter(JobFrontend theJobFrontend) {
357 myJobFrontend = theJobFrontend;
358
359
360 myFileHandlerForFFD.put(1, new FileHandler(System.out));
361 myFileHandlerForFFD.put(2, new FileHandler(System.err));
362 }
363
364
365
366
367
368
369
370
371
372
373
374
375 public void outputFileOpen(JobBackendRef theJobBackend,
376 int bfd,
377 File file,
378 boolean append)
379 throws IOException {
380 new FileHandler().outputFileOpen(theJobBackend, bfd, file, append);
381 }
382
383
384
385
386
387
388
389
390
391
392
393
394 public void outputFileWrite(JobBackendRef theJobBackend,
395 int ffd,
396 int len)
397 throws IOException {
398 FileHandler handler = null;
399 synchronized (myFileHandlerForFFD) {
400 handler = myFileHandlerForFFD.get(ffd);
401 }
402 if (handler != null) {
403 handler.outputFileWrite(theJobBackend, ffd, len);
404 } else {
405 theJobBackend.outputFileWriteResult(myJobFrontend, ffd,
406 new IOException("File closed, ffd=" + ffd));
407 }
408 }
409
410
411
412
413
414
415
416
417
418 public void outputFileFlush(JobBackendRef theJobBackend,
419 int ffd)
420 throws IOException {
421 FileHandler handler = null;
422 synchronized (myFileHandlerForFFD) {
423 handler = myFileHandlerForFFD.get(ffd);
424 }
425 if (handler != null) {
426 handler.outputFileFlush(theJobBackend, ffd);
427 } else {
428 theJobBackend.outputFileFlushResult(myJobFrontend, ffd,
429 new IOException("File closed, ffd=" + ffd));
430 }
431 }
432
433
434
435
436
437
438
439
440
441 public void outputFileClose(JobBackendRef theJobBackend,
442 int ffd)
443 throws IOException {
444 FileHandler handler = null;
445 synchronized (myFileHandlerForFFD) {
446 handler = myFileHandlerForFFD.get(ffd);
447 }
448 if (handler != null) {
449 handler.outputFileClose(theJobBackend, ffd);
450 } else {
451 theJobBackend.outputFileCloseResult(myJobFrontend, ffd,
452 new IOException("File closed, ffd=" + ffd));
453 }
454 }
455
456 }