1 //******************************************************************************
2 //
3 // File: ObjectBuf.java
4 // Package: edu.rit.mp
5 // Unit: Class edu.rit.mp.ObjectBuf<T>
6 //
7 // This Java source file is copyright (C) 2007 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.mp;
41
42 import java.io.ByteArrayInputStream;
43 import java.io.ByteArrayOutputStream;
44 import java.io.IOException;
45 import java.io.ObjectInputStream;
46 import java.io.ObjectOutputStream;
47 import java.nio.ByteBuffer;
48
49 import edu.rit.mp.buf.EmptyObjectBuf;
50 import edu.rit.mp.buf.ObjectArrayBuf;
51 import edu.rit.mp.buf.ObjectArrayBuf_1;
52 import edu.rit.mp.buf.ObjectItemBuf;
53 import edu.rit.mp.buf.ObjectMatrixBuf;
54 import edu.rit.mp.buf.ObjectMatrixBuf_1;
55 import edu.rit.mp.buf.SharedObjectArrayBuf;
56 import edu.rit.mp.buf.SharedObjectArrayBuf_1;
57 import edu.rit.mp.buf.SharedObjectBuf;
58 import edu.rit.pj.reduction.SharedObject;
59 import edu.rit.pj.reduction.SharedObjectArray;
60 import edu.rit.util.Arrays;
61 import edu.rit.util.Range;
62
63 /**
64 * Class ObjectBuf is the abstract base class for a buffer of object items sent
65 * or received using the Message Protocol (MP). In a message, an object item is
66 * represented as a sequence of bytes using Java Object Serialization.
67 * <P>
68 * A buffer may be used to send one or more messages at the same time in
69 * multiple threads. If a buffer is being used to send a message or messages,
70 * the buffer must not be used to receive a message at the same time.
71 * <P>
72 * A buffer may be used to receive one message at a time. If a buffer is being
73 * used to receive a message, the buffer must not be used to receive another
74 * message in a different thread, and the buffer must not be used to send a
75 * message or messages.
76 * <P>
77 * A buffer is a conduit for retrieving and storing data in some underlying data
78 * structure. If the underlying data structure is multiple thread safe, then one
79 * thread can be retrieving or storing data via the buffer at the same time as
80 * other threads are accessing the data structure. If the underlying data
81 * structure is not multiple thread safe, then other threads must not access the
82 * data structure while one thread is retrieving or storing data via the buffer.
83 * <P>
84 * To create an ObjectBuf, call one of the following static factory methods:
85 * <UL>
86 * <LI><code>emptyBuffer()</code>
87 * <LI><code>buffer()</code>
88 * <LI><code>buffer (T)</code>
89 * <LI><code>buffer (T[])</code>
90 * <LI><code>sliceBuffer (T[], Range)</code>
91 * <LI><code>sliceBuffers (T[], Range[])</code>
92 * <LI><code>objectBuffer (T[])</code>
93 * <LI><code>buffer (T[][])</code>
94 * <LI><code>rowSliceBuffer (T[][], Range)</code>
95 * <LI><code>rowSliceBuffers (T[][], Range[])</code>
96 * <LI><code>colSliceBuffer (T[][], Range)</code>
97 * <LI><code>colSliceBuffers (T[][], Range[])</code>
98 * <LI><code>patchBuffer (T[][], Range, Range)</code>
99 * <LI><code>patchBuffers (T[][], Range[], Range[])</code>
100 * <LI><code>objectBuffer (T[][])</code>
101 * <LI><code>buffer (SharedObject<T>)</code>
102 * <LI><code>buffer (SharedObjectArray<T>)</code>
103 * <LI><code>sliceBuffer (SharedObjectArray<T>, Range)</code>
104 * <LI><code>sliceBuffers (SharedObjectArray<T>, Range[])</code>
105 * </UL>
106 * <P>
107 * There are two ways to create a buffer for an array of objects (type
108 * <code>T[]</code>):
109 * <OL TYPE=1>
110 * <LI>
111 * With the <code>buffer(T[])</code>, <code>sliceBuffer(T[],Range)</code>, and
112 * <code>sliceBuffers(T[],Range[])</code> methods. These methods create a buffer
113 * that sends and receives the array elements as multiple separate objects of
114 * type <code>T</code>. The receiver must allocate an array of the proper dimension
115 * to receive the incoming objects and must create a buffer that receives the
116 * array elements as separate objects.
117 *
118 * <LI>
119 * With the <code>objectBuffer(T[])</code> method. This method creates a buffer that
120 * sends and receives the entire array as one object of type <code>T[]</code>. The
121 * receiver must also create a buffer that receives the entire array as one
122 * object; the buffer's <code>item</code> field is automatically set to an array of
123 * the proper dimension.
124 * </OL>
125 * <P>
126 * There are two ways to create a buffer for a matrix of objects (type
127 * <code>T[][]</code>):
128 * <OL TYPE=1>
129 * <LI>
130 * With the <code>buffer(T[][])</code>, <code>rowSliceBuffer(T[][],Range)</code>,
131 * <code>rowSliceBuffers(T[][],Range[])</code>,
132 * <code>colSliceBuffer(T[][],Range)</code>,
133 * <code>colSliceBuffers(T[][],Range[])</code>, <code>patchBuffer(T[][],Range)</code>,
134 * and <code>patchBuffers(T[][],Range[])</code> methods. These methods create a
135 * buffer that sends and receives the matrix elements as multiple separate
136 * objects of type <code>T</code>. The receiver must allocate a matrix of the proper
137 * dimensions to receive the incoming objects and must create a buffer that
138 * receives the matrix elements as separate objects.
139 *
140 * <LI>
141 * With the <code>objectBuffer(T[][])</code> method. This method creates a buffer
142 * that sends and receives the entire matrix as one object of type
143 * <code>T[][]</code>. The receiver must also create a buffer that receives the
144 * matrix as one object; the buffer's <code>item</code> field is automatically set
145 * to a matrix of the proper dimensions.
146 * </OL>
147 * <P>
148 * <B><I>Important Note:</I></B> An ObjectBuf uses the protected field
149 * <code>mySerializedItems</code> to store the serialized representation of the
150 * objects in the buffer. If the buffer is used to receive a message, the
151 * serialized representation of the received objects is cached in
152 * <code>mySerializedItems</code>. If the buffer is used to send a message and
153 * <code>mySerializedItems</code> is empty, the objects in the buffer are
154 * serialized, the serialized representation is cached in
155 * <code>mySerializedItems</code>, and the serialized representation is sent in the
156 * message. If the buffer is used to send a message and
157 * <code>mySerializedItems</code> is not empty, the objects in the buffer are
158 * <I>not</I> serialized; rather, the cached serialized representation is sent.
159 * This is done to avoid re-serializing the objects if the buffer is used to
160 * send copies of a message to multiple destinations, or if the buffer is used
161 * to receive and then immediately send a message. However, if the state of any
162 * object in the buffer changes, the buffer's <code>reset()</code> method must be
163 * called; this tells the buffer to discard the cached serialized representation
164 * and re-serialize the objects in the buffer.
165 *
166 * @param <T> Data type of the objects in the buffer.
167 * @author Alan Kaminsky
168 * @version 03-Jul-2008
169 */
170 @SuppressWarnings("unchecked")
171 public abstract class ObjectBuf<T>
172 extends Buf {
173
174 // Hidden data members.
175 /**
176 * Byte array containing this buffer's object items in serialized form. If
177 * null, the object items need to be serialized.
178 */
179 protected byte[] mySerializedItems;
180
181 // Hidden constructors.
182 /**
183 * Construct a new object buffer.
184 *
185 * @param theLength Number of items.
186 * @exception IllegalArgumentException (unchecked exception) Thrown if
187 * <code>theLength</code> < 0.
188 */
189 protected ObjectBuf(int theLength) {
190 super(Constants.TYPE_OBJECT, theLength);
191 }
192
193 // Exported operations.
194 /**
195 * Create an empty buffer. The buffer's length is 0. The buffer's item type
196 * is Object.
197 *
198 * @return Empty buffer.
199 */
200 public static ObjectBuf<Object> emptyBuffer() {
201 return new EmptyObjectBuf();
202 }
203
204 /**
205 * Create a buffer for an object item. The item is stored in the
206 * <code>item</code> field of the buffer.
207 *
208 * @param <T> Data type of the objects in the buffer.
209 * @return Buffer.
210 */
211 public static <T> ObjectItemBuf<T> buffer() {
212 return new ObjectItemBuf<T>();
213 }
214
215 /**
216 * Create a buffer for an object item with the given initial value. The item
217 * is stored in the <code>item</code> field of the buffer.
218 *
219 * @param <T> Data type of the objects in the buffer.
220 * @param item Initial value of the <code>item</code> field.
221 * @return Buffer.
222 */
223 public static <T> ObjectItemBuf<T> buffer(T item) {
224 return new ObjectItemBuf<T>(item);
225 }
226
227 /**
228 * Create a buffer for the entire given object array. The returned buffer
229 * encompasses all the elements in <code>theArray</code>. The array elements are
230 * sent and received as multiple separate objects of type <code>T</code>.
231 *
232 * @param <T> Data type of the objects in the buffer.
233 * @param theArray Array.
234 * @return Buffer.
235 * @exception NullPointerException (unchecked exception) Thrown if
236 * <code>theArray</code> is null.
237 */
238 public static <T> ObjectBuf<T> buffer(T[] theArray) {
239 if (theArray == null) {
240 throw new NullPointerException("ObjectBuf.buffer(): theArray is null");
241 }
242 int nr = Arrays.length(theArray);
243 return new ObjectArrayBuf_1<T>(theArray, new Range(0, nr - 1));
244 }
245
246 /**
247 * Create a buffer for one slice of the given object array. The returned
248 * buffer encompasses <code>theRange</code> of elements in <code>theArray</code>.
249 * The range's stride may be 1 or greater than 1. The array elements are
250 * sent and received as multiple separate objects of type <code>T</code>.
251 *
252 * @param <T> Data type of the objects in the buffer.
253 * @param theArray Array.
254 * @param theRange Range of elements to include.
255 * @return Buffer.
256 * @exception NullPointerException (unchecked exception) Thrown if
257 * <code>theArray</code> is null or
258 * <code>theRange</code> is null.
259 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
260 * <code>theArray</code> does not include all the indexes in <code>theRange</code>.
261 */
262 public static <T> ObjectBuf<T> sliceBuffer(T[] theArray,
263 Range theRange) {
264 if (theArray == null) {
265 throw new NullPointerException("ObjectBuf.sliceBuffer(): theArray is null");
266 }
267 int nr = Arrays.length(theArray);
268 if (0 > theRange.lb() || theRange.ub() >= nr) {
269 throw new IndexOutOfBoundsException("ObjectBuf.sliceBuffer(): theArray index range = 0.."
270 + (nr - 1) + ", theRange = " + theRange);
271 }
272 if (theRange.stride() == 1) {
273 return new ObjectArrayBuf_1<T>(theArray, theRange);
274 } else {
275 return new ObjectArrayBuf<T>(theArray, theRange);
276 }
277 }
278
279 /**
280 * Create an array of buffers for multiple slices of the given object array.
281 * The returned buffer array has the same length as
282 * <code>theRanges</code>. Each element [<I>i</I>] of the returned buffer array
283 * encompasses the elements of <code>theArray</code> specified by
284 * <code>theRanges[i]</code>. Each range's stride may be 1 or greater than 1.
285 * The array elements are sent and received as multiple separate objects of
286 * type <code>T</code>.
287 *
288 * @param <T> Data type of the objects in the buffer.
289 * @param theArray Array.
290 * @param theRanges Array of ranges of elements to include.
291 * @return Array of buffers.
292 * @exception NullPointerException (unchecked exception) Thrown if
293 * <code>theArray</code> is null or
294 * <code>theRanges</code> or any element thereof is null.
295 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
296 * <code>theArray</code>'s allocation does not include any element of
297 * <code>theRanges</code>.
298 */
299 public static <T> ObjectBuf<T>[] sliceBuffers(T[] theArray,
300 Range[] theRanges) {
301 int n = theRanges.length;
302 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n];
303 for (int i = 0; i < n; ++i) {
304 result[i] = sliceBuffer(theArray, theRanges[i]);
305 }
306 return result;
307 }
308
309 /**
310 * Create a buffer for the entire given object array. The returned buffer
311 * encompasses all the elements in <code>theArray</code>. The array is sent and
312 * received as a single object of type <code>T[]</code>.
313 *
314 * @param <T> Data type of the objects in the buffer.
315 * @param theArray Array. May be null.
316 * @return Buffer.
317 */
318 public static <T> ObjectItemBuf<T[]> objectBuffer(T[] theArray) {
319 return new ObjectItemBuf<T[]>(theArray);
320 }
321
322 /**
323 * Create a buffer for the entire given object matrix. The returned buffer
324 * encompasses all the rows and all the columns in
325 * <code>theMatrix</code>. The matrix elements are sent and received as multiple
326 * separate objects of type <code>T</code>.
327 *
328 * @param <T> Data type of the objects in the buffer.
329 * @param theMatrix Matrix.
330 * @return Buffer.
331 * @exception NullPointerException (unchecked exception) Thrown if
332 * <code>theMatrix</code> is null.
333 */
334 public static <T> ObjectBuf<T> buffer(T[][] theMatrix) {
335 if (theMatrix == null) {
336 throw new NullPointerException("ObjectBuf.buffer(): theMatrix is null");
337 }
338 int nr = Arrays.rowLength(theMatrix);
339 int nc = Arrays.colLength(theMatrix, 0);
340 return new ObjectMatrixBuf_1<T>(theMatrix, new Range(0, nr - 1), new Range(0, nc - 1));
341 }
342
343 /**
344 * Create a buffer for one row slice of the given object matrix. The
345 * returned buffer encompasses <code>theRowRange</code> of rows, and all the
346 * columns, in <code>theMatrix</code>. The range's stride may be 1 or greater
347 * than 1. The matrix elements are sent and received as multiple separate
348 * objects of type <code>T</code>.
349 *
350 * @param <T> Data type of the objects in the buffer.
351 * @param theMatrix Matrix.
352 * @param theRowRange Range of rows to include.
353 * @return Buffer.
354 * @exception NullPointerException (unchecked exception) Thrown if
355 * <code>theMatrix</code> is null or
356 * <code>theRowRange</code> is null.
357 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
358 * <code>theMatrix</code>'s allocation does not include <code>theRowRange</code>.
359 */
360 public static <T> ObjectBuf<T> rowSliceBuffer(T[][] theMatrix,
361 Range theRowRange) {
362 if (theMatrix == null) {
363 throw new NullPointerException("ObjectBuf.rowSliceBuffer(): theMatrix is null");
364 }
365 int nr = Arrays.rowLength(theMatrix);
366 if (0 > theRowRange.lb() || theRowRange.ub() >= nr) {
367 throw new IndexOutOfBoundsException("ObjectBuf.rowSliceBuffer(): theMatrix row index range = 0.."
368 + (nr - 1) + ", theRowRange = " + theRowRange);
369 }
370 int nc = Arrays.colLength(theMatrix, theRowRange.lb());
371 if (theRowRange.stride() == 1) {
372 return new ObjectMatrixBuf_1<T>(theMatrix, theRowRange, new Range(0, nc - 1));
373 } else {
374 return new ObjectMatrixBuf<T>(theMatrix, theRowRange, new Range(0, nc - 1));
375 }
376 }
377
378 /**
379 * Create an array of buffers for multiple row slices of the given object
380 * matrix. The returned buffer array has the same length as
381 * <code>theRowRanges</code>. Each element [<I>i</I>] of the returned buffer
382 * array encompasses the rows of <code>theMatrix</code> specified by
383 * <code>theRowRanges[i]</code> and all the columns of <code>theMatrix</code>. Each
384 * range's stride may be 1 or greater than 1. The matrix elements are sent
385 * and received as multiple separate objects of type <code>T</code>.
386 *
387 * @param <T> Data type of the objects in the buffer.
388 * @param theMatrix Matrix.
389 * @param theRowRanges Array of ranges of rows to include.
390 * @return Array of buffers.
391 * @exception NullPointerException (unchecked exception) Thrown if
392 * <code>theMatrix</code> is null or
393 * <code>theRowRanges</code> or any element thereof is null.
394 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
395 * <code>theMatrix</code>'s allocation does not include any element of
396 * <code>theRowRanges</code>.
397 */
398 public static <T> ObjectBuf<T>[] rowSliceBuffers(T[][] theMatrix,
399 Range[] theRowRanges) {
400 int n = theRowRanges.length;
401 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n];
402 for (int i = 0; i < n; ++i) {
403 result[i] = rowSliceBuffer(theMatrix, theRowRanges[i]);
404 }
405 return result;
406 }
407
408 /**
409 * Create a buffer for one column slice of the given object matrix. The
410 * returned buffer encompasses all the rows, and <code>theColRange</code> of
411 * columns, in <code>theMatrix</code>. The range's stride may be 1 or greater
412 * than 1. The matrix elements are sent and received as multiple separate
413 * objects of type <code>T</code>.
414 *
415 * @param <T> Data type of the objects in the buffer.
416 * @param theMatrix Matrix.
417 * @param theColRange Range of columns to include.
418 * @return Buffer.
419 * @exception NullPointerException (unchecked exception) Thrown if
420 * <code>theMatrix</code> is null or
421 * <code>theColRange</code> is null.
422 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
423 * <code>theMatrix</code>'s allocation does not include <code>theColRange</code>.
424 */
425 public static <T> ObjectBuf<T> colSliceBuffer(T[][] theMatrix,
426 Range theColRange) {
427 if (theMatrix == null) {
428 throw new NullPointerException("ObjectBuf.colSliceBuffer(): theMatrix is null");
429 }
430 int nr = Arrays.rowLength(theMatrix);
431 int nc = Arrays.colLength(theMatrix, 0);
432 if (0 > theColRange.lb() || theColRange.ub() >= nc) {
433 throw new IndexOutOfBoundsException("ObjectBuf.colSliceBuffer(): theMatrix column index range = 0.."
434 + (nc - 1) + ", theColRange = " + theColRange);
435 }
436 if (theColRange.stride() == 1) {
437 return new ObjectMatrixBuf_1<T>(theMatrix, new Range(0, nr - 1), theColRange);
438 } else {
439 return new ObjectMatrixBuf<T>(theMatrix, new Range(0, nr - 1), theColRange);
440 }
441 }
442
443 /**
444 * Create an array of buffers for multiple column slices of the given object
445 * matrix. The returned buffer array has the same length as
446 * <code>theColRanges</code>. Each element [<I>i</I>] of the returned buffer
447 * array encompasses all the rows of <code>theMatrix</code> and the columns of
448 * <code>theMatrix</code> specified by <code>theColRanges[i]</code>. Each range's
449 * stride may be 1 or greater than 1. The matrix elements are sent and
450 * received as multiple separate objects of type <code>T</code>.
451 *
452 * @param <T> Data type of the objects in the buffer.
453 * @param theMatrix Matrix.
454 * @param theColRanges Array of ranges of columns to include.
455 * @return Array of buffers.
456 * @exception NullPointerException (unchecked exception) Thrown if
457 * <code>theMatrix</code> is null or
458 * <code>theColRanges</code> or any element thereof is null.
459 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
460 * <code>theMatrix</code>'s allocation does not include any element of
461 * <code>theColRanges</code>.
462 */
463 public static <T> ObjectBuf<T>[] colSliceBuffers(T[][] theMatrix,
464 Range[] theColRanges) {
465 int n = theColRanges.length;
466 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n];
467 for (int i = 0; i < n; ++i) {
468 result[i] = colSliceBuffer(theMatrix, theColRanges[i]);
469 }
470 return result;
471 }
472
473 /**
474 * Create a buffer for one patch of the given object matrix. The returned
475 * buffer encompasses <code>theRowRange</code> of rows, and <code>theColRange</code>
476 * of columns, in <code>theMatrix</code>. Each range's stride may be 1 or
477 * greater than 1. The matrix elements are sent and received as multiple
478 * separate objects of type <code>T</code>.
479 *
480 * @param <T> Data type of the objects in the buffer.
481 * @param theMatrix Matrix.
482 * @param theRowRange Range of rows to include.
483 * @param theColRange Range of columns to include.
484 * @return Buffer.
485 * @exception NullPointerException (unchecked exception) Thrown if
486 * <code>theMatrix</code> is null,
487 * <code>theRowRange</code> is null, or <code>theColRange</code> is null.
488 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
489 * <code>theMatrix</code>'s allocation does not include <code>theRowRange</code> and
490 * <code>theColRange</code>.
491 */
492 public static <T> ObjectBuf<T> patchBuffer(T[][] theMatrix,
493 Range theRowRange,
494 Range theColRange) {
495 if (theMatrix == null) {
496 throw new NullPointerException("ObjectBuf.patchBuffer(): theMatrix is null");
497 }
498 int nr = Arrays.rowLength(theMatrix);
499 if (0 > theRowRange.lb() || theRowRange.ub() >= nr) {
500 throw new IndexOutOfBoundsException("ObjectBuf.patchBuffer(): theMatrix row index range = 0.."
501 + (nr - 1) + ", theRowRange = " + theRowRange);
502 }
503 int nc = Arrays.colLength(theMatrix, theRowRange.lb());
504 if (0 > theColRange.lb() || theColRange.ub() >= nc) {
505 throw new IndexOutOfBoundsException("ObjectBuf.patchBuffer(): theMatrix column index range = 0.."
506 + (nc - 1) + ", theColRange = " + theColRange);
507 }
508 if (theRowRange.stride() == 1 && theColRange.stride() == 1) {
509 return new ObjectMatrixBuf_1<T>(theMatrix, theRowRange, theColRange);
510 } else {
511 return new ObjectMatrixBuf<T>(theMatrix, theRowRange, theColRange);
512 }
513 }
514
515 /**
516 * Create an array of buffers for multiple patches of the given object
517 * matrix. The length of the returned buffer array is equal to the length of
518 * <code>theRowRanges</code> times the length of <code>theColRanges</code>. Each
519 * element of the returned buffer array encompasses the rows given in one
520 * element of <code>theRowRanges</code> array, and the columns given in one
521 * element of <code>theColRanges</code> array, in all possible combinations, of
522 * <code>theMatrix</code>. Each range's stride may be 1 or greater than 1. The
523 * matrix elements are sent and received as multiple separate objects of
524 * type <code>T</code>.
525 *
526 * @param <T> Data type of the objects in the buffer.
527 * @param theMatrix Matrix.
528 * @param theRowRanges Array of ranges of rows to include.
529 * @param theColRanges Array of ranges of columns to include.
530 * @return Array of buffers.
531 * @exception NullPointerException (unchecked exception) Thrown if
532 * <code>theMatrix</code> is null,
533 * <code>theRowRanges</code> or any element thereof is null, or
534 * <code>theColRanges</code> or any element thereof is null.
535 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
536 * <code>theMatrix</code>'s allocation does not include any element of
537 * <code>theRowRanges</code> or
538 * <code>theColRanges</code>.
539 */
540 public static <T> ObjectBuf<T>[] patchBuffers(T[][] theMatrix,
541 Range[] theRowRanges,
542 Range[] theColRanges) {
543 int m = theRowRanges.length;
544 int n = theColRanges.length;
545 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[m * n];
546 int k = 0;
547 for (int i = 0; i < m; ++i) {
548 Range rowrange = theRowRanges[i];
549 for (int j = 0; j < n; ++j) {
550 result[k++]
551 = patchBuffer(theMatrix, rowrange, theColRanges[j]);
552 }
553 }
554 return result;
555 }
556
557 /**
558 * Create a buffer for the entire given object matrix. The returned buffer
559 * encompasses all the rows and all the columns in <code>theMatrix</code>. The
560 * matrix is sent and received as a single object of type <code>T[][]</code>.
561 *
562 * @param <T> Data type of the objects in the buffer.
563 * @param theMatrix Matrix. May be null.
564 * @return Buffer.
565 */
566 public static <T> ObjectItemBuf<T[][]> objectBuffer(T[][] theMatrix) {
567 return new ObjectItemBuf<T[][]>(theMatrix);
568 }
569
570 /**
571 * Create a buffer for a shared object item. The item is wrapped in an
572 * instance of class {@linkplain edu.rit.pj.reduction.SharedObject
573 * SharedObject}. Use the methods of the SharedObject object to access the
574 * actual item.
575 *
576 * @param <T> Data type of the objects in the buffer.
577 * @param item SharedObject object that wraps the item.
578 * @exception NullPointerException (unchecked exception) Thrown if
579 * <code>item</code> is null.
580 * @return a {@link edu.rit.mp.ObjectBuf} object.
581 */
582 public static <T> ObjectBuf<T> buffer(SharedObject<T> item) {
583 if (item == null) {
584 throw new NullPointerException("ObjectBuf.buffer(): item is null");
585 }
586 return new SharedObjectBuf<T>(item);
587 }
588
589 /**
590 * Create a buffer for the entire given shared object array. The returned
591 * buffer encompasses all the elements in <code>theArray</code>.
592 *
593 * @param <T> Data type of the objects in the buffer.
594 * @param theArray Array.
595 * @return Buffer.
596 * @exception NullPointerException (unchecked exception) Thrown if
597 * <code>theArray</code> is null.
598 */
599 public static <T> ObjectBuf<T> buffer(SharedObjectArray<T> theArray) {
600 if (theArray == null) {
601 throw new NullPointerException("ObjectBuf.buffer(): theArray is null");
602 }
603 int nr = theArray.length();
604 return new SharedObjectArrayBuf_1<T>(theArray, new Range(0, nr - 1));
605 }
606
607 /**
608 * Create a buffer for one slice of the given shared object array. The
609 * returned buffer encompasses <code>theRange</code> of elements in
610 * <code>theArray</code>. The range's stride may be 1 or greater than 1.
611 *
612 * @param <T> Data type of the objects in the buffer.
613 * @param theArray Array.
614 * @param theRange Range of elements to include.
615 * @return Buffer.
616 * @exception NullPointerException (unchecked exception) Thrown if
617 * <code>theArray</code> is null or
618 * <code>theRange</code> is null.
619 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
620 * <code>theArray</code> does not include all the indexes in <code>theRange</code>.
621 */
622 public static <T> ObjectBuf<T> sliceBuffer(SharedObjectArray<T> theArray,
623 Range theRange) {
624 if (theArray == null) {
625 throw new NullPointerException("ObjectBuf.sliceBuffer(): theArray is null");
626 }
627 int nr = theArray.length();
628 if (0 > theRange.lb() || theRange.ub() >= nr) {
629 throw new IndexOutOfBoundsException("ObjectBuf.sliceBuffer(): theArray index range = 0.."
630 + (nr - 1) + ", theRange = " + theRange);
631 }
632 if (theRange.stride() == 1) {
633 return new SharedObjectArrayBuf_1<T>(theArray, theRange);
634 } else {
635 return new SharedObjectArrayBuf<T>(theArray, theRange);
636 }
637 }
638
639 /**
640 * Create an array of buffers for multiple slices of the given shared object
641 * array. The returned buffer array has the same length as
642 * <code>theRanges</code>. Each element [<I>i</I>] of the returned buffer array
643 * encompasses the elements of <code>theArray</code> specified by
644 * <code>theRanges[i]</code>. Each range's stride may be 1 or greater than 1.
645 *
646 * @param <T> Data type of the objects in the buffer.
647 * @param theArray Array.
648 * @param theRanges Array of ranges of elements to include.
649 * @return Array of buffers.
650 * @exception NullPointerException (unchecked exception) Thrown if
651 * <code>theArray</code> is null or
652 * <code>theRanges</code> or any element thereof is null.
653 * @exception IndexOutOfBoundsException (unchecked exception) Thrown if
654 * <code>theArray</code>'s allocation does not include any element of
655 * <code>theRanges</code>.
656 */
657 public static <T> ObjectBuf<T>[] sliceBuffers(SharedObjectArray<T> theArray,
658 Range[] theRanges) {
659 int n = theRanges.length;
660 ObjectBuf<T>[] result = (ObjectBuf<T>[]) new ObjectBuf<?>[n];
661 for (int i = 0; i < n; ++i) {
662 result[i] = sliceBuffer(theArray, theRanges[i]);
663 }
664 return result;
665 }
666
667 /**
668 * Obtain the given item from this buffer.
669 * <P>
670 * The <code>get()</code> method must not block the calling thread; if it does,
671 * all message I/O in MP will be blocked.
672 *
673 * @param i Item index in the range 0 .. <code>length()</code>-1.
674 * @return Item at index <code>i</code>.
675 */
676 public abstract T get(int i);
677
678 /**
679 * Store the given item in this buffer.
680 * <P>
681 * The <code>put()</code> method must not block the calling thread; if it does,
682 * all message I/O in MP will be blocked.
683 *
684 * @param i Item index in the range 0 .. <code>length()</code>-1.
685 * @param item Item to be stored at index <code>i</code>.
686 */
687 public abstract void put(int i,
688 T item);
689
690 /**
691 * {@inheritDoc}
692 *
693 * Copy items from the given buffer to this buffer. The number of items
694 * copied is this buffer's length or <code>theSrc</code>'s length, whichever is
695 * smaller. If <code>theSrc</code> is this buffer, the <code>copy()</code> method
696 * does nothing.
697 * <P>
698 * The default implementation of the <code>copy()</code> method calls the
699 * <code>defaultCopy()</code> method. A subclass can override the
700 * <code>copy()</code> method to use a more efficient algorithm.
701 * <P>
702 * The default implementation of the <code>copy()</code> method also calls the
703 * <code>reset()</code> method.
704 * @exception ClassCastException (unchecked exception) Thrown if
705 * <code>theSrc</code>'s item data type is not the same as this buffer's item
706 * data type.
707 */
708 public void copy(Buf theSrc) {
709 if (theSrc != this) {
710 defaultCopy((ObjectBuf<T>) theSrc, this);
711 reset();
712 }
713 }
714
715 /**
716 * {@inheritDoc}
717 *
718 * Fill this buffer with the given item. The <code>item</code> is assigned to
719 * each element in this buffer.
720 * <P>
721 * The <code>item</code> must be an instance of class T or a subclass thereof.
722 * The <code>item</code> may be null. Note that since <code>item</code> is
723 * <I>assigned</I> to every buffer element, every buffer element ends up
724 * referring to the same <code>item</code>.
725 * <P>
726 * The <code>fill()</code> method calls the <code>reset()</code> method.
727 * @exception ClassCastException (unchecked exception) Thrown if the
728 * <code>item</code>'s data type is not the same as this buffer's item data
729 * type.
730 */
731 public void fill(Object item) {
732 T value = (T) item;
733 for (int i = 0; i < myLength; ++i) {
734 put(i, value);
735 }
736 reset();
737 }
738
739 /**
740 * Create a temporary buffer with the same type of items and the same length
741 * as this buffer. The new buffer items are stored in a newly created array,
742 * separate from the storage for this buffer's items.
743 *
744 * @return a {@link edu.rit.mp.Buf} object.
745 */
746 public Buf getTemporaryBuf() {
747 return buffer((T[]) new Object[myLength]);
748 }
749
750 /**
751 * Reset this buffer. Call <code>reset()</code> if the state of any object in
752 * this buffer changes.
753 */
754 public void reset() {
755 mySerializedItems = null;
756 }
757
758 // Hidden operations.
759 /**
760 * Called by the I/O thread before sending message items using this buffer.
761 *
762 * @exception IOException Thrown if an I/O error occurred.
763 */
764 void preSend()
765 throws IOException {
766 if (mySerializedItems == null) {
767 ByteArrayOutputStream baos = new ByteArrayOutputStream();
768 ObjectOutputStream oos = new ObjectOutputStream(baos);
769 oos.writeInt(myLength);
770 for (int i = 0; i < myLength; ++i) {
771 oos.writeObject(get(i));
772 }
773 oos.close();
774 mySerializedItems = baos.toByteArray();
775 myMessageLength = mySerializedItems.length;
776 }
777 }
778
779 /**
780 * {@inheritDoc}
781 *
782 * Send as many items as possible from this buffer to the given byte buffer.
783 * <P>
784 * The <code>sendItems()</code> method must not block the calling thread; if it
785 * does, all message I/O in MP will be blocked.
786 */
787 protected int sendItems(int i,
788 ByteBuffer buffer) {
789 int len = Math.min(myMessageLength - i, buffer.remaining());
790 buffer.put(mySerializedItems, i, len);
791 return len;
792 }
793
794 /**
795 * Called by the I/O thread before receiving message items using this
796 * buffer.
797 *
798 * @param theReadLength Actual number of items in message.
799 */
800 void preReceive(int theReadLength) {
801 mySerializedItems = new byte[theReadLength];
802 myMessageLength = theReadLength;
803 }
804
805 /**
806 * {@inheritDoc}
807 *
808 * Receive as many items as possible from the given byte buffer to this
809 * buffer.
810 * <P>
811 * The <code>receiveItems()</code> method must not block the calling thread; if
812 * it does, all message I/O in MP will be blocked.
813 */
814 protected int receiveItems(int i,
815 int num,
816 ByteBuffer buffer) {
817 int len = num;
818 len = Math.min(len, myMessageLength - i);
819 len = Math.min(len, buffer.remaining());
820 buffer.get(mySerializedItems, i, len);
821 return len;
822 }
823
824 /**
825 * Skip as many items as possible from the given byte buffer.
826 *
827 * @param num Number of items to skip.
828 * @param buffer Buffer.
829 *
830 * @return Number of items actually skipped.
831 */
832 int skipItems(int num,
833 ByteBuffer buffer) {
834 int n = Math.min(num, buffer.remaining());
835 buffer.position(buffer.position() + n);
836 return n;
837 }
838
839 /**
840 * Called by the I/O thread after receiving message items using this buffer.
841 *
842 * @param theStatus Status object that will be returned for the message; its
843 * contents may be altered if necessary.
844 * @param theClassLoader Alternate class loader to be used when receiving
845 * objects, or null.
846 *
847 * @exception IOException Thrown if an I/O error occurred.
848 */
849 void postReceive(Status theStatus,
850 ClassLoader theClassLoader)
851 throws IOException {
852 try {
853 byte[] savedSerializedItems = mySerializedItems;
854 ByteArrayInputStream bais
855 = new ByteArrayInputStream(mySerializedItems);
856 ObjectInputStream ois
857 = new MPObjectInputStream(bais, theClassLoader);
858 int nmsg = ois.readInt();
859 int n = Math.min(myLength, nmsg);
860 for (int i = 0; i < n; ++i) {
861 put(i, (T) ois.readObject());
862 }
863 ois.close();
864 theStatus.length = nmsg;
865 mySerializedItems = savedSerializedItems;
866 } catch (ClassNotFoundException exc) {
867 throw new IOException("ObjectBuf.postReceive(): Class not found", exc);
868 } catch (ClassCastException exc) {
869 throw new IOException("ObjectBuf.postReceive(): Wrong type", exc);
870 }
871 }
872
873 /**
874 * Copy items from the given source buffer to the given destination buffer.
875 * The number of items copied is <code>theSrc</code>'s length or
876 * <code>theDst</code>'s length, whichever is smaller. Each item is copied
877 * individually using the <code>get()</code> and <code>put()</code> methods. It is
878 * assumed that <code>theSrc</code> is not the same as <code>theDst</code>.
879 *
880 * @param <T> Data type of the objects in the buffer.
881 * @param theSrc Source of items to copy.
882 * @param theDst Destination of items to copy.
883 */
884 protected static <T> void defaultCopy(ObjectBuf<T> theSrc,
885 ObjectBuf<T> theDst) {
886 int n = Math.min(theSrc.myLength, theDst.myLength);
887 for (int i = 0; i < n; ++i) {
888 theDst.put(i, theSrc.get(i));
889 }
890 }
891
892 }