2017-12-15 24 views
1

카프카 주제는 A입니다. 주제 의 데이터카프카에서 최신 값을 얻으십시오

형식은 다음과 같습니다

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000} 
{ id : 2, name:confluent, created_at:2017-09-28 22:00:00.000} 
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000} 
{ id : 4, name:apache, created_at:2017-09-28 24:41:00.000} 

지금 소비자 측면에서 내가 한 시간 창의 최신 데이터를 얻을 것을 의미합니다 내가 기반으로 주제에서 최신 값을 얻을 필요가 1 시간마다

created_at 내 예상 출력은 다음과 같습니다

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000} 
{ id : 3, name:kafka, created_at:2017-09-28 24:42:00.000} 

나는이 ksql에 의해 해결하지만 확실하지 메신저 할 수 있다고 생각합니다. 도와주세요.

감사합니다.

+0

당신의 열쇠는 무엇입니까? –

+0

키가 메시지 1, 메시지 2 등일 수 있다고 생각하십시오. 위에 언급 된 값은 – shakeel

+1

차갑습니다. Kafka Streams는 키를 기반으로하는 모든 작업을 집계/그룹화하므로 키를 염두에 두어야합니다. –

답변

3

예, KSQL을 사용할 수 있습니다. 다음보십시오 :

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAT) WITH (kafka_topic = 'topic_name', value_format = 'JSON');

CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) AS creted_at FROM s1 WINDOW TUMBLING (size 1 hour) GROUP BY id, name;

이 결과는 리눅스 타임 스탬프 형식으로 created_at 시간이있을 것이다. 새 쿼리에서 TIMESTAMPTOSTRING udf를 사용하여 원하는 형식으로 변경할 수 있습니다. 문제가 있으면 알려주세요.

+0

응답 해 주셔서 감사합니다. 1 시간 창을 10 분으로 줄일 수 있습니까? 성능 문제가 있습니까? – shakeel

+0

물론,'(size 10 minutes)'을 사용할 수 있습니다. 중요한 성능 문제가 없어야합니다. – Hojjat

+0

답장을 보내 주셔서 감사합니다. 질문 하나가 ksql에서 데이터를 메모리 또는 디스크에 저장합니까? – shakeel