1 //****************************************************************************** 2 // 3 // File: BackendFileOutputStream.java 4 // Package: edu.rit.pj.cluster 5 // Unit: Class edu.rit.pj.cluster.BackendFileOutputStream 6 // 7 // This Java source file is copyright (C) 2006 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj.cluster; 41 42 import java.io.File; 43 import java.io.IOException; 44 import java.io.InterruptedIOException; 45 import java.io.OutputStream; 46 import java.util.concurrent.LinkedBlockingQueue; 47 48 /** 49 * Class BackendFileOutputStream provides an object in a job backend process 50 * that writes a sequential file in the job frontend process. A backend file 51 * output stream is not constructed directly, rather it is created by a factory 52 * method in class {@linkplain BackendFileWriter}. 53 * <P> 54 * <I>Note:</I> Class BackendFileOutputStream does not do any buffering. Each 55 * method call sends a message to and receives a message from the job frontend. 56 * Consider layering a BufferedOutputStream on top of the 57 * BackendFileOutputStream. 58 * 59 * @author Alan Kaminsky 60 * @version 05-Nov-2006 61 */ 62 public class BackendFileOutputStream 63 extends OutputStream { 64 65 // Hidden data members. 66 private JobFrontendRef myJobFrontend; 67 private JobBackendRef myJobBackend; 68 69 // Queue of results from job frontend. 70 private LinkedBlockingQueue<Result> myResultQueue 71 = new LinkedBlockingQueue<Result>(); 72 73 private static class Result { 74 75 public int ffd; 76 public IOException exc; 77 78 public Result(int ffd, 79 IOException exc) { 80 this.ffd = ffd; 81 this.exc = exc; 82 } 83 } 84 85 // Frontend file descriptor. 86 private int ffd; 87 88 // Hidden constructors. 89 /** 90 * Construct a new backend file output stream. Call the <code>open()</code> 91 * method to open the file and obtain the frontend file descriptor. 92 * 93 * @param theJobFrontend Job Frontend. 94 * @param theJobBackend Job Backend. 95 */ 96 BackendFileOutputStream(JobFrontendRef theJobFrontend, 97 JobBackendRef theJobBackend) { 98 this.myJobFrontend = theJobFrontend; 99 this.myJobBackend = theJobBackend; 100 } 101 102 /** 103 * Construct a new backend file output stream. Use the given frontend file 104 * descriptor. 105 * 106 * @param theJobFrontend Job Frontend. 107 * @param theJobBackend Job Backend. 108 * @param ffd Frontend file descriptor. 109 */ 110 BackendFileOutputStream(JobFrontendRef theJobFrontend, 111 JobBackendRef theJobBackend, 112 int ffd) { 113 this.myJobFrontend = theJobFrontend; 114 this.myJobBackend = theJobBackend; 115 this.ffd = ffd; 116 } 117 118 // Exported operations. 119 /** 120 * Write the given byte to this output stream. Only the least significant 121 * eight bits are written. 122 * 123 * @param b Byte. 124 * @exception IOException Thrown if an I/O error occurred. 125 * @throws java.io.IOException if any. 126 */ 127 public void write(int b) 128 throws IOException { 129 write(new byte[]{(byte) b}); 130 } 131 132 /** 133 * Write the given byte array to this output stream. 134 * 135 * @param buf Byte array. 136 * @exception NullPointerException (unchecked exception) Thrown if 137 * <code>buf</code> is null. 138 * @exception IOException Thrown if an I/O error occurred. 139 * @throws java.io.IOException if any. 140 */ 141 public void write(byte[] buf) 142 throws IOException { 143 write(buf, 0, buf.length); 144 } 145 146 /** 147 * {@inheritDoc} 148 * 149 * Write a portion of the given byte array to this output stream. 150 * @exception NullPointerException (unchecked exception) Thrown if 151 * <code>buf</code> is null. 152 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 153 * <code>off</code> < 0, <code>len</code> 154 * < 0, or <code>off+len</code> > <code>buf.length</code>. 155 * @exception IOException Thrown if an I/O error occurred. 156 */ 157 public void write(byte[] buf, 158 int off, 159 int len) 160 throws IOException { 161 if (off < 0 || len < 0 || off + len > buf.length) { 162 throw new IndexOutOfBoundsException(); 163 } 164 verifyOpen(); 165 myJobFrontend.outputFileWrite(myJobBackend, ffd, buf, off, len); 166 getResult(); 167 } 168 169 /** 170 * Flush this output stream. 171 * 172 * @exception IOException Thrown if an I/O error occurred. 173 * @throws java.io.IOException if any. 174 */ 175 public void flush() 176 throws IOException { 177 verifyOpen(); 178 myJobFrontend.outputFileFlush(myJobBackend, ffd); 179 getResult(); 180 } 181 182 /** 183 * Close this output stream. 184 * 185 * @exception IOException Thrown if an I/O error occurred. 186 * @throws java.io.IOException if any. 187 */ 188 public void close() 189 throws IOException { 190 verifyOpen(); 191 try { 192 myJobFrontend.outputFileClose(myJobBackend, ffd); 193 getResult(); 194 } finally { 195 ffd = 0; 196 } 197 } 198 199 // Hidden operations. 200 /** 201 * Request the Job Frontend to open the file. 202 * 203 * @param bfd Backend file descriptor. 204 * @param file File. 205 * @param append True to append, false to overwrite. 206 * 207 * @return Frontend file descriptor. 208 * 209 * @exception IOException Thrown if an I/O error occurred. 210 */ 211 int open(int bfd, 212 File file, 213 boolean append) 214 throws IOException { 215 myJobFrontend.outputFileOpen(myJobBackend, bfd, file, append); 216 this.ffd = getResult().ffd; 217 return this.ffd; 218 } 219 220 /** 221 * Get the next result from the result queue. Throw an IOException if 222 * necessary. 223 * 224 * @return Result object. 225 * 226 * @exception IOException Thrown if an I/O error occurred. 227 */ 228 private Result getResult() 229 throws IOException { 230 try { 231 Result result = myResultQueue.take(); 232 if (result.exc != null) { 233 throw result.exc; 234 } 235 return result; 236 } catch (InterruptedException exc) { 237 IOException exc2 = new InterruptedIOException("I/O interrupted"); 238 exc2.initCause(exc); 239 throw exc2; 240 } 241 } 242 243 /** 244 * Put the given result into the result queue. 245 * 246 * @param ffd Frontend file descriptor. 247 * @param exc Null if success, exception if failure. 248 */ 249 void putResult(int ffd, 250 IOException exc) { 251 myResultQueue.offer(new Result(ffd, exc)); 252 } 253 254 /** 255 * Verify that this file is open. 256 * 257 * @exception IOException Thrown if this file is not open. 258 */ 259 private void verifyOpen() 260 throws IOException { 261 if (ffd == 0) { 262 throw new IOException("File closed"); 263 } 264 } 265 266 }