Commons Exec 中,ExecuteWatchdog 类实现了 TimeoutObserver 接口,而 TimeoutObserver 接口的实现类将被 Commons Exec 的看门狗(Watchdog)通知,这里采用了观察者模式。TimeoutObserver 接口只提供了一个方法,签名如下:
timeoutOccured(Watchdog w) 当看门狗超时时调用
下面代码将演示执行“cmd.exe /c ping www.hxstrive.com -n 20”命令,该命令执行时长约 20 秒左右。我们通过 DefaultExecutor 的 setWatchdog() 方法添加一个看门狗 ExecuteWatchdog,看门狗将在 5秒后自动结束执行启动的外部进程。代码如下:
import org.apache.commons.exec.*;
import java.io.ByteArrayOutputStream;
/**
* 使用 Commons Exec 调用 ping 命令,采用 Watchdog 监听超市
* @author hxstrive.com 2021/12/25
*/
public class WatchdogDemo1 {
public static void main(String[] args) throws Exception {
// 1.构建命令行
CommandLine commandLine = new CommandLine("cmd.exe");
commandLine.addArgument("/c");
commandLine.addArgument("ping");
commandLine.addArgument("www.hxstrive.com");
commandLine.addArguments(new String[]{"-n", "20"});
// 2.建立接收输出信息的流
// 接收结果流
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
// 接收错误信息流
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
// 3.创建执行器
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
// 4.创建看门狗,5秒后自动结束进程
// 进程的超时时间(以毫秒为单位),一定是大于 0 或 'INFINITE_TIMEOUT'
ExecuteWatchdog watchdog = new ExecuteWatchdog(5000);
executor.setWatchdog(watchdog);
// 5.执行命令
executor.execute(commandLine);
// 6.输出结果
System.out.println(outputStream.toString("GBK"));
System.out.println(errorStream.toString("GBK"));
}
}这里将通过分析源码,简单了解 Commons Exec 是怎样去实现的。详细步骤如下:
我们先从示例中设置看门狗的地方进行一步一步的跟踪。下面是设置看门狗的代码:
// 3.创建执行器 DefaultExecutor executor = new DefaultExecutor(); // 4.创建看门狗,5秒后自动结束进程 // 进程的超时时间(以毫秒为单位),一定是大于 0 或 'INFINITE_TIMEOUT' ExecuteWatchdog watchdog = new ExecuteWatchdog(5000); executor.setWatchdog(watchdog);
setWatchdog() 方法代码如下:
public void setWatchdog(final ExecuteWatchdog watchDog) {
this.watchdog = watchDog;
}这里仅仅是将看门狗对象设置到 DefaultExecutor 执行器中,最终目的是在执行命令时启动看门狗。所以我们可以分析命令执行方法 execute(),代码如下:
public void execute(final CommandLine command, final Map<String, String> environment,
final ExecuteResultHandler handler) throws ExecuteException, IOException {
if (workingDirectory != null && !workingDirectory.exists()) {
throw new IOException(workingDirectory + " doesn't exist.");
}
if (watchdog != null) {
// 将看门狗标的启动标志设置为false,表示看门狗没有启动
watchdog.setProcessNotStarted();
}
// 创建一个 Runnable 类
final Runnable runnable = new Runnable()
{
public void run()
{
int exitValue = Executor.INVALID_EXITVALUE;
try {
// 调用 executeInternal() 方法去执行命令
exitValue = executeInternal(command, environment, workingDirectory, streamHandler);
// 处理完成
handler.onProcessComplete(exitValue);
} catch (final ExecuteException e) {
// 处理失败
handler.onProcessFailed(e);
} catch (final Exception e) {
// 处理失败
handler.onProcessFailed(new ExecuteException("Execution failed", exitValue, e));
}
}
};
// 根据上面创建的 Runnable 类创建线程且启动该线程(Exec 默认执行器)
this.executorThread = createThread(runnable, "Exec Default Executor");
getExecutorThread().start();
}上面代码中,通过调用 executeInternal() 方法去执行具体的命令。
executeInternal() 方法代码如下:
private int executeInternal(final CommandLine command, final Map<String, String> environment,
final File dir, final ExecuteStreamHandler streams) throws IOException {
setExceptionCaught(null);
// 调用命令,返回一个代表子进程的 Process 对象
// 根据该对象,我们可以销毁子进程、获取子进程的输入、输出和错误流
final Process process = this.launch(command, environment, dir);
try {
// 将子进程的输入、输出和错误流设置给 ExecuteStreamHandler(实现类 PumpStreamHandler)
streams.setProcessInputStream(process.getOutputStream());
streams.setProcessOutputStream(process.getInputStream());
streams.setProcessErrorStream(process.getErrorStream());
} catch (final IOException e) {
// 如果出错了,则直接销毁子进程
process.destroy();
throw e;
}
// 开始处理流
// 在 PumpStreamHandler 实现类中,开始分别启动输入、输出和错误线程去处理流
streams.start();
try {
// 如果 VM 退出,则将该进程添加到要销毁的进程列表中
// add the process to the list of those to destroy if the VM exits
if (this.getProcessDestroyer() != null) {
this.getProcessDestroyer().add(process);
}
// 【重点】
// 将看门狗与新创建的进程相关联
// associate the watchdog with the newly created process
if (watchdog != null) {
watchdog.start(process);
}
int exitValue = Executor.INVALID_EXITVALUE;
try {
// 导致当前线程等待,一直要等到由该 Process 对象表示的进程已经终止
exitValue = process.waitFor();
} catch (final InterruptedException e) {
// 线程中断异常,则销毁子进程
process.destroy();
}
finally {
// see http://bugs.sun.com/view_bug.do?bug_id=6420270
// see https://issues.apache.org/jira/browse/EXEC-46
// Process.waitFor should clear interrupt status when throwing InterruptedException
// but we have to do that manually
Thread.interrupted();
}
if (watchdog != null) {
// 停止看门狗
watchdog.stop();
}
try {
// 停止输入、输出和错误流的处理
streams.stop();
}
catch (final IOException e) {
setExceptionCaught(e);
}
closeProcessStreams(process);
if (getExceptionCaught() != null) {
throw getExceptionCaught();
}
if (watchdog != null) {
try {
watchdog.checkException();
} catch (final IOException e) {
throw e;
} catch (final Exception e) {
throw new IOException(e.getMessage());
}
}
if (this.isFailure(exitValue)) {
throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
}
return exitValue;
} finally {
// 如果 VM 退出,则将进程移至要销毁的进程列表
// remove the process to the list of those to destroy if the VM exits
if (this.getProcessDestroyer() != null) {
this.getProcessDestroyer().remove(process);
}
}
}上面方法中,使用 watchdog.start(process) 启动看门狗,如下:
// ExecuteWatchdog.java
public synchronized void start(final Process processToMonitor) {
if (processToMonitor == null) {
throw new NullPointerException("process is null.");
}
if (this.process != null) {
throw new IllegalStateException("Already running.");
}
this.caught = null;
this.killedProcess = false;
this.watch = true;
this.process = processToMonitor;
this.processStarted = true;
this.notifyAll();
if (this.hasWatchdog) {
// 见 Watchdog.java 的 start() 方法
watchdog.start();
}
}
// Watchdog.java
public synchronized void start() {
stopped = false;
final Thread t = new Thread(this, "WATCHDOG");
t.setDaemon(true);
t.start();
}使用 watchdog.stop() 销毁看门狗,如下:
// ExecuteWatchdog.java
public synchronized void stop() {
if (hasWatchdog) {
watchdog.stop();
}
watch = false;
process = null;
}
// Watchdog.java
public synchronized void stop() {
stopped = true;
notifyAll();
}到这里,我们就知道 DefaultExecutor 类是怎样去启动和停止看门狗的。哪看门狗是如何帮我们监听子进程执行是否超时呢?这就需要仔细分析 Watchdog 类,分析如下:
public class Watchdog implements Runnable {
// 存放所有的观察者对象
private final Vector<TimeoutObserver> observers = new Vector<TimeoutObserver>(1);
// 存放子进程应该运行的时长,超过该时长则销毁子进程
private final long timeout;
// 是否停止
private boolean stopped = false;
public Watchdog(final long timeout) {
if (timeout < 1) {
throw new IllegalArgumentException("timeout must not be less than 1.");
}
this.timeout = timeout;
}
//...
}继续查看 run() 方法,如下:
public void run() {
final long startTime = System.currentTimeMillis();
boolean isWaiting;
synchronized (this) {
long timeLeft = timeout - (System.currentTimeMillis() - startTime);
isWaiting = timeLeft > 0;
while (!stopped && isWaiting) {
try {
wait(timeLeft);
} catch (final InterruptedException e) {
}
timeLeft = timeout - (System.currentTimeMillis() - startTime);
isWaiting = timeLeft > 0;
}
}
// notify the listeners outside of the synchronized block (see EXEC-60)
if (!isWaiting) {
// 销毁子进程,因为子进程运行超时了
fireTimeoutOccured();
}
}fireTimeoutOccured() 方法代码如下:
protected final void fireTimeoutOccured() {
final Enumeration<TimeoutObserver> e = observers.elements();
while (e.hasMoreElements()) {
// 逐个通知观察者 TimeoutObserver 的 timeoutOccured 方法
e.nextElement().timeoutOccured(this);
}
}