1 //******************************************************************************
2 //
3 // File: ReplicatedFloat.java
4 // Package: edu.rit.pj.replica
5 // Unit: Class edu.rit.pj.replica.ReplicatedFloat
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.replica;
41
42 import java.io.IOException;
43 import java.util.concurrent.atomic.AtomicInteger;
44
45 import edu.rit.mp.FloatBuf;
46 import edu.rit.mp.buf.FloatItemBuf;
47 import edu.rit.pj.Comm;
48 import edu.rit.pj.reduction.FloatOp;
49
50 /**
51 * Class ReplicatedFloat provides a replicated, shared reduction variable for a
52 * value of type <code>float</code>.
53 * <P>
54 * A replicated, shared reduction variable is intended to be used in a cluster
55 * or hybrid parallel program for a data item shared among all the processes in
56 * the program and all the threads in each process. To use a replicated, shared
57 * reduction variable, do the following in each process of the parallel program:
58 * <OL TYPE=1>
59 * <LI>
60 * Construct an instance of class ReplicatedFloat, specifying the reduction
61 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
62 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
63 * and the message tag to use for sending updates among the processes. At this
64 * point a <I>replica</I> of the variable exists in each process.
65 *
66 * <LI>
67 * To read the variable, call the <code>get()</code> method. The current value of
68 * the local process's replicated variable is returned.
69 *
70 * <LI>
71 * To update the variable, call the <code>reduce()</code> method, specifying a new
72 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
73 * (described below) on the local process's replicated variable with the new
74 * value. If the variable changed as a result of the reduction, the variable's
75 * (updated) value is flooded to all the processes in the communicator. Finally,
76 * the <code>reduce()</code> method returns the variable's value.
77 * <P>
78 * Whenever one of the aforementioned flooded messages arrives, a separate
79 * thread performs an atomic reduction on the local process's variable with the
80 * received value.
81 * <P>
82 * An atomic reduction consists of these steps, performed atomically: Call the
83 * reduction operator's <code>op()</code> method, passing in the current value of
84 * the local process's replicated variable and the new value (either the new
85 * value specified as an argument of <code>reduce()</code>, or the new value
86 * received in a flooded message). Then store the <code>op()</code> method's return
87 * value back into the local process's replicated variable.
88 * </OL>
89 * <P>
90 * Class ReplicatedFloat does not itself guarantee consistency of the replicas'
91 * values. This is to avoid the message passing overhead of a distributed state
92 * update protocol. Instead, the parallel program must be written to operate
93 * correctly when the variable is updated as described above. Note that the
94 * value of a process's local replica can change asynchronously at any time,
95 * either because a thread in the current process updated the variable, or
96 * because a flooded message updated the variable.
97 * <P>
98 * Class ReplicatedFloat is multiple thread safe. The methods use lock-free
99 * atomic compare-and-set.
100 * <P>
101 * <I>Note:</I> Class ReplicatedFloat is implemented using class
102 * java.util.concurrent.atomic.AtomicInteger.
103 *
104 * @author Alan Kaminsky
105 * @version 13-Sep-2008
106 */
107 @SuppressWarnings("serial")
108 public class ReplicatedFloat
109 extends Number {
110
111 // Hidden data members.
112 private final FloatOp myOp;
113 private final AtomicInteger myValue;
114 private final int myTag;
115 private final Comm myComm;
116 private final Receiver myReceiver;
117
118 // Hidden helper classes.
119 /**
120 * Class Receiver receives and processes flooded messages to update this
121 * replicated, shared reduction variable.
122 *
123 * @author Alan Kaminsky
124 * @version 13-Sep-2008
125 */
126 private class Receiver
127 extends Thread {
128
129 public void run() {
130 FloatItemBuf buf = FloatBuf.buffer();
131
132 try {
133 for (;;) {
134 // Receive a flooded message.
135 myComm.floodReceive(myTag, buf);
136
137 // Do an atomic reduction.
138 int oldint, newint;
139 float oldvalue, newvalue;
140 do {
141 oldint = myValue.get();
142 oldvalue = Float.intBitsToFloat(oldint);
143 newvalue = myOp.op(oldvalue, buf.item);
144 newint = Float.floatToIntBits(newvalue);
145 } while (!myValue.compareAndSet(oldint, newint));
146 }
147 } catch (Throwable exc) {
148 exc.printStackTrace(System.err);
149 }
150 }
151 }
152
153 // Exported constructors.
154 /**
155 * Construct a new replicated, shared float reduction variable with the
156 * given reduction operator. The initial value is 0. A message tag of 0 is
157 * used. The world communicator is used.
158 *
159 * @param op Reduction operator.
160 * @exception NullPointerException (unchecked exception) Thrown if
161 * <code>op</code> is null.
162 */
163 public ReplicatedFloat(FloatOp op) {
164 this(op, 0.0f, 0, Comm.world());
165 }
166
167 /**
168 * Construct a new replicated, shared float reduction variable with the
169 * given reduction operator and initial value. A message tag of 0 is used.
170 * The world communicator is used.
171 *
172 * @param op Reduction operator.
173 * @param initialValue Initial value.
174 * @exception NullPointerException (unchecked exception) Thrown if
175 * <code>op</code> is null.
176 */
177 public ReplicatedFloat(FloatOp op,
178 float initialValue) {
179 this(op, initialValue, 0, Comm.world());
180 }
181
182 /**
183 * Construct a new replicated, shared float reduction variable with the
184 * given reduction operator, initial value, and message tag. The world
185 * communicator is used.
186 *
187 * @param op Reduction operator.
188 * @param initialValue Initial value.
189 * @param tag Message tag.
190 * @exception NullPointerException (unchecked exception) Thrown if
191 * <code>op</code> is null. Thrown if
192 * <code>comm</code> is null.
193 */
194 public ReplicatedFloat(FloatOp op,
195 float initialValue,
196 int tag) {
197 this(op, initialValue, tag, Comm.world());
198 }
199
200 /**
201 * Construct a new replicated, shared float reduction variable with the
202 * given reduction operator, initial value, message tag, and communicator.
203 *
204 * @param op Reduction operator.
205 * @param initialValue Initial value.
206 * @param tag Message tag.
207 * @param comm Communicator.
208 * @exception NullPointerException (unchecked exception) Thrown if
209 * <code>op</code> is null. Thrown if
210 * <code>comm</code> is null.
211 */
212 public ReplicatedFloat(FloatOp op,
213 float initialValue,
214 int tag,
215 Comm comm) {
216 if (op == null) {
217 throw new NullPointerException("ReplicatedFloat(): op is null");
218 }
219 if (comm == null) {
220 throw new NullPointerException("ReplicatedFloat(): comm is null");
221 }
222 myOp = op;
223 myValue = new AtomicInteger(Float.floatToIntBits(initialValue));
224 myTag = tag;
225 myComm = comm;
226 myReceiver = new Receiver();
227 myReceiver.setDaemon(true);
228 myReceiver.start();
229 }
230
231 // Exported operations.
232 /**
233 * Returns this replicated, shared reduction variable's current value.
234 *
235 * @return Current value.
236 */
237 public float get() {
238 return Float.intBitsToFloat(myValue.get());
239 }
240
241 /**
242 * Update this replicated, shared reduction variable's current value. This
243 * variable is combined with the given value using the reduction operation
244 * specified to the constructor (<I>op</I>). The result is stored back into
245 * this variable and is returned; the result may also be flooded to all
246 * processes in the communicator.
247 *
248 * @param value Value.
249 * @return (This variable) <I>op</I> (<code>value</code>).
250 * @exception IOException Thrown if an I/O error occurred.
251 * @throws java.io.IOException if any.
252 */
253 public float reduce(float value)
254 throws IOException {
255 // Do an atomic reduction.
256 int oldint, newint;
257 float oldvalue, newvalue;
258 do {
259 oldint = myValue.get();
260 oldvalue = Float.intBitsToFloat(oldint);
261 newvalue = myOp.op(oldvalue, value);
262 newint = Float.floatToIntBits(newvalue);
263 } while (!myValue.compareAndSet(oldint, newint));
264
265 // If value changed, send a flooded message.
266 if (newvalue != oldvalue) {
267 myComm.floodSend(myTag, FloatBuf.buffer(newvalue));
268 }
269
270 // Return updated value.
271 return newvalue;
272 }
273
274 /**
275 * Returns a string version of this reduction variable.
276 *
277 * @return String version.
278 */
279 public String toString() {
280 return Float.toString(get());
281 }
282
283 /**
284 * Returns this reduction variable's current value converted to type
285 * <code>int</code>.
286 *
287 * @return Current value.
288 */
289 public int intValue() {
290 return (int) get();
291 }
292
293 /**
294 * Returns this reduction variable's current value converted to type
295 * <code>long</code>.
296 *
297 * @return Current value.
298 */
299 public long longValue() {
300 return (long) get();
301 }
302
303 /**
304 * Returns this reduction variable's current value converted to type
305 * <code>float</code>.
306 *
307 * @return Current value.
308 */
309 public float floatValue() {
310 return get();
311 }
312
313 /**
314 * Returns this reduction variable's current value converted to type
315 * <code>double</code>.
316 *
317 * @return Current value.
318 */
319 public double doubleValue() {
320 return get();
321 }
322
323 }