1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 package ffx.algorithms.thermodynamics;
39
40 import static java.lang.String.format;
41 import static java.util.Arrays.stream;
42 import static org.apache.commons.math3.util.FastMath.round;
43
44 import edu.rit.mp.DoubleBuf;
45 import edu.rit.pj.Comm;
46 import ffx.algorithms.thermodynamics.OrthogonalSpaceTempering.Histogram;
47 import java.io.IOException;
48 import java.io.InterruptedIOException;
49 import java.util.logging.Level;
50 import java.util.logging.Logger;
51
52
53
54
55
56
57
58 class SendAsynchronous extends Thread {
59
60 private static final Logger logger = Logger.getLogger(SendAsynchronous.class.getName());
61
62
63
64 private final double[] sendCount;
65
66
67
68 private final DoubleBuf sendCountBuf;
69
70
71
72 private final double[] receiveCount;
73
74
75
76 private final DoubleBuf receiveCountBuf;
77
78
79
80 private final Comm world = Comm.world();
81
82
83
84 private final int rank = world.rank();
85
86
87
88 private final int numProc = world.size();
89
90
91
92 private final Histogram histogram;
93
94
95
96
97
98
99 SendAsynchronous(Histogram histogram) {
100 this.histogram = histogram;
101
102 sendCount = new double[4];
103 sendCountBuf = DoubleBuf.buffer(sendCount);
104
105 receiveCount = new double[4];
106 receiveCountBuf = DoubleBuf.buffer(receiveCount);
107 }
108
109
110 @Override
111 public void run() {
112 while (true) {
113 try {
114 histogram.world.receive(null, receiveCountBuf);
115 } catch (InterruptedIOException ioe) {
116 String message = " SendAsynchronous was interrupted at world.receive; "
117 + "future message passing may be in an error state.";
118 logger.log(Level.WARNING, message, ioe);
119 break;
120 } catch (IOException e) {
121 String message = e.getMessage();
122 logger.log(Level.WARNING, message, e);
123 }
124
125
126
127 boolean terminateSignal = stream(receiveCount).allMatch(Double::isNaN);
128 if (terminateSignal) {
129 logger.fine(" Termination signal received -- finishing execution.");
130 break;
131 }
132
133 int countRank = (int) round(receiveCount[0]);
134 double lambda = receiveCount[1];
135 double dUdL = receiveCount[2];
136 double weight = receiveCount[3];
137
138
139 if (histogram.getIndependentWalkers() && countRank != rank) {
140 continue;
141 }
142
143 if (histogram.getResetStatistics() && lambda > histogram.getLambdaResetValue()) {
144 histogram.allocateRecursionKernel();
145 histogram.disableResetStatistics();
146 logger.info(format(" Cleared OST histogram (Lambda = %6.4f).", lambda));
147 }
148
149
150
151 histogram.addToRecursionKernelValue(lambda, dUdL, weight);
152
153
154 if (isInterrupted()) {
155 logger.log(Level.INFO, " SendAsynchronous was interrupted -- finishing execution.");
156
157 break;
158 }
159 }
160 }
161
162
163
164
165
166
167
168
169 public void send(double lambda, double dUdL, double temperingWeight) {
170 sendCount[0] = rank;
171 sendCount[1] = lambda;
172 sendCount[2] = dUdL;
173 sendCount[3] = temperingWeight;
174
175 histogram.setLastReceivedLambda(lambda);
176 histogram.setLastReceiveddUdL(dUdL);
177
178 for (int i = 0; i < numProc; i++) {
179 try {
180 world.send(i, sendCountBuf);
181 } catch (Exception ex) {
182 String message = " Asynchronous Multiwalker OST send failed.";
183 logger.log(Level.SEVERE, message, ex);
184 }
185 }
186 }
187 }