Commons Exec 中,ExecuteWatchdog 类实现了 TimeoutObserver 接口,而 TimeoutObserver 接口的实现类将被 Commons Exec 的看门狗(Watchdog)通知,这里采用了观察者模式。TimeoutObserver 接口只提供了一个方法,签名如下:
timeoutOccured(Watchdog w) 当看门狗超时时调用
下面代码将演示执行“cmd.exe /c ping www.hxstrive.com -n 20”命令,该命令执行时长约 20 秒左右。我们通过 DefaultExecutor 的 setWatchdog() 方法添加一个看门狗 ExecuteWatchdog,看门狗将在 5秒后自动结束执行启动的外部进程。代码如下:
import org.apache.commons.exec.*; import java.io.ByteArrayOutputStream; /** * 使用 Commons Exec 调用 ping 命令,采用 Watchdog 监听超市 * @author hxstrive.com 2021/12/25 */ public class WatchdogDemo1 { public static void main(String[] args) throws Exception { // 1.构建命令行 CommandLine commandLine = new CommandLine("cmd.exe"); commandLine.addArgument("/c"); commandLine.addArgument("ping"); commandLine.addArgument("www.hxstrive.com"); commandLine.addArguments(new String[]{"-n", "20"}); // 2.建立接收输出信息的流 // 接收结果流 ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); // 接收错误信息流 ByteArrayOutputStream errorStream = new ByteArrayOutputStream(); // 3.创建执行器 DefaultExecutor executor = new DefaultExecutor(); executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream)); // 4.创建看门狗,5秒后自动结束进程 // 进程的超时时间(以毫秒为单位),一定是大于 0 或 'INFINITE_TIMEOUT' ExecuteWatchdog watchdog = new ExecuteWatchdog(5000); executor.setWatchdog(watchdog); // 5.执行命令 executor.execute(commandLine); // 6.输出结果 System.out.println(outputStream.toString("GBK")); System.out.println(errorStream.toString("GBK")); } }
这里将通过分析源码,简单了解 Commons Exec 是怎样去实现的。详细步骤如下:
我们先从示例中设置看门狗的地方进行一步一步的跟踪。下面是设置看门狗的代码:
// 3.创建执行器 DefaultExecutor executor = new DefaultExecutor(); // 4.创建看门狗,5秒后自动结束进程 // 进程的超时时间(以毫秒为单位),一定是大于 0 或 'INFINITE_TIMEOUT' ExecuteWatchdog watchdog = new ExecuteWatchdog(5000); executor.setWatchdog(watchdog);
setWatchdog() 方法代码如下:
public void setWatchdog(final ExecuteWatchdog watchDog) { this.watchdog = watchDog; }
这里仅仅是将看门狗对象设置到 DefaultExecutor 执行器中,最终目的是在执行命令时启动看门狗。所以我们可以分析命令执行方法 execute(),代码如下:
public void execute(final CommandLine command, final Map<String, String> environment, final ExecuteResultHandler handler) throws ExecuteException, IOException { if (workingDirectory != null && !workingDirectory.exists()) { throw new IOException(workingDirectory + " doesn't exist."); } if (watchdog != null) { // 将看门狗标的启动标志设置为false,表示看门狗没有启动 watchdog.setProcessNotStarted(); } // 创建一个 Runnable 类 final Runnable runnable = new Runnable() { public void run() { int exitValue = Executor.INVALID_EXITVALUE; try { // 调用 executeInternal() 方法去执行命令 exitValue = executeInternal(command, environment, workingDirectory, streamHandler); // 处理完成 handler.onProcessComplete(exitValue); } catch (final ExecuteException e) { // 处理失败 handler.onProcessFailed(e); } catch (final Exception e) { // 处理失败 handler.onProcessFailed(new ExecuteException("Execution failed", exitValue, e)); } } }; // 根据上面创建的 Runnable 类创建线程且启动该线程(Exec 默认执行器) this.executorThread = createThread(runnable, "Exec Default Executor"); getExecutorThread().start(); }
上面代码中,通过调用 executeInternal() 方法去执行具体的命令。
executeInternal() 方法代码如下:
private int executeInternal(final CommandLine command, final Map<String, String> environment, final File dir, final ExecuteStreamHandler streams) throws IOException { setExceptionCaught(null); // 调用命令,返回一个代表子进程的 Process 对象 // 根据该对象,我们可以销毁子进程、获取子进程的输入、输出和错误流 final Process process = this.launch(command, environment, dir); try { // 将子进程的输入、输出和错误流设置给 ExecuteStreamHandler(实现类 PumpStreamHandler) streams.setProcessInputStream(process.getOutputStream()); streams.setProcessOutputStream(process.getInputStream()); streams.setProcessErrorStream(process.getErrorStream()); } catch (final IOException e) { // 如果出错了,则直接销毁子进程 process.destroy(); throw e; } // 开始处理流 // 在 PumpStreamHandler 实现类中,开始分别启动输入、输出和错误线程去处理流 streams.start(); try { // 如果 VM 退出,则将该进程添加到要销毁的进程列表中 // add the process to the list of those to destroy if the VM exits if (this.getProcessDestroyer() != null) { this.getProcessDestroyer().add(process); } // 【重点】 // 将看门狗与新创建的进程相关联 // associate the watchdog with the newly created process if (watchdog != null) { watchdog.start(process); } int exitValue = Executor.INVALID_EXITVALUE; try { // 导致当前线程等待,一直要等到由该 Process 对象表示的进程已经终止 exitValue = process.waitFor(); } catch (final InterruptedException e) { // 线程中断异常,则销毁子进程 process.destroy(); } finally { // see http://bugs.sun.com/view_bug.do?bug_id=6420270 // see https://issues.apache.org/jira/browse/EXEC-46 // Process.waitFor should clear interrupt status when throwing InterruptedException // but we have to do that manually Thread.interrupted(); } if (watchdog != null) { // 停止看门狗 watchdog.stop(); } try { // 停止输入、输出和错误流的处理 streams.stop(); } catch (final IOException e) { setExceptionCaught(e); } closeProcessStreams(process); if (getExceptionCaught() != null) { throw getExceptionCaught(); } if (watchdog != null) { try { watchdog.checkException(); } catch (final IOException e) { throw e; } catch (final Exception e) { throw new IOException(e.getMessage()); } } if (this.isFailure(exitValue)) { throw new ExecuteException("Process exited with an error: " + exitValue, exitValue); } return exitValue; } finally { // 如果 VM 退出,则将进程移至要销毁的进程列表 // remove the process to the list of those to destroy if the VM exits if (this.getProcessDestroyer() != null) { this.getProcessDestroyer().remove(process); } } }
上面方法中,使用 watchdog.start(process) 启动看门狗,如下:
// ExecuteWatchdog.java public synchronized void start(final Process processToMonitor) { if (processToMonitor == null) { throw new NullPointerException("process is null."); } if (this.process != null) { throw new IllegalStateException("Already running."); } this.caught = null; this.killedProcess = false; this.watch = true; this.process = processToMonitor; this.processStarted = true; this.notifyAll(); if (this.hasWatchdog) { // 见 Watchdog.java 的 start() 方法 watchdog.start(); } } // Watchdog.java public synchronized void start() { stopped = false; final Thread t = new Thread(this, "WATCHDOG"); t.setDaemon(true); t.start(); }
使用 watchdog.stop() 销毁看门狗,如下:
// ExecuteWatchdog.java public synchronized void stop() { if (hasWatchdog) { watchdog.stop(); } watch = false; process = null; } // Watchdog.java public synchronized void stop() { stopped = true; notifyAll(); }
到这里,我们就知道 DefaultExecutor 类是怎样去启动和停止看门狗的。哪看门狗是如何帮我们监听子进程执行是否超时呢?这就需要仔细分析 Watchdog 类,分析如下:
public class Watchdog implements Runnable { // 存放所有的观察者对象 private final Vector<TimeoutObserver> observers = new Vector<TimeoutObserver>(1); // 存放子进程应该运行的时长,超过该时长则销毁子进程 private final long timeout; // 是否停止 private boolean stopped = false; public Watchdog(final long timeout) { if (timeout < 1) { throw new IllegalArgumentException("timeout must not be less than 1."); } this.timeout = timeout; } //... }
继续查看 run() 方法,如下:
public void run() { final long startTime = System.currentTimeMillis(); boolean isWaiting; synchronized (this) { long timeLeft = timeout - (System.currentTimeMillis() - startTime); isWaiting = timeLeft > 0; while (!stopped && isWaiting) { try { wait(timeLeft); } catch (final InterruptedException e) { } timeLeft = timeout - (System.currentTimeMillis() - startTime); isWaiting = timeLeft > 0; } } // notify the listeners outside of the synchronized block (see EXEC-60) if (!isWaiting) { // 销毁子进程,因为子进程运行超时了 fireTimeoutOccured(); } }
fireTimeoutOccured() 方法代码如下:
protected final void fireTimeoutOccured() { final Enumeration<TimeoutObserver> e = observers.elements(); while (e.hasMoreElements()) { // 逐个通知观察者 TimeoutObserver 的 timeoutOccured 方法 e.nextElement().timeoutOccured(this); } }