1 //****************************************************************************** 2 // 3 // File: ReplicatedObject.java 4 // Package: edu.rit.pj.replica 5 // Unit: Class edu.rit.pj.replica.ReplicatedObject 6 // 7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj.replica; 41 42 import java.io.IOException; 43 import java.util.concurrent.atomic.AtomicReference; 44 45 import edu.rit.mp.ObjectBuf; 46 import edu.rit.mp.buf.ObjectItemBuf; 47 import edu.rit.pj.Comm; 48 import edu.rit.pj.reduction.ObjectOp; 49 50 /** 51 * Class ReplicatedObject provides a replicated, shared reduction variable for a 52 * value of type T (a nonprimitive type). 53 * <P> 54 * A replicated, shared reduction variable is intended to be used in a cluster 55 * or hybrid parallel program for a data item shared among all the processes in 56 * the program and all the threads in each process. To use a replicated, shared 57 * reduction variable, do the following in each process of the parallel program: 58 * <OL TYPE=1> 59 * <LI> 60 * Construct an instance of class ReplicatedObject, specifying the reduction 61 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing 62 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm}) 63 * and the message tag to use for sending updates among the processes. At this 64 * point a <I>replica</I> of the variable exists in each process. 65 * 66 * <LI> 67 * To read the variable, call the <code>get()</code> method. The current value of 68 * the local process's replicated variable is returned. 69 * 70 * <LI> 71 * To update the variable, call the <code>reduce()</code> method, specifying a new 72 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I> 73 * (described below) on the local process's replicated variable with the new 74 * value. If the variable changed as a result of the reduction, the variable's 75 * (updated) value is flooded to all the processes in the communicator. Finally, 76 * the <code>reduce()</code> method returns the variable's value. 77 * <P> 78 * Whenever one of the aforementioned flooded messages arrives, a separate 79 * thread performs an atomic reduction on the local process's variable with the 80 * received value. 81 * <P> 82 * An atomic reduction consists of these steps, performed atomically: Call the 83 * reduction operator's <code>op()</code> method, passing in the current value of 84 * the local process's replicated variable and the new value (either the new 85 * value specified as an argument of <code>reduce()</code>, or the new value 86 * received in a flooded message). Then store the <code>op()</code> method's return 87 * value back into the local process's replicated variable. 88 * </OL> 89 * <P> 90 * Class ReplicatedObject does not itself guarantee consistency of the replicas' 91 * values. This is to avoid the message passing overhead of a distributed state 92 * update protocol. Instead, the parallel program must be written to operate 93 * correctly when the variable is updated as described above. Note that the 94 * value of a process's local replica can change asynchronously at any time, 95 * either because a thread in the current process updated the variable, or 96 * because a flooded message updated the variable. 97 * <P> 98 * Class ReplicatedObject is multiple thread safe. The methods use lock-free 99 * atomic compare-and-set. 100 * <P> 101 * <I>Note:</I> Class ReplicatedObject is implemented using class 102 * java.util.concurrent.atomic.AtomicReference. 103 * 104 * @param <T> Object data type. 105 * @author Alan Kaminsky 106 * @version 13-Sep-2008 107 */ 108 public class ReplicatedObject<T> { 109 110 // Hidden data members. 111 private ObjectOp<T> myOp; 112 private AtomicReference<T> myValue; 113 private int myTag; 114 private Comm myComm; 115 private Receiver myReceiver; 116 117 // Hidden helper classes. 118 /** 119 * Class Receiver receives and processes flooded messages to update this 120 * replicated, shared reduction variable. 121 * 122 * @author Alan Kaminsky 123 * @version 13-Sep-2008 124 */ 125 private class Receiver 126 extends Thread { 127 128 public void run() { 129 ObjectItemBuf<T> buf = ObjectBuf.buffer(); 130 131 try { 132 for (;;) { 133 // Receive a flooded message. 134 myComm.floodReceive(myTag, buf); 135 136 // Do an atomic reduction. 137 T oldvalue, newvalue; 138 do { 139 oldvalue = myValue.get(); 140 newvalue = myOp.op(oldvalue, buf.item); 141 } while (!myValue.compareAndSet(oldvalue, newvalue)); 142 } 143 } catch (Throwable exc) { 144 exc.printStackTrace(System.err); 145 } 146 } 147 } 148 149 // Exported constructors. 150 /** 151 * Construct a new replicated, shared object reduction variable with the 152 * given reduction operator. The initial value is null. A message tag of 0 153 * is used. The world communicator is used. 154 * 155 * @param op Reduction operator. 156 * @exception NullPointerException (unchecked exception) Thrown if 157 * <code>op</code> is null. 158 */ 159 public ReplicatedObject(ObjectOp<T> op) { 160 this(op, null, 0, Comm.world()); 161 } 162 163 /** 164 * Construct a new replicated, shared object reduction variable with the 165 * given reduction operator and initial value. A message tag of 0 is used. 166 * The world communicator is used. 167 * 168 * @param op Reduction operator. 169 * @param initialValue Initial value. 170 * @exception NullPointerException (unchecked exception) Thrown if 171 * <code>op</code> is null. 172 */ 173 public ReplicatedObject(ObjectOp<T> op, 174 T initialValue) { 175 this(op, initialValue, 0, Comm.world()); 176 } 177 178 /** 179 * Construct a new replicated, shared object reduction variable with the 180 * given reduction operator, initial value, and message tag. The world 181 * communicator is used. 182 * 183 * @param op Reduction operator. 184 * @param initialValue Initial value. 185 * @param tag Message tag. 186 * @exception NullPointerException (unchecked exception) Thrown if 187 * <code>op</code> is null. Thrown if 188 * <code>comm</code> is null. 189 */ 190 public ReplicatedObject(ObjectOp<T> op, 191 T initialValue, 192 int tag) { 193 this(op, initialValue, tag, Comm.world()); 194 } 195 196 /** 197 * Construct a new replicated, shared object reduction variable with the 198 * given reduction operator, initial value, message tag, and communicator. 199 * 200 * @param op Reduction operator. 201 * @param initialValue Initial value. 202 * @param tag Message tag. 203 * @param comm Communicator. 204 * @exception NullPointerException (unchecked exception) Thrown if 205 * <code>op</code> is null. Thrown if 206 * <code>comm</code> is null. 207 */ 208 public ReplicatedObject(ObjectOp<T> op, 209 T initialValue, 210 int tag, 211 Comm comm) { 212 if (op == null) { 213 throw new NullPointerException("ReplicatedObject(): op is null"); 214 } 215 if (comm == null) { 216 throw new NullPointerException("ReplicatedObject(): comm is null"); 217 } 218 myOp = op; 219 myValue = new AtomicReference<T>(initialValue); 220 myTag = tag; 221 myComm = comm; 222 myReceiver = new Receiver(); 223 myReceiver.setDaemon(true); 224 myReceiver.start(); 225 } 226 227 // Exported operations. 228 /** 229 * Returns this replicated, shared reduction variable's current value. 230 * 231 * @return Current value. 232 */ 233 public T get() { 234 return myValue.get(); 235 } 236 237 /** 238 * Update this replicated, shared reduction variable's current value. This 239 * variable is combined with the given value using the reduction operation 240 * specified to the constructor (<I>op</I>). The result is stored back into 241 * this variable and is returned; the result may also be flooded to all 242 * processes in the communicator. 243 * 244 * @param value Value. 245 * @return (This variable) <I>op</I> (<code>value</code>). 246 * @exception IOException Thrown if an I/O error occurred. 247 * @throws java.io.IOException if any. 248 */ 249 public T reduce(T value) 250 throws IOException { 251 // Do an atomic reduction. 252 T oldvalue, newvalue; 253 do { 254 oldvalue = myValue.get(); 255 newvalue = myOp.op(oldvalue, value); 256 } while (!myValue.compareAndSet(oldvalue, newvalue)); 257 258 // If value changed, send a flooded message. 259 if (newvalue != oldvalue) { 260 myComm.floodSend(myTag, ObjectBuf.buffer(newvalue)); 261 } 262 263 // Return updated value. 264 return newvalue; 265 } 266 267 /** 268 * Returns a string version of this reduction variable. 269 * 270 * @return String version. 271 */ 272 public String toString() { 273 return myValue.toString(); 274 } 275 276 }