Commons Exec 示例:ExecuteWatchdog 类的用法

本文将介绍怎样利用 ExecuteWatchdog 类去监听我们使用 Commons Exec 启动的外部进程,如果外部进程执行时长超过指定的时长,自动停止外部进程运行。

Commons Exec 中,ExecuteWatchdog 类实现了 TimeoutObserver 接口,而 TimeoutObserver 接口的实现类将被 Commons Exec 的看门狗(Watchdog)通知,这里采用了观察者模式。TimeoutObserver 接口只提供了一个方法,签名如下:

  • timeoutOccured(Watchdog w) 当看门狗超时时调用

示例

下面代码将演示执行“cmd.exe /c ping www.hxstrive.com -n 20”命令,该命令执行时长约 20 秒左右。我们通过 DefaultExecutor 的 setWatchdog() 方法添加一个看门狗 ExecuteWatchdog,看门狗将在 5秒后自动结束执行启动的外部进程。代码如下:

import org.apache.commons.exec.*;
import java.io.ByteArrayOutputStream;

/**
 * 使用 Commons Exec 调用 ping 命令,采用 Watchdog 监听超市
 * @author hxstrive.com 2021/12/25
 */
public class WatchdogDemo1 {

    public static void main(String[] args) throws Exception {
        // 1.构建命令行
        CommandLine commandLine = new CommandLine("cmd.exe");
        commandLine.addArgument("/c");
        commandLine.addArgument("ping");
        commandLine.addArgument("www.hxstrive.com");
        commandLine.addArguments(new String[]{"-n", "20"});

        // 2.建立接收输出信息的流
        // 接收结果流
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        // 接收错误信息流
        ByteArrayOutputStream errorStream = new ByteArrayOutputStream();

        // 3.创建执行器
        DefaultExecutor executor = new DefaultExecutor();
        executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));

        // 4.创建看门狗,5秒后自动结束进程
        // 进程的超时时间(以毫秒为单位),一定是大于 0 或 'INFINITE_TIMEOUT'
        ExecuteWatchdog watchdog = new ExecuteWatchdog(5000);
        executor.setWatchdog(watchdog);

        // 5.执行命令
        executor.execute(commandLine);

        // 6.输出结果
        System.out.println(outputStream.toString("GBK"));
        System.out.println(errorStream.toString("GBK"));
    }

}

原理

这里将通过分析源码,简单了解 Commons Exec 是怎样去实现的。详细步骤如下:

DefaultExecutor 类

我们先从示例中设置看门狗的地方进行一步一步的跟踪。下面是设置看门狗的代码:

// 3.创建执行器
DefaultExecutor executor = new DefaultExecutor();
// 4.创建看门狗,5秒后自动结束进程
// 进程的超时时间(以毫秒为单位),一定是大于 0 或 'INFINITE_TIMEOUT'
ExecuteWatchdog watchdog = new ExecuteWatchdog(5000);
executor.setWatchdog(watchdog);

setWatchdog() 方法代码如下:

public void setWatchdog(final ExecuteWatchdog watchDog) {
    this.watchdog = watchDog;
}

这里仅仅是将看门狗对象设置到 DefaultExecutor 执行器中,最终目的是在执行命令时启动看门狗。所以我们可以分析命令执行方法 execute(),代码如下:

public void execute(final CommandLine command, final Map<String, String> environment,
        final ExecuteResultHandler handler) throws ExecuteException, IOException {

    if (workingDirectory != null && !workingDirectory.exists()) {
        throw new IOException(workingDirectory + " doesn't exist.");
    }

    if (watchdog != null) {
        // 将看门狗标的启动标志设置为false,表示看门狗没有启动
        watchdog.setProcessNotStarted();
    }

    // 创建一个 Runnable 类
    final Runnable runnable = new Runnable()
    {
        public void run()
        {
            int exitValue = Executor.INVALID_EXITVALUE;
            try {
                // 调用 executeInternal() 方法去执行命令
                exitValue = executeInternal(command, environment, workingDirectory, streamHandler);
                // 处理完成
                handler.onProcessComplete(exitValue);
            } catch (final ExecuteException e) {
                // 处理失败
                handler.onProcessFailed(e);
            } catch (final Exception e) {
                // 处理失败
                handler.onProcessFailed(new ExecuteException("Execution failed", exitValue, e));
            }
        }
    };

    // 根据上面创建的 Runnable 类创建线程且启动该线程(Exec 默认执行器)
    this.executorThread = createThread(runnable, "Exec Default Executor");
    getExecutorThread().start();
}

上面代码中,通过调用 executeInternal() 方法去执行具体的命令。

executeInternal() 方法代码如下:

private int executeInternal(final CommandLine command, final Map<String, String> environment,
        final File dir, final ExecuteStreamHandler streams) throws IOException {

    setExceptionCaught(null);

    // 调用命令,返回一个代表子进程的 Process 对象
    // 根据该对象,我们可以销毁子进程、获取子进程的输入、输出和错误流
    final Process process = this.launch(command, environment, dir);

    try {
        // 将子进程的输入、输出和错误流设置给 ExecuteStreamHandler(实现类 PumpStreamHandler)
        streams.setProcessInputStream(process.getOutputStream());
        streams.setProcessOutputStream(process.getInputStream());
        streams.setProcessErrorStream(process.getErrorStream());
    } catch (final IOException e) {
        // 如果出错了,则直接销毁子进程
        process.destroy();
        throw e;
    }

    // 开始处理流
    // 在 PumpStreamHandler 实现类中,开始分别启动输入、输出和错误线程去处理流
    streams.start();

    try {
        // 如果 VM 退出,则将该进程添加到要销毁的进程列表中
        // add the process to the list of those to destroy if the VM exits
        if (this.getProcessDestroyer() != null) {
          this.getProcessDestroyer().add(process);
        }

        // 【重点】
        // 将看门狗与新创建的进程相关联            
        // associate the watchdog with the newly created process
        if (watchdog != null) {
            watchdog.start(process);
        }

        int exitValue = Executor.INVALID_EXITVALUE;

        try {
            // 导致当前线程等待,一直要等到由该 Process 对象表示的进程已经终止
            exitValue = process.waitFor();
        } catch (final InterruptedException e) {
            // 线程中断异常,则销毁子进程
            process.destroy();
        }
        finally {
            // see http://bugs.sun.com/view_bug.do?bug_id=6420270
            // see https://issues.apache.org/jira/browse/EXEC-46
            // Process.waitFor should clear interrupt status when throwing InterruptedException
            // but we have to do that manually
            Thread.interrupted();
        }            

        if (watchdog != null) {
            // 停止看门狗
            watchdog.stop();
        }

        try {
            // 停止输入、输出和错误流的处理
            streams.stop();
        }
        catch (final IOException e) {
            setExceptionCaught(e);
        }

        closeProcessStreams(process);

        if (getExceptionCaught() != null) {
            throw getExceptionCaught();
        }

        if (watchdog != null) {
            try {
                watchdog.checkException();
            } catch (final IOException e) {
                throw e;
            } catch (final Exception e) {
                throw new IOException(e.getMessage());
            }
        }

        if (this.isFailure(exitValue)) {
            throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
        }

        return exitValue;
    } finally {
        // 如果 VM 退出,则将进程移至要销毁的进程列表
        // remove the process to the list of those to destroy if the VM exits
        if (this.getProcessDestroyer() != null) {
          this.getProcessDestroyer().remove(process);
        }
    }
}

上面方法中,使用 watchdog.start(process) 启动看门狗,如下:

// ExecuteWatchdog.java
public synchronized void start(final Process processToMonitor) {
    if (processToMonitor == null) {
        throw new NullPointerException("process is null.");
    }
    if (this.process != null) {
        throw new IllegalStateException("Already running.");
    }
    this.caught = null;
    this.killedProcess = false;
    this.watch = true;
    this.process = processToMonitor;
    this.processStarted = true;
    this.notifyAll();
    if (this.hasWatchdog) {
        // 见 Watchdog.java 的 start() 方法
        watchdog.start();
    }
}

// Watchdog.java
public synchronized void start() {
    stopped = false;
    final Thread t = new Thread(this, "WATCHDOG");
    t.setDaemon(true);
    t.start();
}

使用 watchdog.stop() 销毁看门狗,如下:

// ExecuteWatchdog.java
public synchronized void stop() {
    if (hasWatchdog) {
        watchdog.stop();
    }
    watch = false;
    process = null;
}

// Watchdog.java
public synchronized void stop() {
    stopped = true;
    notifyAll();
}

到这里,我们就知道 DefaultExecutor 类是怎样去启动和停止看门狗的。哪看门狗是如何帮我们监听子进程执行是否超时呢?这就需要仔细分析 Watchdog 类,分析如下:

public class Watchdog implements Runnable {
    // 存放所有的观察者对象
    private final Vector<TimeoutObserver> observers = new Vector<TimeoutObserver>(1);
    // 存放子进程应该运行的时长,超过该时长则销毁子进程
    private final long timeout;
    // 是否停止
    private boolean stopped = false;

    public Watchdog(final long timeout) {
        if (timeout < 1) {
            throw new IllegalArgumentException("timeout must not be less than 1.");
        }
        this.timeout = timeout;
    }
    //...
}

继续查看 run() 方法,如下:

public void run() {
    final long startTime = System.currentTimeMillis();
    boolean isWaiting;
    synchronized (this) {
        long timeLeft = timeout - (System.currentTimeMillis() - startTime);
        isWaiting = timeLeft > 0;
        while (!stopped && isWaiting) {
            try {
                wait(timeLeft);
            } catch (final InterruptedException e) {
            }
            timeLeft = timeout - (System.currentTimeMillis() - startTime);
            isWaiting = timeLeft > 0;
        }
    }

    // notify the listeners outside of the synchronized block (see EXEC-60)
    if (!isWaiting) {
        // 销毁子进程,因为子进程运行超时了
        fireTimeoutOccured();
    }
}

fireTimeoutOccured() 方法代码如下:

protected final void fireTimeoutOccured() {
    final Enumeration<TimeoutObserver> e = observers.elements();
    while (e.hasMoreElements()) {
        // 逐个通知观察者 TimeoutObserver 的 timeoutOccured 方法
        e.nextElement().timeoutOccured(this);
    }
}
学习必须与实干相结合。 —— 泰戈尔
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号