1 //******************************************************************************
2 //
3 // File: ReplicatedBoolean.java
4 // Package: edu.rit.pj.replica
5 // Unit: Class edu.rit.pj.replica.ReplicatedBoolean
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.replica;
41
42 import java.io.IOException;
43 import java.util.concurrent.atomic.AtomicBoolean;
44
45 import edu.rit.mp.BooleanBuf;
46 import edu.rit.mp.buf.BooleanItemBuf;
47 import edu.rit.pj.Comm;
48 import edu.rit.pj.reduction.BooleanOp;
49
50 /**
51 * Class ReplicatedBoolean provides a replicated, shared reduction variable for
52 * a value of type <code>boolean</code>.
53 * <P>
54 * A replicated, shared reduction variable is intended to be used in a cluster
55 * or hybrid parallel program for a data item shared among all the processes in
56 * the program and all the threads in each process. To use a replicated, shared
57 * reduction variable, do the following in each process of the parallel program:
58 * <OL TYPE=1>
59 * <LI>
60 * Construct an instance of class ReplicatedBoolean, specifying the reduction
61 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
62 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
63 * and the message tag to use for sending updates among the processes. At this
64 * point a <I>replica</I> of the variable exists in each process.
65 *
66 * <LI>
67 * To read the variable, call the <code>get()</code> method. The current value of
68 * the local process's replicated variable is returned.
69 *
70 * <LI>
71 * To update the variable, call the <code>reduce()</code> method, specifying a new
72 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
73 * (described below) on the local process's replicated variable with the new
74 * value. If the variable changed as a result of the reduction, the variable's
75 * (updated) value is flooded to all the processes in the communicator. Finally,
76 * the <code>reduce()</code> method returns the variable's value.
77 * <P>
78 * Whenever one of the aforementioned flooded messages arrives, a separate
79 * thread performs an atomic reduction on the local process's variable with the
80 * received value.
81 * <P>
82 * An atomic reduction consists of these steps, performed atomically: Call the
83 * reduction operator's <code>op()</code> method, passing in the current value of
84 * the local process's replicated variable and the new value (either the new
85 * value specified as an argument of <code>reduce()</code>, or the new value
86 * received in a flooded message). Then store the <code>op()</code> method's return
87 * value back into the local process's replicated variable.
88 * </OL>
89 * <P>
90 * Class ReplicatedBoolean does not itself guarantee consistency of the
91 * replicas' values. This is to avoid the message passing overhead of a
92 * distributed state update protocol. Instead, the parallel program must be
93 * written to operate correctly when the variable is updated as described above.
94 * Note that the value of a process's local replica can change asynchronously at
95 * any time, either because a thread in the current process updated the
96 * variable, or because a flooded message updated the variable.
97 * <P>
98 * Class ReplicatedBoolean is multiple thread safe. The methods use lock-free
99 * atomic compare-and-set.
100 * <P>
101 * <I>Note:</I> Class ReplicatedBoolean is implemented using class
102 * java.util.concurrent.atomic.AtomicBoolean.
103 *
104 * @author Alan Kaminsky
105 * @version 13-Sep-2008
106 */
107 public class ReplicatedBoolean {
108
109 // Hidden data members.
110 private BooleanOp myOp;
111 private AtomicBoolean myValue;
112 private int myTag;
113 private Comm myComm;
114 private Receiver myReceiver;
115
116 // Hidden helper classes.
117 /**
118 * Class Receiver receives and processes flooded messages to update this
119 * replicated, shared reduction variable.
120 *
121 * @author Alan Kaminsky
122 * @version 13-Sep-2008
123 */
124 private class Receiver
125 extends Thread {
126
127 public void run() {
128 BooleanItemBuf buf = BooleanBuf.buffer();
129
130 try {
131 for (;;) {
132 // Receive a flooded message.
133 myComm.floodReceive(myTag, buf);
134
135 // Do an atomic reduction.
136 boolean oldvalue, newvalue;
137 do {
138 oldvalue = myValue.get();
139 newvalue = myOp.op(oldvalue, buf.item);
140 } while (!myValue.compareAndSet(oldvalue, newvalue));
141 }
142 } catch (Throwable exc) {
143 exc.printStackTrace(System.err);
144 }
145 }
146 }
147
148 // Exported constructors.
149 /**
150 * Construct a new replicated, shared Boolean reduction variable with the
151 * given reduction operator. The initial value is false. A message tag of 0
152 * is used. The world communicator is used.
153 *
154 * @param op Reduction operator.
155 * @exception NullPointerException (unchecked exception) Thrown if
156 * <code>op</code> is null.
157 */
158 public ReplicatedBoolean(BooleanOp op) {
159 this(op, false, 0, Comm.world());
160 }
161
162 /**
163 * Construct a new replicated, shared Boolean reduction variable with the
164 * given reduction operator and initial value. A message tag of 0 is used.
165 * The world communicator is used.
166 *
167 * @param op Reduction operator.
168 * @param initialValue Initial value.
169 * @exception NullPointerException (unchecked exception) Thrown if
170 * <code>op</code> is null.
171 */
172 public ReplicatedBoolean(BooleanOp op,
173 boolean initialValue) {
174 this(op, initialValue, 0, Comm.world());
175 }
176
177 /**
178 * Construct a new replicated, shared Boolean reduction variable with the
179 * given reduction operator, initial value, and message tag. The world
180 * communicator is used.
181 *
182 * @param op Reduction operator.
183 * @param initialValue Initial value.
184 * @param tag Message tag.
185 * @exception NullPointerException (unchecked exception) Thrown if
186 * <code>op</code> is null. Thrown if
187 * <code>comm</code> is null.
188 */
189 public ReplicatedBoolean(BooleanOp op,
190 boolean initialValue,
191 int tag) {
192 this(op, initialValue, tag, Comm.world());
193 }
194
195 /**
196 * Construct a new replicated, shared Boolean reduction variable with the
197 * given reduction operator, initial value, message tag, and communicator.
198 *
199 * @param op Reduction operator.
200 * @param initialValue Initial value.
201 * @param tag Message tag.
202 * @param comm Communicator.
203 * @exception NullPointerException (unchecked exception) Thrown if
204 * <code>op</code> is null. Thrown if
205 * <code>comm</code> is null.
206 */
207 public ReplicatedBoolean(BooleanOp op,
208 boolean initialValue,
209 int tag,
210 Comm comm) {
211 if (op == null) {
212 throw new NullPointerException("ReplicatedBoolean(): op is null");
213 }
214 if (comm == null) {
215 throw new NullPointerException("ReplicatedBoolean(): comm is null");
216 }
217 myOp = op;
218 myValue = new AtomicBoolean(initialValue);
219 myTag = tag;
220 myComm = comm;
221 myReceiver = new Receiver();
222 myReceiver.setDaemon(true);
223 myReceiver.start();
224 }
225
226 // Exported operations.
227 /**
228 * Returns this replicated, shared reduction variable's current value.
229 *
230 * @return Current value.
231 */
232 public boolean get() {
233 return myValue.get();
234 }
235
236 /**
237 * Update this replicated, shared reduction variable's current value. This
238 * variable is combined with the given value using the reduction operation
239 * specified to the constructor (<I>op</I>). The result is stored back into
240 * this variable and is returned; the result may also be flooded to all
241 * processes in the communicator.
242 *
243 * @param value Value.
244 * @return (This variable) <I>op</I> (<code>value</code>).
245 * @exception IOException Thrown if an I/O error occurred.
246 * @throws java.io.IOException if any.
247 */
248 public boolean reduce(boolean value)
249 throws IOException {
250 // Do an atomic reduction.
251 boolean oldvalue, newvalue;
252 do {
253 oldvalue = myValue.get();
254 newvalue = myOp.op(oldvalue, value);
255 } while (!myValue.compareAndSet(oldvalue, newvalue));
256
257 // If value changed, send a flooded message.
258 if (newvalue != oldvalue) {
259 myComm.floodSend(myTag, BooleanBuf.buffer(newvalue));
260 }
261
262 // Return updated value.
263 return newvalue;
264 }
265
266 /**
267 * Returns a string version of this reduction variable.
268 *
269 * @return String version.
270 */
271 public String toString() {
272 return Boolean.toString(get());
273 }
274
275 }