1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 package ffx.algorithms.thermodynamics;
39
40 import edu.rit.mp.DoubleBuf;
41 import edu.rit.pj.Comm;
42 import ffx.algorithms.thermodynamics.OrthogonalSpaceTempering.Histogram;
43
44 import java.io.IOException;
45 import java.io.InterruptedIOException;
46 import java.util.logging.Level;
47 import java.util.logging.Logger;
48
49 import static java.lang.String.format;
50 import static java.util.Arrays.stream;
51 import static org.apache.commons.math3.util.FastMath.round;
52
53
54
55
56
57
58
59 class SendAsynchronous extends Thread {
60
61 private static final Logger logger = Logger.getLogger(SendAsynchronous.class.getName());
62
63
64
65 private final double[] sendCount;
66
67
68
69 private final DoubleBuf sendCountBuf;
70
71
72
73 private final double[] receiveCount;
74
75
76
77 private final DoubleBuf receiveCountBuf;
78
79
80
81 private final Comm world = Comm.world();
82
83
84
85 private final int rank = world.rank();
86
87
88
89 private final int numProc = world.size();
90
91
92
93 private final Histogram histogram;
94
95
96
97
98
99
100 SendAsynchronous(Histogram histogram) {
101 this.histogram = histogram;
102
103 sendCount = new double[4];
104 sendCountBuf = DoubleBuf.buffer(sendCount);
105
106 receiveCount = new double[4];
107 receiveCountBuf = DoubleBuf.buffer(receiveCount);
108 }
109
110
111 @Override
112 public void run() {
113 while (true) {
114 try {
115 histogram.world.receive(null, receiveCountBuf);
116 } catch (InterruptedIOException ioe) {
117 String message = " SendAsynchronous was interrupted at world.receive; "
118 + "future message passing may be in an error state.";
119 logger.log(Level.WARNING, message, ioe);
120 break;
121 } catch (IOException e) {
122 String message = e.getMessage();
123 logger.log(Level.WARNING, message, e);
124 }
125
126
127
128 boolean terminateSignal = stream(receiveCount).allMatch(Double::isNaN);
129 if (terminateSignal) {
130 logger.fine(" Termination signal received -- finishing execution.");
131 break;
132 }
133
134 int countRank = (int) round(receiveCount[0]);
135 double lambda = receiveCount[1];
136 double dUdL = receiveCount[2];
137 double weight = receiveCount[3];
138
139
140 if (histogram.getIndependentWalkers() && countRank != rank) {
141 continue;
142 }
143
144 if (histogram.getResetStatistics() && lambda > histogram.getLambdaResetValue()) {
145 histogram.allocateRecursionKernel();
146 histogram.disableResetStatistics();
147 logger.info(format(" Cleared OST histogram (Lambda = %6.4f).", lambda));
148 }
149
150
151
152 histogram.addToRecursionKernelValue(lambda, dUdL, weight);
153
154
155 if (isInterrupted()) {
156 logger.log(Level.INFO, " SendAsynchronous was interrupted -- finishing execution.");
157
158 break;
159 }
160 }
161 }
162
163
164
165
166
167
168
169
170 public void send(double lambda, double dUdL, double temperingWeight) {
171 sendCount[0] = rank;
172 sendCount[1] = lambda;
173 sendCount[2] = dUdL;
174 sendCount[3] = temperingWeight;
175
176 histogram.setLastReceivedLambda(lambda);
177 histogram.setLastReceiveddUdL(dUdL);
178
179 for (int i = 0; i < numProc; i++) {
180 try {
181 world.send(i, sendCountBuf);
182 } catch (Exception ex) {
183 String message = " Asynchronous Multiwalker OST send failed.";
184 logger.log(Level.SEVERE, message, ex);
185 }
186 }
187 }
188 }