View Javadoc
1   // ******************************************************************************
2   //
3   // Title:       Force Field X.
4   // Description: Force Field X - Software for Molecular Biophysics.
5   // Copyright:   Copyright (c) Michael J. Schnieders 2001-2024.
6   //
7   // This file is part of Force Field X.
8   //
9   // Force Field X is free software; you can redistribute it and/or modify it
10  // under the terms of the GNU General Public License version 3 as published by
11  // the Free Software Foundation.
12  //
13  // Force Field X is distributed in the hope that it will be useful, but WITHOUT
14  // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15  // FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16  // details.
17  //
18  // You should have received a copy of the GNU General Public License along with
19  // Force Field X; if not, write to the Free Software Foundation, Inc., 59 Temple
20  // Place, Suite 330, Boston, MA 02111-1307 USA
21  //
22  // Linking this library statically or dynamically with other modules is making a
23  // combined work based on this library. Thus, the terms and conditions of the
24  // GNU General Public License cover the whole combination.
25  //
26  // As a special exception, the copyright holders of this library give you
27  // permission to link this library with independent modules to produce an
28  // executable, regardless of the license terms of these independent modules, and
29  // to copy and distribute the resulting executable under terms of your choice,
30  // provided that you also meet, for each linked independent module, the terms
31  // and conditions of the license of that module. An independent module is a
32  // module which is not derived from or based on this library. If you modify this
33  // library, you may extend this exception to your version of the library, but
34  // you are not obligated to do so. If you do not wish to do so, delete this
35  // exception statement from your version.
36  //
37  // ******************************************************************************
38  package ffx.algorithms.thermodynamics;
39  
40  import static java.lang.String.format;
41  import static java.util.Arrays.stream;
42  import static org.apache.commons.math3.util.FastMath.round;
43  
44  import edu.rit.mp.DoubleBuf;
45  import edu.rit.pj.Comm;
46  import ffx.algorithms.thermodynamics.OrthogonalSpaceTempering.Histogram;
47  import java.io.IOException;
48  import java.io.InterruptedIOException;
49  import java.util.logging.Level;
50  import java.util.logging.Logger;
51  
52  /**
53   * The AsynchronousSend sends/receives Histogram counts from multiple walkers asynchronously.
54   *
55   * @author Michael J. Schnieders
56   * @since 1.0
57   */
58  class SendAsynchronous extends Thread {
59  
60    private static final Logger logger = Logger.getLogger(SendAsynchronous.class.getName());
61    /**
62     * Storage to send a recursion count. [rank, lambda, dU/dL, weight].
63     */
64    private final double[] sendCount;
65    /**
66     * DoubleBuf to wrap the recursion count for sending.
67     */
68    private final DoubleBuf sendCountBuf;
69    /**
70     * Storage to receive a recursion count. [rank, lambda, dU/dL, weight].
71     */
72    private final double[] receiveCount;
73    /**
74     * DoubleBuf to wrap the recursion count.
75     */
76    private final DoubleBuf receiveCountBuf;
77    /**
78     * World communicator.
79     */
80    private final Comm world = Comm.world();
81    /**
82     * Rank.
83     */
84    private final int rank = world.rank();
85    /**
86     * Number of processes.
87     */
88    private final int numProc = world.size();
89    /**
90     * Private reference to the Histogram instance.
91     */
92    private final Histogram histogram;
93  
94    /**
95     * Constructor for a thread to asynchronously receive recursion counts.
96     *
97     * @param histogram Histogram instance.
98     */
99    SendAsynchronous(Histogram histogram) {
100     this.histogram = histogram;
101     // Send.
102     sendCount = new double[4];
103     sendCountBuf = DoubleBuf.buffer(sendCount);
104     // Receive.
105     receiveCount = new double[4];
106     receiveCountBuf = DoubleBuf.buffer(receiveCount);
107   }
108 
109   /** Run the AsynchronousSend receive thread. */
110   @Override
111   public void run() {
112     while (true) {
113       try {
114         histogram.world.receive(null, receiveCountBuf);
115       } catch (InterruptedIOException ioe) {
116         String message = " SendAsynchronous was interrupted at world.receive; "
117             + "future message passing may be in an error state.";
118         logger.log(Level.WARNING, message, ioe);
119         break;
120       } catch (IOException e) {
121         String message = e.getMessage();
122         logger.log(Level.WARNING, message, e);
123       }
124 
125       // 4x NaN is a message (usually sent by the same process)
126       // indicating that it is time to shut down.
127       boolean terminateSignal = stream(receiveCount).allMatch(Double::isNaN);
128       if (terminateSignal) {
129         logger.fine(" Termination signal received -- finishing execution.");
130         break;
131       }
132 
133       int countRank = (int) round(receiveCount[0]);
134       double lambda = receiveCount[1];
135       double dUdL = receiveCount[2];
136       double weight = receiveCount[3];
137 
138       // If independent, only add bias values from this walker
139       if (histogram.getIndependentWalkers() && countRank != rank) {
140         continue;
141       }
142 
143       if (histogram.getResetStatistics() && lambda > histogram.getLambdaResetValue()) {
144         histogram.allocateRecursionKernel();
145         histogram.disableResetStatistics();
146         logger.info(format(" Cleared OST histogram (Lambda = %6.4f).", lambda));
147       }
148 
149       // Increase the Recursion Kernel based on the input of current walker.
150       // Guaranteed to be from a different process.
151       histogram.addToRecursionKernelValue(lambda, dUdL, weight);
152 
153       // Check if we have been interrupted.
154       if (isInterrupted()) {
155         logger.log(Level.INFO, " SendAsynchronous was interrupted -- finishing execution.");
156         // No pending message receipt, so no warning.
157         break;
158       }
159     }
160   }
161 
162   /**
163    * Send a Histogram count to all other processes.
164    *
165    * @param lambda Current value of lambda.
166    * @param dUdL Current value of dU/dL.
167    * @param temperingWeight The weight of the count.
168    */
169   public void send(double lambda, double dUdL, double temperingWeight) {
170     sendCount[0] = rank;
171     sendCount[1] = lambda;
172     sendCount[2] = dUdL;
173     sendCount[3] = temperingWeight;
174 
175     histogram.setLastReceivedLambda(lambda);
176     histogram.setLastReceiveddUdL(dUdL);
177 
178     for (int i = 0; i < numProc; i++) {
179       try {
180         world.send(i, sendCountBuf);
181       } catch (Exception ex) {
182         String message = " Asynchronous Multiwalker OST send failed.";
183         logger.log(Level.SEVERE, message, ex);
184       }
185     }
186   }
187 }