1 // ******************************************************************************
2 //
3 // Title: Force Field X.
4 // Description: Force Field X - Software for Molecular Biophysics.
5 // Copyright: Copyright (c) Michael J. Schnieders 2001-2025.
6 //
7 // This file is part of Force Field X.
8 //
9 // Force Field X is free software; you can redistribute it and/or modify it
10 // under the terms of the GNU General Public License version 3 as published by
11 // the Free Software Foundation.
12 //
13 // Force Field X is distributed in the hope that it will be useful, but WITHOUT
14 // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15 // FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16 // details.
17 //
18 // You should have received a copy of the GNU General Public License along with
19 // Force Field X; if not, write to the Free Software Foundation, Inc., 59 Temple
20 // Place, Suite 330, Boston, MA 02111-1307 USA
21 //
22 // Linking this library statically or dynamically with other modules is making a
23 // combined work based on this library. Thus, the terms and conditions of the
24 // GNU General Public License cover the whole combination.
25 //
26 // As a special exception, the copyright holders of this library give you
27 // permission to link this library with independent modules to produce an
28 // executable, regardless of the license terms of these independent modules, and
29 // to copy and distribute the resulting executable under terms of your choice,
30 // provided that you also meet, for each linked independent module, the terms
31 // and conditions of the license of that module. An independent module is a
32 // module which is not derived from or based on this library. If you modify this
33 // library, you may extend this exception to your version of the library, but
34 // you are not obligated to do so. If you do not wish to do so, delete this
35 // exception statement from your version.
36 //
37 // ******************************************************************************
38 package ffx.algorithms.thermodynamics;
39
40 import edu.rit.mp.DoubleBuf;
41 import edu.rit.pj.Comm;
42 import ffx.algorithms.thermodynamics.OrthogonalSpaceTempering.Histogram;
43
44 import java.io.IOException;
45 import java.io.InterruptedIOException;
46 import java.util.logging.Level;
47 import java.util.logging.Logger;
48
49 import static java.lang.String.format;
50 import static java.util.Arrays.stream;
51 import static org.apache.commons.math3.util.FastMath.round;
52
53 /**
54 * The AsynchronousSend sends/receives Histogram counts from multiple walkers asynchronously.
55 *
56 * @author Michael J. Schnieders
57 * @since 1.0
58 */
59 class SendAsynchronous extends Thread {
60
61 private static final Logger logger = Logger.getLogger(SendAsynchronous.class.getName());
62 /**
63 * Storage to send a recursion count. [rank, lambda, dU/dL, weight].
64 */
65 private final double[] sendCount;
66 /**
67 * DoubleBuf to wrap the recursion count for sending.
68 */
69 private final DoubleBuf sendCountBuf;
70 /**
71 * Storage to receive a recursion count. [rank, lambda, dU/dL, weight].
72 */
73 private final double[] receiveCount;
74 /**
75 * DoubleBuf to wrap the recursion count.
76 */
77 private final DoubleBuf receiveCountBuf;
78 /**
79 * World communicator.
80 */
81 private final Comm world = Comm.world();
82 /**
83 * Rank.
84 */
85 private final int rank = world.rank();
86 /**
87 * Number of processes.
88 */
89 private final int numProc = world.size();
90 /**
91 * Private reference to the Histogram instance.
92 */
93 private final Histogram histogram;
94
95 /**
96 * Constructor for a thread to asynchronously receive recursion counts.
97 *
98 * @param histogram Histogram instance.
99 */
100 SendAsynchronous(Histogram histogram) {
101 this.histogram = histogram;
102 // Send.
103 sendCount = new double[4];
104 sendCountBuf = DoubleBuf.buffer(sendCount);
105 // Receive.
106 receiveCount = new double[4];
107 receiveCountBuf = DoubleBuf.buffer(receiveCount);
108 }
109
110 /** Run the AsynchronousSend receive thread. */
111 @Override
112 public void run() {
113 while (true) {
114 try {
115 histogram.world.receive(null, receiveCountBuf);
116 } catch (InterruptedIOException ioe) {
117 String message = " SendAsynchronous was interrupted at world.receive; "
118 + "future message passing may be in an error state.";
119 logger.log(Level.WARNING, message, ioe);
120 break;
121 } catch (IOException e) {
122 String message = e.getMessage();
123 logger.log(Level.WARNING, message, e);
124 }
125
126 // 4x NaN is a message (usually sent by the same process)
127 // indicating that it is time to shut down.
128 boolean terminateSignal = stream(receiveCount).allMatch(Double::isNaN);
129 if (terminateSignal) {
130 logger.fine(" Termination signal received -- finishing execution.");
131 break;
132 }
133
134 int countRank = (int) round(receiveCount[0]);
135 double lambda = receiveCount[1];
136 double dUdL = receiveCount[2];
137 double weight = receiveCount[3];
138
139 // If independent, only add bias values from this walker
140 if (histogram.getIndependentWalkers() && countRank != rank) {
141 continue;
142 }
143
144 if (histogram.getResetStatistics() && lambda > histogram.getLambdaResetValue()) {
145 histogram.allocateRecursionKernel();
146 histogram.disableResetStatistics();
147 logger.info(format(" Cleared OST histogram (Lambda = %6.4f).", lambda));
148 }
149
150 // Increase the Recursion Kernel based on the input of current walker.
151 // Guaranteed to be from a different process.
152 histogram.addToRecursionKernelValue(lambda, dUdL, weight);
153
154 // Check if we have been interrupted.
155 if (isInterrupted()) {
156 logger.log(Level.INFO, " SendAsynchronous was interrupted -- finishing execution.");
157 // No pending message receipt, so no warning.
158 break;
159 }
160 }
161 }
162
163 /**
164 * Send a Histogram count to all other processes.
165 *
166 * @param lambda Current value of lambda.
167 * @param dUdL Current value of dU/dL.
168 * @param temperingWeight The weight of the count.
169 */
170 public void send(double lambda, double dUdL, double temperingWeight) {
171 sendCount[0] = rank;
172 sendCount[1] = lambda;
173 sendCount[2] = dUdL;
174 sendCount[3] = temperingWeight;
175
176 histogram.setLastReceivedLambda(lambda);
177 histogram.setLastReceiveddUdL(dUdL);
178
179 for (int i = 0; i < numProc; i++) {
180 try {
181 world.send(i, sendCountBuf);
182 } catch (Exception ex) {
183 String message = " Asynchronous Multiwalker OST send failed.";
184 logger.log(Level.SEVERE, message, ex);
185 }
186 }
187 }
188 }