2013-07-31 5 views
21

저는 Java 8에서 CompletionStage/CompletableFuture를 사용하여 비동기 처리를 수행했습니다. 이는 잘 작동합니다. 그러나 때때로 무대에서 항목의 반복자/스트림의 비동기 처리를 수행하기를 원하며이를 수행하는 방법이없는 것 같습니다.비동기 처리를 위해 Java 8 Streams API를 사용할 수 있습니까?

특히 Stream.forEach()는 호출 후에 모든 항목이 처리되었음을 의미합니다. 나는 같은 일을 할 것이지만, 스트림이 비동기 스트리밍 결과에 의해 뒷받침되는 경우 대신 CompletionStage, 예컨대 :

CompletionStage<Void> done = stream.forEach(...); 
done.thenRun(...); 

으로 이것은 위의 코드 자체에서 완료를 기다리는 것보다 훨씬 더 좋을 것이다.

현재 Java 8 API로 어떻게 든 구성 할 수 있습니까? 해결 방법은 무엇입니까?

+0

안녕하세요! 내 대답을 업데이트했습니다. 만약 당신이 만족한다면 더 많은 평판을 얻으 려하고 있습니다.) –

+2

@ 리카르도 너무 늦을지도 모르지만 :) 나는 당신을 올바르게 이해한다면 당신이 원하는 것을 발견했습니다. http://www.reactive-streams.org/. https://github.com/reactor/reactor –

답변

20

내가 아는 한 스트림 API는 비동기 이벤트 처리를 지원하지 않습니다. Reactive Extensions for .NET과 같은 것을 원하고 Netflix에서 만든 RxJava이라는 Java 포트가 있습니다.

RxJava는지도 및 필터와 같은 Java 8 스트림과 동일한 많은 고급 작업을 지원하며 비동기식입니다.

업데이트 :이 작품에서 reactive streams 이니셔티브 지금과는 Flow 클래스 비록의 적어도 일부에 대한 지원이 포함됩니다 JDK 9 같습니다.

+19

사실, RxJava가 당신이 원하는 것입니다. Streams의 디자인 센터는 대부분 대기 시간없이 액세스 할 수있는 데이터 (데이터 구조 또는 함수 생성 중 하나)에 관한 것입니다. Rx의 디자인 센터는 비동기식으로 도착할 수있는 이벤트의 무한한 스트림에 관한 것입니다. –

7

마찬가지로 @KarolKrol은 (는) 당신에게 CompletableFuture의 스트림으로 알 수 있습니다.

cyclops-react이라는 CompletableFuture의 스트림 작업을 용이하게하기 위해 JDK8 스트림을 기반으로하는 라이브러리가 있습니다.

스트림을 작성하려면 cyclops-react의 유익한 Ike API를 사용하거나 간단한 반응 인 Stages을 사용할 수 있습니다.

+0

나는 요즘 투표하는 것을 잊는다 :) –

+0

upvotes를 위해 로비하는 대신에 @ KarolKról, 단순히 더 많은 질문에 대한 가능한 최상의 답을 써라. –

2

cyclops-react (전이 라이브러리의 저자입니다)는 스트림 처리를 위해 StreamUtils 클래스를 제공합니다. 제공하는 기능 중 하나는 FutureOperations이며 표준 스트림 터미널 작업에 대한 액세스를 제공하고 그 중 일부는 비틀어서 액세스합니다. Stream은 비동기 적으로 실행되고 결과는 CompletableFuture 내에 반환됩니다. .eg

cyclops-react FutureStreams가 비동기 적으로 데이터를 처리하도록 설계 아담으로

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) 
             .map(i->i+2) 
             .futureOperations(
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

는 지적 멋진 유창 API로, 스트림을 래핑과 같은 기능을 제공하는 편의적인 클래스 ReactiveSeq도 있습니다

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) 
             .map(i->i+2); 
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, 
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

(Futures와 Streams을 함께 섞어서) - 입출력 차단 (파일 읽기, DB 호출, 휴식 호출 등)과 관련된 멀티 스레드 작업에 특히 적합합니다.

1

스트림을 생성하고 각 요소를 CompletionStage으로 매핑하고 CompletionStage.thenCombine()을 사용하여 결과를 수집 할 수 있지만 결과 코드는 다음과 같이 간단하게 읽을 수 없습니다.

이 예는 쉽게 읽을 수있는 기능을 제공하기 위해 쉽게 변형 될 수 있습니다. 그러나 스트림의 reduce 또는 collect을 사용하려면 추가 코드가 필요합니다.

업데이트 : CompletableFuture.allOfCompletableFuture.join은 미래의 결과 모음을 미래의 결과 모음으로 변환하는 또 다른보다 읽기 쉬운 방법을 제공합니다.