Java IO:管道(Pipe)

在 Java IO 中,管道(Pipe)是一种用于在同一 JVM 中不同线程之间进行通信的机制,它可以实现线程之间的数据传输。因此,管道也可以是数据的来源或目的地。如下图:

image.png

注意:Java IO 中,你不能使用管道与不同 JVM(不同进程)中的线程通信。Java 中的管道概念不同于 Unix / Linux 中的管道概念,在 Unix / Linux 中,运行在不同地址空间的两个进程可以通过管道进行通信。在 Java 中,通信双方必须运行在同一个进程中(即同一个 JVM),而且应该是不同的线程。

通过 Java IO 创建管道

使用 Java IO 创建管道是通过 PipedOutputStream PipedInputStream 类完成的。一个 PipedInputStream 应连接到一个 PipedOutputStream。一个线程写入 PipedOutputStream 的数据可由另一个线程从连接的 PipedInputStream 读取。

Java IO 管道示例

下面是一个如何将管道输入流连接到管道输出流的简单示例:

package com.hxstrive.java_io.demo01;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

/**
 * 管道流示例
 * @author hxstrive.com
 */
public class PipeDemo1 {

    public static void main(String[] args) throws IOException {
        // 创建管道流对象
        final PipedOutputStream output = new PipedOutputStream();
        final PipedInputStream  input  = new PipedInputStream(output);

        // 线程1,向管道中写入数据
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    output.write("Hello world, pipe!".getBytes());
                    output.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

        // 线程2,从管道中读取数据
        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    int data = input.read();
                    while(data != -1){
                        System.out.print((char) data);
                        data = input.read();
                    }
                    // 输出:Hello world, pipe!
                    input.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

        // 启动线程
        thread1.start();
        thread2.start();
    }
}

你还可以使用两个管道流的 connect() 方法将它们连接起来。PipedInputStream 和 PipedOutputStream 都有一个 connect() 方法,可以将其中一个连接到另一个。例如:

// 创建管道流对象
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream  input  = new PipedInputStream();
input.connect(output);

或者

// 创建管道流对象
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream  input  = new PipedInputStream();
output.connect(input);

管道和线程

管道是线程间通信的工具,用于实现不同线程之间的数据传输和协作。

一定要记住,在使用两个连接的管道流时,应将一个流传递给一个线程,另一个流传递给另一个线程。管道流上的 read() 和 write() 调用是阻塞性的,这意味着如果尝试使用同一线程同时调用 read() 和 write(),可能会导致线程自身陷入死锁。

以下是管道和线程的关系:

(1)管道是线程间通信的桥梁:管道允许一个线程(生产者)向另一个线程(消费者)发送数据,实现线程间的同步或异步通信。例如,在生产者-消费者模型中,生产者线程将数据写入管道,消费者线程从管道读取数据,两者通过管道解耦。

(2)传统 I/O 管道与线程的协作:传统管道(PipedInputStream/PipedOutputStream)的读写操作是阻塞的。当管道中无数据时,读操作会阻塞,直到有数据写入。当管道已满时,写操作会阻塞,直到数据被读取。注意:管道本身是线程安全的,但需要确保读写操作由不同线程完成,否则可能导致死锁(如同一线程同时读写)。

(3)NIO 管道与线程的高效协作:NIO 管道(Pipe)通过通道(SinkChannel/SourceChannel)和缓冲区(ByteBuffer)支持非阻塞操作。注意,可与选择器(Selector)配合,在单个线程中管理多个管道的读写,适合高并发场景。NIO 的内容将在 Java NIO 教程详细介绍

管道替代方案

在同一个 JVM 中,线程可以通过管道以外的许多其他方式进行通信。例如:

使用 BlockingQueue

BlockingQueue 是 Java 并发包中的一部分,它能在线程间安全地传递数据。BlockingQueue 提供了阻塞操作,在队列为空时会阻塞取操作,在队列满时会阻塞放操作。

示例代码:

package com.hxstrive.java_io.pipe;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * BlockingQueue示例
 * @author hxstrive.com
 */
public class BlockingQueueExample {
    public static void main(String[] args) {
        // 用于线程间通信
        final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

        // 线程1
        new Thread(() -> {
            try {
                // 发送数据
                for (int i = 0; i < 10; i++) {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // 线程2
        new Thread(() -> {
            try {
                // 接收数据
                while (true) {
                    Integer item = queue.take();
                    System.out.println("Consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }

}

运行结果如下:

Produced: 0
Consumed: 0
Produced: 1
...
Consumed: 8
Produced: 9
Consumed: 9

使用 SynchronousQueue

SynchronousQueue 是一种特殊的 BlockingQueue,它的容量为零,每个插入操作必须等待另一个线程的移除操作,反之亦然。这在需要线程间同步数据交换的场景下很有用。

示例代码:

package com.hxstrive.java_io.pipe;

import java.util.concurrent.SynchronousQueue;

/**
 * SynchronousQueue示例
 * @author hxstrive.com
 */
public class SynchronousQueueExample {

    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();
        // 线程1
        new Thread(() -> {
            try {
                // 生产数据
                for (int i = 0; i < 10; i++) {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // 线程2
        new Thread(() -> {
            try {
                // 消费数据
                for (int i = 0; i < 10; i++) {
                    Integer item = queue.take();
                    System.out.println("Consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }

}

使用 CompletableFuture

CompletableFuture 可用于异步编程,能在线程间传递计算结果。

示例代码:

package com.hxstrive.java_io.pipe;

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 利用 CompletableFuture 实现了异步任务的执行,并在任务完成后处理其结果。
        // CompletableFuture 是 Java 8 引入的一个强大工具,用于简化异步编程。
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000); // 模拟耗时
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Hello, World!";
        });

        // thenAccept 是 CompletableFuture 的一个方法,用于在异步任务完成后执行一个操作
        future.thenAccept(result -> System.out.println("Received: " + result));
        // 输出:
        // Received: Hello, World!

        // 等待任务完成
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

事实上,线程更经常交换的是完整对象,而不是原始字节数据。但是,如果需要在线程间交换原始字节数据,Java IO 的管道也是一种可能的方式。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号