2017-12-08 7 views
0

kafka 스트림 코드로 뭔가를 시도하고 데이터를 분할 한 후 1ms 동안 지연 또는 thread.sleep()를 추가하려고했습니다 .... 어떻게해야합니까? 나 한테 그렇게 해?kafka 스트림의 지연 기능

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> textlines = builder.stream("INTOPIC"); 
KStream<String, String> mstream = textlines 
    .mapValues(value -> value.replace("[","")); 
    .mapValues(value -> value.replace("]","")); 
    .mapValues(value -> value.replaceAll("\\},\\{" ,"\\}\\},\\{\\{")) 
    .flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); 
mstream.to("OUTTOPIC"); 
KafkaStreams streams = new KafkaStreams(builder, config); 
streams.start(); 

그래서 .flatmapvalues ​​문 뒤에 나는 그래서이 내 문이 될 수 1ms의에 대한 Thread.sleep를()를 추가 할 필요가 ...?

+0

수면이 필요한 이유는 무엇입니까? (이것은 실제로 XY 문제입니까?) – cowbert

답변

0

달성하고자하는 것이 확실하지 않지만 처리 속도를 늦추고 싶으십니까? 사용 코드에 잠을 자면 충분합니다. 이를 위해 람다 식은 실제 결과를 반환하기 전에 "sleep"을 호출해야합니다. 또는 .foreach() 또는 peek() 전화를 추가하여 거기에서 잠을 자면됩니다.

+0

정확히 어떤 유스 케이스가 처리 속도를 늦추 시나요? 이것이 종속성/경쟁 조건 문제라면 대신 NiFi와 같은 조건에서 처리를 조건부로 수행하는 것이 좋습니다. – cowbert

+0

답장을 보내 주셔서 감사합니다. 그래서 저는 모든 배열 [{ ""json data "}, {"json data "}, {"json data "}]의 스트림을 가져올 것입니다. 이것을 나눌거야. 그래서 배열 안의 레코드를 나눌 수있는 위의 코드를 작성했습니다 ..... 이제는이 레코드를 하나씩 다른 레코드로 묶어서 프로세스를 느리게 할 계획입니까, 아니면 가장 좋은 솔루션이 될까요? 이러한 기록은 스트림 리액터가 다른 하나를 받아들이는 합류 카프카 (kluka)에서 원자로로 이동해야합니다. 내 코드와 같은 묶음으로 데이터를 받아 들일 수 없습니다. –

+0

언급 한 바와 같이. 당신은'flatMap' 후에'foreach()'를 할 수 있고'foreach()'안에서 잠을 자면 –