java中怎么实现异步处理

本篇文章给大家分享的是有关java中怎么实现异步处理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

创新互联主要从事网站制作、成都网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务清水,十载网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220

1.DeferredResult 加线程池 (DeferredResult 提供了超时、错误处理,功能非常完善,再加上多线程处理请求效果很不错)

2.新开个定时任务线程池 定时轮询当前任务列表 超时就停止(需要自己维护任务列表)Hystrix就是这种方案

3.JDK9 可以采用CompletableFuture orTimeout、completeOnTimeout 方法处理 前者抛出异常后者返回默认值

总结,其实线程池统一设置超时这个需求本身就是伪需求,线程执行任务时间本身就是参差不齐的,而且这个控制权应该交给Runable或Callable内部业务处理,不同的业务处理超时、异常、报警等各不相同。CompletableFuture、ListenableFuture 、DeferredResult 的功能相当丰富,建议在多线程处理的场景多使用这些api。

具体实现:

  1. DeferredResult 先建个工具类。调用方使用execute方法,传入new的DeferredResultDTO(DeferredResultDTO只有msgId,也可以自定义一些成员变量方便后期业务扩展使用)

然后在其他线程业务处理完设置结果,调用setResult方法,传入msgId相同的DeferredResultDTO和result对象

/**
 * DeferredResult 工具类
 *
 * @author tiancong
 * @date 2020/10/14 19:23
 */
@UtilityClass
@Slf4j
public class DeferredResultUtil {

    private Map>> taskMap = new ConcurrentHashMap<>(16);

    public DeferredResult> execute(DeferredResultDTO dto) {
        return execute(dto, 5000L);
    }

    public DeferredResult> execute(DeferredResultDTO dto, Long time) {
        if (taskMap.containsKey(dto)) {
            throw new BusinessException(String.format("msgId=%s 已经存在,请勿重发消息", dto.getMsgId()));
        }
        DeferredResult> deferredResult = new DeferredResult<>(time);
        deferredResult.onError((e) -> {
            taskMap.remove(dto);
            log.info("处理失败 ", e);
            deferredResult.setResult(ResultVoUtil.fail("处理失败"));
        });
        deferredResult.onTimeout(() -> {
            taskMap.remove(dto);
            if (dto.getType().equals(DeferredResultTypeEnum.CLOTHES_DETECTION)) {
                ExamController.getCURRENT_STUDENT().remove(dto.getMsgId());
            }
            deferredResult.setResult(ResultVoUtil.fail("请求超时,请联系工作人员!"));
        });
        taskMap.putIfAbsent(dto, deferredResult);
        return deferredResult;
    }

    public void setResult(DeferredResultDTO dto, ResultVO resultVO) {
        if (taskMap.containsKey(dto)) {
            DeferredResult> deferredResult = taskMap.get(dto);
            deferredResult.setResult(resultVO);
            taskMap.remove(dto);
        } else {
            log.error("ERROR 未找到该消息msgId:{}", dto.getMsgId());
        }
    }
}

2.   新开个定时任务线程池 定时轮询当前任务列表

/**
 * @author tiancong
 * @date 2021/4/10 11:06
 */
@Slf4j
public class T {

    private static final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(
                    2,
                    r -> {
                        Thread thread = new Thread(r);
                        thread.setName("failAfter-%d");
                        thread.setDaemon(true);
                        return thread;
                    });
    private static int timeCount;

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
        executorService.setCorePoolSize(4);
        executorService.setQueueCapacity(10);
        executorService.setMaxPoolSize(100);
        executorService.initialize();
//        executorService.setAwaitTerminationSeconds(5);
//        executorService.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS);
        executorService.setWaitForTasksToCompleteOnShutdown(true);


        Random random = new Random();

        long start = System.currentTimeMillis();
        List> asyncResultList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            ListenableFuture asyncResult = executorService.submitListenable(() -> {
                int r = random.nextInt(10);
                log.info("{} 开始睡{}s", Thread.currentThread().getName(), r);
                TimeUnit.SECONDS.sleep(r);
                log.info("{} 干完了 {}s", Thread.currentThread().getName(), r);
                //throw new RuntimeException("出现异常");
                return true;
            });

            asyncResult.addCallback(data -> {
                try {
                    // 休息3毫秒模拟获取到执行结果后的操作
                    TimeUnit.MILLISECONDS.sleep(3);
                    log.info("{} 收到结果:{}", Thread.currentThread().getName(), data);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, ex -> log.info("**异常信息**", ex));

            asyncResultList.add(asyncResult);
        }

        System.out.println(String.format("总结耗时:%s ms", System.currentTimeMillis() - start));

        // 守护进程 定时轮询 终止超时的任务
        scheduler.scheduleAtFixedRate(() -> {
            // 模拟守护进程 终止超过6s的任务
            timeCount++;
            if (timeCount > 6) {
                for (ListenableFuture future : asyncResultList) {
                    if (!future.isDone()) {
                        log.error("future 因超时终止任务,{}", future);
                        future.cancel(true);
                    }
                }
            }
        }, 0, 1000, TimeUnit.MILLISECONDS);

    }
}

额外补充:

CompletableFuture实现了CompletionStage接口,里面很多丰富的异步编程接口。

applyToEither方法是哪个先完成,就apply哪一个结果(但是两个任务都会最终走完)

/**
 * @author tiancong
 * @date 2021/4/10 11:06
 */
@Slf4j
public class T {

    public static void main(String[] args) throws InterruptedException {

//        CompletableFuture responseFuture = within(
//                createTaskSupplier("5"), 3000, TimeUnit.MILLISECONDS);
//        responseFuture
//                .thenAccept(T::send)
//                .exceptionally(throwable -> {
//                    log.error("Unrecoverable error", throwable);
//                    return null;
//                });
//

        // 注意 exceptionally是new 的CompletableFuture
        CompletableFuture timeoutCompletableFuture = timeoutAfter(1000, TimeUnit.MILLISECONDS).exceptionally(xxx -> "超时");


        // 异步任务超时、异常处理
        List collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
//                .map(x -> within(
//                        createTaskSupplier(x), 3000, TimeUnit.MILLISECONDS)
//                        .thenAccept(T::send)
//                        .exceptionally(throwable -> {
//                            log.error("Unrecoverable error", throwable);
//                            return null;
//                        }))
                .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
                        , timeoutCompletableFuture))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
//                .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
//                , oneSecondTimeout).join())
//                .collect(Collectors.toList());
        System.out.println("-------结束------");
        System.out.println(collect.toString());

    }

    private static final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(
                    2,
                    r -> {
                        Thread thread = new Thread(r);
                        thread.setName("failAfter-%d");
                        thread.setDaemon(true);
                        return thread;
                    });

    private static String send(String s) {
        log.info("最终结果是{}", s);
        return s;
    }

    private static CompletableFuture createTaskSupplier(String x) {
        return CompletableFuture.supplyAsync(getStringSupplier(x))
                .exceptionally(Throwable::getMessage);
    }

    private static Supplier getStringSupplier(String text) {
        return () -> {
            System.out.println("开始 " + text);
            if ("1".equals(text)) {
                throw new RuntimeException("运行时错误");
            }
            try {
                if ("5".equals(text)) {
                    TimeUnit.SECONDS.sleep(5);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束 " + text);
            return text + "号";
        };
    }


    private static  CompletableFuture within(CompletableFuture future, long timeout, TimeUnit unit) {
        final CompletableFuture timeoutFuture = timeoutAfter(timeout, unit);
        // 哪个先完成 就apply哪一个结果 这是一个关键的API
        return future.applyToEither(timeoutFuture, Function.identity());
    }

    private static  CompletableFuture timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture result = new CompletableFuture();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        scheduler.schedule(() -> result.completeExceptionally(new TimeoutException("超时:" + timeout)), timeout, unit);
//        return CompletableFuture.supplyAsync(()-> (T)"另一个分支任务");
        return result;
    }

}

以上就是java中怎么实现异步处理,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。


当前文章:java中怎么实现异步处理
文章源于:http://hbruida.cn/article/pchdeg.html