1 //****************************************************************************** 2 // 3 // File: ReplicatedFloat.java 4 // Package: edu.rit.pj.replica 5 // Unit: Class edu.rit.pj.replica.ReplicatedFloat 6 // 7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj.replica; 41 42 import java.io.IOException; 43 import java.util.concurrent.atomic.AtomicInteger; 44 45 import edu.rit.mp.FloatBuf; 46 import edu.rit.mp.buf.FloatItemBuf; 47 import edu.rit.pj.Comm; 48 import edu.rit.pj.reduction.FloatOp; 49 50 /** 51 * Class ReplicatedFloat provides a replicated, shared reduction variable for a 52 * value of type <code>float</code>. 53 * <P> 54 * A replicated, shared reduction variable is intended to be used in a cluster 55 * or hybrid parallel program for a data item shared among all the processes in 56 * the program and all the threads in each process. To use a replicated, shared 57 * reduction variable, do the following in each process of the parallel program: 58 * <OL TYPE=1> 59 * <LI> 60 * Construct an instance of class ReplicatedFloat, specifying the reduction 61 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing 62 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm}) 63 * and the message tag to use for sending updates among the processes. At this 64 * point a <I>replica</I> of the variable exists in each process. 65 * 66 * <LI> 67 * To read the variable, call the <code>get()</code> method. The current value of 68 * the local process's replicated variable is returned. 69 * 70 * <LI> 71 * To update the variable, call the <code>reduce()</code> method, specifying a new 72 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I> 73 * (described below) on the local process's replicated variable with the new 74 * value. If the variable changed as a result of the reduction, the variable's 75 * (updated) value is flooded to all the processes in the communicator. Finally, 76 * the <code>reduce()</code> method returns the variable's value. 77 * <P> 78 * Whenever one of the aforementioned flooded messages arrives, a separate 79 * thread performs an atomic reduction on the local process's variable with the 80 * received value. 81 * <P> 82 * An atomic reduction consists of these steps, performed atomically: Call the 83 * reduction operator's <code>op()</code> method, passing in the current value of 84 * the local process's replicated variable and the new value (either the new 85 * value specified as an argument of <code>reduce()</code>, or the new value 86 * received in a flooded message). Then store the <code>op()</code> method's return 87 * value back into the local process's replicated variable. 88 * </OL> 89 * <P> 90 * Class ReplicatedFloat does not itself guarantee consistency of the replicas' 91 * values. This is to avoid the message passing overhead of a distributed state 92 * update protocol. Instead, the parallel program must be written to operate 93 * correctly when the variable is updated as described above. Note that the 94 * value of a process's local replica can change asynchronously at any time, 95 * either because a thread in the current process updated the variable, or 96 * because a flooded message updated the variable. 97 * <P> 98 * Class ReplicatedFloat is multiple thread safe. The methods use lock-free 99 * atomic compare-and-set. 100 * <P> 101 * <I>Note:</I> Class ReplicatedFloat is implemented using class 102 * java.util.concurrent.atomic.AtomicInteger. 103 * 104 * @author Alan Kaminsky 105 * @version 13-Sep-2008 106 */ 107 @SuppressWarnings("serial") 108 public class ReplicatedFloat 109 extends Number { 110 111 // Hidden data members. 112 private final FloatOp myOp; 113 private final AtomicInteger myValue; 114 private final int myTag; 115 private final Comm myComm; 116 private final Receiver myReceiver; 117 118 // Hidden helper classes. 119 /** 120 * Class Receiver receives and processes flooded messages to update this 121 * replicated, shared reduction variable. 122 * 123 * @author Alan Kaminsky 124 * @version 13-Sep-2008 125 */ 126 private class Receiver 127 extends Thread { 128 129 public void run() { 130 FloatItemBuf buf = FloatBuf.buffer(); 131 132 try { 133 for (;;) { 134 // Receive a flooded message. 135 myComm.floodReceive(myTag, buf); 136 137 // Do an atomic reduction. 138 int oldint, newint; 139 float oldvalue, newvalue; 140 do { 141 oldint = myValue.get(); 142 oldvalue = Float.intBitsToFloat(oldint); 143 newvalue = myOp.op(oldvalue, buf.item); 144 newint = Float.floatToIntBits(newvalue); 145 } while (!myValue.compareAndSet(oldint, newint)); 146 } 147 } catch (Throwable exc) { 148 exc.printStackTrace(System.err); 149 } 150 } 151 } 152 153 // Exported constructors. 154 /** 155 * Construct a new replicated, shared float reduction variable with the 156 * given reduction operator. The initial value is 0. A message tag of 0 is 157 * used. The world communicator is used. 158 * 159 * @param op Reduction operator. 160 * @exception NullPointerException (unchecked exception) Thrown if 161 * <code>op</code> is null. 162 */ 163 public ReplicatedFloat(FloatOp op) { 164 this(op, 0.0f, 0, Comm.world()); 165 } 166 167 /** 168 * Construct a new replicated, shared float reduction variable with the 169 * given reduction operator and initial value. A message tag of 0 is used. 170 * The world communicator is used. 171 * 172 * @param op Reduction operator. 173 * @param initialValue Initial value. 174 * @exception NullPointerException (unchecked exception) Thrown if 175 * <code>op</code> is null. 176 */ 177 public ReplicatedFloat(FloatOp op, 178 float initialValue) { 179 this(op, initialValue, 0, Comm.world()); 180 } 181 182 /** 183 * Construct a new replicated, shared float reduction variable with the 184 * given reduction operator, initial value, and message tag. The world 185 * communicator is used. 186 * 187 * @param op Reduction operator. 188 * @param initialValue Initial value. 189 * @param tag Message tag. 190 * @exception NullPointerException (unchecked exception) Thrown if 191 * <code>op</code> is null. Thrown if 192 * <code>comm</code> is null. 193 */ 194 public ReplicatedFloat(FloatOp op, 195 float initialValue, 196 int tag) { 197 this(op, initialValue, tag, Comm.world()); 198 } 199 200 /** 201 * Construct a new replicated, shared float reduction variable with the 202 * given reduction operator, initial value, message tag, and communicator. 203 * 204 * @param op Reduction operator. 205 * @param initialValue Initial value. 206 * @param tag Message tag. 207 * @param comm Communicator. 208 * @exception NullPointerException (unchecked exception) Thrown if 209 * <code>op</code> is null. Thrown if 210 * <code>comm</code> is null. 211 */ 212 public ReplicatedFloat(FloatOp op, 213 float initialValue, 214 int tag, 215 Comm comm) { 216 if (op == null) { 217 throw new NullPointerException("ReplicatedFloat(): op is null"); 218 } 219 if (comm == null) { 220 throw new NullPointerException("ReplicatedFloat(): comm is null"); 221 } 222 myOp = op; 223 myValue = new AtomicInteger(Float.floatToIntBits(initialValue)); 224 myTag = tag; 225 myComm = comm; 226 myReceiver = new Receiver(); 227 myReceiver.setDaemon(true); 228 myReceiver.start(); 229 } 230 231 // Exported operations. 232 /** 233 * Returns this replicated, shared reduction variable's current value. 234 * 235 * @return Current value. 236 */ 237 public float get() { 238 return Float.intBitsToFloat(myValue.get()); 239 } 240 241 /** 242 * Update this replicated, shared reduction variable's current value. This 243 * variable is combined with the given value using the reduction operation 244 * specified to the constructor (<I>op</I>). The result is stored back into 245 * this variable and is returned; the result may also be flooded to all 246 * processes in the communicator. 247 * 248 * @param value Value. 249 * @return (This variable) <I>op</I> (<code>value</code>). 250 * @exception IOException Thrown if an I/O error occurred. 251 * @throws java.io.IOException if any. 252 */ 253 public float reduce(float value) 254 throws IOException { 255 // Do an atomic reduction. 256 int oldint, newint; 257 float oldvalue, newvalue; 258 do { 259 oldint = myValue.get(); 260 oldvalue = Float.intBitsToFloat(oldint); 261 newvalue = myOp.op(oldvalue, value); 262 newint = Float.floatToIntBits(newvalue); 263 } while (!myValue.compareAndSet(oldint, newint)); 264 265 // If value changed, send a flooded message. 266 if (newvalue != oldvalue) { 267 myComm.floodSend(myTag, FloatBuf.buffer(newvalue)); 268 } 269 270 // Return updated value. 271 return newvalue; 272 } 273 274 /** 275 * Returns a string version of this reduction variable. 276 * 277 * @return String version. 278 */ 279 public String toString() { 280 return Float.toString(get()); 281 } 282 283 /** 284 * Returns this reduction variable's current value converted to type 285 * <code>int</code>. 286 * 287 * @return Current value. 288 */ 289 public int intValue() { 290 return (int) get(); 291 } 292 293 /** 294 * Returns this reduction variable's current value converted to type 295 * <code>long</code>. 296 * 297 * @return Current value. 298 */ 299 public long longValue() { 300 return (long) get(); 301 } 302 303 /** 304 * Returns this reduction variable's current value converted to type 305 * <code>float</code>. 306 * 307 * @return Current value. 308 */ 309 public float floatValue() { 310 return get(); 311 } 312 313 /** 314 * Returns this reduction variable's current value converted to type 315 * <code>double</code>. 316 * 317 * @return Current value. 318 */ 319 public double doubleValue() { 320 return get(); 321 } 322 323 }