1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 package ffx.algorithms.commands;
39
40 import ffx.algorithms.cli.AlgorithmsCommand;
41 import ffx.utilities.FFXBinding;
42 import ffx.utilities.PortUtils;
43 import picocli.CommandLine.Command;
44 import picocli.CommandLine.Option;
45
46 import java.io.File;
47 import java.io.IOException;
48 import java.nio.file.Files;
49 import java.util.List;
50
51 import static java.lang.String.format;
52
53
54
55
56
57
58
59
60
61
62
63 @Command(description = " The Scheduler runs parallel jobs over nodes.", name = "Scheduler")
64 public class Scheduler extends AlgorithmsCommand {
65
66
67
68
69 @Option(names = {"-v", "--verbose"}, paramLabel = "", defaultValue = "false",
70 description = "Turn on verbose logging during backend Parallel Java startup.")
71 private boolean v = false;
72
73
74
75
76
77 @Option(names = {"-p", "--threadsPerProcess"}, paramLabel = "all", defaultValue = "-1",
78 description = "The number of cores (threads) per process (requires -Dpj.nt=X satisfies X <= p).")
79 private int p = -1;
80
81
82
83
84 @Option(names = {"-P", "--port"}, paramLabel = "20617", defaultValue = "20617",
85 description = "Set the port the front end server will listen on.")
86 private int port = 20617;
87
88
89
90
91 @Option(names = {"-W", "--webPort"}, paramLabel = "8080",
92 description = "Set the port the server will serve a webpage to.")
93 private int webPort = 8080;
94
95
96
97
98 @Option(names = {"--ib", "--ipoib"},
99 description = "Replace the \"hpc\" domain with the \"ipoib\" domain to use the Argon high-speed network.")
100 private boolean ipoib = false;
101
102
103
104
105 @Option(names = {"-e", "--hostfile"}, paramLabel = "PE_HOSTFILE",
106 description = "Environment variable that points to the host file.")
107 private String hostfileName = "PE_HOSTFILE";
108
109
110
111
112 @Option(names = {"-m", "--memory"}, paramLabel = "2G",
113 description = "String value of -Xmx to pass to worker nodes.")
114 private String memory = "2G";
115
116
117
118
119 public Scheduler() {
120 super();
121 }
122
123
124
125
126
127
128 public Scheduler(FFXBinding binding) {
129 super(binding);
130 }
131
132
133
134
135
136
137 public Scheduler(String[] args) {
138 super(args);
139 }
140
141
142
143
144 @Override
145 public Scheduler run() {
146
147 if (!init()) {
148 return this;
149 }
150
151
152 int CPUs = Runtime.getRuntime().availableProcessors();
153
154
155 int processes = 1;
156
157 if (p > 0) {
158 int n = p;
159 if (n < CPUs && CPUs % n == 0) {
160 processes = CPUs / n;
161 CPUs = n;
162 }
163 }
164
165
166 String hostsFile = System.getenv(hostfileName);
167
168
169 String[] hostnames = new String[1];
170 hostnames[0] = "localhost";
171
172 if (hostsFile == null) {
173 logger.info(" The " + hostfileName + " environment variable is empty.");
174 logger.info(" Only localhost will be used.\n");
175 } else {
176
177 File host = new File(hostsFile);
178 if (!host.exists() || !host.canRead()) {
179 logger.info(" The file path specified by the " + hostfileName
180 + " environment variable does not exist or cannot be read.");
181 logger.info(" Only localhost will be used.\n");
182 } else {
183
184 try {
185 List<String> nodes = Files.readAllLines(host.toPath());
186 hostnames = new String[nodes.size()];
187 int i = 0;
188 for (String line : nodes) {
189 hostnames[i] = line.split(" +")[0];
190 if (ipoib) {
191 hostnames[i] = hostnames[i].replace("hpc", "ipoib");
192 }
193 i++;
194 }
195 } catch (IOException e) {
196 logger.info(" Error reading host file: " + e.getMessage());
197 logger.info(" Only localhost will be used.\n");
198 }
199 }
200 }
201
202
203 if (port <= 1) {
204 logger.info(" The scheduler port must be greater than 1; the default of 20617 will be used.");
205 port = 20617;
206 }
207
208
209 while (!PortUtils.isTcpPortAvailable(port)) {
210 logger.info(format(" Scheduler port %d is not available.", port));
211 if (++port > PortUtils.MAX_TCP_PORT) {
212 logger.severe(" Reached port 65535 without finding an open scheduler port!");
213 }
214 }
215 logger.info(format(" Scheduler port: %d.", port));
216 String logFile = format("scheduler.%d.log", port);
217
218
219 while (!PortUtils.isTcpPortAvailable(webPort) || webPort == port) {
220 logger.info(format(" Web port %d is not available.", webPort));
221 if (++webPort > PortUtils.MAX_TCP_PORT) {
222 logger.severe(" Reached port 65535 without finding an open web port!");
223 }
224 }
225 logger.info(format(" Web port: %d.", webPort));
226
227
228 String frontend = hostnames[0];
229 StringBuilder sb = new StringBuilder();
230 sb.append("# Force Field X Cluster Configuration File\n");
231 sb.append("cluster Force Field X Cluster\n");
232 sb.append(format("logfile %s\n", logFile));
233 sb.append("webhost 127.0.0.1\n");
234 sb.append(format("webport %d\n", webPort));
235 sb.append("schedulerhost localhost\n");
236 sb.append(format("schedulerport %d\n", port));
237 sb.append(format("frontendhost %s\n", frontend));
238
239
240 String javaHome = System.getProperty("java.home");
241 String java = javaHome + "/bin/java";
242
243
244 String classpath = System.getProperty("java.class.path");
245
246
247 String arg = "-Xmx" + memory;
248
249 if (v) {
250 arg += " -Dpj.verbose=true";
251 }
252
253
254 arg += " --add-modules jdk.incubator.vector";
255
256
257 int i = 0;
258 for (int proc = 0; proc < processes; proc++) {
259 for (String node : hostnames) {
260 sb.append("backend node" + i + " "
261 + CPUs + " "
262 + node + " "
263 + java + " "
264 + classpath + " "
265 + arg + "\n");
266 i++;
267 }
268 }
269
270
271 String pjConfig = "cluster.txt";
272 File config = new File(pjConfig);
273 try {
274 Files.writeString(config.toPath(), sb.toString());
275 } catch (IOException e) {
276 logger.severe(" Error writing configuration file: " + e.getMessage());
277 return this;
278 }
279
280
281 String ffxHome = System.getProperty("basedir");
282
283 try {
284 ProcessBuilder pb = new ProcessBuilder(ffxHome + "/bin/scheduler", pjConfig);
285
286
287
288
289 Process process = pb.start();
290 process.waitFor();
291 } catch (IOException | InterruptedException e) {
292 logger.severe(" Error executing scheduler: " + e.getMessage());
293 if (e instanceof InterruptedException) {
294 Thread.currentThread().interrupt();
295 }
296 }
297
298 return this;
299 }
300
301 }