1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 package edu.rit.pj.cluster;
41
42 import java.io.File;
43 import java.io.FileInputStream;
44 import java.io.IOException;
45 import java.io.InputStream;
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.concurrent.LinkedBlockingQueue;
49
50
51
52
53
54
55
56
57 public class FrontendFileReader {
58
59
60 private JobFrontend myJobFrontend;
61
62
63 private Map<Integer, FileHandler> myFileHandlerForFFD
64 = new HashMap<Integer, FileHandler>();
65
66
67 private int myNextFFD = 2;
68
69
70
71
72
73
74
75
76
77
78 private class FileHandler
79 extends Thread {
80
81 private LinkedBlockingQueue<Invocation> myQueue
82 = new LinkedBlockingQueue<Invocation>();
83
84 private InputStream myInputStream;
85 private byte[] myBuffer = new byte[0];
86
87 private abstract static class Invocation {
88
89 public abstract boolean invoke()
90 throws IOException;
91 }
92
93
94
95
96 public FileHandler() {
97 setDaemon(true);
98 start();
99 }
100
101
102
103
104
105
106 public FileHandler(InputStream theInputStream) {
107 myInputStream = theInputStream;
108 setDaemon(true);
109 start();
110 }
111
112
113
114
115 public void run() {
116 try {
117 while (myQueue.take().invoke());
118 } catch (Throwable exc) {
119 myJobFrontend.terminateCancelJobOther(exc);
120 }
121 }
122
123
124
125
126
127
128
129
130 public void inputFileOpen(JobBackendRef theJobBackend,
131 int bfd,
132 File file) {
133 myQueue.offer(new InputFileOpenInvocation(theJobBackend, bfd, file));
134 }
135
136 private class InputFileOpenInvocation
137 extends Invocation {
138
139 private JobBackendRef theJobBackend;
140 private int bfd;
141 private File file;
142
143 public InputFileOpenInvocation(JobBackendRef theJobBackend,
144 int bfd,
145 File file) {
146 this.theJobBackend = theJobBackend;
147 this.bfd = bfd;
148 this.file = file;
149 }
150
151 public boolean invoke()
152 throws IOException {
153 return invokeInputFileOpen(theJobBackend, bfd, file);
154 }
155 }
156
157 private boolean invokeInputFileOpen(JobBackendRef theJobBackend,
158 int bfd,
159 File file)
160 throws IOException {
161 int ffd = 0;
162 IOException result = null;
163 boolean more = false;
164 try {
165 myInputStream = new FileInputStream(file);
166 synchronized (myFileHandlerForFFD) {
167 ffd = myNextFFD++;
168 myFileHandlerForFFD.put(ffd, this);
169 }
170 more = true;
171 } catch (IOException exc) {
172 result = exc;
173 }
174 theJobBackend.inputFileOpenResult(myJobFrontend, bfd, ffd, result);
175 return more;
176 }
177
178
179
180
181
182
183
184
185
186
187 public void inputFileRead(JobBackendRef theJobBackend,
188 int ffd,
189 int len) {
190 myQueue.offer(new InputFileReadInvocation(theJobBackend, ffd, len));
191 }
192
193 private class InputFileReadInvocation
194 extends Invocation {
195
196 private JobBackendRef theJobBackend;
197 private int ffd;
198 private int len;
199
200 public InputFileReadInvocation(JobBackendRef theJobBackend,
201 int ffd,
202 int len) {
203 this.theJobBackend = theJobBackend;
204 this.ffd = ffd;
205 this.len = len;
206 }
207
208 public boolean invoke()
209 throws IOException {
210 return invokeInputFileRead(theJobBackend, ffd, len);
211 }
212 }
213
214 private boolean invokeInputFileRead(JobBackendRef theJobBackend,
215 int ffd,
216 int len)
217 throws IOException {
218 int resultlen = 0;
219 IOException resultexc = null;
220 boolean more = false;
221 try {
222 if (myBuffer.length < len) {
223 myBuffer = new byte[len];
224 }
225 resultlen = myInputStream.read(myBuffer, 0, len);
226 more = true;
227 } catch (IOException exc) {
228 resultexc = exc;
229 try {
230 myInputStream.close();
231 } catch (IOException ignored) {
232 }
233 synchronized (myFileHandlerForFFD) {
234 myFileHandlerForFFD.remove(ffd);
235 }
236 }
237 theJobBackend.inputFileReadResult(myJobFrontend, ffd, myBuffer, resultlen, resultexc);
238 return more;
239 }
240
241
242
243
244
245
246
247
248 public void inputFileSkip(JobBackendRef theJobBackend,
249 int ffd,
250 long len) {
251 myQueue.offer(new InputFileSkipInvocation(theJobBackend, ffd, len));
252 }
253
254 private class InputFileSkipInvocation
255 extends Invocation {
256
257 private JobBackendRef theJobBackend;
258 private int ffd;
259 private long len;
260
261 public InputFileSkipInvocation(JobBackendRef theJobBackend,
262 int ffd,
263 long len) {
264 this.theJobBackend = theJobBackend;
265 this.ffd = ffd;
266 this.len = len;
267 }
268
269 public boolean invoke()
270 throws IOException {
271 return invokeInputFileSkip(theJobBackend, ffd, len);
272 }
273 }
274
275 private boolean invokeInputFileSkip(JobBackendRef theJobBackend,
276 int ffd,
277 long len)
278 throws IOException {
279 long resultlen = 0L;
280 IOException resultexc = null;
281 boolean more = false;
282 try {
283 resultlen = myInputStream.skip(len);
284 more = true;
285 } catch (IOException exc) {
286 resultexc = exc;
287 try {
288 myInputStream.close();
289 } catch (IOException ignored) {
290 }
291 synchronized (myFileHandlerForFFD) {
292 myFileHandlerForFFD.remove(ffd);
293 }
294 }
295 theJobBackend.inputFileSkipResult(myJobFrontend, ffd, resultlen, resultexc);
296 return more;
297 }
298
299
300
301
302
303
304
305 public void inputFileClose(JobBackendRef theJobBackend,
306 int ffd) {
307 myQueue.offer(new InputFileCloseInvocation(theJobBackend, ffd));
308 }
309
310 private class InputFileCloseInvocation
311 extends Invocation {
312
313 private JobBackendRef theJobBackend;
314 private int ffd;
315
316 public InputFileCloseInvocation(JobBackendRef theJobBackend,
317 int ffd) {
318 this.theJobBackend = theJobBackend;
319 this.ffd = ffd;
320 }
321
322 public boolean invoke()
323 throws IOException {
324 return invokeInputFileClose(theJobBackend, ffd);
325 }
326 }
327
328 private boolean invokeInputFileClose(JobBackendRef theJobBackend,
329 int ffd)
330 throws IOException {
331 IOException result = null;
332 try {
333 myInputStream.close();
334 } catch (IOException exc) {
335 result = exc;
336 }
337 synchronized (myFileHandlerForFFD) {
338 myFileHandlerForFFD.remove(ffd);
339 }
340 theJobBackend.inputFileCloseResult(myJobFrontend, ffd, result);
341 return false;
342 }
343 }
344
345
346
347
348
349
350
351 public FrontendFileReader(JobFrontend theJobFrontend) {
352 myJobFrontend = theJobFrontend;
353
354
355 myFileHandlerForFFD.put(1, new FileHandler(System.in));
356 }
357
358
359
360
361
362
363
364
365
366
367
368 public void inputFileOpen(JobBackendRef theJobBackend,
369 int bfd,
370 File file)
371 throws IOException {
372 new FileHandler().inputFileOpen(theJobBackend, bfd, file);
373 }
374
375
376
377
378
379
380
381
382
383
384
385
386 public void inputFileRead(JobBackendRef theJobBackend,
387 int ffd,
388 int len)
389 throws IOException {
390 FileHandler handler = null;
391 synchronized (myFileHandlerForFFD) {
392 handler = myFileHandlerForFFD.get(ffd);
393 }
394 if (handler != null) {
395 handler.inputFileRead(theJobBackend, ffd, len);
396 } else {
397 theJobBackend.inputFileReadResult(myJobFrontend, ffd, null, -1,
398 new IOException("File closed, ffd=" + ffd));
399 }
400 }
401
402
403
404
405
406
407
408
409
410
411 public void inputFileSkip(JobBackendRef theJobBackend,
412 int ffd,
413 long len)
414 throws IOException {
415 FileHandler handler = null;
416 synchronized (myFileHandlerForFFD) {
417 handler = myFileHandlerForFFD.get(ffd);
418 }
419 if (handler != null) {
420 handler.inputFileSkip(theJobBackend, ffd, len);
421 } else {
422 theJobBackend.inputFileSkipResult(myJobFrontend, ffd, 0L,
423 new IOException("File closed, ffd=" + ffd));
424 }
425 }
426
427
428
429
430
431
432
433
434
435 public void inputFileClose(JobBackendRef theJobBackend,
436 int ffd)
437 throws IOException {
438 FileHandler handler = null;
439 synchronized (myFileHandlerForFFD) {
440 handler = myFileHandlerForFFD.get(ffd);
441 }
442 if (handler != null) {
443 handler.inputFileClose(theJobBackend, ffd);
444 } else {
445 theJobBackend.inputFileCloseResult(myJobFrontend, ffd,
446 new IOException("File closed, ffd=" + ffd));
447 }
448 }
449
450 }