View Javadoc
1   //******************************************************************************
2   //
3   // File:    ReplicatedObject.java
4   // Package: edu.rit.pj.replica
5   // Unit:    Class edu.rit.pj.replica.ReplicatedObject
6   //
7   // This Java source file is copyright (C) 2008 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  package edu.rit.pj.replica;
41  
42  import java.io.IOException;
43  import java.util.concurrent.atomic.AtomicReference;
44  
45  import edu.rit.mp.ObjectBuf;
46  import edu.rit.mp.buf.ObjectItemBuf;
47  import edu.rit.pj.Comm;
48  import edu.rit.pj.reduction.ObjectOp;
49  
50  /**
51   * Class ReplicatedObject provides a replicated, shared reduction variable for a
52   * value of type T (a nonprimitive type).
53   * <P>
54   * A replicated, shared reduction variable is intended to be used in a cluster
55   * or hybrid parallel program for a data item shared among all the processes in
56   * the program and all the threads in each process. To use a replicated, shared
57   * reduction variable, do the following in each process of the parallel program:
58   * <OL TYPE=1>
59   * <LI>
60   * Construct an instance of class ReplicatedObject, specifying the reduction
61   * operator (class {@linkplain edu.rit.pj.reduction.Op}) to use when performing
62   * updates, and specifying the communicator (class {@linkplain edu.rit.pj.Comm})
63   * and the message tag to use for sending updates among the processes. At this
64   * point a <I>replica</I> of the variable exists in each process.
65   *
66   * <LI>
67   * To read the variable, call the <code>get()</code> method. The current value of
68   * the local process's replicated variable is returned.
69   *
70   * <LI>
71   * To update the variable, call the <code>reduce()</code> method, specifying a new
72   * value. The <code>reduce()</code> method performs an <I>atomic reduction</I>
73   * (described below) on the local process's replicated variable with the new
74   * value. If the variable changed as a result of the reduction, the variable's
75   * (updated) value is flooded to all the processes in the communicator. Finally,
76   * the <code>reduce()</code> method returns the variable's value.
77   * <P>
78   * Whenever one of the aforementioned flooded messages arrives, a separate
79   * thread performs an atomic reduction on the local process's variable with the
80   * received value.
81   * <P>
82   * An atomic reduction consists of these steps, performed atomically: Call the
83   * reduction operator's <code>op()</code> method, passing in the current value of
84   * the local process's replicated variable and the new value (either the new
85   * value specified as an argument of <code>reduce()</code>, or the new value
86   * received in a flooded message). Then store the <code>op()</code> method's return
87   * value back into the local process's replicated variable.
88   * </OL>
89   * <P>
90   * Class ReplicatedObject does not itself guarantee consistency of the replicas'
91   * values. This is to avoid the message passing overhead of a distributed state
92   * update protocol. Instead, the parallel program must be written to operate
93   * correctly when the variable is updated as described above. Note that the
94   * value of a process's local replica can change asynchronously at any time,
95   * either because a thread in the current process updated the variable, or
96   * because a flooded message updated the variable.
97   * <P>
98   * Class ReplicatedObject is multiple thread safe. The methods use lock-free
99   * atomic compare-and-set.
100  * <P>
101  * <I>Note:</I> Class ReplicatedObject is implemented using class
102  * java.util.concurrent.atomic.AtomicReference.
103  *
104  * @param <T> Object data type.
105  * @author Alan Kaminsky
106  * @version 13-Sep-2008
107  */
108 public class ReplicatedObject<T> {
109 
110 // Hidden data members.
111     private ObjectOp<T> myOp;
112     private AtomicReference<T> myValue;
113     private int myTag;
114     private Comm myComm;
115     private Receiver myReceiver;
116 
117 // Hidden helper classes.
118     /**
119      * Class Receiver receives and processes flooded messages to update this
120      * replicated, shared reduction variable.
121      *
122      * @author Alan Kaminsky
123      * @version 13-Sep-2008
124      */
125     private class Receiver
126             extends Thread {
127 
128         public void run() {
129             ObjectItemBuf<T> buf = ObjectBuf.buffer();
130 
131             try {
132                 for (;;) {
133                     // Receive a flooded message.
134                     myComm.floodReceive(myTag, buf);
135 
136                     // Do an atomic reduction.
137                     T oldvalue, newvalue;
138                     do {
139                         oldvalue = myValue.get();
140                         newvalue = myOp.op(oldvalue, buf.item);
141                     } while (!myValue.compareAndSet(oldvalue, newvalue));
142                 }
143             } catch (Throwable exc) {
144                 exc.printStackTrace(System.err);
145             }
146         }
147     }
148 
149 // Exported constructors.
150     /**
151      * Construct a new replicated, shared object reduction variable with the
152      * given reduction operator. The initial value is null. A message tag of 0
153      * is used. The world communicator is used.
154      *
155      * @param op Reduction operator.
156      * @exception NullPointerException (unchecked exception) Thrown if
157      * <code>op</code> is null.
158      */
159     public ReplicatedObject(ObjectOp<T> op) {
160         this(op, null, 0, Comm.world());
161     }
162 
163     /**
164      * Construct a new replicated, shared object reduction variable with the
165      * given reduction operator and initial value. A message tag of 0 is used.
166      * The world communicator is used.
167      *
168      * @param op Reduction operator.
169      * @param initialValue Initial value.
170      * @exception NullPointerException (unchecked exception) Thrown if
171      * <code>op</code> is null.
172      */
173     public ReplicatedObject(ObjectOp<T> op,
174             T initialValue) {
175         this(op, initialValue, 0, Comm.world());
176     }
177 
178     /**
179      * Construct a new replicated, shared object reduction variable with the
180      * given reduction operator, initial value, and message tag. The world
181      * communicator is used.
182      *
183      * @param op Reduction operator.
184      * @param initialValue Initial value.
185      * @param tag Message tag.
186      * @exception NullPointerException (unchecked exception) Thrown if
187      * <code>op</code> is null. Thrown if
188      * <code>comm</code> is null.
189      */
190     public ReplicatedObject(ObjectOp<T> op,
191             T initialValue,
192             int tag) {
193         this(op, initialValue, tag, Comm.world());
194     }
195 
196     /**
197      * Construct a new replicated, shared object reduction variable with the
198      * given reduction operator, initial value, message tag, and communicator.
199      *
200      * @param op Reduction operator.
201      * @param initialValue Initial value.
202      * @param tag Message tag.
203      * @param comm Communicator.
204      * @exception NullPointerException (unchecked exception) Thrown if
205      * <code>op</code> is null. Thrown if
206      * <code>comm</code> is null.
207      */
208     public ReplicatedObject(ObjectOp<T> op,
209             T initialValue,
210             int tag,
211             Comm comm) {
212         if (op == null) {
213             throw new NullPointerException("ReplicatedObject(): op is null");
214         }
215         if (comm == null) {
216             throw new NullPointerException("ReplicatedObject(): comm is null");
217         }
218         myOp = op;
219         myValue = new AtomicReference<T>(initialValue);
220         myTag = tag;
221         myComm = comm;
222         myReceiver = new Receiver();
223         myReceiver.setDaemon(true);
224         myReceiver.start();
225     }
226 
227 // Exported operations.
228     /**
229      * Returns this replicated, shared reduction variable's current value.
230      *
231      * @return Current value.
232      */
233     public T get() {
234         return myValue.get();
235     }
236 
237     /**
238      * Update this replicated, shared reduction variable's current value. This
239      * variable is combined with the given value using the reduction operation
240      * specified to the constructor (<I>op</I>). The result is stored back into
241      * this variable and is returned; the result may also be flooded to all
242      * processes in the communicator.
243      *
244      * @param value Value.
245      * @return (This variable) <I>op</I> (<code>value</code>).
246      * @exception IOException Thrown if an I/O error occurred.
247      * @throws java.io.IOException if any.
248      */
249     public T reduce(T value)
250             throws IOException {
251         // Do an atomic reduction.
252         T oldvalue, newvalue;
253         do {
254             oldvalue = myValue.get();
255             newvalue = myOp.op(oldvalue, value);
256         } while (!myValue.compareAndSet(oldvalue, newvalue));
257 
258         // If value changed, send a flooded message.
259         if (newvalue != oldvalue) {
260             myComm.floodSend(myTag, ObjectBuf.buffer(newvalue));
261         }
262 
263         // Return updated value.
264         return newvalue;
265     }
266 
267     /**
268      * Returns a string version of this reduction variable.
269      *
270      * @return String version.
271      */
272     public String toString() {
273         return myValue.toString();
274     }
275 
276 }