개발 용 Windows 컴퓨터에 Kafka Streams 응용 프로그램을 쓰고 있습니다. 나는 단지 응용 프로그램을 실행할 때 다음 오류 얻을 카프카 스트림의 leftJoin
및 branch
기능을 사용하려고하면 : 그것은 카프카처럼 보인다Kafka 스트림으로 개발할 때 Lib에서 불완전한 LinkError가 발생하면 DB dll이 발생합니다.
Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll: Can't find dependent libraries
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.Options.<clinit>(Options.java:22)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38)
at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75)
at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72)
at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
이 DLL을 찾을 수없는,하지만 기다릴 ... 난 자바 애플리케이션 개발하기!
무엇이 문제 일 수 있습니까? 그리고 내가 왜 filter
과 같은 간단한 스트리밍 작업을하려고 할 때이 오류가 과장되지 않는 이유는 무엇입니까?
업데이트 : 메시지가 브로커에 존재하는 경우에만
이 문제가 발생합니다. Kafka Streams 버전 0.10.2.1을 사용하고 있습니다.
이
public class KafkaStreamsMainClass {
private KafkaStreamsMainClass() {
}
public static void main(final String[] args) throws Exception {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092");
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "schema-registry:8082");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
KStreamBuilder builder = new KStreamBuilder();
KStream<GenericRecord, GenericRecord> sourceStream = builder.stream(SOURCE_TOPIC);
KStream<GenericRecord, GenericRecord> finishedFiltered = sourceStream
.filter((GenericRecord key, GenericRecord value) -> value.get("endTime") != null);
KStream<GenericRecord, GenericRecord>[] branchedStreams = sourceStream
.filter((GenericRecord key, GenericRecord value) -> value.get("endTime") == null)
.branch((GenericRecord key, GenericRecord value) -> value.get("firstField") != null,
(GenericRecord key, GenericRecord value) -> value.get("secondField") != null);
branchedStreams[0] = finishedFiltered.join(branchedStreams[0],
(GenericRecord value1, GenericRecord value2) -> {
return value1;
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
branchedStreams[1] = finishedFiltered.join(branchedStreams[1],
(GenericRecord value1, GenericRecord value2) -> {
return value1;
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
throwable.printStackTrace();
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
내가 메이븐으로 다운로드
rocksdbjni-5.0.1.jar
아카이브를 열어이 문제를 제기 코드의 조각이며, 그것은
librocksdbjni-win64.dll
라이브러리가 포함되어 있습니다. 그것은 내부에서 RocksDB 바깥 쪽에서 라이브러리를 검색하려고하는 것 같습니다.
저는 Windows 7 컴퓨터에서 개발하고 있습니다.
혹시이 문제가 발생 했습니까?
이것은 이상합니다. 참고로, 나는 어제 Oracle JDK 1.8이 탑재 된 Windows 10에서 Confluent의 Kafka Streams 데모 어플리케이션 (https://github.com/confluentinc/examples)의'mvn test' suite를 실행 시켰습니다. (Kafka 중개인 Windows에서의 버그 = Kafka Streams와는 무관) 모든 것이 그대로 작동했습니다. 아마도 귀하의 환경 (Windows 버전, Java 버전 등), 사용중인 Kafka Stream의 정확한 버전 및 코드를 재현하기 쉽도록 더 자세하게 제공 할 수 있습니까? –
나는이 문제를 발견했다고 생각한다. 로컬 Maven 저장소에는 RocksDB, 버전 4.4.1 및 5.0.1의 두 가지 버전이 있습니다 (사용중인 Kafka Streams 0.10.2에서 사용되는 버전입니다). 4.4.1 버전을 삭제하고 문제가 없어졌습니다. 이상한 것은 Maven이 라이브러리의 이전 버전을 사용하고 있다는 것입니다. – gvdm
아니요, 문제가 다시 있습니다. 브로커에 메시지가 없어 (Kafka 삭제 작업에 의해 삭제 된) 메시지가 표시되지 않았습니다. 내 질문을 필수 정보로 업데이트합니다. – gvdm