1 //****************************************************************************** 2 // 3 // File: ObjectBuf.java 4 // Package: edu.rit.mp 5 // Unit: Class edu.rit.mp.ObjectBuf<T> 6 // 7 // This Java source file is copyright (C) 2007 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.mp; 41 42 import java.io.ByteArrayInputStream; 43 import java.io.ByteArrayOutputStream; 44 import java.io.IOException; 45 import java.io.ObjectInputStream; 46 import java.io.ObjectOutputStream; 47 import java.nio.ByteBuffer; 48 49 import edu.rit.mp.buf.EmptyObjectBuf; 50 import edu.rit.mp.buf.ObjectArrayBuf; 51 import edu.rit.mp.buf.ObjectArrayBuf_1; 52 import edu.rit.mp.buf.ObjectItemBuf; 53 import edu.rit.mp.buf.ObjectMatrixBuf; 54 import edu.rit.mp.buf.ObjectMatrixBuf_1; 55 import edu.rit.mp.buf.SharedObjectArrayBuf; 56 import edu.rit.mp.buf.SharedObjectArrayBuf_1; 57 import edu.rit.mp.buf.SharedObjectBuf; 58 import edu.rit.pj.reduction.SharedObject; 59 import edu.rit.pj.reduction.SharedObjectArray; 60 import edu.rit.util.Arrays; 61 import edu.rit.util.Range; 62 63 /** 64 * Class ObjectBuf is the abstract base class for a buffer of object items sent 65 * or received using the Message Protocol (MP). In a message, an object item is 66 * represented as a sequence of bytes using Java Object Serialization. 67 * <P> 68 * A buffer may be used to send one or more messages at the same time in 69 * multiple threads. If a buffer is being used to send a message or messages, 70 * the buffer must not be used to receive a message at the same time. 71 * <P> 72 * A buffer may be used to receive one message at a time. If a buffer is being 73 * used to receive a message, the buffer must not be used to receive another 74 * message in a different thread, and the buffer must not be used to send a 75 * message or messages. 76 * <P> 77 * A buffer is a conduit for retrieving and storing data in some underlying data 78 * structure. If the underlying data structure is multiple thread safe, then one 79 * thread can be retrieving or storing data via the buffer at the same time as 80 * other threads are accessing the data structure. If the underlying data 81 * structure is not multiple thread safe, then other threads must not access the 82 * data structure while one thread is retrieving or storing data via the buffer. 83 * <P> 84 * To create an ObjectBuf, call one of the following static factory methods: 85 * <UL> 86 * <LI><code>emptyBuffer()</code> 87 * <LI><code>buffer()</code> 88 * <LI><code>buffer (T)</code> 89 * <LI><code>buffer (T[])</code> 90 * <LI><code>sliceBuffer (T[], Range)</code> 91 * <LI><code>sliceBuffers (T[], Range[])</code> 92 * <LI><code>objectBuffer (T[])</code> 93 * <LI><code>buffer (T[][])</code> 94 * <LI><code>rowSliceBuffer (T[][], Range)</code> 95 * <LI><code>rowSliceBuffers (T[][], Range[])</code> 96 * <LI><code>colSliceBuffer (T[][], Range)</code> 97 * <LI><code>colSliceBuffers (T[][], Range[])</code> 98 * <LI><code>patchBuffer (T[][], Range, Range)</code> 99 * <LI><code>patchBuffers (T[][], Range[], Range[])</code> 100 * <LI><code>objectBuffer (T[][])</code> 101 * <LI><code>buffer (SharedObject<T>)</code> 102 * <LI><code>buffer (SharedObjectArray<T>)</code> 103 * <LI><code>sliceBuffer (SharedObjectArray<T>, Range)</code> 104 * <LI><code>sliceBuffers (SharedObjectArray<T>, Range[])</code> 105 * </UL> 106 * <P> 107 * There are two ways to create a buffer for an array of objects (type 108 * <code>T[]</code>): 109 * <OL TYPE=1> 110 * <LI> 111 * With the <code>buffer(T[])</code>, <code>sliceBuffer(T[],Range)</code>, and 112 * <code>sliceBuffers(T[],Range[])</code> methods. These methods create a buffer 113 * that sends and receives the array elements as multiple separate objects of 114 * type <code>T</code>. The receiver must allocate an array of the proper dimension 115 * to receive the incoming objects and must create a buffer that receives the 116 * array elements as separate objects. 117 * 118 * <LI> 119 * With the <code>objectBuffer(T[])</code> method. This method creates a buffer that 120 * sends and receives the entire array as one object of type <code>T[]</code>. The 121 * receiver must also create a buffer that receives the entire array as one 122 * object; the buffer's <code>item</code> field is automatically set to an array of 123 * the proper dimension. 124 * </OL> 125 * <P> 126 * There are two ways to create a buffer for a matrix of objects (type 127 * <code>T[][]</code>): 128 * <OL TYPE=1> 129 * <LI> 130 * With the <code>buffer(T[][])</code>, <code>rowSliceBuffer(T[][],Range)</code>, 131 * <code>rowSliceBuffers(T[][],Range[])</code>, 132 * <code>colSliceBuffer(T[][],Range)</code>, 133 * <code>colSliceBuffers(T[][],Range[])</code>, <code>patchBuffer(T[][],Range)</code>, 134 * and <code>patchBuffers(T[][],Range[])</code> methods. These methods create a 135 * buffer that sends and receives the matrix elements as multiple separate 136 * objects of type <code>T</code>. The receiver must allocate a matrix of the proper 137 * dimensions to receive the incoming objects and must create a buffer that 138 * receives the matrix elements as separate objects. 139 * 140 * <LI> 141 * With the <code>objectBuffer(T[][])</code> method. This method creates a buffer 142 * that sends and receives the entire matrix as one object of type 143 * <code>T[][]</code>. The receiver must also create a buffer that receives the 144 * matrix as one object; the buffer's <code>item</code> field is automatically set 145 * to a matrix of the proper dimensions. 146 * </OL> 147 * <P> 148 * <B><I>Important Note:</I></B> An ObjectBuf uses the protected field 149 * <code>mySerializedItems</code> to store the serialized representation of the 150 * objects in the buffer. If the buffer is used to receive a message, the 151 * serialized representation of the received objects is cached in 152 * <code>mySerializedItems</code>. If the buffer is used to send a message and 153 * <code>mySerializedItems</code> is empty, the objects in the buffer are 154 * serialized, the serialized representation is cached in 155 * <code>mySerializedItems</code>, and the serialized representation is sent in the 156 * message. If the buffer is used to send a message and 157 * <code>mySerializedItems</code> is not empty, the objects in the buffer are 158 * <I>not</I> serialized; rather, the cached serialized representation is sent. 159 * This is done to avoid re-serializing the objects if the buffer is used to 160 * send copies of a message to multiple destinations, or if the buffer is used 161 * to receive and then immediately send a message. However, if the state of any 162 * object in the buffer changes, the buffer's <code>reset()</code> method must be 163 * called; this tells the buffer to discard the cached serialized representation 164 * and re-serialize the objects in the buffer. 165 * 166 * @param <T> Data type of the objects in the buffer. 167 * @author Alan Kaminsky 168 * @version 03-Jul-2008 169 */ 170 @SuppressWarnings("unchecked") 171 public abstract class ObjectBuf<T> 172 extends Buf { 173 174 // Hidden data members. 175 /** 176 * Byte array containing this buffer's object items in serialized form. If 177 * null, the object items need to be serialized. 178 */ 179 protected byte[] mySerializedItems; 180 181 // Hidden constructors. 182 /** 183 * Construct a new object buffer. 184 * 185 * @param theLength Number of items. 186 * @exception IllegalArgumentException (unchecked exception) Thrown if 187 * <code>theLength</code> < 0. 188 */ 189 protected ObjectBuf(int theLength) { 190 super(Constants.TYPE_OBJECT, theLength); 191 } 192 193 // Exported operations. 194 /** 195 * Create an empty buffer. The buffer's length is 0. The buffer's item type 196 * is Object. 197 * 198 * @return Empty buffer. 199 */ 200 public static ObjectBuf<Object> emptyBuffer() { 201 return new EmptyObjectBuf(); 202 } 203 204 /** 205 * Create a buffer for an object item. The item is stored in the 206 * <code>item</code> field of the buffer. 207 * 208 * @param <T> Data type of the objects in the buffer. 209 * @return Buffer. 210 */ 211 public static <T> ObjectItemBuf<T> buffer() { 212 return new ObjectItemBuf<T>(); 213 } 214 215 /** 216 * Create a buffer for an object item with the given initial value. The item 217 * is stored in the <code>item</code> field of the buffer. 218 * 219 * @param <T> Data type of the objects in the buffer. 220 * @param item Initial value of the <code>item</code> field. 221 * @return Buffer. 222 */ 223 public static <T> ObjectItemBuf<T> buffer(T item) { 224 return new ObjectItemBuf<T>(item); 225 } 226 227 /** 228 * Create a buffer for the entire given object array. The returned buffer 229 * encompasses all the elements in <code>theArray</code>. The array elements are 230 * sent and received as multiple separate objects of type <code>T</code>. 231 * 232 * @param <T> Data type of the objects in the buffer. 233 * @param theArray Array. 234 * @return Buffer. 235 * @exception NullPointerException (unchecked exception) Thrown if 236 * <code>theArray</code> is null. 237 */ 238 public static <T> ObjectBuf<T> buffer(T[] theArray) { 239 if (theArray == null) { 240 throw new NullPointerException("ObjectBuf.buffer(): theArray is null"); 241 } 242 int nr = Arrays.length(theArray); 243 return new ObjectArrayBuf_1<T>(theArray, new Range(0, nr - 1)); 244 } 245 246 /** 247 * Create a buffer for one slice of the given object array. The returned 248 * buffer encompasses <code>theRange</code> of elements in <code>theArray</code>. 249 * The range's stride may be 1 or greater than 1. The array elements are 250 * sent and received as multiple separate objects of type <code>T</code>. 251 * 252 * @param <T> Data type of the objects in the buffer. 253 * @param theArray Array. 254 * @param theRange Range of elements to include. 255 * @return Buffer. 256 * @exception NullPointerException (unchecked exception) Thrown if 257 * <code>theArray</code> is null or 258 * <code>theRange</code> is null. 259 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 260 * <code>theArray</code> does not include all the indexes in <code>theRange</code>. 261 */ 262 public static <T> ObjectBuf<T> sliceBuffer(T[] theArray, 263 Range theRange) { 264 if (theArray == null) { 265 throw new NullPointerException("ObjectBuf.sliceBuffer(): theArray is null"); 266 } 267 int nr = Arrays.length(theArray); 268 if (0 > theRange.lb() || theRange.ub() >= nr) { 269 throw new IndexOutOfBoundsException("ObjectBuf.sliceBuffer(): theArray index range = 0.." 270 + (nr - 1) + ", theRange = " + theRange); 271 } 272 if (theRange.stride() == 1) { 273 return new ObjectArrayBuf_1<T>(theArray, theRange); 274 } else { 275 return new ObjectArrayBuf<T>(theArray, theRange); 276 } 277 } 278 279 /** 280 * Create an array of buffers for multiple slices of the given object array. 281 * The returned buffer array has the same length as 282 * <code>theRanges</code>. Each element [<I>i</I>] of the returned buffer array 283 * encompasses the elements of <code>theArray</code> specified by 284 * <code>theRanges[i]</code>. Each range's stride may be 1 or greater than 1. 285 * The array elements are sent and received as multiple separate objects of 286 * type <code>T</code>. 287 * 288 * @param <T> Data type of the objects in the buffer. 289 * @param theArray Array. 290 * @param theRanges Array of ranges of elements to include. 291 * @return Array of buffers. 292 * @exception NullPointerException (unchecked exception) Thrown if 293 * <code>theArray</code> is null or 294 * <code>theRanges</code> or any element thereof is null. 295 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 296 * <code>theArray</code>'s allocation does not include any element of 297 * <code>theRanges</code>. 298 */ 299 public static <T> ObjectBuf<T>[] sliceBuffers(T[] theArray, 300 Range[] theRanges) { 301 int n = theRanges.length; 302 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n]; 303 for (int i = 0; i < n; ++i) { 304 result[i] = sliceBuffer(theArray, theRanges[i]); 305 } 306 return result; 307 } 308 309 /** 310 * Create a buffer for the entire given object array. The returned buffer 311 * encompasses all the elements in <code>theArray</code>. The array is sent and 312 * received as a single object of type <code>T[]</code>. 313 * 314 * @param <T> Data type of the objects in the buffer. 315 * @param theArray Array. May be null. 316 * @return Buffer. 317 */ 318 public static <T> ObjectItemBuf<T[]> objectBuffer(T[] theArray) { 319 return new ObjectItemBuf<T[]>(theArray); 320 } 321 322 /** 323 * Create a buffer for the entire given object matrix. The returned buffer 324 * encompasses all the rows and all the columns in 325 * <code>theMatrix</code>. The matrix elements are sent and received as multiple 326 * separate objects of type <code>T</code>. 327 * 328 * @param <T> Data type of the objects in the buffer. 329 * @param theMatrix Matrix. 330 * @return Buffer. 331 * @exception NullPointerException (unchecked exception) Thrown if 332 * <code>theMatrix</code> is null. 333 */ 334 public static <T> ObjectBuf<T> buffer(T[][] theMatrix) { 335 if (theMatrix == null) { 336 throw new NullPointerException("ObjectBuf.buffer(): theMatrix is null"); 337 } 338 int nr = Arrays.rowLength(theMatrix); 339 int nc = Arrays.colLength(theMatrix, 0); 340 return new ObjectMatrixBuf_1<T>(theMatrix, new Range(0, nr - 1), new Range(0, nc - 1)); 341 } 342 343 /** 344 * Create a buffer for one row slice of the given object matrix. The 345 * returned buffer encompasses <code>theRowRange</code> of rows, and all the 346 * columns, in <code>theMatrix</code>. The range's stride may be 1 or greater 347 * than 1. The matrix elements are sent and received as multiple separate 348 * objects of type <code>T</code>. 349 * 350 * @param <T> Data type of the objects in the buffer. 351 * @param theMatrix Matrix. 352 * @param theRowRange Range of rows to include. 353 * @return Buffer. 354 * @exception NullPointerException (unchecked exception) Thrown if 355 * <code>theMatrix</code> is null or 356 * <code>theRowRange</code> is null. 357 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 358 * <code>theMatrix</code>'s allocation does not include <code>theRowRange</code>. 359 */ 360 public static <T> ObjectBuf<T> rowSliceBuffer(T[][] theMatrix, 361 Range theRowRange) { 362 if (theMatrix == null) { 363 throw new NullPointerException("ObjectBuf.rowSliceBuffer(): theMatrix is null"); 364 } 365 int nr = Arrays.rowLength(theMatrix); 366 if (0 > theRowRange.lb() || theRowRange.ub() >= nr) { 367 throw new IndexOutOfBoundsException("ObjectBuf.rowSliceBuffer(): theMatrix row index range = 0.." 368 + (nr - 1) + ", theRowRange = " + theRowRange); 369 } 370 int nc = Arrays.colLength(theMatrix, theRowRange.lb()); 371 if (theRowRange.stride() == 1) { 372 return new ObjectMatrixBuf_1<T>(theMatrix, theRowRange, new Range(0, nc - 1)); 373 } else { 374 return new ObjectMatrixBuf<T>(theMatrix, theRowRange, new Range(0, nc - 1)); 375 } 376 } 377 378 /** 379 * Create an array of buffers for multiple row slices of the given object 380 * matrix. The returned buffer array has the same length as 381 * <code>theRowRanges</code>. Each element [<I>i</I>] of the returned buffer 382 * array encompasses the rows of <code>theMatrix</code> specified by 383 * <code>theRowRanges[i]</code> and all the columns of <code>theMatrix</code>. Each 384 * range's stride may be 1 or greater than 1. The matrix elements are sent 385 * and received as multiple separate objects of type <code>T</code>. 386 * 387 * @param <T> Data type of the objects in the buffer. 388 * @param theMatrix Matrix. 389 * @param theRowRanges Array of ranges of rows to include. 390 * @return Array of buffers. 391 * @exception NullPointerException (unchecked exception) Thrown if 392 * <code>theMatrix</code> is null or 393 * <code>theRowRanges</code> or any element thereof is null. 394 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 395 * <code>theMatrix</code>'s allocation does not include any element of 396 * <code>theRowRanges</code>. 397 */ 398 public static <T> ObjectBuf<T>[] rowSliceBuffers(T[][] theMatrix, 399 Range[] theRowRanges) { 400 int n = theRowRanges.length; 401 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n]; 402 for (int i = 0; i < n; ++i) { 403 result[i] = rowSliceBuffer(theMatrix, theRowRanges[i]); 404 } 405 return result; 406 } 407 408 /** 409 * Create a buffer for one column slice of the given object matrix. The 410 * returned buffer encompasses all the rows, and <code>theColRange</code> of 411 * columns, in <code>theMatrix</code>. The range's stride may be 1 or greater 412 * than 1. The matrix elements are sent and received as multiple separate 413 * objects of type <code>T</code>. 414 * 415 * @param <T> Data type of the objects in the buffer. 416 * @param theMatrix Matrix. 417 * @param theColRange Range of columns to include. 418 * @return Buffer. 419 * @exception NullPointerException (unchecked exception) Thrown if 420 * <code>theMatrix</code> is null or 421 * <code>theColRange</code> is null. 422 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 423 * <code>theMatrix</code>'s allocation does not include <code>theColRange</code>. 424 */ 425 public static <T> ObjectBuf<T> colSliceBuffer(T[][] theMatrix, 426 Range theColRange) { 427 if (theMatrix == null) { 428 throw new NullPointerException("ObjectBuf.colSliceBuffer(): theMatrix is null"); 429 } 430 int nr = Arrays.rowLength(theMatrix); 431 int nc = Arrays.colLength(theMatrix, 0); 432 if (0 > theColRange.lb() || theColRange.ub() >= nc) { 433 throw new IndexOutOfBoundsException("ObjectBuf.colSliceBuffer(): theMatrix column index range = 0.." 434 + (nc - 1) + ", theColRange = " + theColRange); 435 } 436 if (theColRange.stride() == 1) { 437 return new ObjectMatrixBuf_1<T>(theMatrix, new Range(0, nr - 1), theColRange); 438 } else { 439 return new ObjectMatrixBuf<T>(theMatrix, new Range(0, nr - 1), theColRange); 440 } 441 } 442 443 /** 444 * Create an array of buffers for multiple column slices of the given object 445 * matrix. The returned buffer array has the same length as 446 * <code>theColRanges</code>. Each element [<I>i</I>] of the returned buffer 447 * array encompasses all the rows of <code>theMatrix</code> and the columns of 448 * <code>theMatrix</code> specified by <code>theColRanges[i]</code>. Each range's 449 * stride may be 1 or greater than 1. The matrix elements are sent and 450 * received as multiple separate objects of type <code>T</code>. 451 * 452 * @param <T> Data type of the objects in the buffer. 453 * @param theMatrix Matrix. 454 * @param theColRanges Array of ranges of columns to include. 455 * @return Array of buffers. 456 * @exception NullPointerException (unchecked exception) Thrown if 457 * <code>theMatrix</code> is null or 458 * <code>theColRanges</code> or any element thereof is null. 459 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 460 * <code>theMatrix</code>'s allocation does not include any element of 461 * <code>theColRanges</code>. 462 */ 463 public static <T> ObjectBuf<T>[] colSliceBuffers(T[][] theMatrix, 464 Range[] theColRanges) { 465 int n = theColRanges.length; 466 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n]; 467 for (int i = 0; i < n; ++i) { 468 result[i] = colSliceBuffer(theMatrix, theColRanges[i]); 469 } 470 return result; 471 } 472 473 /** 474 * Create a buffer for one patch of the given object matrix. The returned 475 * buffer encompasses <code>theRowRange</code> of rows, and <code>theColRange</code> 476 * of columns, in <code>theMatrix</code>. Each range's stride may be 1 or 477 * greater than 1. The matrix elements are sent and received as multiple 478 * separate objects of type <code>T</code>. 479 * 480 * @param <T> Data type of the objects in the buffer. 481 * @param theMatrix Matrix. 482 * @param theRowRange Range of rows to include. 483 * @param theColRange Range of columns to include. 484 * @return Buffer. 485 * @exception NullPointerException (unchecked exception) Thrown if 486 * <code>theMatrix</code> is null, 487 * <code>theRowRange</code> is null, or <code>theColRange</code> is null. 488 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 489 * <code>theMatrix</code>'s allocation does not include <code>theRowRange</code> and 490 * <code>theColRange</code>. 491 */ 492 public static <T> ObjectBuf<T> patchBuffer(T[][] theMatrix, 493 Range theRowRange, 494 Range theColRange) { 495 if (theMatrix == null) { 496 throw new NullPointerException("ObjectBuf.patchBuffer(): theMatrix is null"); 497 } 498 int nr = Arrays.rowLength(theMatrix); 499 if (0 > theRowRange.lb() || theRowRange.ub() >= nr) { 500 throw new IndexOutOfBoundsException("ObjectBuf.patchBuffer(): theMatrix row index range = 0.." 501 + (nr - 1) + ", theRowRange = " + theRowRange); 502 } 503 int nc = Arrays.colLength(theMatrix, theRowRange.lb()); 504 if (0 > theColRange.lb() || theColRange.ub() >= nc) { 505 throw new IndexOutOfBoundsException("ObjectBuf.patchBuffer(): theMatrix column index range = 0.." 506 + (nc - 1) + ", theColRange = " + theColRange); 507 } 508 if (theRowRange.stride() == 1 && theColRange.stride() == 1) { 509 return new ObjectMatrixBuf_1<T>(theMatrix, theRowRange, theColRange); 510 } else { 511 return new ObjectMatrixBuf<T>(theMatrix, theRowRange, theColRange); 512 } 513 } 514 515 /** 516 * Create an array of buffers for multiple patches of the given object 517 * matrix. The length of the returned buffer array is equal to the length of 518 * <code>theRowRanges</code> times the length of <code>theColRanges</code>. Each 519 * element of the returned buffer array encompasses the rows given in one 520 * element of <code>theRowRanges</code> array, and the columns given in one 521 * element of <code>theColRanges</code> array, in all possible combinations, of 522 * <code>theMatrix</code>. Each range's stride may be 1 or greater than 1. The 523 * matrix elements are sent and received as multiple separate objects of 524 * type <code>T</code>. 525 * 526 * @param <T> Data type of the objects in the buffer. 527 * @param theMatrix Matrix. 528 * @param theRowRanges Array of ranges of rows to include. 529 * @param theColRanges Array of ranges of columns to include. 530 * @return Array of buffers. 531 * @exception NullPointerException (unchecked exception) Thrown if 532 * <code>theMatrix</code> is null, 533 * <code>theRowRanges</code> or any element thereof is null, or 534 * <code>theColRanges</code> or any element thereof is null. 535 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 536 * <code>theMatrix</code>'s allocation does not include any element of 537 * <code>theRowRanges</code> or 538 * <code>theColRanges</code>. 539 */ 540 public static <T> ObjectBuf<T>[] patchBuffers(T[][] theMatrix, 541 Range[] theRowRanges, 542 Range[] theColRanges) { 543 int m = theRowRanges.length; 544 int n = theColRanges.length; 545 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[m * n]; 546 int k = 0; 547 for (int i = 0; i < m; ++i) { 548 Range rowrange = theRowRanges[i]; 549 for (int j = 0; j < n; ++j) { 550 result[k++] 551 = patchBuffer(theMatrix, rowrange, theColRanges[j]); 552 } 553 } 554 return result; 555 } 556 557 /** 558 * Create a buffer for the entire given object matrix. The returned buffer 559 * encompasses all the rows and all the columns in <code>theMatrix</code>. The 560 * matrix is sent and received as a single object of type <code>T[][]</code>. 561 * 562 * @param <T> Data type of the objects in the buffer. 563 * @param theMatrix Matrix. May be null. 564 * @return Buffer. 565 */ 566 public static <T> ObjectItemBuf<T[][]> objectBuffer(T[][] theMatrix) { 567 return new ObjectItemBuf<T[][]>(theMatrix); 568 } 569 570 /** 571 * Create a buffer for a shared object item. The item is wrapped in an 572 * instance of class {@linkplain edu.rit.pj.reduction.SharedObject 573 * SharedObject}. Use the methods of the SharedObject object to access the 574 * actual item. 575 * 576 * @param <T> Data type of the objects in the buffer. 577 * @param item SharedObject object that wraps the item. 578 * @exception NullPointerException (unchecked exception) Thrown if 579 * <code>item</code> is null. 580 * @return a {@link edu.rit.mp.ObjectBuf} object. 581 */ 582 public static <T> ObjectBuf<T> buffer(SharedObject<T> item) { 583 if (item == null) { 584 throw new NullPointerException("ObjectBuf.buffer(): item is null"); 585 } 586 return new SharedObjectBuf<T>(item); 587 } 588 589 /** 590 * Create a buffer for the entire given shared object array. The returned 591 * buffer encompasses all the elements in <code>theArray</code>. 592 * 593 * @param <T> Data type of the objects in the buffer. 594 * @param theArray Array. 595 * @return Buffer. 596 * @exception NullPointerException (unchecked exception) Thrown if 597 * <code>theArray</code> is null. 598 */ 599 public static <T> ObjectBuf<T> buffer(SharedObjectArray<T> theArray) { 600 if (theArray == null) { 601 throw new NullPointerException("ObjectBuf.buffer(): theArray is null"); 602 } 603 int nr = theArray.length(); 604 return new SharedObjectArrayBuf_1<T>(theArray, new Range(0, nr - 1)); 605 } 606 607 /** 608 * Create a buffer for one slice of the given shared object array. The 609 * returned buffer encompasses <code>theRange</code> of elements in 610 * <code>theArray</code>. The range's stride may be 1 or greater than 1. 611 * 612 * @param <T> Data type of the objects in the buffer. 613 * @param theArray Array. 614 * @param theRange Range of elements to include. 615 * @return Buffer. 616 * @exception NullPointerException (unchecked exception) Thrown if 617 * <code>theArray</code> is null or 618 * <code>theRange</code> is null. 619 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 620 * <code>theArray</code> does not include all the indexes in <code>theRange</code>. 621 */ 622 public static <T> ObjectBuf<T> sliceBuffer(SharedObjectArray<T> theArray, 623 Range theRange) { 624 if (theArray == null) { 625 throw new NullPointerException("ObjectBuf.sliceBuffer(): theArray is null"); 626 } 627 int nr = theArray.length(); 628 if (0 > theRange.lb() || theRange.ub() >= nr) { 629 throw new IndexOutOfBoundsException("ObjectBuf.sliceBuffer(): theArray index range = 0.." 630 + (nr - 1) + ", theRange = " + theRange); 631 } 632 if (theRange.stride() == 1) { 633 return new SharedObjectArrayBuf_1<T>(theArray, theRange); 634 } else { 635 return new SharedObjectArrayBuf<T>(theArray, theRange); 636 } 637 } 638 639 /** 640 * Create an array of buffers for multiple slices of the given shared object 641 * array. The returned buffer array has the same length as 642 * <code>theRanges</code>. Each element [<I>i</I>] of the returned buffer array 643 * encompasses the elements of <code>theArray</code> specified by 644 * <code>theRanges[i]</code>. Each range's stride may be 1 or greater than 1. 645 * 646 * @param <T> Data type of the objects in the buffer. 647 * @param theArray Array. 648 * @param theRanges Array of ranges of elements to include. 649 * @return Array of buffers. 650 * @exception NullPointerException (unchecked exception) Thrown if 651 * <code>theArray</code> is null or 652 * <code>theRanges</code> or any element thereof is null. 653 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if 654 * <code>theArray</code>'s allocation does not include any element of 655 * <code>theRanges</code>. 656 */ 657 public static <T> ObjectBuf<T>[] sliceBuffers(SharedObjectArray<T> theArray, 658 Range[] theRanges) { 659 int n = theRanges.length; 660 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n]; 661 for (int i = 0; i < n; ++i) { 662 result[i] = sliceBuffer(theArray, theRanges[i]); 663 } 664 return result; 665 } 666 667 /** 668 * Obtain the given item from this buffer. 669 * <P> 670 * The <code>get()</code> method must not block the calling thread; if it does, 671 * all message I/O in MP will be blocked. 672 * 673 * @param i Item index in the range 0 .. <code>length()</code>-1. 674 * @return Item at index <code>i</code>. 675 */ 676 public abstract T get(int i); 677 678 /** 679 * Store the given item in this buffer. 680 * <P> 681 * The <code>put()</code> method must not block the calling thread; if it does, 682 * all message I/O in MP will be blocked. 683 * 684 * @param i Item index in the range 0 .. <code>length()</code>-1. 685 * @param item Item to be stored at index <code>i</code>. 686 */ 687 public abstract void put(int i, 688 T item); 689 690 /** 691 * {@inheritDoc} 692 * 693 * Copy items from the given buffer to this buffer. The number of items 694 * copied is this buffer's length or <code>theSrc</code>'s length, whichever is 695 * smaller. If <code>theSrc</code> is this buffer, the <code>copy()</code> method 696 * does nothing. 697 * <P> 698 * The default implementation of the <code>copy()</code> method calls the 699 * <code>defaultCopy()</code> method. A subclass can override the 700 * <code>copy()</code> method to use a more efficient algorithm. 701 * <P> 702 * The default implementation of the <code>copy()</code> method also calls the 703 * <code>reset()</code> method. 704 * @exception ClassCastException (unchecked exception) Thrown if 705 * <code>theSrc</code>'s item data type is not the same as this buffer's item 706 * data type. 707 */ 708 public void copy(Buf theSrc) { 709 if (theSrc != this) { 710 defaultCopy((ObjectBuf<T>) theSrc, this); 711 reset(); 712 } 713 } 714 715 /** 716 * {@inheritDoc} 717 * 718 * Fill this buffer with the given item. The <code>item</code> is assigned to 719 * each element in this buffer. 720 * <P> 721 * The <code>item</code> must be an instance of class T or a subclass thereof. 722 * The <code>item</code> may be null. Note that since <code>item</code> is 723 * <I>assigned</I> to every buffer element, every buffer element ends up 724 * referring to the same <code>item</code>. 725 * <P> 726 * The <code>fill()</code> method calls the <code>reset()</code> method. 727 * @exception ClassCastException (unchecked exception) Thrown if the 728 * <code>item</code>'s data type is not the same as this buffer's item data 729 * type. 730 */ 731 public void fill(Object item) { 732 T value = (T) item; 733 for (int i = 0; i < myLength; ++i) { 734 put(i, value); 735 } 736 reset(); 737 } 738 739 /** 740 * Create a temporary buffer with the same type of items and the same length 741 * as this buffer. The new buffer items are stored in a newly created array, 742 * separate from the storage for this buffer's items. 743 * 744 * @return a {@link edu.rit.mp.Buf} object. 745 */ 746 public Buf getTemporaryBuf() { 747 return buffer((T[]) new Object[myLength]); 748 } 749 750 /** 751 * Reset this buffer. Call <code>reset()</code> if the state of any object in 752 * this buffer changes. 753 */ 754 public void reset() { 755 mySerializedItems = null; 756 } 757 758 // Hidden operations. 759 /** 760 * Called by the I/O thread before sending message items using this buffer. 761 * 762 * @exception IOException Thrown if an I/O error occurred. 763 */ 764 void preSend() 765 throws IOException { 766 if (mySerializedItems == null) { 767 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 768 ObjectOutputStream oos = new ObjectOutputStream(baos); 769 oos.writeInt(myLength); 770 for (int i = 0; i < myLength; ++i) { 771 oos.writeObject(get(i)); 772 } 773 oos.close(); 774 mySerializedItems = baos.toByteArray(); 775 myMessageLength = mySerializedItems.length; 776 } 777 } 778 779 /** 780 * {@inheritDoc} 781 * 782 * Send as many items as possible from this buffer to the given byte buffer. 783 * <P> 784 * The <code>sendItems()</code> method must not block the calling thread; if it 785 * does, all message I/O in MP will be blocked. 786 */ 787 protected int sendItems(int i, 788 ByteBuffer buffer) { 789 int len = Math.min(myMessageLength - i, buffer.remaining()); 790 buffer.put(mySerializedItems, i, len); 791 return len; 792 } 793 794 /** 795 * Called by the I/O thread before receiving message items using this 796 * buffer. 797 * 798 * @param theReadLength Actual number of items in message. 799 */ 800 void preReceive(int theReadLength) { 801 mySerializedItems = new byte[theReadLength]; 802 myMessageLength = theReadLength; 803 } 804 805 /** 806 * {@inheritDoc} 807 * 808 * Receive as many items as possible from the given byte buffer to this 809 * buffer. 810 * <P> 811 * The <code>receiveItems()</code> method must not block the calling thread; if 812 * it does, all message I/O in MP will be blocked. 813 */ 814 protected int receiveItems(int i, 815 int num, 816 ByteBuffer buffer) { 817 int len = num; 818 len = Math.min(len, myMessageLength - i); 819 len = Math.min(len, buffer.remaining()); 820 buffer.get(mySerializedItems, i, len); 821 return len; 822 } 823 824 /** 825 * Skip as many items as possible from the given byte buffer. 826 * 827 * @param num Number of items to skip. 828 * @param buffer Buffer. 829 * 830 * @return Number of items actually skipped. 831 */ 832 int skipItems(int num, 833 ByteBuffer buffer) { 834 int n = Math.min(num, buffer.remaining()); 835 buffer.position(buffer.position() + n); 836 return n; 837 } 838 839 /** 840 * Called by the I/O thread after receiving message items using this buffer. 841 * 842 * @param theStatus Status object that will be returned for the message; its 843 * contents may be altered if necessary. 844 * @param theClassLoader Alternate class loader to be used when receiving 845 * objects, or null. 846 * 847 * @exception IOException Thrown if an I/O error occurred. 848 */ 849 void postReceive(Status theStatus, 850 ClassLoader theClassLoader) 851 throws IOException { 852 try { 853 byte[] savedSerializedItems = mySerializedItems; 854 ByteArrayInputStream bais 855 = new ByteArrayInputStream(mySerializedItems); 856 ObjectInputStream ois 857 = new MPObjectInputStream(bais, theClassLoader); 858 int nmsg = ois.readInt(); 859 int n = Math.min(myLength, nmsg); 860 for (int i = 0; i < n; ++i) { 861 put(i, (T) ois.readObject()); 862 } 863 ois.close(); 864 theStatus.length = nmsg; 865 mySerializedItems = savedSerializedItems; 866 } catch (ClassNotFoundException exc) { 867 throw new IOException("ObjectBuf.postReceive(): Class not found", exc); 868 } catch (ClassCastException exc) { 869 throw new IOException("ObjectBuf.postReceive(): Wrong type", exc); 870 } 871 } 872 873 /** 874 * Copy items from the given source buffer to the given destination buffer. 875 * The number of items copied is <code>theSrc</code>'s length or 876 * <code>theDst</code>'s length, whichever is smaller. Each item is copied 877 * individually using the <code>get()</code> and <code>put()</code> methods. It is 878 * assumed that <code>theSrc</code> is not the same as <code>theDst</code>. 879 * 880 * @param <T> Data type of the objects in the buffer. 881 * @param theSrc Source of items to copy. 882 * @param theDst Destination of items to copy. 883 */ 884 protected static <T> void defaultCopy(ObjectBuf<T> theSrc, 885 ObjectBuf<T> theDst) { 886 int n = Math.min(theSrc.myLength, theDst.myLength); 887 for (int i = 0; i < n; ++i) { 888 theDst.put(i, theSrc.get(i)); 889 } 890 } 891 892 }