1 //****************************************************************************** 2 // 3 // File: ReplicatedBoolean.java 4 // Package: edu.rit.pj.replica 5 // Unit: Class edu.rit.pj.replica.ReplicatedBoolean 6 // 7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights 8 // reserved. For further information, contact the author, Alan Kaminsky, at 9 // ark@cs.rit.edu. 10 // 11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free 12 // software; you can redistribute it and/or modify it under the terms of the GNU 13 // General Public License as published by the Free Software Foundation; either 14 // version 3 of the License, or (at your option) any later version. 15 // 16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY 17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details. 19 // 20 // Linking this library statically or dynamically with other modules is making a 21 // combined work based on this library. Thus, the terms and conditions of the GNU 22 // General Public License cover the whole combination. 23 // 24 // As a special exception, the copyright holders of this library give you 25 // permission to link this library with independent modules to produce an 26 // executable, regardless of the license terms of these independent modules, and 27 // to copy and distribute the resulting executable under terms of your choice, 28 // provided that you also meet, for each linked independent module, the terms 29 // and conditions of the license of that module. An independent module is a module 30 // which is not derived from or based on this library. If you modify this library, 31 // you may extend this exception to your version of the library, but you are not 32 // obligated to do so. If you do not wish to do so, delete this exception 33 // statement from your version. 34 // 35 // A copy of the GNU General Public License is provided in the file gpl.txt. You 36 // may also obtain a copy of the GNU General Public License on the World Wide 37 // Web at http://www.gnu.org/licenses/gpl.html. 38 // 39 //****************************************************************************** 40 package edu.rit.pj.replica; 41 42 import java.io.IOException; 43 import java.util.concurrent.atomic.AtomicBoolean; 44 45 import edu.rit.mp.BooleanBuf; 46 import edu.rit.mp.buf.BooleanItemBuf; 47 import edu.rit.pj.Comm; 48 import edu.rit.pj.reduction.BooleanOp; 49 50 /** 51 * Class ReplicatedBoolean provides a replicated, shared reduction variable for 52 * a value of type <code>boolean</code>. 53 * <P> 54 * A replicated, shared reduction variable is intended to be used in a cluster 55 * or hybrid parallel program for a data item shared among all the processes in 56 * the program and all the threads in each process. To use a replicated, shared 57 * reduction variable, do the following in each process of the parallel program: 58 * <OL TYPE=1> 59 * <LI> 60 * Construct an instance of class ReplicatedBoolean, specifying the reduction 61 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing 62 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm}) 63 * and the message tag to use for sending updates among the processes. At this 64 * point a <I>replica</I> of the variable exists in each process. 65 * 66 * <LI> 67 * To read the variable, call the <code>get()</code> method. The current value of 68 * the local process's replicated variable is returned. 69 * 70 * <LI> 71 * To update the variable, call the <code>reduce()</code> method, specifying a new 72 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I> 73 * (described below) on the local process's replicated variable with the new 74 * value. If the variable changed as a result of the reduction, the variable's 75 * (updated) value is flooded to all the processes in the communicator. Finally, 76 * the <code>reduce()</code> method returns the variable's value. 77 * <P> 78 * Whenever one of the aforementioned flooded messages arrives, a separate 79 * thread performs an atomic reduction on the local process's variable with the 80 * received value. 81 * <P> 82 * An atomic reduction consists of these steps, performed atomically: Call the 83 * reduction operator's <code>op()</code> method, passing in the current value of 84 * the local process's replicated variable and the new value (either the new 85 * value specified as an argument of <code>reduce()</code>, or the new value 86 * received in a flooded message). Then store the <code>op()</code> method's return 87 * value back into the local process's replicated variable. 88 * </OL> 89 * <P> 90 * Class ReplicatedBoolean does not itself guarantee consistency of the 91 * replicas' values. This is to avoid the message passing overhead of a 92 * distributed state update protocol. Instead, the parallel program must be 93 * written to operate correctly when the variable is updated as described above. 94 * Note that the value of a process's local replica can change asynchronously at 95 * any time, either because a thread in the current process updated the 96 * variable, or because a flooded message updated the variable. 97 * <P> 98 * Class ReplicatedBoolean is multiple thread safe. The methods use lock-free 99 * atomic compare-and-set. 100 * <P> 101 * <I>Note:</I> Class ReplicatedBoolean is implemented using class 102 * java.util.concurrent.atomic.AtomicBoolean. 103 * 104 * @author Alan Kaminsky 105 * @version 13-Sep-2008 106 */ 107 public class ReplicatedBoolean { 108 109 // Hidden data members. 110 private BooleanOp myOp; 111 private AtomicBoolean myValue; 112 private int myTag; 113 private Comm myComm; 114 private Receiver myReceiver; 115 116 // Hidden helper classes. 117 /** 118 * Class Receiver receives and processes flooded messages to update this 119 * replicated, shared reduction variable. 120 * 121 * @author Alan Kaminsky 122 * @version 13-Sep-2008 123 */ 124 private class Receiver 125 extends Thread { 126 127 public void run() { 128 BooleanItemBuf buf = BooleanBuf.buffer(); 129 130 try { 131 for (;;) { 132 // Receive a flooded message. 133 myComm.floodReceive(myTag, buf); 134 135 // Do an atomic reduction. 136 boolean oldvalue, newvalue; 137 do { 138 oldvalue = myValue.get(); 139 newvalue = myOp.op(oldvalue, buf.item); 140 } while (!myValue.compareAndSet(oldvalue, newvalue)); 141 } 142 } catch (Throwable exc) { 143 exc.printStackTrace(System.err); 144 } 145 } 146 } 147 148 // Exported constructors. 149 /** 150 * Construct a new replicated, shared Boolean reduction variable with the 151 * given reduction operator. The initial value is false. A message tag of 0 152 * is used. The world communicator is used. 153 * 154 * @param op Reduction operator. 155 * @exception NullPointerException (unchecked exception) Thrown if 156 * <code>op</code> is null. 157 */ 158 public ReplicatedBoolean(BooleanOp op) { 159 this(op, false, 0, Comm.world()); 160 } 161 162 /** 163 * Construct a new replicated, shared Boolean reduction variable with the 164 * given reduction operator and initial value. A message tag of 0 is used. 165 * The world communicator is used. 166 * 167 * @param op Reduction operator. 168 * @param initialValue Initial value. 169 * @exception NullPointerException (unchecked exception) Thrown if 170 * <code>op</code> is null. 171 */ 172 public ReplicatedBoolean(BooleanOp op, 173 boolean initialValue) { 174 this(op, initialValue, 0, Comm.world()); 175 } 176 177 /** 178 * Construct a new replicated, shared Boolean reduction variable with the 179 * given reduction operator, initial value, and message tag. The world 180 * communicator is used. 181 * 182 * @param op Reduction operator. 183 * @param initialValue Initial value. 184 * @param tag Message tag. 185 * @exception NullPointerException (unchecked exception) Thrown if 186 * <code>op</code> is null. Thrown if 187 * <code>comm</code> is null. 188 */ 189 public ReplicatedBoolean(BooleanOp op, 190 boolean initialValue, 191 int tag) { 192 this(op, initialValue, tag, Comm.world()); 193 } 194 195 /** 196 * Construct a new replicated, shared Boolean reduction variable with the 197 * given reduction operator, initial value, message tag, and communicator. 198 * 199 * @param op Reduction operator. 200 * @param initialValue Initial value. 201 * @param tag Message tag. 202 * @param comm Communicator. 203 * @exception NullPointerException (unchecked exception) Thrown if 204 * <code>op</code> is null. Thrown if 205 * <code>comm</code> is null. 206 */ 207 public ReplicatedBoolean(BooleanOp op, 208 boolean initialValue, 209 int tag, 210 Comm comm) { 211 if (op == null) { 212 throw new NullPointerException("ReplicatedBoolean(): op is null"); 213 } 214 if (comm == null) { 215 throw new NullPointerException("ReplicatedBoolean(): comm is null"); 216 } 217 myOp = op; 218 myValue = new AtomicBoolean(initialValue); 219 myTag = tag; 220 myComm = comm; 221 myReceiver = new Receiver(); 222 myReceiver.setDaemon(true); 223 myReceiver.start(); 224 } 225 226 // Exported operations. 227 /** 228 * Returns this replicated, shared reduction variable's current value. 229 * 230 * @return Current value. 231 */ 232 public boolean get() { 233 return myValue.get(); 234 } 235 236 /** 237 * Update this replicated, shared reduction variable's current value. This 238 * variable is combined with the given value using the reduction operation 239 * specified to the constructor (<I>op</I>). The result is stored back into 240 * this variable and is returned; the result may also be flooded to all 241 * processes in the communicator. 242 * 243 * @param value Value. 244 * @return (This variable) <I>op</I> (<code>value</code>). 245 * @exception IOException Thrown if an I/O error occurred. 246 * @throws java.io.IOException if any. 247 */ 248 public boolean reduce(boolean value) 249 throws IOException { 250 // Do an atomic reduction. 251 boolean oldvalue, newvalue; 252 do { 253 oldvalue = myValue.get(); 254 newvalue = myOp.op(oldvalue, value); 255 } while (!myValue.compareAndSet(oldvalue, newvalue)); 256 257 // If value changed, send a flooded message. 258 if (newvalue != oldvalue) { 259 myComm.floodSend(myTag, BooleanBuf.buffer(newvalue)); 260 } 261 262 // Return updated value. 263 return newvalue; 264 } 265 266 /** 267 * Returns a string version of this reduction variable. 268 * 269 * @return String version. 270 */ 271 public String toString() { 272 return Boolean.toString(get()); 273 } 274 275 }