1 //****************************************************************************** 2 // 3 // File: BackendFileInputStream.java 4 // Package: edu.rit.pj.cluster 5 // Unit: Class edu.rit.pj.cluster.BackendFileInputStream 6 // 7 // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj.cluster; 41 42 import java.io.File; 43 import java.io.IOException; 44 import java.io.InputStream; 45 import java.io.InterruptedIOException; 46 import java.util.concurrent.LinkedBlockingQueue; 47 48 import edu.rit.mp.ByteBuf; 49 import edu.rit.util.Range; 50 51 /** 52 * Class BackendFileInputStream provides an object in a job backend process that 53 * reads a sequential file in the job frontend process. A backend file input 54 * stream is not constructed directly, rather it is created by a factory method 55 * in class {@linkplain BackendFileReader}. 56 * <P> 57 * <I>Note:</I> Class BackendFileInputStream does not do any buffering. Each 58 * method call sends a message to and receives a message from the job frontend. 59 * Consider layering a BufferedInputStream on top of the BackendFileInputStream. 60 * 61 * @author Alan Kaminsky 62 * @version 20-Nov-2006 63 */ 64 public class BackendFileInputStream 65 extends InputStream { 66 67 // Hidden data members. 68 private JobFrontendRef myJobFrontend; 69 private JobBackendRef myJobBackend; 70 71 // Queue of results from job frontend. 72 private LinkedBlockingQueue<Result> myResultQueue 73 = new LinkedBlockingQueue<Result>(); 74 75 private static class Result { 76 77 public int ffd; 78 public int readlen; 79 public long skiplen; 80 public IOException exc; 81 82 public Result(int ffd, 83 int readlen, 84 long skiplen, 85 IOException exc) { 86 this.ffd = ffd; 87 this.readlen = readlen; 88 this.skiplen = skiplen; 89 this.exc = exc; 90 } 91 } 92 93 // Frontend file descriptor. 94 private int ffd; 95 96 // Hidden constructors. 97 /** 98 * Construct a new backend file input stream. Call the <code>open()</code> 99 * method to open the file and obtain the frontend file descriptor. 100 * 101 * @param theJobFrontend Job Frontend. 102 * @param theJobBackend Job Backend. 103 */ 104 BackendFileInputStream(JobFrontendRef theJobFrontend, 105 JobBackendRef theJobBackend) { 106 this.myJobFrontend = theJobFrontend; 107 this.myJobBackend = theJobBackend; 108 } 109 110 /** 111 * Construct a new backend file input stream. Use the given frontend file 112 * descriptor. 113 * 114 * @param theJobFrontend Job Frontend. 115 * @param theJobBackend Job Backend. 116 * @param ffd Frontend file descriptor. 117 */ 118 BackendFileInputStream(JobFrontendRef theJobFrontend, 119 JobBackendRef theJobBackend, 120 int ffd) { 121 this.myJobFrontend = theJobFrontend; 122 this.myJobBackend = theJobBackend; 123 this.ffd = ffd; 124 } 125 126 // Exported operations. 127 /** 128 * Read a byte from this input stream. The byte is returned as an 129 * <code>int</code> in the range 0 .. 255. 130 * 131 * @exception IOException Thrown if an I/O error occurred. 132 * @return a int. 133 * @throws java.io.IOException if any. 134 */ 135 public int read() 136 throws IOException { 137 byte[] buf = new byte[1]; 138 int len = read(buf); 139 return len == -1 ? -1 : buf[0] & 0xFF; 140 } 141 142 /** 143 * Read the given byte array from this input stream. 144 * 145 * @param buf Byte array. 146 * @return Number of bytes actually read, or -1 if the end-of-stream was 147 * encountered. 148 * @exception NullPointerException (unchecked exception) Thrown if 149 * <code>buf</code> is null. 150 * @exception IOException Thrown if an I/O error occurred. 151 * @throws java.io.IOException if any. 152 */ 153 public int read(byte[] buf) 154 throws IOException { 155 return read(buf, 0, buf.length); 156 } 157 158 /** 159 * {@inheritDoc} 160 * 161 * Read a portion of the given byte array from this input stream. 162 * @exception NullPointerException (unchecked exception) Thrown if 163 * <code>buf</code> is null. 164 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 165 * <code>off</code> < 0, <code>len</code> 166 * < 0, or <code>off+len</code> > <code>buf.length</code>. 167 * @exception IOException Thrown if an I/O error occurred. 168 */ 169 public int read(byte[] buf, 170 int off, 171 int len) 172 throws IOException { 173 if (off < 0 || len < 0 || off + len > buf.length) { 174 throw new IndexOutOfBoundsException(); 175 } 176 verifyOpen(); 177 myJobFrontend.inputFileRead(myJobBackend, ffd, len); 178 Result r = getResult(); 179 if (r.readlen > 0) { 180 ((JobFrontendProxy) myJobFrontend).receive(ffd, 181 ByteBuf.sliceBuffer(buf, new Range(off, off + len - 1))); 182 } 183 return r.readlen; 184 } 185 186 /** 187 * {@inheritDoc} 188 * 189 * Skip the given number of bytes from this input stream. 190 * @exception IOException Thrown if an I/O error occurred. 191 */ 192 public long skip(long len) 193 throws IOException { 194 verifyOpen(); 195 if (len < 0L) { 196 return 0L; 197 } else { 198 myJobFrontend.inputFileSkip(myJobBackend, ffd, len); 199 return getResult().skiplen; 200 } 201 } 202 203 /** 204 * Close this input stream. 205 * 206 * @exception IOException Thrown if an I/O error occurred. 207 * @throws java.io.IOException if any. 208 */ 209 public void close() 210 throws IOException { 211 verifyOpen(); 212 try { 213 myJobFrontend.inputFileClose(myJobBackend, ffd); 214 getResult(); 215 } finally { 216 ffd = 0; 217 } 218 } 219 220 // Hidden operations. 221 /** 222 * Request the Job Frontend to open the file. 223 * 224 * @param bfd Backend file descriptor. 225 * @param file File. 226 * 227 * @return Frontend file descriptor. 228 * 229 * @exception IOException Thrown if an I/O error occurred. 230 */ 231 int open(int bfd, 232 File file) 233 throws IOException { 234 myJobFrontend.inputFileOpen(myJobBackend, bfd, file); 235 this.ffd = getResult().ffd; 236 return this.ffd; 237 } 238 239 /** 240 * Get the next result from the result queue. Throw an IOException if 241 * necessary. 242 * 243 * @return Result object. 244 * 245 * @exception IOException Thrown if an I/O error occurred. 246 */ 247 private Result getResult() 248 throws IOException { 249 try { 250 Result result = myResultQueue.take(); 251 if (result.exc != null) { 252 throw result.exc; 253 } 254 return result; 255 } catch (InterruptedException exc) { 256 IOException exc2 = new InterruptedIOException("I/O interrupted"); 257 exc2.initCause(exc); 258 throw exc2; 259 } 260 } 261 262 /** 263 * Put the given result into the result queue. 264 * 265 * @param ffd Frontend file descriptor. 266 * @param readlen Number of bytes actually read. 267 * @param skiplen Number of bytes actually skipped. 268 * @param exc Null if success, exception if failure. 269 */ 270 void putResult(int ffd, 271 int readlen, 272 long skiplen, 273 IOException exc) { 274 myResultQueue.offer(new Result(ffd, readlen, skiplen, exc)); 275 } 276 277 /** 278 * Verify that this file is open. 279 * 280 * @exception IOException Thrown if this file is not open. 281 */ 282 private void verifyOpen() 283 throws IOException { 284 if (ffd == 0) { 285 throw new IOException("File closed"); 286 } 287 } 288 289 }