创建线程的方式有两种,一种是实现 Runnable 接口,另一种是继承自 Thread ,但是这两种方式都有个缺点,那就是在任务执行完成之后无法获取返回结果,如果需要获取返回结果,那就需要来实现 Callable 接口。

JavaSE 5.0 开始引入了 CallableFuture 接口后就可以通过它们来构建带有返回结果的线程,在任务执行完成后就可以获取执行结果。

# Callable<V> 接口

先来看一下 Runnable 接口中的 run() 方法其返回值为 void , 当然就无法获取结果了。

Runnable.java
public interface Runnable {
    public abstract void run();
}

Callable 接口定义如下:

Callable.java
public interface Callable<V> {
    V call() throws Exception;
}

该接口声明了一个名为 call() 的方法,同时这个方法的返回值是 V , 无论是 Runnable 接口的实现类还是 Callable 接口的实现类,都可以被 ThreadPoolExecutorScheduledThreadPoolExecutor 来执行, ThreadPoolExecutorScheduledThreadExecutor 都实现了 ExecutorService 接口,因此 Callable 需要和 Executor 框架中的 ExecutorService 结合来使用,下面是 ExecutorService 提供的方法:

ExecutorService.java
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

# Future<V> 接口

Future 接口是用来获取异步计算结果的,简单来说就是对具体的 RunnableCallable 对象任务执行的结果进行获取,下面是 Future 接口中的方法。

Future.java
package java.util.concurrent;
public interface Future<V> {
    
    boolean cancel(boolean mayInterruptIfRunning);
    
    boolean isCancelled();
    boolean isDone();
  
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
  • get() :获取异步执行的结果,如果没有结果此方法会阻塞直到异步计算完成。
  • get(Long timeout,TimeUnit unit) :获取异步执行结果,如果没有结果,此方法会阻塞,但是会有时间限制,如果阻塞时间超过了设定的 timeout 该方法将会抛出异常。
  • isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true
  • isCanceller() :如果任务完成前被取消则返回 true
  • cancel(boolean mayInterruptIfRunning) :如果任务还没开始执行 cancel() 方法将返回 false , 如果任务已经启动,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功返回 true ,当任务已经启动并执行 cancle(false) 方法,将不会对正在执行的任务线程产生影响,此时返回 false ,当任务已经完成,执行 cancel() 方法将返回 falsemayInterruptIfRunning 参数表示是否中断执行中的线程。
  • Future 提供了三种功能:
    1. 能够中断执行中的任务。
    2. 判断任务是否执行完成。
    3. 获取任务执行完成后的结果。
  • 但是必需要明白 Future 只是一个结果,无法直接来创建对象,因此现在就需要其实现 Future 接口的 FutureTask 类了。

# FutureTask

先来看一下 FutureTask 的实现:

FutureTask.java
public class FutureTask<V> implements RunnableFuture<V> {
    // ...
}

FutureTask 类实现了一个 RunnableFuture 接口,先看一下 RunnableFuture 接口中的方法:

RunnableFuture.java
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
  • FutureTask 除了实现 Future 接口外还实现了 Runnable 接口,因此 FutureTask 也可以直接提交给 Executor 来执行,当然也可以调用线程直接执行 (FutureTask.run())
  • FutureTask.run() 执行时的三种状态:
    1. 未启动: FutureTask.run() 方法还没有被执行之前, FutureTask 处于未启动状态,当创建一个 FutureTask 而且没有执行 FutureTask.run() 方法前,这个 FutureTask 也是处于未启动状态。
    2. 已启动: FutureTask.run() 被执行的过程中, FutureTask 处于已启动状态。
    3. 已完成: FutureTask.run() 方法执行完正常结束或被取消或抛出异常结束, FutureTask 都处于完成状态。

pFC1AMT.png

下面是 FutureTask 的方法执行流程图。

pFC1eZ4.png

FutureTask 处于未启动或已启动状态时,如果此时我们执行 FutureTask.get() 方法将导致调用线程阻塞,当 FutureTask 处于已完成状态时,执行 FutureTask.get() 方法将导致线程立即返回结果或抛出异常。

FutureTask 处于未启动状态时,执行 FutureTask.cancel() 方法将导致此任务永远不会执行。

FutureTask 处于已启动状态时,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果任务取消成功, cancel() 方法将返回 true ,但如果执行 cancel(false) 方法将不会对正在执行的任务线程产生影响,此时 cancel() 方法返回 false , 当任务已经完成,执行 cancel() 方法将返回 false

FutureTask 的两种构造方法:

public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}

# FutureTask 的使用方式

现在对 CallableFutureFutureTask 有了一定得了解之后,那么它们到底有什么用?通过这样的方式去创建线程,最大的好处就是能够返回结果,如:有这么一个场景,现在需要计算一个数据,而这个数据的计算比较耗时,并且后面的程序也需要用到这个数据结果,那么这个时候 Callable 岂不是最好的选择。

现在可以开设一个线程去执行计算,而主线程继续做其它事情,而后面需要使用到这个数据时,再通过 Future 来获取就可以了。

CallableDemo.java
package top.rem.rain.demo3;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
 * @Author: LightRain
 * @Description: 使用 Callable+Future 获取执行结果
 * @DateTime: 2024-01-12 01:25
 * @Version:1.0
 **/
public class CallableDemo implements Callable<Integer> {
    private int sum;
    @Override
    public Integer call() throws Exception {
        System.out.println("Callable子线程开始计算啦!");
        Thread.sleep(2000);
        for (int i = 0; i < 5000; i++) {
            sum = sum + i;
        }
        System.out.println("Callable子线程计算结束!");
        return sum;
    }
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();
        // 创建 Callable 对象任务
        CallableDemo calTask = new CallableDemo();
        // 提交任务并获取执行结果
        Future<Integer> future = es.submit(calTask);
        // 关闭线程池
        es.shutdown();
        try {
            Thread.sleep(2000);
            System.out.println("主线程在执行其它任务");
            if (future.get() != null) {
                // 输出获取到的结果
                System.out.println("future.get()--> " + future.get());
            } else {
                // 输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");
    }
}
执行结果
Callable 子线程开始计算啦!
主线程在执行其它任务
Callable 子线程计算结束!
future.get ()--> 12497500
主线程在执行完成

使用 Callable+FutureTask 获取执行结果

CallableDemo.java
package top.rem.rain.demo3;
import java.util.concurrent.*;
/**
 * @Author: LightRain
 * @Description: 使用 Callable+FutureTask 获取执行结果
 * @DateTime: 2024-01-12 01:25
 * @Version:1.0
 **/
public class CallableDemo implements Callable<Integer> {
    private int sum;
    @Override
    public Integer call() throws Exception {
        System.out.println("Callable子线程开始计算啦!");
        Thread.sleep(2000);
        for (int i = 0; i < 5000; i++) {
            sum = sum + i;
        }
        System.out.println("Callable子线程计算结束!");
        return sum;
    }
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();
        // 创建 Callable 对象任务
        CallableDemo calTask = new CallableDemo();
        // 创建 FutureTask
        FutureTask<Integer> futureTask = new FutureTask<>(calTask);
        // 提交任务
        es.execute(futureTask);
        // 关闭线程池
        es.shutdown();
        try {
            Thread.sleep(2000);
            System.out.println("主线程在执行其它任务");
            if (futureTask.get() != null) {
                // 输出获取到的结果
                System.out.println("future.get()--> " + futureTask.get());
            } else {
                // 输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");
    }
}
执行结果
Callable 子线程开始计算啦!
主线程在执行其它任务
Callable 子线程计算结束!
future.get ()--> 12497500
主线程在执行完成