1 //******************************************************************************
2 //
3 // File: ReplicatedObject.java
4 // Package: edu.rit.pj.replica
5 // Unit: Class edu.rit.pj.replica.ReplicatedObject
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.replica;
41
42 import java.io.IOException;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import edu.rit.mp.ObjectBuf;
46 import edu.rit.mp.buf.ObjectItemBuf;
47 import edu.rit.pj.Comm;
48 import edu.rit.pj.reduction.ObjectOp;
49
50 /**
51 * Class ReplicatedObject provides a replicated, shared reduction variable for a
52 * value of type T (a nonprimitive type).
53 * <P>
54 * A replicated, shared reduction variable is intended to be used in a cluster
55 * or hybrid parallel program for a data item shared among all the processes in
56 * the program and all the threads in each process. To use a replicated, shared
57 * reduction variable, do the following in each process of the parallel program:
58 * <OL TYPE=1>
59 * <LI>
60 * Construct an instance of class ReplicatedObject, specifying the reduction
61 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
62 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
63 * and the message tag to use for sending updates among the processes. At this
64 * point a <I>replica</I> of the variable exists in each process.
65 *
66 * <LI>
67 * To read the variable, call the <code>get()</code> method. The current value of
68 * the local process's replicated variable is returned.
69 *
70 * <LI>
71 * To update the variable, call the <code>reduce()</code> method, specifying a new
72 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
73 * (described below) on the local process's replicated variable with the new
74 * value. If the variable changed as a result of the reduction, the variable's
75 * (updated) value is flooded to all the processes in the communicator. Finally,
76 * the <code>reduce()</code> method returns the variable's value.
77 * <P>
78 * Whenever one of the aforementioned flooded messages arrives, a separate
79 * thread performs an atomic reduction on the local process's variable with the
80 * received value.
81 * <P>
82 * An atomic reduction consists of these steps, performed atomically: Call the
83 * reduction operator's <code>op()</code> method, passing in the current value of
84 * the local process's replicated variable and the new value (either the new
85 * value specified as an argument of <code>reduce()</code>, or the new value
86 * received in a flooded message). Then store the <code>op()</code> method's return
87 * value back into the local process's replicated variable.
88 * </OL>
89 * <P>
90 * Class ReplicatedObject does not itself guarantee consistency of the replicas'
91 * values. This is to avoid the message passing overhead of a distributed state
92 * update protocol. Instead, the parallel program must be written to operate
93 * correctly when the variable is updated as described above. Note that the
94 * value of a process's local replica can change asynchronously at any time,
95 * either because a thread in the current process updated the variable, or
96 * because a flooded message updated the variable.
97 * <P>
98 * Class ReplicatedObject is multiple thread safe. The methods use lock-free
99 * atomic compare-and-set.
100 * <P>
101 * <I>Note:</I> Class ReplicatedObject is implemented using class
102 * java.util.concurrent.atomic.AtomicReference.
103 *
104 * @param <T> Object data type.
105 * @author Alan Kaminsky
106 * @version 13-Sep-2008
107 */
108 public class ReplicatedObject<T> {
109
110 // Hidden data members.
111 private ObjectOp<T> myOp;
112 private AtomicReference<T> myValue;
113 private int myTag;
114 private Comm myComm;
115 private Receiver myReceiver;
116
117 // Hidden helper classes.
118 /**
119 * Class Receiver receives and processes flooded messages to update this
120 * replicated, shared reduction variable.
121 *
122 * @author Alan Kaminsky
123 * @version 13-Sep-2008
124 */
125 private class Receiver
126 extends Thread {
127
128 public void run() {
129 ObjectItemBuf<T> buf = ObjectBuf.buffer();
130
131 try {
132 for (;;) {
133 // Receive a flooded message.
134 myComm.floodReceive(myTag, buf);
135
136 // Do an atomic reduction.
137 T oldvalue, newvalue;
138 do {
139 oldvalue = myValue.get();
140 newvalue = myOp.op(oldvalue, buf.item);
141 } while (!myValue.compareAndSet(oldvalue, newvalue));
142 }
143 } catch (Throwable exc) {
144 exc.printStackTrace(System.err);
145 }
146 }
147 }
148
149 // Exported constructors.
150 /**
151 * Construct a new replicated, shared object reduction variable with the
152 * given reduction operator. The initial value is null. A message tag of 0
153 * is used. The world communicator is used.
154 *
155 * @param op Reduction operator.
156 * @exception NullPointerException (unchecked exception) Thrown if
157 * <code>op</code> is null.
158 */
159 public ReplicatedObject(ObjectOp<T> op) {
160 this(op, null, 0, Comm.world());
161 }
162
163 /**
164 * Construct a new replicated, shared object reduction variable with the
165 * given reduction operator and initial value. A message tag of 0 is used.
166 * The world communicator is used.
167 *
168 * @param op Reduction operator.
169 * @param initialValue Initial value.
170 * @exception NullPointerException (unchecked exception) Thrown if
171 * <code>op</code> is null.
172 */
173 public ReplicatedObject(ObjectOp<T> op,
174 T initialValue) {
175 this(op, initialValue, 0, Comm.world());
176 }
177
178 /**
179 * Construct a new replicated, shared object reduction variable with the
180 * given reduction operator, initial value, and message tag. The world
181 * communicator is used.
182 *
183 * @param op Reduction operator.
184 * @param initialValue Initial value.
185 * @param tag Message tag.
186 * @exception NullPointerException (unchecked exception) Thrown if
187 * <code>op</code> is null. Thrown if
188 * <code>comm</code> is null.
189 */
190 public ReplicatedObject(ObjectOp<T> op,
191 T initialValue,
192 int tag) {
193 this(op, initialValue, tag, Comm.world());
194 }
195
196 /**
197 * Construct a new replicated, shared object reduction variable with the
198 * given reduction operator, initial value, message tag, and communicator.
199 *
200 * @param op Reduction operator.
201 * @param initialValue Initial value.
202 * @param tag Message tag.
203 * @param comm Communicator.
204 * @exception NullPointerException (unchecked exception) Thrown if
205 * <code>op</code> is null. Thrown if
206 * <code>comm</code> is null.
207 */
208 public ReplicatedObject(ObjectOp<T> op,
209 T initialValue,
210 int tag,
211 Comm comm) {
212 if (op == null) {
213 throw new NullPointerException("ReplicatedObject(): op is null");
214 }
215 if (comm == null) {
216 throw new NullPointerException("ReplicatedObject(): comm is null");
217 }
218 myOp = op;
219 myValue = new AtomicReference<T>(initialValue);
220 myTag = tag;
221 myComm = comm;
222 myReceiver = new Receiver();
223 myReceiver.setDaemon(true);
224 myReceiver.start();
225 }
226
227 // Exported operations.
228 /**
229 * Returns this replicated, shared reduction variable's current value.
230 *
231 * @return Current value.
232 */
233 public T get() {
234 return myValue.get();
235 }
236
237 /**
238 * Update this replicated, shared reduction variable's current value. This
239 * variable is combined with the given value using the reduction operation
240 * specified to the constructor (<I>op</I>). The result is stored back into
241 * this variable and is returned; the result may also be flooded to all
242 * processes in the communicator.
243 *
244 * @param value Value.
245 * @return (This variable) <I>op</I> (<code>value</code>).
246 * @exception IOException Thrown if an I/O error occurred.
247 * @throws java.io.IOException if any.
248 */
249 public T reduce(T value)
250 throws IOException {
251 // Do an atomic reduction.
252 T oldvalue, newvalue;
253 do {
254 oldvalue = myValue.get();
255 newvalue = myOp.op(oldvalue, value);
256 } while (!myValue.compareAndSet(oldvalue, newvalue));
257
258 // If value changed, send a flooded message.
259 if (newvalue != oldvalue) {
260 myComm.floodSend(myTag, ObjectBuf.buffer(newvalue));
261 }
262
263 // Return updated value.
264 return newvalue;
265 }
266
267 /**
268 * Returns a string version of this reduction variable.
269 *
270 * @return String version.
271 */
272 public String toString() {
273 return myValue.toString();
274 }
275
276 }