View Javadoc
1   // ******************************************************************************
2   //
3   // Title:       Force Field X.
4   // Description: Force Field X - Software for Molecular Biophysics.
5   // Copyright:   Copyright (c) Michael J. Schnieders 2001-2025.
6   //
7   // This file is part of Force Field X.
8   //
9   // Force Field X is free software; you can redistribute it and/or modify it
10  // under the terms of the GNU General Public License version 3 as published by
11  // the Free Software Foundation.
12  //
13  // Force Field X is distributed in the hope that it will be useful, but WITHOUT
14  // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15  // FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16  // details.
17  //
18  // You should have received a copy of the GNU General Public License along with
19  // Force Field X; if not, write to the Free Software Foundation, Inc., 59 Temple
20  // Place, Suite 330, Boston, MA 02111-1307 USA
21  //
22  // Linking this library statically or dynamically with other modules is making a
23  // combined work based on this library. Thus, the terms and conditions of the
24  // GNU General Public License cover the whole combination.
25  //
26  // As a special exception, the copyright holders of this library give you
27  // permission to link this library with independent modules to produce an
28  // executable, regardless of the license terms of these independent modules, and
29  // to copy and distribute the resulting executable under terms of your choice,
30  // provided that you also meet, for each linked independent module, the terms
31  // and conditions of the license of that module. An independent module is a
32  // module which is not derived from or based on this library. If you modify this
33  // library, you may extend this exception to your version of the library, but
34  // you are not obligated to do so. If you do not wish to do so, delete this
35  // exception statement from your version.
36  //
37  // ******************************************************************************
38  package ffx.algorithms.thermodynamics;
39  
40  import edu.rit.mp.DoubleBuf;
41  import edu.rit.pj.Comm;
42  import ffx.algorithms.thermodynamics.OrthogonalSpaceTempering.Histogram;
43  
44  import java.io.IOException;
45  import java.io.InterruptedIOException;
46  import java.util.logging.Level;
47  import java.util.logging.Logger;
48  
49  import static java.lang.String.format;
50  import static java.util.Arrays.stream;
51  import static org.apache.commons.math3.util.FastMath.round;
52  
53  /**
54   * The AsynchronousSend sends/receives Histogram counts from multiple walkers asynchronously.
55   *
56   * @author Michael J. Schnieders
57   * @since 1.0
58   */
59  class SendAsynchronous extends Thread {
60  
61    private static final Logger logger = Logger.getLogger(SendAsynchronous.class.getName());
62    /**
63     * Storage to send a recursion count. [rank, lambda, dU/dL, weight].
64     */
65    private final double[] sendCount;
66    /**
67     * DoubleBuf to wrap the recursion count for sending.
68     */
69    private final DoubleBuf sendCountBuf;
70    /**
71     * Storage to receive a recursion count. [rank, lambda, dU/dL, weight].
72     */
73    private final double[] receiveCount;
74    /**
75     * DoubleBuf to wrap the recursion count.
76     */
77    private final DoubleBuf receiveCountBuf;
78    /**
79     * World communicator.
80     */
81    private final Comm world = Comm.world();
82    /**
83     * Rank.
84     */
85    private final int rank = world.rank();
86    /**
87     * Number of processes.
88     */
89    private final int numProc = world.size();
90    /**
91     * Private reference to the Histogram instance.
92     */
93    private final Histogram histogram;
94  
95    /**
96     * Constructor for a thread to asynchronously receive recursion counts.
97     *
98     * @param histogram Histogram instance.
99     */
100   SendAsynchronous(Histogram histogram) {
101     this.histogram = histogram;
102     // Send.
103     sendCount = new double[4];
104     sendCountBuf = DoubleBuf.buffer(sendCount);
105     // Receive.
106     receiveCount = new double[4];
107     receiveCountBuf = DoubleBuf.buffer(receiveCount);
108   }
109 
110   /** Run the AsynchronousSend receive thread. */
111   @Override
112   public void run() {
113     while (true) {
114       try {
115         histogram.world.receive(null, receiveCountBuf);
116       } catch (InterruptedIOException ioe) {
117         String message = " SendAsynchronous was interrupted at world.receive; "
118             + "future message passing may be in an error state.";
119         logger.log(Level.WARNING, message, ioe);
120         break;
121       } catch (IOException e) {
122         String message = e.getMessage();
123         logger.log(Level.WARNING, message, e);
124       }
125 
126       // 4x NaN is a message (usually sent by the same process)
127       // indicating that it is time to shut down.
128       boolean terminateSignal = stream(receiveCount).allMatch(Double::isNaN);
129       if (terminateSignal) {
130         logger.fine(" Termination signal received -- finishing execution.");
131         break;
132       }
133 
134       int countRank = (int) round(receiveCount[0]);
135       double lambda = receiveCount[1];
136       double dUdL = receiveCount[2];
137       double weight = receiveCount[3];
138 
139       // If independent, only add bias values from this walker
140       if (histogram.getIndependentWalkers() && countRank != rank) {
141         continue;
142       }
143 
144       if (histogram.getResetStatistics() && lambda > histogram.getLambdaResetValue()) {
145         histogram.allocateRecursionKernel();
146         histogram.disableResetStatistics();
147         logger.info(format(" Cleared OST histogram (Lambda = %6.4f).", lambda));
148       }
149 
150       // Increase the Recursion Kernel based on the input of current walker.
151       // Guaranteed to be from a different process.
152       histogram.addToRecursionKernelValue(lambda, dUdL, weight);
153 
154       // Check if we have been interrupted.
155       if (isInterrupted()) {
156         logger.log(Level.INFO, " SendAsynchronous was interrupted -- finishing execution.");
157         // No pending message receipt, so no warning.
158         break;
159       }
160     }
161   }
162 
163   /**
164    * Send a Histogram count to all other processes.
165    *
166    * @param lambda Current value of lambda.
167    * @param dUdL Current value of dU/dL.
168    * @param temperingWeight The weight of the count.
169    */
170   public void send(double lambda, double dUdL, double temperingWeight) {
171     sendCount[0] = rank;
172     sendCount[1] = lambda;
173     sendCount[2] = dUdL;
174     sendCount[3] = temperingWeight;
175 
176     histogram.setLastReceivedLambda(lambda);
177     histogram.setLastReceiveddUdL(dUdL);
178 
179     for (int i = 0; i < numProc; i++) {
180       try {
181         world.send(i, sendCountBuf);
182       } catch (Exception ex) {
183         String message = " Asynchronous Multiwalker OST send failed.";
184         logger.log(Level.SEVERE, message, ex);
185       }
186     }
187   }
188 }