View Javadoc
1   //******************************************************************************
2   //
3   // File:    ReplicatedBoolean.java
4   // Package: edu.rit.pj.replica
5   // Unit:    Class edu.rit.pj.replica.ReplicatedBoolean
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.replica;
41  
42  import java.io.IOException;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import edu.rit.mp.BooleanBuf;
46  import edu.rit.mp.buf.BooleanItemBuf;
47  import edu.rit.pj.Comm;
48  import edu.rit.pj.reduction.BooleanOp;
49  
50  /**
51   * Class ReplicatedBoolean provides a replicated, shared reduction variable for
52   * a value of type <code>boolean</code>.
53   * <P>
54   * A replicated, shared reduction variable is intended to be used in a cluster
55   * or hybrid parallel program for a data item shared among all the processes in
56   * the program and all the threads in each process. To use a replicated, shared
57   * reduction variable, do the following in each process of the parallel program:
58   * <OL TYPE=1>
59   * <LI>
60   * Construct an instance of class ReplicatedBoolean, specifying the reduction
61   * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
62   * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
63   * and the message tag to use for sending updates among the processes. At this
64   * point a <I>replica</I> of the variable exists in each process.
65   *
66   * <LI>
67   * To read the variable, call the <code>get()</code> method. The current value of
68   * the local process's replicated variable is returned.
69   *
70   * <LI>
71   * To update the variable, call the <code>reduce()</code> method, specifying a new
72   * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
73   * (described below) on the local process's replicated variable with the new
74   * value. If the variable changed as a result of the reduction, the variable's
75   * (updated) value is flooded to all the processes in the communicator. Finally,
76   * the <code>reduce()</code> method returns the variable's value.
77   * <P>
78   * Whenever one of the aforementioned flooded messages arrives, a separate
79   * thread performs an atomic reduction on the local process's variable with the
80   * received value.
81   * <P>
82   * An atomic reduction consists of these steps, performed atomically: Call the
83   * reduction operator's <code>op()</code> method, passing in the current value of
84   * the local process's replicated variable and the new value (either the new
85   * value specified as an argument of <code>reduce()</code>, or the new value
86   * received in a flooded message). Then store the <code>op()</code> method's return
87   * value back into the local process's replicated variable.
88   * </OL>
89   * <P>
90   * Class ReplicatedBoolean does not itself guarantee consistency of the
91   * replicas' values. This is to avoid the message passing overhead of a
92   * distributed state update protocol. Instead, the parallel program must be
93   * written to operate correctly when the variable is updated as described above.
94   * Note that the value of a process's local replica can change asynchronously at
95   * any time, either because a thread in the current process updated the
96   * variable, or because a flooded message updated the variable.
97   * <P>
98   * Class ReplicatedBoolean is multiple thread safe. The methods use lock-free
99   * atomic compare-and-set.
100  * <P>
101  * <I>Note:</I> Class ReplicatedBoolean is implemented using class
102  * java.util.concurrent.atomic.AtomicBoolean.
103  *
104  * @author Alan Kaminsky
105  * @version 13-Sep-2008
106  */
107 public class ReplicatedBoolean {
108 
109 // Hidden data members.
110     private BooleanOp myOp;
111     private AtomicBoolean myValue;
112     private int myTag;
113     private Comm myComm;
114     private Receiver myReceiver;
115 
116 // Hidden helper classes.
117     /**
118      * Class Receiver receives and processes flooded messages to update this
119      * replicated, shared reduction variable.
120      *
121      * @author Alan Kaminsky
122      * @version 13-Sep-2008
123      */
124     private class Receiver
125             extends Thread {
126 
127         public void run() {
128             BooleanItemBuf buf = BooleanBuf.buffer();
129 
130             try {
131                 for (;;) {
132                     // Receive a flooded message.
133                     myComm.floodReceive(myTag, buf);
134 
135                     // Do an atomic reduction.
136                     boolean oldvalue, newvalue;
137                     do {
138                         oldvalue = myValue.get();
139                         newvalue = myOp.op(oldvalue, buf.item);
140                     } while (!myValue.compareAndSet(oldvalue, newvalue));
141                 }
142             } catch (Throwable exc) {
143                 exc.printStackTrace(System.err);
144             }
145         }
146     }
147 
148 // Exported constructors.
149     /**
150      * Construct a new replicated, shared Boolean reduction variable with the
151      * given reduction operator. The initial value is false. A message tag of 0
152      * is used. The world communicator is used.
153      *
154      * @param op Reduction operator.
155      * @exception NullPointerException (unchecked exception) Thrown if
156      * <code>op</code> is null.
157      */
158     public ReplicatedBoolean(BooleanOp op) {
159         this(op, false, 0, Comm.world());
160     }
161 
162     /**
163      * Construct a new replicated, shared Boolean reduction variable with the
164      * given reduction operator and initial value. A message tag of 0 is used.
165      * The world communicator is used.
166      *
167      * @param op Reduction operator.
168      * @param initialValue Initial value.
169      * @exception NullPointerException (unchecked exception) Thrown if
170      * <code>op</code> is null.
171      */
172     public ReplicatedBoolean(BooleanOp op,
173             boolean initialValue) {
174         this(op, initialValue, 0, Comm.world());
175     }
176 
177     /**
178      * Construct a new replicated, shared Boolean reduction variable with the
179      * given reduction operator, initial value, and message tag. The world
180      * communicator is used.
181      *
182      * @param op Reduction operator.
183      * @param initialValue Initial value.
184      * @param tag Message tag.
185      * @exception NullPointerException (unchecked exception) Thrown if
186      * <code>op</code> is null. Thrown if
187      * <code>comm</code> is null.
188      */
189     public ReplicatedBoolean(BooleanOp op,
190             boolean initialValue,
191             int tag) {
192         this(op, initialValue, tag, Comm.world());
193     }
194 
195     /**
196      * Construct a new replicated, shared Boolean reduction variable with the
197      * given reduction operator, initial value, message tag, and communicator.
198      *
199      * @param op Reduction operator.
200      * @param initialValue Initial value.
201      * @param tag Message tag.
202      * @param comm Communicator.
203      * @exception NullPointerException (unchecked exception) Thrown if
204      * <code>op</code> is null. Thrown if
205      * <code>comm</code> is null.
206      */
207     public ReplicatedBoolean(BooleanOp op,
208             boolean initialValue,
209             int tag,
210             Comm comm) {
211         if (op == null) {
212             throw new NullPointerException("ReplicatedBoolean(): op is null");
213         }
214         if (comm == null) {
215             throw new NullPointerException("ReplicatedBoolean(): comm is null");
216         }
217         myOp = op;
218         myValue = new AtomicBoolean(initialValue);
219         myTag = tag;
220         myComm = comm;
221         myReceiver = new Receiver();
222         myReceiver.setDaemon(true);
223         myReceiver.start();
224     }
225 
226 // Exported operations.
227     /**
228      * Returns this replicated, shared reduction variable's current value.
229      *
230      * @return Current value.
231      */
232     public boolean get() {
233         return myValue.get();
234     }
235 
236     /**
237      * Update this replicated, shared reduction variable's current value. This
238      * variable is combined with the given value using the reduction operation
239      * specified to the constructor (<I>op</I>). The result is stored back into
240      * this variable and is returned; the result may also be flooded to all
241      * processes in the communicator.
242      *
243      * @param value Value.
244      * @return (This variable) <I>op</I> (<code>value</code>).
245      * @exception IOException Thrown if an I/O error occurred.
246      * @throws java.io.IOException if any.
247      */
248     public boolean reduce(boolean value)
249             throws IOException {
250         // Do an atomic reduction.
251         boolean oldvalue, newvalue;
252         do {
253             oldvalue = myValue.get();
254             newvalue = myOp.op(oldvalue, value);
255         } while (!myValue.compareAndSet(oldvalue, newvalue));
256 
257         // If value changed, send a flooded message.
258         if (newvalue != oldvalue) {
259             myComm.floodSend(myTag, BooleanBuf.buffer(newvalue));
260         }
261 
262         // Return updated value.
263         return newvalue;
264     }
265 
266     /**
267      * Returns a string version of this reduction variable.
268      *
269      * @return String version.
270      */
271     public String toString() {
272         return Boolean.toString(get());
273     }
274 
275 }