Concurrent
Java 并发相关
这里记录 java 并发相关知识
CompletionService
功能
Callable+Future
可以实现多个task
并行执行,但是如果遇到前面的task
执行较慢时需要阻塞等待前面的task
执行完后面task
才能取得结果;CompletionService
的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了 ;- 只有一个实现类:
ExecutorCompletionService
;
应用场景
- 需要批量提交异步任务的时候(
CompletionService
将线程池Executor
和阻塞队列BlockingQueue
的功能融合在了一起,能够让批量异步任务的管理更简单); - 让异步任务的执行结果有序化:先执行完的先进入阻塞队列;
- 线程池隔离
:CompletionService
支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险;
ExecutorCompletionService
ExecutorCompletionService
是Java中的一个实用工具类,它实现了 CompletionService
接口,用于简化并发任务的管理和结果获取。它结合了线程池和阻塞队列,可以更方便地获取已完成任务的结果。
ExecutorCompletionService
的主要作用是将已完成的任务封装成 Future
对象,这样可以通过获取 Future
对象的方式获取任务的执行结果,而无需手动迭代任务列表来获取结果。
Future<V> submit(Callable<V> task)
: 提交一个Callable
类型任务,并返回该任务执行结果关联的Future
;Future<V> submit(Runnable task,V result):
提交一个Runnable
类型任务,并返回该任务执行结果关联的Future
; -Future<V> take()
: 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;Future<V> poll()
: 从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null
,不阻塞;Future<V> poll(long timeout, TimeUnit unit)
: 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null
;
Callable 与 Runnable 区别
Callable
和Runnable
接口是Java中用于定义并发任务的两个核心接口,它们之间有以下几个区别:
-
返回结果:
Runnable
接口的run()
方法没有返回值,而Callable
接口的call()
方法可以返回一个结果。 -
异常处理:
Runnable
接口的run()
方法不能抛出任何受检异常,只能通过捕获异常并在方法内部处理。而Callable
接口的call()
方法可以抛出受检异常,需要使用者进行适当的异常处理。 -
使用方式:
Runnable
通常用于执行一些没有返回结果的异步任务,而Callable
则通常用于执行需要返回结果的异步任务。 -
线程池的submit方法:线程池的
submit()
方法接受Runnable
任务时,返回一个Future
对象,该对象用于表示任务的异步执行结果,但该对象的get()
方法返回的结果始终为null
。当提交Callable
任务时,submit()
方法返回的Future
对象可以通过get()
方法获取任务的执行结果。
综上所述,Runnable
适用于不需要返回结果的简单异步任务,而Callable
适用于需要返回结果的复杂异步任务。在使用线程池时,可以根据任务的需求选择使用Runnable
或Callable
。
AbstractQueue 并发任务队列
AbstractQueue
是Java集合框架中的一个抽象类,实现了Queue
接口。它提供了部分Queue
接口的默认实现,使得实现队列的具体类可以更轻松地实现队列的基本功能。
AbstractQueue
类本身无法直接实例化,它被设计为供其他具体队列类继承和扩展使用。通过继承AbstractQueue
,具体的队列类可以利用其提供的默认实现来简化自身的实现。
AbstractQueue
类中的一些重要方法和概念包括:
offer(E e)
:将元素e插入队列。如果队列已满,offer
方法可以选择抛出异常或返回特定值。poll()
:移除并返回队列中的头部元素。如果队列为空,poll
方法可以选择返回null
或特定值。peek()
:返回队列中的头部元素,但不移除它。如果队列为空,peek
方法可以选择返回null
或特定值。size()
:返回队列中的元素数量。iterator()
:返回一个迭代器,用于遍历队列中的元素。AbstractQueue
还提供了一些模板方法,如remove()
、element()
等,这些方法是由具体队列类实现的。
通过继承AbstractQueue
,具体的队列类只需实现offer
、poll
、peek
和iterator
等抽象方法,并根据自身特点进行具体的实现。这样,具体队列类就可以获得默认实现的其他方法,减少了重复代码的编写。
需要注意的是,由于AbstractQueue
是一个抽象类,因此无法直接创建它的实例。如果想使用队列,可以使用它的具体实现类,如LinkedList
、ArrayBlockingQueue
等,或者自己实现一个继承自AbstractQueue
的子类。
常见继承此类实现的队列
一些常见实现了AbstractQueue
类的队列包括:
-
LinkedList
:LinkedList
实现了Deque
接口,而Deque
继承了Queue
接口,并通过继承AbstractQueue
提供了默认实现。LinkedList
是一个双向链表结构的队列,支持在队列头部和尾部进行元素的插入和删除操作。 -
ArrayBlockingQueue
:ArrayBlockingQueue
是一个基于数组的有界队列,它按照先进先出(FIFO)的顺序对元素进行存储和访问。它实现了BlockingQueue
接口,并通过继承AbstractQueue
提供了默认实现。 -
PriorityQueue
:PriorityQueue
是一个基于优先级的队列,它按照元素的自然顺序或者通过比较器进行排序。PriorityQueue
实现了Queue
接口,并通过继承AbstractQueue
提供了默认实现。它的特点是每次插入元素都会根据优先级进行排序。 -
ConcurrentLinkedQueue
:ConcurrentLinkedQueue
是一个无界的线程安全队列,它使用链表数据结构实现。ConcurrentLinkedQueue
实现了Queue
接口,并通过继承AbstractQueue
提供了默认实现。它支持高并发的队列操作,适用于多线程环境。
BlockingQueue 阻塞队列
BlockingQueue
是 Java 并发编程中常用的阻塞队列接口,它继承自 Queue
接口,并添加了一些阻塞操作,可以用于线程间的安全数据传输和同步。下面是几个常见的 BlockingQueue
实现类及其用法:
-
ArrayBlockingQueue
:ArrayBlockingQueue
是一个有界阻塞队列,它基于数组实现。在创建时需要指定队列的容量,容量固定不变。当队列已满时,生产者线程会被阻塞,直到队列有空闲位置。当队列为空时,消费者线程会被阻塞,直到队列有新的元素。使用ArrayBlockingQueue
可以控制任务的并发量和队列大小,适用于固定数量的任务处理场景。 -
LinkedBlockingQueue
:LinkedBlockingQueue
是一个可选有界或无界的阻塞队列,它基于链表实现。在创建时可以选择指定队列的容量,或者使用默认的无界容量。当队列为空时,消费者线程会被阻塞,直到队列有新的元素。当队列已满时,生产者线程会被阻塞,直到队列有空闲位置。使用LinkedBlockingQueue
可以适应不同的任务处理场景,可以根据需求设置有界或无界的队列。 -
PriorityBlockingQueue
:PriorityBlockingQueue
是一个支持优先级的阻塞队列,它基于优先级堆实现。元素按照优先级顺序被移除,具有最高优先级的元素总是被首先移除。当队列为空时,消费者线程会被阻塞,直到队列有新的元素。PriorityBlockingQueue
可以用于任务调度或根据优先级进行排序的场景。 -
SynchronousQueue
:SynchronousQueue
是一个没有存储元素的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。SynchronousQueue
适用于直接将任务交给消费者线程的场景,用于传递临时数据或控制信号。
使用这些 BlockingQueue
的基本用法如下:
// 创建一个 ArrayBlockingQueue 实例,指定容量
BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity);
// 创建一个 LinkedBlockingQueue 实例,使用默认容量
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
// 创建一个 PriorityBlockingQueue 实例
BlockingQueue<T> queue = new PriorityBlockingQueue<>();
// 创建一个 SynchronousQueue 实例
BlockingQueue<T> queue = new SynchronousQueue<>();
阻塞队列申请拒绝策略
AbortPolicy(中止策略)
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardOldestPolicy (弃老策略)
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}