1 //******************************************************************************
2 //
3 // File: ReplicatedDouble.java
4 // Package: edu.rit.pj.replica
5 // Unit: Class edu.rit.pj.replica.ReplicatedDouble
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.replica;
41
42 import java.io.IOException;
43 import java.io.Serial;
44 import java.util.concurrent.atomic.AtomicLong;
45
46 import edu.rit.mp.DoubleBuf;
47 import edu.rit.mp.buf.DoubleItemBuf;
48 import edu.rit.pj.Comm;
49 import edu.rit.pj.reduction.DoubleOp;
50
51 /**
52 * Class ReplicatedDouble provides a replicated, shared reduction variable for a
53 * value of type <code>double</code>.
54 * <P>
55 * A replicated, shared reduction variable is intended to be used in a cluster
56 * or hybrid parallel program for a data item shared among all the processes in
57 * the program and all the threads in each process. To use a replicated, shared
58 * reduction variable, do the following in each process of the parallel program:
59 * <OL TYPE=1>
60 * <LI>
61 * Construct an instance of class ReplicatedDouble, specifying the reduction
62 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
63 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
64 * and the message tag to use for sending updates among the processes. At this
65 * point a <I>replica</I> of the variable exists in each process.
66 *
67 * <LI>
68 * To read the variable, call the <code>get()</code> method. The current value of
69 * the local process's replicated variable is returned.
70 *
71 * <LI>
72 * To update the variable, call the <code>reduce()</code> method, specifying a new
73 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
74 * (described below) on the local process's replicated variable with the new
75 * value. If the variable changed as a result of the reduction, the variable's
76 * (updated) value is flooded to all the processes in the communicator. Finally,
77 * the <code>reduce()</code> method returns the variable's value.
78 * <P>
79 * Whenever one of the aforementioned flooded messages arrives, a separate
80 * thread performs an atomic reduction on the local process's variable with the
81 * received value.
82 * <P>
83 * An atomic reduction consists of these steps, performed atomically: Call the
84 * reduction operator's <code>op()</code> method, passing in the current value of
85 * the local process's replicated variable and the new value (either the new
86 * value specified as an argument of <code>reduce()</code>, or the new value
87 * received in a flooded message). Then store the <code>op()</code> method's return
88 * value back into the local process's replicated variable.
89 * </OL>
90 * <P>
91 * Class ReplicatedDouble does not itself guarantee consistency of the replicas'
92 * values. This is to avoid the message passing overhead of a distributed state
93 * update protocol. Instead, the parallel program must be written to operate
94 * correctly when the variable is updated as described above. Note that the
95 * value of a process's local replica can change asynchronously at any time,
96 * either because a thread in the current process updated the variable, or
97 * because a flooded message updated the variable.
98 * <P>
99 * Class ReplicatedDouble is multiple thread safe. The methods use lock-free
100 * atomic compare-and-set.
101 * <P>
102 * <I>Note:</I> Class ReplicatedDouble is implemented using class
103 * java.util.concurrent.atomic.AtomicLong.
104 *
105 * @author Alan Kaminsky
106 * @version 13-Sep-2008
107 */
108 @SuppressWarnings("serial")
109 public class ReplicatedDouble
110 extends Number {
111
112 @Serial
113 private static final long serialVersionUID = 1L;
114
115 // Hidden data members.
116 private DoubleOp myOp;
117 private AtomicLong myValue;
118 private int myTag;
119 private Comm myComm;
120 private Receiver myReceiver;
121
122 // Hidden helper classes.
123 /**
124 * Class Receiver receives and processes flooded messages to update this
125 * replicated, shared reduction variable.
126 *
127 * @author Alan Kaminsky
128 * @version 13-Sep-2008
129 */
130 private class Receiver
131 extends Thread {
132
133 public void run() {
134 DoubleItemBuf buf = DoubleBuf.buffer();
135
136 try {
137 for (;;) {
138 // Receive a flooded message.
139 myComm.floodReceive(myTag, buf);
140
141 // Do an atomic reduction.
142 long oldlong, newlong;
143 double oldvalue, newvalue;
144 do {
145 oldlong = myValue.get();
146 oldvalue = Double.longBitsToDouble(oldlong);
147 newvalue = myOp.op(oldvalue, buf.item);
148 newlong = Double.doubleToLongBits(newvalue);
149 } while (!myValue.compareAndSet(oldlong, newlong));
150 }
151 } catch (Throwable exc) {
152 exc.printStackTrace(System.err);
153 }
154 }
155 }
156
157 // Exported constructors.
158 /**
159 * Construct a new replicated, shared double reduction variable with the
160 * given reduction operator. The initial value is 0. A message tag of 0 is
161 * used. The world communicator is used.
162 *
163 * @param op Reduction operator.
164 * @exception NullPointerException (unchecked exception) Thrown if
165 * <code>op</code> is null.
166 */
167 public ReplicatedDouble(DoubleOp op) {
168 this(op, 0.0, 0, Comm.world());
169 }
170
171 /**
172 * Construct a new replicated, shared double reduction variable with the
173 * given reduction operator and initial value. A message tag of 0 is used.
174 * The world communicator is used.
175 *
176 * @param op Reduction operator.
177 * @param initialValue Initial value.
178 * @exception NullPointerException (unchecked exception) Thrown if
179 * <code>op</code> is null.
180 */
181 public ReplicatedDouble(DoubleOp op,
182 double initialValue) {
183 this(op, initialValue, 0, Comm.world());
184 }
185
186 /**
187 * Construct a new replicated, shared double reduction variable with the
188 * given reduction operator, initial value, and message tag. The world
189 * communicator is used.
190 *
191 * @param op Reduction operator.
192 * @param initialValue Initial value.
193 * @param tag Message tag.
194 * @exception NullPointerException (unchecked exception) Thrown if
195 * <code>op</code> is null. Thrown if
196 * <code>comm</code> is null.
197 */
198 public ReplicatedDouble(DoubleOp op,
199 double initialValue,
200 int tag) {
201 this(op, initialValue, tag, Comm.world());
202 }
203
204 /**
205 * Construct a new replicated, shared double reduction variable with the
206 * given reduction operator, initial value, message tag, and communicator.
207 *
208 * @param op Reduction operator.
209 * @param initialValue Initial value.
210 * @param tag Message tag.
211 * @param comm Communicator.
212 * @exception NullPointerException (unchecked exception) Thrown if
213 * <code>op</code> is null. Thrown if
214 * <code>comm</code> is null.
215 */
216 public ReplicatedDouble(DoubleOp op,
217 double initialValue,
218 int tag,
219 Comm comm) {
220 if (op == null) {
221 throw new NullPointerException("ReplicatedDouble(): op is null");
222 }
223 if (comm == null) {
224 throw new NullPointerException("ReplicatedDouble(): comm is null");
225 }
226 myOp = op;
227 myValue = new AtomicLong(Double.doubleToLongBits(initialValue));
228 myTag = tag;
229 myComm = comm;
230 myReceiver = new Receiver();
231 myReceiver.setDaemon(true);
232 myReceiver.start();
233 }
234
235 // Exported operations.
236 /**
237 * Returns this replicated, shared reduction variable's current value.
238 *
239 * @return Current value.
240 */
241 public double get() {
242 return Double.longBitsToDouble(myValue.get());
243 }
244
245 /**
246 * Update this replicated, shared reduction variable's current value. This
247 * variable is combined with the given value using the reduction operation
248 * specified to the constructor (<I>op</I>). The result is stored back into
249 * this variable and is returned; the result may also be flooded to all
250 * processes in the communicator.
251 *
252 * @param value Value.
253 * @return (This variable) <I>op</I> (<code>value</code>).
254 * @exception IOException Thrown if an I/O error occurred.
255 * @throws java.io.IOException if any.
256 */
257 public double reduce(double value)
258 throws IOException {
259 // Do an atomic reduction.
260 long oldlong, newlong;
261 double oldvalue, newvalue;
262 do {
263 oldlong = myValue.get();
264 oldvalue = Double.longBitsToDouble(oldlong);
265 newvalue = myOp.op(oldvalue, value);
266 newlong = Double.doubleToLongBits(newvalue);
267 } while (!myValue.compareAndSet(oldlong, newlong));
268
269 // If value changed, send a flooded message.
270 if (newvalue != oldvalue) {
271 myComm.floodSend(myTag, DoubleBuf.buffer(newvalue));
272 }
273
274 // Return updated value.
275 return newvalue;
276 }
277
278 /**
279 * Returns a string version of this reduction variable.
280 *
281 * @return String version.
282 */
283 public String toString() {
284 return Double.toString(get());
285 }
286
287 /**
288 * Returns this reduction variable's current value converted to type
289 * <code>int</code>.
290 *
291 * @return Current value.
292 */
293 public int intValue() {
294 return (int) get();
295 }
296
297 /**
298 * Returns this reduction variable's current value converted to type
299 * <code>long</code>.
300 *
301 * @return Current value.
302 */
303 public long longValue() {
304 return (long) get();
305 }
306
307 /**
308 * Returns this reduction variable's current value converted to type
309 * <code>float</code>.
310 *
311 * @return Current value.
312 */
313 public float floatValue() {
314 return (float) get();
315 }
316
317 /**
318 * Returns this reduction variable's current value converted to type
319 * <code>double</code>.
320 *
321 * @return Current value.
322 */
323 public double doubleValue() {
324 return get();
325 }
326
327 }