2017-11-30 6 views
0

단일 문자열 필드를 추출하려는 Avro 객체의 DataStream이있는 간단한 프로그램이 있습니다. DataStreamTable으로 변환하고 간단한 투영법으로 쿼리를 실행합니다. (accepted_cohort_id, ADMIN_ID, after_submission : - :Apache Flink : InvalidProgramException : 테이블 프로그램을 컴파일 할 수 없습니다.

2017년 11월 29일 16시 7분 36초 출처 :>에서 사용자 정의 소스 나 프로그램을 실행하면

val kinesisConsumer = new FlinkKinesisConsumer(streamName, new UnifiedEventDeserializationSchema, consumerConfig) 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
implicit val typeInfo = TypeInformation.of(classOf[UnifiedEvent]) 
val kinesisStream = env.addSource(kinesisConsumer) 
val tableEnv = TableEnvironment.getTableEnvironment(env) 
tableEnv.registerDataStream("table1", kinesisStream); 
val query = "SELECT nd_key FROM table1" 
val result = tableEnv.sql(query) 
tableEnv.toAppendStream[org.apache.avro.util.Utf8](result).print() 
env.execute() 

, 나는 다음과 같은 예외를 얻을 , anonymous_id, APPLICATION_ID, atom_key, bd_group_key, biz_geo, braavos_purchase_id, 카테고리 cohort_id, concept_key, concept_rank, 컨텍스트 context_campaign, context_experiment, COUPON_CODE, course_key, course_rank, cta_destination, cta_location, cta_message,,745, amount_paidcta_type, 통화, decision_group_id, device_browser, device_os, device_os_version, DEVICE_TYPE, 기간, evaluation_id, EVENT_TYPE, fin_geo, in_collaboration_with, lab_id, lab_rank, 레이블, lesson_key, lesson_rank, 로케일, max_pause_duration, 메시지, MESSAGE_ID, module_key, module_rank , nd_key, nd_unit_id, nd_unit_rank, new_cohort_id, notification_id, num_concepts_completed, num_interactions가 num_lessons_completed, old_cohort_id, PART_KEY, part_rank, pause_duration, pause_reason, payment_plan, payment_provider, points_earned, points_possible, 가격, price_sheet, PRODUCT_KEY, 제품 유형 provider_charge_id, provider_refund_id, quiz_type, referrer, ref und_amount, requested_cohort_id 결과, scholarship_group_key, SEARCH_TERM, skill_level, subscription_id, suspension_length, suspension_reason, 기술, 타임 스탬프, total_concepts, total_lessons, total_time_sec, 유형, unenroll_reason, USER_ID, user_locale, USER_RESPONSE, 변형, 버전, workspace_id, workspace_session , workspace_type) -> select : (nd_key) -> to : Utf8 -> 싱크 : 이름이 (5/8) FAILED로 전환됨 org.apache.flink.api.common.InvalidProgramException : 테이블 프로그램 을 컴파일 할 수 없습니다. 이것은 버그입니다. 문제를 제기하십시오. 에서 : org.apache.flink.table.runtime.CRowOutputMapRunner.compile (33 CRowOutputMapRunner.scala)에서 : org.apache.flink.table.codegen.Compiler $의 class.compile (36 Compiler.scala)에서 에서 org.apache.flink.api.common.functions.util.FunctionUtils.openFunction (FunctionUtils.java:36)에서 : org.apache.flink.table.runtime.CRowOutputMapRunner.open (48 CRowOutputMapRunner.scala) org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators (StreamTask.java:376) 에서 에서 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open (AbstractUdfStreamOperator.java:111) org.apache.flink.s treaming.runtime.tasks.StreamTask.invoke (StreamTask.java:253) (org.apache.flink.runtime.taskmanager.Task.run (Task.java:702) at ) java.lang.Thread.run (Thread. java : 748) 원인 : org.codehaus.commons.compiler.CompileException : 행 790, 열 15 : "java.lang.CharSequence"유형 에서 "org.apache.avro.util"유형으로 지정 변환을 수행 할 수 없습니다. org.codehaus.janino.UnitCompiler.assignmentConversion에서 org.codehaus.janino.UnitCompiler.compileError (UnitCompiler.java:11672) (UnitCompiler.java:10528) 조직에서 에서 UTF8 ".codehaus.janino.UnitCompiler.compile2 org.codehaus.janino.UnitCompiler $ 6.visitLocalVariableDeclarationStatement에서 (UnitCompiler.java:2534) org.codehaus.janino.UnitCompiler.access $ 2600 (UnitCompiler.java:212) 에서 (UnitCompiler. 자바 : 1459) org.codehaus.janino.UnitCompiler $ 6.visitLocalVariableDeclarationStatement (UnitCompiler.java:1443) 에서 org.codehaus에서 org.codehaus.janino.Java $ LocalVariableDeclarationStatement.accept (Java.java:3348) 에서 org.codehaus.janino.UnitCompiler.compile 에서 .janino.UnitCompiler.compile (UnitCompiler.java:1443)에서 org.codehaus.janino.UnitCompiler.compileStatements (UnitCompiler.java:1523) org.codehaus 에서 org.codehaus.janino.UnitCompiler.compileDeclaredMethods (UnitCompiler.java:1286)에서 org.codehaus.janino.UnitCompiler.compileDeclaredMethods (UnitCompiler.java:1313)에서 (UnitCompiler.java:3052) .janino.UnitCompiler.compile2 (UnitCompiler.java:785) org.codehaus.janino.UnitCompiler.compile2 (UnitCompiler.java:436)에서 $ 400 org.codehaus.janino.UnitCompiler.access (UnitCompiler.java:212) 에서 org.codehaus.janino.UnitCompiler $ 2.visitPackageMemberClassDeclaration (UnitCompiler.java:385) 에서 org.codehaus.janino.UnitCompiler $ 2.visitPackageMemberClassDeclaration (UnitCompiler.java:390) 에서 org.codehaus.janino.UnitCompiler.compileUnit에서 org.codehaus.janino.UnitCompiler.compile에서org.codehaus.janino.Java $ PackageMemberClassDeclaration.accept (Java.java:1405) (UnitCompiler.java:385) (org.codehaus.janino 에서 org.codehaus.janino.SimpleCompiler.compileToClassLoader (SimpleCompiler.java:446)에서 org.codehaus.janino.SimpleCompiler.cook에서 UnitCompiler.java:357) (SimpleCompiler.java:234) .SimpleCompiler.cook org.codehaus.commons.compiler.Cookable.cook (Cookable.java:80)에서 org.codehaus.janino.SimpleCompiler.cook (SimpleCompiler.java:204)에서 (SimpleCompiler.java:213)에서에 org.codehaus.commons.compiler.Cookable.cook (Cookable.java:75) $org.apache.flink.table.codegen.Compiler class.compile (Compiler.scala 33) ... 8

답변

0

이상의 오브젝트의 nd_key 브로 유형의 필드가 처리되고, 예 java.lang.CharSequence SQL 쿼리에 의해. 쿼리의 결과가 DataStream[Utf8]로 변환 할 수 야해 가야 toAppendStream[org.apache.avro.util.Utf8]를 호출하여

, 당신은 요청합니다. 그러나 FLINK 자동 CharSequenceUtf8로 변환 할 수 없습니다.

toAppendStream[java.lang.CharSequence]toAppendStream[org.apache.avro.util.Utf8]을 변경하려고합니다.

FLINK 어떤 버전을 사용하고 있습니까?