2017-12-29 49 views
1

Java 스파크 응용 프로그램의 형식화 된 데이터 집합의 필터 및 맵에서 람다 함수를 사용하는 데 문제가 있습니다.Spark CSV - 실제 매개 변수에 적용 가능한 생성자/메서드가 없습니다.

나는 아래의 클래스를 사용하고 2.2.0를 촉발하고이 런타임 오류를

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)" 

을 얻고있다. 샘플 데이터와 전체 예는 https://gitlab.com/opencell/test-bigdata

Dataset<CDR> cdr = spark 
      .read() 
      .format("csv") 
      .option("header", "true") 
      .option("inferSchema", "true") 
      .option("delimiter", ";") 
      .csv("CDR_SAMPLE.csv") 
      .as(Encoders.bean(CDR.class)); 

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count(); 

    System.out.println("validated entries :" + v); 

CDR 파일 정의에서 사용할 수 gitlab link

편집

val cdrCSVSchema = StructType(Array(
    StructField("timestamp", DataTypes.TimestampType), 
    StructField("quantity", DataTypes.DoubleType), 
    StructField("access", DataTypes.StringType), 
    StructField("param1", DataTypes.StringType), 
    StructField("param2", DataTypes.StringType), 
    StructField("param3", DataTypes.StringType), 
    StructField("param4", DataTypes.StringType), 
    StructField("param5", DataTypes.StringType), 
    StructField("param6", DataTypes.StringType), 
    StructField("param7", DataTypes.StringType), 
    StructField("param8", DataTypes.StringType), 
    StructField("param9", DataTypes.StringType), 
    StructField("dateParam1", DataTypes.TimestampType), 
    StructField("dateParam2", DataTypes.TimestampType), 
    StructField("dateParam3", DataTypes.TimestampType), 
    StructField("dateParam4", DataTypes.TimestampType), 
    StructField("dateParam5", DataTypes.TimestampType), 
    StructField("decimalParam1", DataTypes.DoubleType), 
    StructField("decimalParam2", DataTypes.DoubleType), 
    StructField("decimalParam3", DataTypes.DoubleType), 
    StructField("decimalParam4", DataTypes.DoubleType), 
    StructField("decimalParam5", DataTypes.DoubleType), 
    StructField("extraParam", DataTypes.StringType))) 

이고 나는 CSV 문서를로드하려면이 명령을 사용

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv") 

한 다음 인코딩 및 실행 람다 함수에이 명령을 시도하지만, 난 여전히 오류

을 얻고있다
cdr.as[CDR].filter(c => c.timestamp != null).show 

답변

0

TL, 입력 데이터 세트 (대한의 유형을 추론하는 값을 가지고 있지 않기 때문에 DR 명시 적으로 스키마를 정의 java.sql.Date 필드).

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count 

(그것은 스칼라와 나 '(내부 행 형식에서 불필요한 직렬화를 피하기 위해 아마도 해결하고 정직하게 내가 권하고 싶습니다) 해결책이 될 수 형식화되지 않은 데이터 집합 API를 사용하여 케이스를 들어

, 자바를 가정 운동으로 번역하면서 떠나는 것).

inferSchema 옵션을 사용하는 경우 대부분의 입력란을 사용할 수 없으므로 CDR_SAMPLE.csv 형식의 문자열 (대부분의 특정 유형을 추론 할 수있는 값이없는 경우 기본 유형)을 만드는 파일에서 문제가 발생합니다.

그러면 java.sql.Date 유형의 필드 (dateParam1까지 dateParam5, 유형 String)가됩니다. 즉 dateParam1dateParam5 관심 분야는, 모든 문자열 것을

import org.opencell.spark.model.CDR 
import org.apache.spark.sql.Encoders 
implicit val cdrEnc = Encoders.bean(classOf[CDR]) 
val cdrs = spark.read. 
    option("inferSchema", "true"). 
    option("delimiter", ";"). 
    option("header", true). 
    csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv") 
scala> cdrs.printSchema 
root 
|-- timestamp: timestamp (nullable = true) 
|-- quantity: integer (nullable = true) 
|-- access: string (nullable = true) 
|-- param1: string (nullable = true) 
|-- param2: string (nullable = true) 
|-- param3: string (nullable = true) 
|-- param4: string (nullable = true) 
|-- param5: string (nullable = true) 
|-- param6: string (nullable = true) 
|-- param7: string (nullable = true) 
|-- param8: string (nullable = true) 
|-- param9: string (nullable = true) 
|-- dateParam1: string (nullable = true) 
|-- dateParam2: string (nullable = true) 
|-- dateParam3: string (nullable = true) 
|-- dateParam4: string (nullable = true) 
|-- dateParam5: string (nullable = true) 
|-- decimalParam1: string (nullable = true) 
|-- decimalParam2: string (nullable = true) 
|-- decimalParam3: string (nullable = true) 
|-- decimalParam4: string (nullable = true) 
|-- decimalParam5: string (nullable = true) 
|-- extraParam: string (nullable = true) 

참고.

|-- dateParam1: string (nullable = true) 
|-- dateParam2: string (nullable = true) 
|-- dateParam3: string (nullable = true) 
|-- dateParam4: string (nullable = true) 
|-- dateParam5: string (nullable = true) 

는 "척"문제의 표면은 말한다 CDR 클래스에 정의 된 필드의 유형이 인코더를 사용하여 다른 : 문제의 근본 원인이다

private Date dateParam1; 
private Date dateParam2; 
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

합니다. Spark이 수업에서 추론 할 수있는 것의 차이점이 있습니다. 당신이 주장하기 때문에 변환없이 코드는 ... 작동했지만 한 것

cdrs.as[CDR]. // <-- HERE is the issue = types don't match 
    filter(cdr => cdr.timestamp != null). 
    show // <-- trigger conversion 

그것은하지 않습니다 정말 아무리 당신이 filter 운영자에 액세스 어떤 필드. 문제는 잘못된 실행 (및 전체 단계 Java 코드 생성)으로 이어지는 변환이 발생한다는 것입니다.

형식 유추에 사용할 값이없는 데이터 집합을 inferSchema에게 요청한 이후로 Spark에서 많은 것을 할 수 있을지 의심 스럽습니다. 가장 좋은 방법은 명시 적으로 스키마를 정의하고 schema(...) 연산자를 사용하여 스키마를 설정하는 것입니다.

+1

제 편집 내용을 확인하십시오. 동일한 오류가 발생했습니다 ... –