View Javadoc
1   //******************************************************************************
2   //
3   // Title:       Force Field X.
4   // Description: Force Field X - Software for Molecular Biophysics.
5   // Copyright:   Copyright (c) Michael J. Schnieders 2001-2025.
6   //
7   // This file is part of Force Field X.
8   //
9   // Force Field X is free software; you can redistribute it and/or modify it
10  // under the terms of the GNU General Public License version 3 as published by
11  // the Free Software Foundation.
12  //
13  // Force Field X is distributed in the hope that it will be useful, but WITHOUT
14  // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15  // FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16  // details.
17  //
18  // You should have received a copy of the GNU General Public License along with
19  // Force Field X; if not, write to the Free Software Foundation, Inc., 59 Temple
20  // Place, Suite 330, Boston, MA 02111-1307 USA
21  //
22  // Linking this library statically or dynamically with other modules is making a
23  // combined work based on this library. Thus, the terms and conditions of the
24  // GNU General Public License cover the whole combination.
25  //
26  // As a special exception, the copyright holders of this library give you
27  // permission to link this library with independent modules to produce an
28  // executable, regardless of the license terms of these independent modules, and
29  // to copy and distribute the resulting executable under terms of your choice,
30  // provided that you also meet, for each linked independent module, the terms
31  // and conditions of the license of that module. An independent module is a
32  // module which is not derived from or based on this library. If you modify this
33  // library, you may extend this exception to your version of the library, but
34  // you are not obligated to do so. If you do not wish to do so, delete this
35  // exception statement from your version.
36  //
37  //******************************************************************************
38  package ffx.algorithms.commands;
39  
40  import ffx.algorithms.cli.AlgorithmsCommand;
41  import ffx.utilities.FFXBinding;
42  import ffx.utilities.PortUtils;
43  import picocli.CommandLine.Command;
44  import picocli.CommandLine.Option;
45  
46  import java.io.File;
47  import java.io.IOException;
48  import java.nio.file.Files;
49  import java.util.List;
50  
51  import static java.lang.String.format;
52  
53  /**
54   * Run a Parallel Java Job Scheduler on a cluster with multiple nodes.
55   *
56   * <br>
57   * Usage:
58   * <br>
59   * ffxc Scheduler [options] &lt;filename&gt;
60   *
61   * @author Michael Schnieders
62   */
63  @Command(description = " The Scheduler runs parallel jobs over nodes.", name = "Scheduler")
64  public class Scheduler extends AlgorithmsCommand {
65  
66    /**
67     * -v or --verbose Turn on verbose logging during backend Parallel Java startup.
68     */
69    @Option(names = {"-v", "--verbose"}, paramLabel = "", defaultValue = "false",
70        description = "Turn on verbose logging during backend Parallel Java startup.")
71    private boolean v = false;
72  
73    /**
74     * -p or --threadsPerProcess The number of cores (threads) per process (requires -Dpj.nt=x satisfies x <= p).
75     * The default is all available cores.
76     */
77    @Option(names = {"-p", "--threadsPerProcess"}, paramLabel = "all", defaultValue = "-1",
78        description = "The number of cores (threads) per process (requires -Dpj.nt=X satisfies X <= p).")
79    private int p = -1;
80  
81    /**
82     * -P or --port to define the port the Server will use.
83     */
84    @Option(names = {"-P", "--port"}, paramLabel = "20617", defaultValue = "20617",
85        description = "Set the port the front end server will listen on.")
86    private int port = 20617;
87  
88    /**
89     * -W or --webPort to define the port the Server will serve a webpage to (generally not used).
90     */
91    @Option(names = {"-W", "--webPort"}, paramLabel = "8080",
92        description = "Set the port the server will serve a webpage to.")
93    private int webPort = 8080;
94  
95    /**
96     * --ib or --infiniband Replace the "hpc" domain with the "ipoib" domain to use the Argon high-speed network.
97     */
98    @Option(names = {"--ib", "--ipoib"},
99        description = "Replace the \"hpc\" domain with the \"ipoib\" domain to use the Argon high-speed network.")
100   private boolean ipoib = false;
101 
102   /**
103    * -e or --hostfile to define the environment variable that points to the host file (default is PE_HOSTFILE).
104    */
105   @Option(names = {"-e", "--hostfile"}, paramLabel = "PE_HOSTFILE",
106       description = "Environment variable that points to the host file.")
107   private String hostfileName = "PE_HOSTFILE";
108 
109   /**
110    * -m or --memory to define the string value of -Xmx to pass to worker nodes (default is '2G').
111    */
112   @Option(names = {"-m", "--memory"}, paramLabel = "2G",
113       description = "String value of -Xmx to pass to worker nodes.")
114   private String memory = "2G";
115 
116   /**
117    * Scheduler Constructor.
118    */
119   public Scheduler() {
120     super();
121   }
122 
123   /**
124    * Scheduler Constructor.
125    *
126    * @param binding The Binding to use.
127    */
128   public Scheduler(FFXBinding binding) {
129     super(binding);
130   }
131 
132   /**
133    * Scheduler constructor that sets the command line arguments.
134    *
135    * @param args Command line arguments.
136    */
137   public Scheduler(String[] args) {
138     super(args);
139   }
140 
141   /**
142    * {@inheritDoc}
143    */
144   @Override
145   public Scheduler run() {
146 
147     if (!init()) {
148       return this;
149     }
150 
151     // Determine the number of CPUs per node
152     int CPUs = Runtime.getRuntime().availableProcessors();
153 
154     // The default is 1 process per node.
155     int processes = 1;
156     // More than 1 process per node can be selected if no cores are wasted.
157     if (p > 0) {
158       int n = p;
159       if (n < CPUs && CPUs % n == 0) {
160         processes = CPUs / n;
161         CPUs = n;
162       }
163     }
164 
165     // Read in the Parallel Environment Host File environment variable.
166     String hostsFile = System.getenv(hostfileName);
167 
168     // Default to using only the current node (i.e. "localhost")
169     String[] hostnames = new String[1];
170     hostnames[0] = "localhost";
171 
172     if (hostsFile == null) {
173       logger.info(" The " + hostfileName + " environment variable is empty.");
174       logger.info(" Only localhost will be used.\n");
175     } else {
176       // Check that the supplied file exists and is readable.
177       File host = new File(hostsFile);
178       if (!host.exists() || !host.canRead()) {
179         logger.info(" The file path specified by the " + hostfileName
180             + " environment variable does not exist or cannot be read.");
181         logger.info(" Only localhost will be used.\n");
182       } else {
183         // Read in the hosts.
184         try {
185           List<String> nodes = Files.readAllLines(host.toPath());
186           hostnames = new String[nodes.size()];
187           int i = 0;
188           for (String line : nodes) {
189             hostnames[i] = line.split(" +")[0];
190             if (ipoib) {
191               hostnames[i] = hostnames[i].replace("hpc", "ipoib");
192             }
193             i++;
194           }
195         } catch (IOException e) {
196           logger.info(" Error reading host file: " + e.getMessage());
197           logger.info(" Only localhost will be used.\n");
198         }
199       }
200     }
201 
202     // Check for an invalid requested port.
203     if (port <= 1) {
204       logger.info(" The scheduler port must be greater than 1; the default of 20617 will be used.");
205       port = 20617;
206     }
207 
208     // Check the availability of the desired Scheduler port.
209     while (!PortUtils.isTcpPortAvailable(port)) {
210       logger.info(format(" Scheduler port %d is not available.", port));
211       if (++port > PortUtils.MAX_TCP_PORT) {
212         logger.severe(" Reached port 65535 without finding an open scheduler port!");
213       }
214     }
215     logger.info(format(" Scheduler port: %d.", port));
216     String logFile = format("scheduler.%d.log", port);
217 
218     // Check the availability of the desired web port.
219     while (!PortUtils.isTcpPortAvailable(webPort) || webPort == port) {
220       logger.info(format(" Web port %d is not available.", webPort));
221       if (++webPort > PortUtils.MAX_TCP_PORT) {
222         logger.severe(" Reached port 65535 without finding an open web port!");
223       }
224     }
225     logger.info(format(" Web port: %d.", webPort));
226 
227     // Create the Parallel Java cluster configuration file.
228     String frontend = hostnames[0];
229     StringBuilder sb = new StringBuilder();
230     sb.append("# Force Field X Cluster Configuration File\n");
231     sb.append("cluster Force Field X Cluster\n");
232     sb.append(format("logfile %s\n", logFile));
233     sb.append("webhost 127.0.0.1\n");
234     sb.append(format("webport %d\n", webPort));
235     sb.append("schedulerhost localhost\n");
236     sb.append(format("schedulerport %d\n", port));
237     sb.append(format("frontendhost %s\n", frontend));
238 
239     // Locate the Java executable being used.
240     String javaHome = System.getProperty("java.home");
241     String java = javaHome + "/bin/java";
242 
243     // Set the classpath.
244     String classpath = System.getProperty("java.class.path");
245 
246     // args = "-Xmx" + memory
247     String arg = "-Xmx" + memory;
248 
249     if (v) {
250       arg += " -Dpj.verbose=true";
251     }
252 
253     // Add support for vector instructions.
254     arg += " --add-modules jdk.incubator.vector";
255 
256     // Create an entry for each process
257     int i = 0;
258     for (int proc = 0; proc < processes; proc++) {
259       for (String node : hostnames) {
260         sb.append("backend node" + i + " "
261             + CPUs + " "
262             + node + " "
263             + java + " "
264             + classpath + " "
265             + arg + "\n");
266         i++;
267       }
268     }
269 
270     // Write the Parallel Java config file.
271     String pjConfig = "cluster.txt";
272     File config = new File(pjConfig);
273     try {
274       Files.writeString(config.toPath(), sb.toString());
275     } catch (IOException e) {
276       logger.severe(" Error writing configuration file: " + e.getMessage());
277       return this;
278     }
279 
280     // Locate the version of FFX being used.
281     String ffxHome = System.getProperty("basedir");
282     // Run the Parallel Java Scheduler.
283     try {
284       ProcessBuilder pb = new ProcessBuilder(ffxHome + "/bin/scheduler", pjConfig);
285       // Merge stderr into stdout
286       // pb.redirectErrorStream(true);
287       // Inherit parent's output
288       // pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
289       Process process = pb.start();
290       process.waitFor();
291     } catch (IOException | InterruptedException e) {
292       logger.severe(" Error executing scheduler: " + e.getMessage());
293       if (e instanceof InterruptedException) {
294         Thread.currentThread().interrupt();
295       }
296     }
297 
298     return this;
299   }
300 
301 }