1 //******************************************************************************
2 //
3 // File: ReplicatedCharacter.java
4 // Package: edu.rit.pj.replica
5 // Unit: Class edu.rit.pj.replica.ReplicatedCharacter
6 //
7 // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8 // reserved. For further information, contact the author, Alan Kaminsky, at
9 // ark@cs.rit.edu.
10 //
11 // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12 // software; you can redistribute it and/or modify it under the terms of the GNU
13 // General Public License as published by the Free Software Foundation; either
14 // version 3 of the License, or (at your option) any later version.
15 //
16 // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18 // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19 //
20 // Linking this library statically or dynamically with other modules is making a
21 // combined work based on this library. Thus, the terms and conditions of the GNU
22 // General Public License cover the whole combination.
23 //
24 // As a special exception, the copyright holders of this library give you
25 // permission to link this library with independent modules to produce an
26 // executable, regardless of the license terms of these independent modules, and
27 // to copy and distribute the resulting executable under terms of your choice,
28 // provided that you also meet, for each linked independent module, the terms
29 // and conditions of the license of that module. An independent module is a module
30 // which is not derived from or based on this library. If you modify this library,
31 // you may extend this exception to your version of the library, but you are not
32 // obligated to do so. If you do not wish to do so, delete this exception
33 // statement from your version.
34 //
35 // A copy of the GNU General Public License is provided in the file gpl.txt. You
36 // may also obtain a copy of the GNU General Public License on the World Wide
37 // Web at http://www.gnu.org/licenses/gpl.html.
38 //
39 //******************************************************************************
40 package edu.rit.pj.replica;
41
42 import java.io.IOException;
43 import java.io.Serial;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import edu.rit.mp.CharacterBuf;
47 import edu.rit.mp.buf.CharacterItemBuf;
48 import edu.rit.pj.Comm;
49 import edu.rit.pj.reduction.CharacterOp;
50
51 /**
52 * Class ReplicatedCharacter provides a replicated, shared reduction variable
53 * for a value of type <code>char</code>.
54 * <P>
55 * A replicated, shared reduction variable is intended to be used in a cluster
56 * or hybrid parallel program for a data item shared among all the processes in
57 * the program and all the threads in each process. To use a replicated, shared
58 * reduction variable, do the following in each process of the parallel program:
59 * <OL TYPE=1>
60 * <LI>
61 * Construct an instance of class ReplicatedCharacter, specifying the reduction
62 * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
63 * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
64 * and the message tag to use for sending updates among the processes. At this
65 * point a <I>replica</I> of the variable exists in each process.
66 *
67 * <LI>
68 * To read the variable, call the <code>get()</code> method. The current value of
69 * the local process's replicated variable is returned.
70 *
71 * <LI>
72 * To update the variable, call the <code>reduce()</code> method, specifying a new
73 * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
74 * (described below) on the local process's replicated variable with the new
75 * value. If the variable changed as a result of the reduction, the variable's
76 * (updated) value is flooded to all the processes in the communicator. Finally,
77 * the <code>reduce()</code> method returns the variable's value.
78 * <P>
79 * Whenever one of the aforementioned flooded messages arrives, a separate
80 * thread performs an atomic reduction on the local process's variable with the
81 * received value.
82 * <P>
83 * An atomic reduction consists of these steps, performed atomically: Call the
84 * reduction operator's <code>op()</code> method, passing in the current value of
85 * the local process's replicated variable and the new value (either the new
86 * value specified as an argument of <code>reduce()</code>, or the new value
87 * received in a flooded message). Then store the <code>op()</code> method's return
88 * value back into the local process's replicated variable.
89 * </OL>
90 * <P>
91 * Class ReplicatedCharacter does not itself guarantee consistency of the
92 * replicas' values. This is to avoid the message passing overhead of a
93 * distributed state update protocol. Instead, the parallel program must be
94 * written to operate correctly when the variable is updated as described above.
95 * Note that the value of a process's local replica can change asynchronously at
96 * any time, either because a thread in the current process updated the
97 * variable, or because a flooded message updated the variable.
98 * <P>
99 * Class ReplicatedCharacter is multiple thread safe. The methods use lock-free
100 * atomic compare-and-set.
101 * <P>
102 * <I>Note:</I> Class ReplicatedCharacter is implemented using class
103 * java.util.concurrent.atomic.AtomicInteger.
104 *
105 * @author Alan Kaminsky
106 * @version 13-Sep-2008
107 */
108 @SuppressWarnings("serial")
109 public class ReplicatedCharacter
110 extends Number {
111
112 @Serial
113 private static final long serialVersionUID = 1L;
114
115 // Hidden data members.
116 private final CharacterOp myOp;
117 private final AtomicInteger myValue;
118 private final int myTag;
119 private final Comm myComm;
120 private final Receiver myReceiver;
121
122 // Hidden helper classes.
123 /**
124 * Class Receiver receives and processes flooded messages to update this
125 * replicated, shared reduction variable.
126 *
127 * @author Alan Kaminsky
128 * @version 12-Sep-2008
129 */
130 private class Receiver
131 extends Thread {
132
133 public void run() {
134 CharacterItemBuf buf = CharacterBuf.buffer();
135
136 try {
137 for (;;) {
138 // Receive a flooded message.
139 myComm.floodReceive(myTag, buf);
140
141 // Do an atomic reduction.
142 char oldvalue, newvalue;
143 do {
144 oldvalue = (char) myValue.get();
145 newvalue = myOp.op(oldvalue, buf.item);
146 } while (!myValue.compareAndSet(oldvalue, newvalue));
147 }
148 } catch (Throwable exc) {
149 exc.printStackTrace(System.err);
150 }
151 }
152 }
153
154 // Exported constructors.
155 /**
156 * Construct a new replicated, shared character reduction variable with the
157 * given reduction operator. The initial value is 0. A message tag of 0 is
158 * used. The world communicator is used.
159 *
160 * @param op Reduction operator.
161 * @exception NullPointerException (unchecked exception) Thrown if
162 * <code>op</code> is null.
163 */
164 public ReplicatedCharacter(CharacterOp op) {
165 this(op, (char) 0, 0, Comm.world());
166 }
167
168 /**
169 * Construct a new replicated, shared character reduction variable with the
170 * given reduction operator and initial value. A message tag of 0 is used.
171 * The world communicator is used.
172 *
173 * @param op Reduction operator.
174 * @param initialValue Initial value.
175 * @exception NullPointerException (unchecked exception) Thrown if
176 * <code>op</code> is null.
177 */
178 public ReplicatedCharacter(CharacterOp op,
179 char initialValue) {
180 this(op, initialValue, 0, Comm.world());
181 }
182
183 /**
184 * Construct a new replicated, shared character reduction variable with the
185 * given reduction operator, initial value, and message tag. The world
186 * communicator is used.
187 *
188 * @param op Reduction operator.
189 * @param initialValue Initial value.
190 * @param tag Message tag.
191 * @exception NullPointerException (unchecked exception) Thrown if
192 * <code>op</code> is null. Thrown if
193 * <code>comm</code> is null.
194 */
195 public ReplicatedCharacter(CharacterOp op,
196 char initialValue,
197 int tag) {
198 this(op, initialValue, tag, Comm.world());
199 }
200
201 /**
202 * Construct a new replicated, shared character reduction variable with the
203 * given reduction operator, initial value, message tag, and communicator.
204 *
205 * @param op Reduction operator.
206 * @param initialValue Initial value.
207 * @param tag Message tag.
208 * @param comm Communicator.
209 * @exception NullPointerException (unchecked exception) Thrown if
210 * <code>op</code> is null. Thrown if
211 * <code>comm</code> is null.
212 */
213 public ReplicatedCharacter(CharacterOp op,
214 char initialValue,
215 int tag,
216 Comm comm) {
217 if (op == null) {
218 throw new NullPointerException("ReplicatedCharacter(): op is null");
219 }
220 if (comm == null) {
221 throw new NullPointerException("ReplicatedCharacter(): comm is null");
222 }
223 myOp = op;
224 myValue = new AtomicInteger(initialValue);
225 myTag = tag;
226 myComm = comm;
227 myReceiver = new Receiver();
228 myReceiver.setDaemon(true);
229 myReceiver.start();
230 }
231
232 // Exported operations.
233 /**
234 * Returns this replicated, shared reduction variable's current value.
235 *
236 * @return Current value.
237 */
238 public char get() {
239 return (char) myValue.get();
240 }
241
242 /**
243 * Update this replicated, shared reduction variable's current value. This
244 * variable is combined with the given value using the reduction operation
245 * specified to the constructor (<I>op</I>). The result is stored back into
246 * this variable and is returned; the result may also be flooded to all
247 * processes in the communicator.
248 *
249 * @param value Value.
250 * @return (This variable) <I>op</I> (<code>value</code>).
251 * @exception IOException Thrown if an I/O error occurred.
252 * @throws java.io.IOException if any.
253 */
254 public char reduce(char value)
255 throws IOException {
256 // Do an atomic reduction.
257 char oldvalue, newvalue;
258 do {
259 oldvalue = (char) myValue.get();
260 newvalue = myOp.op(oldvalue, value);
261 } while (!myValue.compareAndSet(oldvalue, newvalue));
262
263 // If value changed, send a flooded message.
264 if (newvalue != oldvalue) {
265 myComm.floodSend(myTag, CharacterBuf.buffer(newvalue));
266 }
267
268 // Return updated value.
269 return newvalue;
270 }
271
272 /**
273 * Returns a string version of this reduction variable.
274 *
275 * @return String version.
276 */
277 public String toString() {
278 return Character.toString(get());
279 }
280
281 /**
282 * Returns this reduction variable's current value converted to type
283 * <code>int</code>.
284 *
285 * @return Current value.
286 */
287 public int intValue() {
288 return get();
289 }
290
291 /**
292 * Returns this reduction variable's current value converted to type
293 * <code>long</code>.
294 *
295 * @return Current value.
296 */
297 public long longValue() {
298 return get();
299 }
300
301 /**
302 * Returns this reduction variable's current value converted to type
303 * <code>float</code>.
304 *
305 * @return Current value.
306 */
307 public float floatValue() {
308 return get();
309 }
310
311 /**
312 * Returns this reduction variable's current value converted to type
313 * <code>double</code>.
314 *
315 * @return Current value.
316 */
317 public double doubleValue() {
318 return get();
319 }
320
321 }