CompletableFuture 异步异常处理记录

作者:old wang 发布时间: 2023-01-23 阅读量:4 评论数:0

在 Java 项目中,CompletableFuture 经常用于异步任务编排。

例如:

  • 异步发送消息;

  • 异步调用第三方接口;

  • 多个任务并行执行;

  • 接口中异步处理非核心逻辑;

  • 聚合多个异步任务结果。

但在使用 CompletableFuture 时,有一个很容易忽略的问题:

异步任务中发生异常,不一定会被主线程捕获。

如果没有正确处理,可能会出现一个现象:

异步任务已经执行失败,但接口仍然返回成功。

记录一下 CompletableFuture 中常见的异常处理方式,包括:

  • 不处理异常时会发生什么;

  • join() 如何抛出异常;

  • get() 如何抛出异常;

  • exceptionally() 的作用;

  • whenComplete() 的作用;

  • handle() 的作用;

  • 实际项目中应该如何选择。

一、自定义线程池

示例中先定义一个自定义线程池。

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolConfig {

    public static ThreadPoolExecutor getThreadPoolExecutor() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();

        return new ThreadPoolExecutor(
                availableProcessors,
                availableProcessors,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(9999),
                new ThreadFactoryBuilder()
                        .setNameFormat("custom-thread-pool-%d")
                        .build(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

然后在测试代码中使用:

public static final ThreadPoolExecutor CUSTOM_THREAD_POOL =
        ThreadPoolConfig.getThreadPoolExecutor();

这里不使用 CompletableFuture 默认的 ForkJoinPool.commonPool(),而是显式传入自定义线程池。

这样更方便控制线程数量、线程名称和拒绝策略,也更适合实际业务项目。

二、异步任务异常不会自动抛到主线程

先看一个最容易踩坑的例子。

@GetMapping("/asyncException")
public ResponseData<Object> asyncException() {
    try {
        try {
            CompletableFuture.runAsync(() -> {
                int i = 1 / 0;
            }, CUSTOM_THREAD_POOL);
        } catch (Exception e) {
            log.error("异常信息: {}", e.getMessage(), e);
            throw new BusinessException(e.getMessage());
        }

        return new ResponseData<>(
                StatusCodeEnum.SUCCESS_CODE.getStatusCode(),
                "操作成功"
        );
    } catch (Exception e) {
        return new ResponseData<>(
                StatusCodeEnum.ERROR_CODE.getStatusCode(),
                "操作失败:" + e.getMessage()
        );
    }
}

这段代码看起来外层已经写了 try-catch,但实际并不能捕获异步任务中的异常。

原因是:

CompletableFuture.runAsync(() -> {
    int i = 1 / 0;
}, CUSTOM_THREAD_POOL);

这段逻辑是在异步线程中执行的。

主线程只是把任务提交到了线程池,然后继续往下执行。

所以接口很可能直接返回:

操作成功

而异步任务里的异常不会被外层 try-catch 捕获。

这也是很多异步任务异常“静默失败”的原因。

三、使用 join() 获取异步异常

如果希望主线程感知异步任务的异常,可以调用 join()

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).join();
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

join() 会等待异步任务执行完成。

如果异步任务执行异常,join() 会把异常重新抛到当前线程。

因此外层 try-catch 可以捕获到异常。

需要注意的是,join() 抛出的通常是:

CompletionException

原始异常可以通过:

e.getCause()

获取。

例如:

catch (CompletionException e) {
    Throwable cause = e.getCause();
    log.error("异步任务异常", cause);
}

四、使用 get() 获取异步异常

除了 join(),也可以使用 get()

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

get() 同样会等待异步任务执行完成。

如果任务异常,会抛出:

ExecutionException

原始异常也需要通过:

e.getCause()

获取。

join() 相比,get() 的特点是:

  • 会抛受检异常;

  • 可以指定超时时间;

  • 适合需要控制最长等待时间的场景。

例如:

try {
    future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    log.error("异步任务执行超时", e);
} catch (ExecutionException e) {
    log.error("异步任务执行异常", e.getCause());
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    log.error("等待异步任务时被中断", e);
}

实际项目中,如果使用 get(),建议使用带超时时间的版本,避免主线程无限阻塞。

五、exceptionally() 只能处理异步链路内的异常

exceptionally() 可以处理异步任务中的异常。

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).exceptionally(e -> {
        log.error("异步运行异常信息: {}", e.getMessage(), e);
        throw new BusinessException(e.getMessage());
    });
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

这里需要注意一个关键点:

exceptionally() 中的异常处理仍然属于 CompletableFuture 异步链路的一部分,不会自动抛到外层主线程。

所以这段代码的表现通常是:

  • exceptionally() 中可以打印异步异常日志;

  • 外层 try-catch 捕获不到异常;

  • 接口主流程可能仍然返回成功。

如果希望主线程感知最终结果,需要继续调用:

join()

或者:

get()

例如:

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).exceptionally(e -> {
        log.error("异步执行异常信息: {}", e.getMessage(), e);
        throw new BusinessException(e.getMessage());
    }).join();
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

这时,异步链路中的异常才会通过 join() 回到当前线程。

六、whenComplete() 可以观察结果和异常

whenComplete() 可以拿到异步任务的执行结果和异常信息。

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).whenComplete((result, ex) -> {
        if (ex != null) {
            log.error("异步执行异常信息: {}", ex.getMessage(), ex);
            throw new BusinessException(ex.getMessage());
        }
    });
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

whenComplete() 的特点是:

  • 可以拿到正常结果;

  • 也可以拿到异常;

  • 适合做日志记录、监控埋点、资源清理等动作;

  • 不会改变原来的结果。

但和 exceptionally() 一样,如果不调用 join()get(),外层主线程通常感知不到异常。

例如:

CompletableFuture.runAsync(() -> {
    int i = 1 / 0;
}, CUSTOM_THREAD_POOL).whenComplete((result, ex) -> {
    if (ex != null) {
        log.error("异步任务异常", ex);
    }
});

这段代码可以打印异步异常日志,但主线程不会因为这个异常自动失败。

如果需要主线程等待并感知异常,需要:

CompletableFuture.runAsync(() -> {
    int i = 1 / 0;
}, CUSTOM_THREAD_POOL).whenComplete((result, ex) -> {
    if (ex != null) {
        log.error("异步任务异常", ex);
    }
}).join();

七、handle() 可以转换结果

handle() 也可以拿到结果和异常。

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).handle((result, ex) -> {
        if (ex != null) {
            log.error("异步执行异常信息: {}", ex.getMessage(), ex);
            throw new BusinessException(ex.getMessage());
        }
        return null;
    });
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

handle()whenComplete() 的区别是:

handle() 可以返回一个新的结果。

也就是说,它不仅能观察异常,还能把异常转换成一个默认值。

例如:

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> {
            int i = 1 / 0;
            return "success";
        }, CUSTOM_THREAD_POOL)
        .handle((result, ex) -> {
            if (ex != null) {
                log.error("异步任务失败,返回默认值", ex);
                return "fallback";
            }
            return result;
        });

如果异步任务失败,最终 future 的结果会变成:

fallback

这适合一些允许降级的场景。

八、exceptionally、whenComplete、handle 的区别

这三个方法都能处理异常,但适用场景不同。

方法

是否能拿到正常结果

是否能拿到异常

是否能改变返回结果

常见用途

exceptionally()

异常兜底,返回默认值

whenComplete()

记录日志、监控、清理资源

handle()

统一处理成功和失败,并转换结果

简单理解:

  • 只关心异常兜底,用 exceptionally()

  • 只想记录结果或异常,不改变返回值,用 whenComplete()

  • 想根据成功或失败返回不同结果,用 handle()

九、需要让主线程知道异常时,要调用 get() 或 join()

如果业务要求是:

异步任务失败,接口也要返回失败。

那么只写:

CompletableFuture.runAsync(...).exceptionally(...);

是不够的。

因为这只是处理异步链路中的异常,不会让当前请求线程自动失败。

这种情况下需要调用:

join()

或者:

get()

例如:

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).exceptionally(e -> {
        log.error("异步执行异常信息: {}", e.getMessage(), e);
        throw new BusinessException(e.getMessage());
    }).join();
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

或者:

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL).exceptionally(e -> {
        log.error("异步执行异常信息: {}", e.getMessage(), e);
        throw new BusinessException(e.getMessage());
    }).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
    log.error("外层异常信息: {}", e.getMessage(), e);
    throw new BusinessException(e.getMessage());
}

这样异步链路中的异常最终会回到当前线程。

十、不需要主线程等待时,至少要记录异常

有些异步任务不要求影响主流程。

例如:

  • 异步写操作日志;

  • 异步发送通知;

  • 异步刷新缓存;

  • 异步上报埋点。

这种场景下,接口可以先返回成功,但异步任务失败不能完全无感。

至少应该记录异常日志。

例如:

CompletableFuture.runAsync(() -> {
    doSomething();
}, CUSTOM_THREAD_POOL).exceptionally(e -> {
    log.error("异步任务执行失败", e);
    return null;
});

这种写法不会阻塞主线程,也不会影响接口返回。

但如果异步任务失败,日志里能看到错误。

如果任务很重要,还可以在这里增加:

  • 告警;

  • 重试;

  • 失败任务落库;

  • MQ 补偿;

  • 业务状态标记。

十一、join() 和 get() 的区别

方法

异常类型

是否受检异常

是否支持超时

适用场景

join()

CompletionException

简洁获取结果

get()

ExecutionException

需要处理中断、超时等场景

如果只是简单等待结果,join() 写起来更简洁。

如果需要控制超时时间,推荐使用:

get(timeout, unit)

例如:

future.get(2, TimeUnit.SECONDS);

这样可以避免异步任务长期不返回导致请求线程一直阻塞。

十二、常见错误写法

1. 以为外层 try-catch 能捕获异步异常

错误示例:

try {
    CompletableFuture.runAsync(() -> {
        int i = 1 / 0;
    }, CUSTOM_THREAD_POOL);
} catch (Exception e) {
    log.error("捕获异常", e);
}

这个 catch 捕获不到异步线程中的异常。

2. 只写 exceptionally,却不关心结果

CompletableFuture.runAsync(() -> {
    int i = 1 / 0;
}, CUSTOM_THREAD_POOL).exceptionally(e -> {
    log.error("异步异常", e);
    throw new BusinessException(e.getMessage());
});

这里虽然处理了异常,但如果没有 join()get(),主线程不会感知这个异常。

3. get() 不设置超时时间

future.get();

如果异步任务长时间不结束,当前线程会一直阻塞。

更稳妥的方式是:

future.get(2, TimeUnit.SECONDS);

4. 异步任务完全不记录异常

CompletableFuture.runAsync(() -> {
    doSomething();
}, CUSTOM_THREAD_POOL);

这种写法最大的问题是:任务失败了也不容易发现。

十三、实践建议

1. 显式传入线程池

不要随手使用默认线程池。

推荐:

CompletableFuture.runAsync(task, CUSTOM_THREAD_POOL);

而不是:

CompletableFuture.runAsync(task);

这样线程资源更可控。

2. 判断异步任务是否影响主流程

如果异步任务失败需要影响接口返回,就调用:

join()

或:

get(timeout, unit)

如果异步任务失败不影响主流程,至少使用:

exceptionally()

或:

whenComplete()

记录异常。

3. get() 建议设置超时时间

如果使用 get(),建议优先使用:

get(timeout, unit)

避免接口被异步任务长期阻塞。

4. 异常日志中保留上下文

异步任务异常排查比同步任务更麻烦。

日志中建议带上:

  • 业务 ID;

  • 用户 ID;

  • traceId;

  • 任务类型;

  • 请求参数摘要。

否则异步任务失败后,很难反查来源。

结论

CompletableFuture 的异常不会自动回到主线程。

如果只写:

CompletableFuture.runAsync(...)

异步任务中即使发生异常,当前接口也可能继续返回成功。

常见处理方式如下:

  • 使用 join() 可以等待任务完成,并将异常以 CompletionException 抛出;

  • 使用 get() 可以等待任务完成,并将异常以 ExecutionException 抛出,还可以设置超时时间;

  • 使用 exceptionally() 可以处理异步链路中的异常;

  • 使用 whenComplete() 可以观察异步任务的执行结果和异常;

  • 使用 handle() 可以同时处理成功和失败,并转换最终结果。

评论