在 Java IO 中,管道(Pipe)是一种用于在同一 JVM 中不同线程之间进行通信的机制,它可以实现线程之间的数据传输。因此,管道也可以是数据的来源或目的地。如下图:
注意:Java IO 中,你不能使用管道与不同 JVM(不同进程)中的线程通信。Java 中的管道概念不同于 Unix / Linux 中的管道概念,在 Unix / Linux 中,运行在不同地址空间的两个进程可以通过管道进行通信。在 Java 中,通信双方必须运行在同一个进程中(即同一个 JVM),而且应该是不同的线程。
使用 Java IO 创建管道是通过 PipedOutputStream 和 PipedInputStream 类完成的。一个 PipedInputStream 应连接到一个 PipedOutputStream。一个线程写入 PipedOutputStream 的数据可由另一个线程从连接的 PipedInputStream 读取。
下面是一个如何将管道输入流连接到管道输出流的简单示例:
package com.hxstrive.java_io.demo01; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; /** * 管道流示例 * @author hxstrive.com */ public class PipeDemo1 { public static void main(String[] args) throws IOException { // 创建管道流对象 final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(output); // 线程1,向管道中写入数据 Thread thread1 = new Thread(new Runnable() { @Override public void run() { try { output.write("Hello world, pipe!".getBytes()); output.close(); } catch (IOException e) { e.printStackTrace(); } } }); // 线程2,从管道中读取数据 Thread thread2 = new Thread(new Runnable() { @Override public void run() { try { int data = input.read(); while(data != -1){ System.out.print((char) data); data = input.read(); } // 输出:Hello world, pipe! input.close(); } catch (IOException e) { e.printStackTrace(); } } }); // 启动线程 thread1.start(); thread2.start(); } }
你还可以使用两个管道流的 connect() 方法将它们连接起来。PipedInputStream 和 PipedOutputStream 都有一个 connect() 方法,可以将其中一个连接到另一个。例如:
// 创建管道流对象 final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(); input.connect(output);
或者
// 创建管道流对象 final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(); output.connect(input);
管道是线程间通信的工具,用于实现不同线程之间的数据传输和协作。
一定要记住,在使用两个连接的管道流时,应将一个流传递给一个线程,另一个流传递给另一个线程。管道流上的 read() 和 write() 调用是阻塞性的,这意味着如果尝试使用同一线程同时调用 read() 和 write(),可能会导致线程自身陷入死锁。
以下是管道和线程的关系:
(1)管道是线程间通信的桥梁:管道允许一个线程(生产者)向另一个线程(消费者)发送数据,实现线程间的同步或异步通信。例如,在生产者-消费者模型中,生产者线程将数据写入管道,消费者线程从管道读取数据,两者通过管道解耦。
(2)传统 I/O 管道与线程的协作:传统管道(PipedInputStream/PipedOutputStream)的读写操作是阻塞的。当管道中无数据时,读操作会阻塞,直到有数据写入。当管道已满时,写操作会阻塞,直到数据被读取。注意:管道本身是线程安全的,但需要确保读写操作由不同线程完成,否则可能导致死锁(如同一线程同时读写)。
(3)NIO 管道与线程的高效协作:NIO 管道(Pipe)通过通道(SinkChannel/SourceChannel)和缓冲区(ByteBuffer)支持非阻塞操作。注意,可与选择器(Selector)配合,在单个线程中管理多个管道的读写,适合高并发场景。NIO 的内容将在 Java NIO 教程详细介绍
在同一个 JVM 中,线程可以通过管道以外的许多其他方式进行通信。例如:
BlockingQueue 是 Java 并发包中的一部分,它能在线程间安全地传递数据。BlockingQueue 提供了阻塞操作,在队列为空时会阻塞取操作,在队列满时会阻塞放操作。
示例代码:
package com.hxstrive.java_io.pipe; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * BlockingQueue示例 * @author hxstrive.com */ public class BlockingQueueExample { public static void main(String[] args) { // 用于线程间通信 final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); // 线程1 new Thread(() -> { try { // 发送数据 for (int i = 0; i < 10; i++) { queue.put(i); System.out.println("Produced: " + i); Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); // 线程2 new Thread(() -> { try { // 接收数据 while (true) { Integer item = queue.take(); System.out.println("Consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } }
运行结果如下:
Produced: 0 Consumed: 0 Produced: 1 ... Consumed: 8 Produced: 9 Consumed: 9
SynchronousQueue 是一种特殊的 BlockingQueue,它的容量为零,每个插入操作必须等待另一个线程的移除操作,反之亦然。这在需要线程间同步数据交换的场景下很有用。
示例代码:
package com.hxstrive.java_io.pipe; import java.util.concurrent.SynchronousQueue; /** * SynchronousQueue示例 * @author hxstrive.com */ public class SynchronousQueueExample { public static void main(String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(); // 线程1 new Thread(() -> { try { // 生产数据 for (int i = 0; i < 10; i++) { queue.put(i); System.out.println("Produced: " + i); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); // 线程2 new Thread(() -> { try { // 消费数据 for (int i = 0; i < 10; i++) { Integer item = queue.take(); System.out.println("Consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } }
CompletableFuture 可用于异步编程,能在线程间传递计算结果。
示例代码:
package com.hxstrive.java_io.pipe; import java.util.concurrent.CompletableFuture; public class CompletableFutureExample { public static void main(String[] args) { // 利用 CompletableFuture 实现了异步任务的执行,并在任务完成后处理其结果。 // CompletableFuture 是 Java 8 引入的一个强大工具,用于简化异步编程。 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); // 模拟耗时 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Hello, World!"; }); // thenAccept 是 CompletableFuture 的一个方法,用于在异步任务完成后执行一个操作 future.thenAccept(result -> System.out.println("Received: " + result)); // 输出: // Received: Hello, World! // 等待任务完成 try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
事实上,线程更经常交换的是完整对象,而不是原始字节数据。但是,如果需要在线程间交换原始字节数据,Java IO 的管道也是一种可能的方式。