0

개발 용 Windows 컴퓨터에 Kafka Streams 응용 프로그램을 쓰고 있습니다. 나는 단지 응용 프로그램을 실행할 때 다음 오류 얻을 카프카 스트림의 leftJoinbranch 기능을 사용하려고하면 : 그것은 카프카처럼 보인다Kafka 스트림으로 개발할 때 Lib에서 불완전한 LinkError가 발생하면 DB dll이 발생합니다.

Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll: Can't find dependent libraries 
    at java.lang.ClassLoader$NativeLibrary.load(Native Method) 
    at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941) 
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824) 
    at java.lang.Runtime.load0(Runtime.java:809) 
    at java.lang.System.load(System.java:1086) 
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78) 
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56) 
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64) 
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35) 
    at org.rocksdb.Options.<clinit>(Options.java:22) 
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115) 
    at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38) 
    at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75) 
    at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72) 
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) 
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) 
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) 
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101) 
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) 
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) 
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) 
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) 
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) 

이 DLL을 찾을 수없는,하지만 기다릴 ... 난 자바 애플리케이션 개발하기!

무엇이 문제 일 수 있습니까? 그리고 내가 왜 filter과 같은 간단한 스트리밍 작업을하려고 할 때이 오류가 과장되지 않는 이유는 무엇입니까?

업데이트 : 메시지가 브로커에 존재하는 경우에만

이 문제가 발생합니다. Kafka Streams 버전 0.10.2.1을 사용하고 있습니다.

public class KafkaStreamsMainClass { 

    private KafkaStreamsMainClass() { 
    } 

    public static void main(final String[] args) throws Exception { 
     Properties streamsConfiguration = new Properties(); 
     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams"); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092"); 
     streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "schema-registry:8082"); 
     streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
     streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 
     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); 
     KStreamBuilder builder = new KStreamBuilder(); 
     KStream<GenericRecord, GenericRecord> sourceStream = builder.stream(SOURCE_TOPIC); 

     KStream<GenericRecord, GenericRecord> finishedFiltered = sourceStream 
       .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") != null); 

     KStream<GenericRecord, GenericRecord>[] branchedStreams = sourceStream 
       .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") == null) 
       .branch((GenericRecord key, GenericRecord value) -> value.get("firstField") != null, 
         (GenericRecord key, GenericRecord value) -> value.get("secondField") != null); 

     branchedStreams[0] = finishedFiltered.join(branchedStreams[0], 
       (GenericRecord value1, GenericRecord value2) -> { 
        return value1; 
       }, JoinWindows.of(TimeUnit.SECONDS.toMillis(2))); 

     branchedStreams[1] = finishedFiltered.join(branchedStreams[1], 
       (GenericRecord value1, GenericRecord value2) -> { 
        return value1; 
       }, JoinWindows.of(TimeUnit.SECONDS.toMillis(2))); 

     KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 
     streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { 
      throwable.printStackTrace(); 
     }); 
     streams.start(); 

     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 

} 

내가 메이븐으로 다운로드 rocksdbjni-5.0.1.jar 아카이브를 열어이 문제를 제기 코드의 조각이며, 그것은 librocksdbjni-win64.dll 라이브러리가 포함되어 있습니다. 그것은 내부에서 RocksDB 바깥 쪽에서 라이브러리를 검색하려고하는 것 같습니다.

저는 Windows 7 컴퓨터에서 개발하고 있습니다.

혹시이 문제가 발생 했습니까?

+0

이것은 이상합니다. 참고로, 나는 어제 Oracle JDK 1.8이 탑재 된 Windows 10에서 Confluent의 Kafka Streams 데모 어플리케이션 (https://github.com/confluentinc/examples)의'mvn test' suite를 실행 시켰습니다. (Kafka 중개인 Windows에서의 버그 = Kafka Streams와는 무관) 모든 것이 그대로 작동했습니다. 아마도 귀하의 환경 (Windows 버전, Java 버전 등), 사용중인 Kafka Stream의 정확한 버전 및 코드를 재현하기 쉽도록 더 자세하게 제공 할 수 있습니까? –

+1

나는이 문제를 발견했다고 생각한다. 로컬 Maven 저장소에는 RocksDB, 버전 4.4.1 및 5.0.1의 두 가지 버전이 있습니다 (사용중인 Kafka Streams 0.10.2에서 사용되는 버전입니다). 4.4.1 버전을 삭제하고 문제가 없어졌습니다. 이상한 것은 Maven이 라이브러리의 이전 버전을 사용하고 있다는 것입니다. – gvdm

+0

아니요, 문제가 다시 있습니다. 브로커에 메시지가 없어 (Kafka 삭제 작업에 의해 삭제 된) 메시지가 표시되지 않았습니다. 내 질문을 필수 정보로 업데이트합니다. – gvdm

답변

1

kafka-streams 프로젝트를 최신 릴리스 버전 1.0.0으로 업데이트했습니다.

이 패치는 패치를 한 후이 패치 버전을 내부 Artifactory 서버에 업로드 한 후 this 버그가 있습니다. Windows와 Linux 모두에서 kafka-streams 에이전트를 실행할 수있었습니다. 다음 버전 1.0.1 및 1.1.0에는이 버그 수정이 포함되어 있으므로이 버전 중 하나가 출시되면 패치 된 버전 대신 해당 버전으로 전환됩니다.

요약하자면, 카프카 친구들은이 버그를 1.0.0 릴리스로 해결했습니다.

0

rocksdb dll이 의존하는 일부 기본 라이브러리가 누락되었습니다. https://github.com/facebook/rocksdb/issues/1302

+0

네, 분명하지만 Maven Java 콘솔 애플리케이션이 외부 DLL 라이브러리를 사용하려고하는 이유는 무엇입니까? 그것은 classpath에서 물건을 사용해야합니다! – gvdm

+0

안녕하세요 @ Nicholas. 나는 1.0.0으로 업그레이드하는 문제를 해결했다. 답변에있는 추가 답변 https://stackoverflow.com/questions/43742423/unsatisfiedlinkerror-on-lib-rocks-db-dll-when-developing-with-kafka-streams#48340251. 지원해 주셔서 감사합니다. – gvdm

3

최근에이 문제도 발생했습니다.

  1. C:\Users\[your_user]\AppData\Local\Temp 폴더에서 모든 librocksdbjni[...].dll 파일을 삭제 : 나는 두 단계로이 문제를 해결 할 수 있었다.
  2. 프로젝트에서 rocksdb에 대한 받는다는 종속성을 추가, 이것은 나를 위해 작동 : https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/5.0.1

당신의 카프카 스트림 응용 프로그램을 컴파일하고 실행합니다. 그것은 작동해야합니다!

+0

안녕하세요 @ David, 응답 주셔서 감사합니다. 이 솔루션은 Linux 개발 환경에서도 작동합니까, 아니면 그러한 환경에서 클래스 경로를 망칠 수 있습니까? – gvdm

+1

안녕하세요. @ gvdm, 죄송 합니다만 실제로는 Windows에서만 개발할 수있는 권한이 있기 때문에 모르겠습니다. –

+0

Ok @ David, 가능한 한 빨리 시도하고 알려 드리겠습니다. – gvdm