1 //****************************************************************************** 2 // 3 // File: ReplicatedLong.java 4 // Package: edu.rit.pj.replica 5 // Unit: Class edu.rit.pj.replica.ReplicatedLong 6 // 7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj.replica; 41 42 import java.io.IOException; 43 import java.io.Serial; 44 import java.util.concurrent.atomic.AtomicLong; 45 46 import edu.rit.mp.LongBuf; 47 import edu.rit.mp.buf.LongItemBuf; 48 import edu.rit.pj.Comm; 49 import edu.rit.pj.reduction.LongOp; 50 51 /** 52 * Class ReplicatedLong provides a replicated, shared reduction variable for a 53 * value of type <code>long</code>. 54 * <P> 55 * A replicated, shared reduction variable is intended to be used in a cluster 56 * or hybrid parallel program for a data item shared among all the processes in 57 * the program and all the threads in each process. To use a replicated, shared 58 * reduction variable, do the following in each process of the parallel program: 59 * <OL TYPE=1> 60 * <LI> 61 * Construct an instance of class ReplicatedLong, specifying the reduction 62 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing 63 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm}) 64 * and the message tag to use for sending updates among the processes. At this 65 * point a <I>replica</I> of the variable exists in each process. 66 * 67 * <LI> 68 * To read the variable, call the <code>get()</code> method. The current value of 69 * the local process's replicated variable is returned. 70 * 71 * <LI> 72 * To update the variable, call the <code>reduce()</code> method, specifying a new 73 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I> 74 * (described below) on the local process's replicated variable with the new 75 * value. If the variable changed as a result of the reduction, the variable's 76 * (updated) value is flooded to all the processes in the communicator. Finally, 77 * the <code>reduce()</code> method returns the variable's value. 78 * <P> 79 * Whenever one of the aforementioned flooded messages arrives, a separate 80 * thread performs an atomic reduction on the local process's variable with the 81 * received value. 82 * <P> 83 * An atomic reduction consists of these steps, performed atomically: Call the 84 * reduction operator's <code>op()</code> method, passing in the current value of 85 * the local process's replicated variable and the new value (either the new 86 * value specified as an argument of <code>reduce()</code>, or the new value 87 * received in a flooded message). Then store the <code>op()</code> method's return 88 * value back into the local process's replicated variable. 89 * </OL> 90 * <P> 91 * Class ReplicatedLong does not itself guarantee consistency of the replicas' 92 * values. This is to avoid the message passing overhead of a distributed state 93 * update protocol. Instead, the parallel program must be written to operate 94 * correctly when the variable is updated as described above. Note that the 95 * value of a process's local replica can change asynchronously at any time, 96 * either because a thread in the current process updated the variable, or 97 * because a flooded message updated the variable. 98 * <P> 99 * Class ReplicatedLong is multiple thread safe. The methods use lock-free 100 * atomic compare-and-set. 101 * <P> 102 * <I>Note:</I> Class ReplicatedLong is implemented using class 103 * java.util.concurrent.atomic.AtomicLong. 104 * 105 * @author Alan Kaminsky 106 * @version 13-Sep-2008 107 */ 108 @SuppressWarnings("serial") 109 public class ReplicatedLong 110 extends Number { 111 112 @Serial 113 private static final long serialVersionUID = 1L; 114 115 // Hidden data members. 116 private LongOp myOp; 117 private AtomicLong myValue; 118 private int myTag; 119 private Comm myComm; 120 private Receiver myReceiver; 121 122 // Hidden helper classes. 123 /** 124 * Class Receiver receives and processes flooded messages to update this 125 * replicated, shared reduction variable. 126 * 127 * @author Alan Kaminsky 128 * @version 13-Sep-2008 129 */ 130 private class Receiver 131 extends Thread { 132 133 public void run() { 134 LongItemBuf buf = LongBuf.buffer(); 135 136 try { 137 for (;;) { 138 // Receive a flooded message. 139 myComm.floodReceive(myTag, buf); 140 141 // Do an atomic reduction. 142 long oldvalue, newvalue; 143 do { 144 oldvalue = myValue.get(); 145 newvalue = myOp.op(oldvalue, buf.item); 146 } while (!myValue.compareAndSet(oldvalue, newvalue)); 147 } 148 } catch (Throwable exc) { 149 exc.printStackTrace(System.err); 150 } 151 } 152 } 153 154 // Exported constructors. 155 /** 156 * Construct a new replicated, shared long reduction variable with the given 157 * reduction operator. The initial value is 0. A message tag of 0 is used. 158 * The world communicator is used. 159 * 160 * @param op Reduction operator. 161 * @exception NullPointerException (unchecked exception) Thrown if 162 * <code>op</code> is null. 163 */ 164 public ReplicatedLong(LongOp op) { 165 this(op, 0L, 0, Comm.world()); 166 } 167 168 /** 169 * Construct a new replicated, shared long reduction variable with the given 170 * reduction operator and initial value. A message tag of 0 is used. The 171 * world communicator is used. 172 * 173 * @param op Reduction operator. 174 * @param initialValue Initial value. 175 * @exception NullPointerException (unchecked exception) Thrown if 176 * <code>op</code> is null. 177 */ 178 public ReplicatedLong(LongOp op, 179 long initialValue) { 180 this(op, initialValue, 0, Comm.world()); 181 } 182 183 /** 184 * Construct a new replicated, shared long reduction variable with the given 185 * reduction operator, initial value, and message tag. The world 186 * communicator is used. 187 * 188 * @param op Reduction operator. 189 * @param initialValue Initial value. 190 * @param tag Message tag. 191 * @exception NullPointerException (unchecked exception) Thrown if 192 * <code>op</code> is null. Thrown if 193 * <code>comm</code> is null. 194 */ 195 public ReplicatedLong(LongOp op, 196 long initialValue, 197 int tag) { 198 this(op, initialValue, tag, Comm.world()); 199 } 200 201 /** 202 * Construct a new replicated, shared long reduction variable with the given 203 * reduction operator, initial value, message tag, and communicator. 204 * 205 * @param op Reduction operator. 206 * @param initialValue Initial value. 207 * @param tag Message tag. 208 * @param comm Communicator. 209 * @exception NullPointerException (unchecked exception) Thrown if 210 * <code>op</code> is null. Thrown if 211 * <code>comm</code> is null. 212 */ 213 public ReplicatedLong(LongOp op, 214 long initialValue, 215 int tag, 216 Comm comm) { 217 if (op == null) { 218 throw new NullPointerException("ReplicatedLong(): op is null"); 219 } 220 if (comm == null) { 221 throw new NullPointerException("ReplicatedLong(): comm is null"); 222 } 223 myOp = op; 224 myValue = new AtomicLong(initialValue); 225 myTag = tag; 226 myComm = comm; 227 myReceiver = new Receiver(); 228 myReceiver.setDaemon(true); 229 myReceiver.start(); 230 } 231 232 // Exported operations. 233 /** 234 * Returns this replicated, shared reduction variable's current value. 235 * 236 * @return Current value. 237 */ 238 public long get() { 239 return myValue.get(); 240 } 241 242 /** 243 * Update this replicated, shared reduction variable's current value. This 244 * variable is combined with the given value using the reduction operation 245 * specified to the constructor (<I>op</I>). The result is stored back into 246 * this variable and is returned; the result may also be flooded to all 247 * processes in the communicator. 248 * 249 * @param value Value. 250 * @return (This variable) <I>op</I> (<code>value</code>). 251 * @exception IOException Thrown if an I/O error occurred. 252 * @throws java.io.IOException if any. 253 */ 254 public long reduce(long value) 255 throws IOException { 256 // Do an atomic reduction. 257 long oldvalue, newvalue; 258 do { 259 oldvalue = myValue.get(); 260 newvalue = myOp.op(oldvalue, value); 261 } while (!myValue.compareAndSet(oldvalue, newvalue)); 262 263 // If value changed, send a flooded message. 264 if (newvalue != oldvalue) { 265 myComm.floodSend(myTag, LongBuf.buffer(newvalue)); 266 } 267 268 // Return updated value. 269 return newvalue; 270 } 271 272 /** 273 * Returns a string version of this reduction variable. 274 * 275 * @return String version. 276 */ 277 public String toString() { 278 return Long.toString(get()); 279 } 280 281 /** 282 * Returns this reduction variable's current value converted to type 283 * <code>int</code>. 284 * 285 * @return Current value. 286 */ 287 public int intValue() { 288 return (int) get(); 289 } 290 291 /** 292 * Returns this reduction variable's current value converted to type 293 * <code>long</code>. 294 * 295 * @return Current value. 296 */ 297 public long longValue() { 298 return get(); 299 } 300 301 /** 302 * Returns this reduction variable's current value converted to type 303 * <code>float</code>. 304 * 305 * @return Current value. 306 */ 307 public float floatValue() { 308 return (float) get(); 309 } 310 311 /** 312 * Returns this reduction variable's current value converted to type 313 * <code>double</code>. 314 * 315 * @return Current value. 316 */ 317 public double doubleValue() { 318 return (double) get(); 319 } 320 321 }