View Javadoc
1   //******************************************************************************
2   //
3   // File:    ReplicatedFloat.java
4   // Package: edu.rit.pj.replica
5   // Unit:    Class edu.rit.pj.replica.ReplicatedFloat
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.replica;
41  
42  import java.io.IOException;
43  import java.util.concurrent.atomic.AtomicInteger;
44  
45  import edu.rit.mp.FloatBuf;
46  import edu.rit.mp.buf.FloatItemBuf;
47  import edu.rit.pj.Comm;
48  import edu.rit.pj.reduction.FloatOp;
49  
50  /**
51   * Class ReplicatedFloat provides a replicated, shared reduction variable for a
52   * value of type <code>float</code>.
53   * <P>
54   * A replicated, shared reduction variable is intended to be used in a cluster
55   * or hybrid parallel program for a data item shared among all the processes in
56   * the program and all the threads in each process. To use a replicated, shared
57   * reduction variable, do the following in each process of the parallel program:
58   * <OL TYPE=1>
59   * <LI>
60   * Construct an instance of class ReplicatedFloat, specifying the reduction
61   * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
62   * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
63   * and the message tag to use for sending updates among the processes. At this
64   * point a <I>replica</I> of the variable exists in each process.
65   *
66   * <LI>
67   * To read the variable, call the <code>get()</code> method. The current value of
68   * the local process's replicated variable is returned.
69   *
70   * <LI>
71   * To update the variable, call the <code>reduce()</code> method, specifying a new
72   * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
73   * (described below) on the local process's replicated variable with the new
74   * value. If the variable changed as a result of the reduction, the variable's
75   * (updated) value is flooded to all the processes in the communicator. Finally,
76   * the <code>reduce()</code> method returns the variable's value.
77   * <P>
78   * Whenever one of the aforementioned flooded messages arrives, a separate
79   * thread performs an atomic reduction on the local process's variable with the
80   * received value.
81   * <P>
82   * An atomic reduction consists of these steps, performed atomically: Call the
83   * reduction operator's <code>op()</code> method, passing in the current value of
84   * the local process's replicated variable and the new value (either the new
85   * value specified as an argument of <code>reduce()</code>, or the new value
86   * received in a flooded message). Then store the <code>op()</code> method's return
87   * value back into the local process's replicated variable.
88   * </OL>
89   * <P>
90   * Class ReplicatedFloat does not itself guarantee consistency of the replicas'
91   * values. This is to avoid the message passing overhead of a distributed state
92   * update protocol. Instead, the parallel program must be written to operate
93   * correctly when the variable is updated as described above. Note that the
94   * value of a process's local replica can change asynchronously at any time,
95   * either because a thread in the current process updated the variable, or
96   * because a flooded message updated the variable.
97   * <P>
98   * Class ReplicatedFloat is multiple thread safe. The methods use lock-free
99   * atomic compare-and-set.
100  * <P>
101  * <I>Note:</I> Class ReplicatedFloat is implemented using class
102  * java.util.concurrent.atomic.AtomicInteger.
103  *
104  * @author Alan Kaminsky
105  * @version 13-Sep-2008
106  */
107 @SuppressWarnings("serial")
108 public class ReplicatedFloat
109         extends Number {
110 
111 // Hidden data members.
112     private final FloatOp myOp;
113     private final AtomicInteger myValue;
114     private final int myTag;
115     private final Comm myComm;
116     private final Receiver myReceiver;
117 
118 // Hidden helper classes.
119     /**
120      * Class Receiver receives and processes flooded messages to update this
121      * replicated, shared reduction variable.
122      *
123      * @author Alan Kaminsky
124      * @version 13-Sep-2008
125      */
126     private class Receiver
127             extends Thread {
128 
129         public void run() {
130             FloatItemBuf buf = FloatBuf.buffer();
131 
132             try {
133                 for (;;) {
134                     // Receive a flooded message.
135                     myComm.floodReceive(myTag, buf);
136 
137                     // Do an atomic reduction.
138                     int oldint, newint;
139                     float oldvalue, newvalue;
140                     do {
141                         oldint = myValue.get();
142                         oldvalue = Float.intBitsToFloat(oldint);
143                         newvalue = myOp.op(oldvalue, buf.item);
144                         newint = Float.floatToIntBits(newvalue);
145                     } while (!myValue.compareAndSet(oldint, newint));
146                 }
147             } catch (Throwable exc) {
148                 exc.printStackTrace(System.err);
149             }
150         }
151     }
152 
153 // Exported constructors.
154     /**
155      * Construct a new replicated, shared float reduction variable with the
156      * given reduction operator. The initial value is 0. A message tag of 0 is
157      * used. The world communicator is used.
158      *
159      * @param op Reduction operator.
160      * @exception NullPointerException (unchecked exception) Thrown if
161      * <code>op</code> is null.
162      */
163     public ReplicatedFloat(FloatOp op) {
164         this(op, 0.0f, 0, Comm.world());
165     }
166 
167     /**
168      * Construct a new replicated, shared float reduction variable with the
169      * given reduction operator and initial value. A message tag of 0 is used.
170      * The world communicator is used.
171      *
172      * @param op Reduction operator.
173      * @param initialValue Initial value.
174      * @exception NullPointerException (unchecked exception) Thrown if
175      * <code>op</code> is null.
176      */
177     public ReplicatedFloat(FloatOp op,
178             float initialValue) {
179         this(op, initialValue, 0, Comm.world());
180     }
181 
182     /**
183      * Construct a new replicated, shared float reduction variable with the
184      * given reduction operator, initial value, and message tag. The world
185      * communicator is used.
186      *
187      * @param op Reduction operator.
188      * @param initialValue Initial value.
189      * @param tag Message tag.
190      * @exception NullPointerException (unchecked exception) Thrown if
191      * <code>op</code> is null. Thrown if
192      * <code>comm</code> is null.
193      */
194     public ReplicatedFloat(FloatOp op,
195             float initialValue,
196             int tag) {
197         this(op, initialValue, tag, Comm.world());
198     }
199 
200     /**
201      * Construct a new replicated, shared float reduction variable with the
202      * given reduction operator, initial value, message tag, and communicator.
203      *
204      * @param op Reduction operator.
205      * @param initialValue Initial value.
206      * @param tag Message tag.
207      * @param comm Communicator.
208      * @exception NullPointerException (unchecked exception) Thrown if
209      * <code>op</code> is null. Thrown if
210      * <code>comm</code> is null.
211      */
212     public ReplicatedFloat(FloatOp op,
213             float initialValue,
214             int tag,
215             Comm comm) {
216         if (op == null) {
217             throw new NullPointerException("ReplicatedFloat(): op is null");
218         }
219         if (comm == null) {
220             throw new NullPointerException("ReplicatedFloat(): comm is null");
221         }
222         myOp = op;
223         myValue = new AtomicInteger(Float.floatToIntBits(initialValue));
224         myTag = tag;
225         myComm = comm;
226         myReceiver = new Receiver();
227         myReceiver.setDaemon(true);
228         myReceiver.start();
229     }
230 
231 // Exported operations.
232     /**
233      * Returns this replicated, shared reduction variable's current value.
234      *
235      * @return Current value.
236      */
237     public float get() {
238         return Float.intBitsToFloat(myValue.get());
239     }
240 
241     /**
242      * Update this replicated, shared reduction variable's current value. This
243      * variable is combined with the given value using the reduction operation
244      * specified to the constructor (<I>op</I>). The result is stored back into
245      * this variable and is returned; the result may also be flooded to all
246      * processes in the communicator.
247      *
248      * @param value Value.
249      * @return (This variable) <I>op</I> (<code>value</code>).
250      * @exception IOException Thrown if an I/O error occurred.
251      * @throws java.io.IOException if any.
252      */
253     public float reduce(float value)
254             throws IOException {
255         // Do an atomic reduction.
256         int oldint, newint;
257         float oldvalue, newvalue;
258         do {
259             oldint = myValue.get();
260             oldvalue = Float.intBitsToFloat(oldint);
261             newvalue = myOp.op(oldvalue, value);
262             newint = Float.floatToIntBits(newvalue);
263         } while (!myValue.compareAndSet(oldint, newint));
264 
265         // If value changed, send a flooded message.
266         if (newvalue != oldvalue) {
267             myComm.floodSend(myTag, FloatBuf.buffer(newvalue));
268         }
269 
270         // Return updated value.
271         return newvalue;
272     }
273 
274     /**
275      * Returns a string version of this reduction variable.
276      *
277      * @return String version.
278      */
279     public String toString() {
280         return Float.toString(get());
281     }
282 
283     /**
284      * Returns this reduction variable's current value converted to type
285      * <code>int</code>.
286      *
287      * @return Current value.
288      */
289     public int intValue() {
290         return (int) get();
291     }
292 
293     /**
294      * Returns this reduction variable's current value converted to type
295      * <code>long</code>.
296      *
297      * @return Current value.
298      */
299     public long longValue() {
300         return (long) get();
301     }
302 
303     /**
304      * Returns this reduction variable's current value converted to type
305      * <code>float</code>.
306      *
307      * @return Current value.
308      */
309     public float floatValue() {
310         return get();
311     }
312 
313     /**
314      * Returns this reduction variable's current value converted to type
315      * <code>double</code>.
316      *
317      * @return Current value.
318      */
319     public double doubleValue() {
320         return get();
321     }
322 
323 }