View Javadoc
1   //******************************************************************************
2   //
3   // File:    ReplicatedShort.java
4   // Package: edu.rit.pj.replica
5   // Unit:    Class edu.rit.pj.replica.ReplicatedShort
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.replica;
41  
42  import java.io.IOException;
43  import java.io.Serial;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import edu.rit.mp.ShortBuf;
47  import edu.rit.mp.buf.ShortItemBuf;
48  import edu.rit.pj.Comm;
49  import edu.rit.pj.reduction.ShortOp;
50  
51  /**
52   * Class ReplicatedShort provides a replicated, shared reduction variable for a
53   * value of type <code>short</code>.
54   * <P>
55   * A replicated, shared reduction variable is intended to be used in a cluster
56   * or hybrid parallel program for a data item shared among all the processes in
57   * the program and all the threads in each process. To use a replicated, shared
58   * reduction variable, do the following in each process of the parallel program:
59   * <OL TYPE=1>
60   * <LI>
61   * Construct an instance of class ReplicatedShort, specifying the reduction
62   * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
63   * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
64   * and the message tag to use for sending updates among the processes. At this
65   * point a <I>replica</I> of the variable exists in each process.
66   *
67   * <LI>
68   * To read the variable, call the <code>get()</code> method. The current value of
69   * the local process's replicated variable is returned.
70   *
71   * <LI>
72   * To update the variable, call the <code>reduce()</code> method, specifying a new
73   * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
74   * (described below) on the local process's replicated variable with the new
75   * value. If the variable changed as a result of the reduction, the variable's
76   * (updated) value is flooded to all the processes in the communicator. Finally,
77   * the <code>reduce()</code> method returns the variable's value.
78   * <P>
79   * Whenever one of the aforementioned flooded messages arrives, a separate
80   * thread performs an atomic reduction on the local process's variable with the
81   * received value.
82   * <P>
83   * An atomic reduction consists of these steps, performed atomically: Call the
84   * reduction operator's <code>op()</code> method, passing in the current value of
85   * the local process's replicated variable and the new value (either the new
86   * value specified as an argument of <code>reduce()</code>, or the new value
87   * received in a flooded message). Then store the <code>op()</code> method's return
88   * value back into the local process's replicated variable.
89   * </OL>
90   * <P>
91   * Class ReplicatedShort does not itself guarantee consistency of the replicas'
92   * values. This is to avoid the message passing overhead of a distributed state
93   * update protocol. Instead, the parallel program must be written to operate
94   * correctly when the variable is updated as described above. Note that the
95   * value of a process's local replica can change asynchronously at any time,
96   * either because a thread in the current process updated the variable, or
97   * because a flooded message updated the variable.
98   * <P>
99   * Class ReplicatedShort is multiple thread safe. The methods use lock-free
100  * atomic compare-and-set.
101  * <P>
102  * <I>Note:</I> Class ReplicatedShort is implemented using class
103  * java.util.concurrent.atomic.AtomicInteger.
104  *
105  * @author Alan Kaminsky
106  * @version 13-Sep-2008
107  */
108 @SuppressWarnings("serial")
109 public class ReplicatedShort
110         extends Number {
111 
112     @Serial
113     private static final long serialVersionUID = 1L;
114 
115 // Hidden data members.
116     private ShortOp myOp;
117     private AtomicInteger myValue;
118     private int myTag;
119     private Comm myComm;
120     private Receiver myReceiver;
121 
122 // Hidden helper classes.
123     /**
124      * Class Receiver receives and processes flooded messages to update this
125      * replicated, shared reduction variable.
126      *
127      * @author Alan Kaminsky
128      * @version 13-Sep-2008
129      */
130     private class Receiver
131             extends Thread {
132 
133         public void run() {
134             ShortItemBuf buf = ShortBuf.buffer();
135 
136             try {
137                 for (;;) {
138                     // Receive a flooded message.
139                     myComm.floodReceive(myTag, buf);
140 
141                     // Do an atomic reduction.
142                     short oldvalue, newvalue;
143                     do {
144                         oldvalue = (short) myValue.get();
145                         newvalue = myOp.op(oldvalue, buf.item);
146                     } while (!myValue.compareAndSet(oldvalue, newvalue));
147                 }
148             } catch (Throwable exc) {
149                 exc.printStackTrace(System.err);
150             }
151         }
152     }
153 
154 // Exported constructors.
155     /**
156      * Construct a new replicated, shared short reduction variable with the
157      * given reduction operator. The initial value is 0. A message tag of 0 is
158      * used. The world communicator is used.
159      *
160      * @param op Reduction operator.
161      * @exception NullPointerException (unchecked exception) Thrown if
162      * <code>op</code> is null.
163      */
164     public ReplicatedShort(ShortOp op) {
165         this(op, (short) 0, 0, Comm.world());
166     }
167 
168     /**
169      * Construct a new replicated, shared short reduction variable with the
170      * given reduction operator and initial value. A message tag of 0 is used.
171      * The world communicator is used.
172      *
173      * @param op Reduction operator.
174      * @param initialValue Initial value.
175      * @exception NullPointerException (unchecked exception) Thrown if
176      * <code>op</code> is null.
177      */
178     public ReplicatedShort(ShortOp op,
179             short initialValue) {
180         this(op, initialValue, 0, Comm.world());
181     }
182 
183     /**
184      * Construct a new replicated, shared short reduction variable with the
185      * given reduction operator, initial value, and message tag. The world
186      * communicator is used.
187      *
188      * @param op Reduction operator.
189      * @param initialValue Initial value.
190      * @param tag Message tag.
191      * @exception NullPointerException (unchecked exception) Thrown if
192      * <code>op</code> is null. Thrown if
193      * <code>comm</code> is null.
194      */
195     public ReplicatedShort(ShortOp op,
196             short initialValue,
197             int tag) {
198         this(op, initialValue, tag, Comm.world());
199     }
200 
201     /**
202      * Construct a new replicated, shared short reduction variable with the
203      * given reduction operator, initial value, message tag, and communicator.
204      *
205      * @param op Reduction operator.
206      * @param initialValue Initial value.
207      * @param tag Message tag.
208      * @param comm Communicator.
209      * @exception NullPointerException (unchecked exception) Thrown if
210      * <code>op</code> is null. Thrown if
211      * <code>comm</code> is null.
212      */
213     public ReplicatedShort(ShortOp op,
214             short initialValue,
215             int tag,
216             Comm comm) {
217         if (op == null) {
218             throw new NullPointerException("ReplicatedShort(): op is null");
219         }
220         if (comm == null) {
221             throw new NullPointerException("ReplicatedShort(): comm is null");
222         }
223         myOp = op;
224         myValue = new AtomicInteger(initialValue);
225         myTag = tag;
226         myComm = comm;
227         myReceiver = new Receiver();
228         myReceiver.setDaemon(true);
229         myReceiver.start();
230     }
231 
232 // Exported operations.
233     /**
234      * Returns this replicated, shared reduction variable's current value.
235      *
236      * @return Current value.
237      */
238     public short get() {
239         return (short) myValue.get();
240     }
241 
242     /**
243      * Update this replicated, shared reduction variable's current value. This
244      * variable is combined with the given value using the reduction operation
245      * specified to the constructor (<I>op</I>). The result is stored back into
246      * this variable and is returned; the result may also be flooded to all
247      * processes in the communicator.
248      *
249      * @param value Value.
250      * @return (This variable) <I>op</I> (<code>value</code>).
251      * @exception IOException Thrown if an I/O error occurred.
252      * @throws java.io.IOException if any.
253      */
254     public short reduce(short value)
255             throws IOException {
256         // Do an atomic reduction.
257         short oldvalue, newvalue;
258         do {
259             oldvalue = (short) myValue.get();
260             newvalue = myOp.op(oldvalue, value);
261         } while (!myValue.compareAndSet(oldvalue, newvalue));
262 
263         // If value changed, send a flooded message.
264         if (newvalue != oldvalue) {
265             myComm.floodSend(myTag, ShortBuf.buffer(newvalue));
266         }
267 
268         // Return updated value.
269         return newvalue;
270     }
271 
272     /**
273      * Returns a string version of this reduction variable.
274      *
275      * @return String version.
276      */
277     public String toString() {
278         return Short.toString(get());
279     }
280 
281     /**
282      * Returns this reduction variable's current value converted to type
283      * <code>int</code>.
284      *
285      * @return Current value.
286      */
287     public int intValue() {
288         return get();
289     }
290 
291     /**
292      * Returns this reduction variable's current value converted to type
293      * <code>long</code>.
294      *
295      * @return Current value.
296      */
297     public long longValue() {
298         return get();
299     }
300 
301     /**
302      * Returns this reduction variable's current value converted to type
303      * <code>float</code>.
304      *
305      * @return Current value.
306      */
307     public float floatValue() {
308         return get();
309     }
310 
311     /**
312      * Returns this reduction variable's current value converted to type
313      * <code>double</code>.
314      *
315      * @return Current value.
316      */
317     public double doubleValue() {
318         return get();
319     }
320 
321 }