跳转至

Concurrent

Java 并发相关

这里记录 java 并发相关知识

CompletionService

功能

  • Callable+Future 可以实现多个 task 并行执行,但是如果遇到前面的 task 执行较慢时需要阻塞等待前面的 task 执行完后面 task 才能取得结果;
  • CompletionService 的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了 ;
  • 只有一个实现类:ExecutorCompletionService;

应用场景

  • 需要批量提交异步任务的时候( CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单);
  • 让异步任务的执行结果有序化:先执行完的先进入阻塞队列;
  • 线程池隔离:CompletionService 支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险;

ExecutorCompletionService

ExecutorCompletionService 是Java中的一个实用工具类,它实现了 CompletionService 接口,用于简化并发任务的管理和结果获取。它结合了线程池和阻塞队列,可以更方便地获取已完成任务的结果。

ExecutorCompletionService 的主要作用是将已完成的任务封装成 Future 对象,这样可以通过获取 Future 对象的方式获取任务的执行结果,而无需手动迭代任务列表来获取结果。

  • Future<V> submit(Callable<V> task): 提交一个 Callable 类型任务,并返回该任务执行结果关联的Future
  • Future<V> submit(Runnable task,V result): 提交一个Runnable类型任务,并返回该任务执行结果关联的Future; -Future<V> take(): 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞,直到有任务完成;
  • Future<V> poll(): 从内部阻塞队列中获取并移除第一个执行完成的任务,获取不到则返回null,不阻塞;
  • Future<V> poll(long timeout, TimeUnit unit): 从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间为timeout,获取不到则返回null

Callable 与 Runnable 区别

CallableRunnable接口是Java中用于定义并发任务的两个核心接口,它们之间有以下几个区别:

  1. 返回结果:Runnable接口的run()方法没有返回值,而Callable接口的call()方法可以返回一个结果。

  2. 异常处理:Runnable接口的run()方法不能抛出任何受检异常,只能通过捕获异常并在方法内部处理。而Callable接口的call()方法可以抛出受检异常,需要使用者进行适当的异常处理。

  3. 使用方式:Runnable通常用于执行一些没有返回结果的异步任务,而Callable则通常用于执行需要返回结果的异步任务。

  4. 线程池的submit方法:线程池的submit()方法接受Runnable任务时,返回一个Future对象,该对象用于表示任务的异步执行结果,但该对象的get()方法返回的结果始终为null。当提交Callable任务时,submit()方法返回的Future对象可以通过get()方法获取任务的执行结果。

综上所述,Runnable适用于不需要返回结果的简单异步任务,而Callable适用于需要返回结果的复杂异步任务。在使用线程池时,可以根据任务的需求选择使用RunnableCallable

AbstractQueue 并发任务队列

AbstractQueue是Java集合框架中的一个抽象类,实现了Queue接口。它提供了部分Queue接口的默认实现,使得实现队列的具体类可以更轻松地实现队列的基本功能。

AbstractQueue类本身无法直接实例化,它被设计为供其他具体队列类继承和扩展使用。通过继承AbstractQueue,具体的队列类可以利用其提供的默认实现来简化自身的实现。

AbstractQueue类中的一些重要方法和概念包括:

  • offer(E e):将元素e插入队列。如果队列已满,offer方法可以选择抛出异常或返回特定值。
  • poll():移除并返回队列中的头部元素。如果队列为空,poll方法可以选择返回null或特定值。
  • peek():返回队列中的头部元素,但不移除它。如果队列为空,peek方法可以选择返回null或特定值。
  • size():返回队列中的元素数量。
  • iterator():返回一个迭代器,用于遍历队列中的元素。
  • AbstractQueue还提供了一些模板方法,如remove()element()等,这些方法是由具体队列类实现的。

通过继承AbstractQueue,具体的队列类只需实现offerpollpeekiterator等抽象方法,并根据自身特点进行具体的实现。这样,具体队列类就可以获得默认实现的其他方法,减少了重复代码的编写。

需要注意的是,由于AbstractQueue是一个抽象类,因此无法直接创建它的实例。如果想使用队列,可以使用它的具体实现类,如LinkedListArrayBlockingQueue等,或者自己实现一个继承自AbstractQueue的子类。

常见继承此类实现的队列

一些常见实现了AbstractQueue类的队列包括:

  1. LinkedListLinkedList实现了Deque接口,而Deque继承了Queue接口,并通过继承AbstractQueue提供了默认实现。LinkedList是一个双向链表结构的队列,支持在队列头部和尾部进行元素的插入和删除操作。

  2. ArrayBlockingQueueArrayBlockingQueue是一个基于数组的有界队列,它按照先进先出(FIFO)的顺序对元素进行存储和访问。它实现了BlockingQueue接口,并通过继承AbstractQueue提供了默认实现。

  3. PriorityQueuePriorityQueue是一个基于优先级的队列,它按照元素的自然顺序或者通过比较器进行排序。PriorityQueue实现了Queue接口,并通过继承AbstractQueue提供了默认实现。它的特点是每次插入元素都会根据优先级进行排序。

  4. ConcurrentLinkedQueueConcurrentLinkedQueue是一个无界的线程安全队列,它使用链表数据结构实现。ConcurrentLinkedQueue实现了Queue接口,并通过继承AbstractQueue提供了默认实现。它支持高并发的队列操作,适用于多线程环境。

BlockingQueue 阻塞队列

BlockingQueue 是 Java 并发编程中常用的阻塞队列接口,它继承自 Queue 接口,并添加了一些阻塞操作,可以用于线程间的安全数据传输和同步。下面是几个常见的 BlockingQueue 实现类及其用法:

  1. ArrayBlockingQueueArrayBlockingQueue 是一个有界阻塞队列,它基于数组实现。在创建时需要指定队列的容量,容量固定不变。当队列已满时,生产者线程会被阻塞,直到队列有空闲位置。当队列为空时,消费者线程会被阻塞,直到队列有新的元素。使用 ArrayBlockingQueue 可以控制任务的并发量和队列大小,适用于固定数量的任务处理场景。

  2. LinkedBlockingQueueLinkedBlockingQueue 是一个可选有界或无界的阻塞队列,它基于链表实现。在创建时可以选择指定队列的容量,或者使用默认的无界容量。当队列为空时,消费者线程会被阻塞,直到队列有新的元素。当队列已满时,生产者线程会被阻塞,直到队列有空闲位置。使用 LinkedBlockingQueue 可以适应不同的任务处理场景,可以根据需求设置有界或无界的队列。

  3. PriorityBlockingQueuePriorityBlockingQueue 是一个支持优先级的阻塞队列,它基于优先级堆实现。元素按照优先级顺序被移除,具有最高优先级的元素总是被首先移除。当队列为空时,消费者线程会被阻塞,直到队列有新的元素。PriorityBlockingQueue 可以用于任务调度或根据优先级进行排序的场景。

  4. SynchronousQueueSynchronousQueue 是一个没有存储元素的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。SynchronousQueue 适用于直接将任务交给消费者线程的场景,用于传递临时数据或控制信号。

使用这些 BlockingQueue 的基本用法如下:

// 创建一个 ArrayBlockingQueue 实例,指定容量
BlockingQueue<T> queue = new ArrayBlockingQueue<>(capacity); 
// 创建一个 LinkedBlockingQueue 实例,使用默认容量
BlockingQueue<T> queue = new LinkedBlockingQueue<>(); 
// 创建一个 PriorityBlockingQueue 实例
BlockingQueue<T> queue = new PriorityBlockingQueue<>(); 
// 创建一个 SynchronousQueue 实例
BlockingQueue<T> queue = new SynchronousQueue<>(); 

阻塞队列申请拒绝策略

AbortPolicy(中止策略)

public static class AbortPolicy implements RejectedExecutionHandler {
   /**
   * Creates an {@code AbortPolicy}.
   */
   public AbortPolicy() { }

   /**
   * Always throws RejectedExecutionException.
   *
   * @param r the runnable task requested to be executed
   * @param e the executor attempting to execute this task
   * @throws RejectedExecutionException always
   */
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
   }
}

DiscardOldestPolicy (弃老策略)

/**
 * A handler for rejected tasks that discards the oldest unhandled
 * request and then retries {@code execute}, unless the executor
 * is shut down, in which case the task is discarded.
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   /**
   * Creates a {@code DiscardOldestPolicy} for the given executor.
   */
   public DiscardOldestPolicy() { }

   /**
   * Obtains and ignores the next task that the executor
   * would otherwise execute, if one is immediately available,
   * and then retries execution of task r, unless the executor
   * is shut down, in which case task r is instead discarded.
   *
   * @param r the runnable task requested to be executed
   * @param e the executor attempting to execute this task
   */
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
      if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
      }
   }
}