Cassandra와 함께 Flink 스트리밍을 실험하면서 MapFunction에서 INSERT 문을 생성 할 때 흥미로운 문제가 발생했습니다. DataStream<Insert>
을 사용하면 혼란 스럽습니다 RuntimeException
이 나에게 던졌습니다. 그러나 대신 DataStream<Statement>
을 사용하면 실행 코드에서 Insert
인스턴스를 사용하더라도 예상대로 작동합니다.Cassandra와 함께 Flink의 MapFunction을 "삽입"하지만 "명령문"과 함께 사용하지 않는 경우 RuntimeException
시행 착오를 통해 해결책을 찾았지만 (여전히 DataStream<Statement>
사용),이 문제의 원인에 대해서는 여전히 혼란 스럽습니다. 의도적입니까, 버그입니까? 나는 인터넷 검색으로 어떤 설명도 찾을 수 없었으므로, 무슨 일이 일어나고 있는지 아는 사람이 있는지 여기에서 물을 수도있다.
예상 출력 (DataStream<Statement>
사용) : (DataStream<Insert>
사용)
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-638132790]
01/17/2017 15:57:42 Job execution switched to status RUNNING.
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to RUNNING
INSERT INTO tablename (name,age) VALUES ('Test Nameson',27);
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to FINISHED
01/17/2017 15:57:42 Job execution switched to status FINISHED.
오류 출력 :
Exception in thread "main" java.lang.RuntimeException: The field private java.util.List com.datastax.driver.core.querybuilder.BuiltStatement.values is already contained in the hierarchy of the class com.datastax.driver.core.querybuilder.BuiltStatement.Please use unique field names through your classes hierarchy
at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1762)
at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1683)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1580)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1479)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:737)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:565)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:366)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:305)
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:120)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:506)
at se.hiq.bjornper.testenv.cassandra.SOCassandraQueryTest.main(SOCassandraQueryTest.java:51)
코드 예제 (두 개의 다른 경우에 주석 코드를 전환) :
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
public class SOCassandraQueryTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Map<String, Object>> myDataStream = env.addSource(new RichSourceFunction<Map<String, Object>>() {
@Override
public void run(SourceContext<Map<String, Object>> ctx) throws Exception {
Map<String, Object> map = new HashMap<String, Object>();
map.put("name", "Test Nameson");
map.put("age", 27);
ctx.collect(map);
}
@Override
public void cancel() {
}
});
/* Works just fine */
DataStream<Statement> debugDatastream = myDataStream.map(new MapFunction<Map<String, Object>, Statement>() {
@Override
public Statement map(Map<String, Object> datarow) throws Exception {
Insert insert = QueryBuilder.insertInto("tablename");
for (Entry<String, Object> e : datarow.entrySet()) {
insert.value(e.getKey(), e.getValue());
}
return insert;
}
});
/* Throws RuntimeException if using "Insert" instead of "Statement" */
// DataStream<Insert> debugDatastream = myDataStream.map(new MapFunction<Map<String, Object>, Insert>() {
//
// @Override
// public Insert map(Map<String, Object> datarow) throws Exception {
// Insert insert = QueryBuilder.insertInto("tablename");
//
// for (Entry<String, Object> e : datarow.entrySet()) {
// insert.value(e.getKey(), e.getValue());
// }
// return insert;
// }
// });
debugDatastream.print();
env.execute("CassandraQueryTest");
}
}
을
환경 :
- 자바 8
- FLINK 1.1.3 (이 받는다는 패키지에서 Cassabdra 드라이버)
- 이클립스 IDE
내가이 버그에 대한 PR을 열었습니다 :
나는이 문서 페이지가 FLINK의 직렬화 스택의 작동 방식을 우리가 설명했던 가장 가까운 생각 https://github.com/apache/flink/pull/3154 – twalthr