2014-10-29 4 views
1

Spark SQL을 사용하여 신기원에서 UTC 밀리 초로 저장된 날짜를 검색하는 방법에 대한 해결책을 찾지 못했습니다.Spark SQL - 에포크에서 UTC millis로 저장된 날짜를 선택하는 방법은 무엇입니까?

|-- dateCreated: struct (nullable = true) 

||-- $date: long (nullable = true) 

전체 스키마는 다음과 같다 : 나는이되는 NoSQL 데이터 소스 (MongoDB의에서 JSON)에서 당긴 스키마는 목표 날짜가

scala> accEvt.printSchema 
root 
|-- _id: struct (nullable = true) 
| |-- $oid: string (nullable = true) 
|-- appId: integer (nullable = true) 
|-- cId: long (nullable = true) 
|-- data: struct (nullable = true) 
| |-- expires: struct (nullable = true) 
| | |-- $date: long (nullable = true) 
| |-- metadata: struct (nullable = true) 
| | |-- another key: string (nullable = true) 
| | |-- class: string (nullable = true) 
| | |-- field: string (nullable = true) 
| | |-- flavors: string (nullable = true) 
| | |-- foo: string (nullable = true) 
| | |-- location1: string (nullable = true) 
| | |-- location2: string (nullable = true) 
| | |-- test: string (nullable = true) 
| | |-- testKey: string (nullable = true) 
| | |-- testKey2: string (nullable = true) 
|-- dateCreated: struct (nullable = true) 
| |-- $date: long (nullable = true) 
|-- id: integer (nullable = true) 
|-- originationDate: struct (nullable = true) 
| |-- $date: long (nullable = true) 
|-- processedDate: struct (nullable = true) 
| |-- $date: long (nullable = true) 
|-- receivedDate: struct (nullable = true) 
| |-- $date: long (nullable = true) 

를 내 목표는 함께 쿼리를 작성하는 것입니다

SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1] 

내 처리까지하고있다 :

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
sqlContext: org.apache.spark.sql.SQLContext = [email protected] 

scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json") 

... 
14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s 
accEvt: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[6] at RDD at SchemaRDD.scala:103 

scala> accEvt.registerAsTable("accomplishmentEvent") 
의 라인

내가 제대로하지 수있는 부두가 날짜를 추론 내 SELECT 문을 형성하는 방법은 지금

scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println) 
... 
[74475] 

을 (이 시점에서 다음베이스 라인 쿼리는 성공적으로 실행). 예를 들어, 다음은 w/o 오류를 실행하지만 모든 레코드의 개수보다는 0을 반환합니다 (74475).

scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println) 
... 
[0] 

가 나는 또한 같은 몇 가지면을 시도 : 권장

scala> val now = new java.util.Date() 
now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014 

scala> val today = now.getTime 
today: Long = 1414613115743 

scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000) 
thirtydaysago: Long = 1416316083039 


scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println) 

을, 나는 작동하는지 확인하기 위해 명명 된 필드를 선택했습니다. 그래서 :

scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println) 

반환 : 오류의

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println) 

결과 :

java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true))) 
... 
[[1376318850033]] 
[[1376319429590]] 
[[1376320804289]] 
[[1376320832835]] 
[[1376320832960]] 
[[1376320835554]] 
[[1376320914480]] 
[[41899]] 
[[1376321109341]] 
[[1376321121469]] 

는 그런 시도하고 내가 시도 작업 날짜의 어떤 종류를 얻을 수 확장

필드 이름 앞에 0을 붙입니다. 오류의 다른 종류의으로도 제안 결과 : 분명히

scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println) 
java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found 

select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5 

나는이 방법으로 저장된 날짜를 선택하는 방법을 받고 있지 않다 - 그 누구도 날이 틈을 작성 도와 드릴까요?

저는 Scala와 Spark의 최신 버전입니다. 초보적인 질문이라면 용서해주십시오.하지만 포럼과 Spark 설명서에서 검색 결과가 비어 있습니다.

감사합니다.

+0

에 그것을 만들 시간이 걸릴 수있는 것 문제가있는 부분을 분명히 밝혀주십시오. 검색어에 올바른 필터 표현식 (예 : WHERE 절)을 구성하고 있습니까? 또한, 아마도'RDD'에 데이터를 가져 오는 방법을 알아 냈을 것입니다 :'RDD'의 타입은 무엇입니까? –

+0

나는 내 게시물에 더 많은 세부 사항을 추가했다. 내가 한 일과 그 일이 나를 피하는 것이 무엇인지를보다 정확하게 설명했다. 고맙습니다. – reverend

+0

시도해 볼 두 가지 사항 : (1) accEvt.printSchema()의 전체 출력을 질문에 추가하십시오 (실제로는 맨 위의 스 니펫입니까?) (2) 오히려 특정 필드 (들)를 선택하십시오 *를 사용하여 필드 이름 지정이 작동하는지 확인하십시오. JSON이 평평하지 않은 것처럼 보입니다. 스 니펫에 표시된 필드가 dataCreated. $ data로 처리되어야하는지 궁금합니다. Spark SQL 문서의 JSON 예제는 ** 평평합니다. –

답변

1

JSON이 플랫하지 않으므로 dateCreated.$date과 같이 정규화 된 이름을 사용하여 최상위 수준 아래의 입력란을 처리해야합니다. 특정 날짜 입력란은 모두 long 유형이므로 수치 비교가 필요하며이를 수행하는 데 올바른 길을 걷는 것처럼 보입니다.

또 다른 문제는 필드 이름에 "$"문자가 있고 Spark SQL에서 해당 문자를 쿼리 할 수 ​​없다는 것입니다.한 가지 해결책은 SchemaRDD으로 JSON을 직접 읽는 대신에 (먼저 수행 한 것처럼) RDD[String]으로 읽은 다음 map 메서드를 사용하여 선택한 스칼라 문자열 조작을 수행 한 다음 SQLContextjsonRDD 메서드를 사용하여 SchemaRDD을 만듭니다.

val lines = sc.textFile(...) 
// you may want something less naive than global replacement of all "$" chars 
val linesFixed = lines.map(s => s.replaceAllLiterally("$", "")) 
val accEvt = sqlContext.jsonRDD(linesFixed) 

나는 이것을 Spark 1.1.0으로 테스트했습니다.

는 참고로, 스파크 SQL에서 인용 능력의 부족 this bug report 아마도 다른 사람에서 언급 된, 그리고 수정 최근 checked in 것을 보인다,하지만 당신을 릴리스

+0

그래서 원래의 JSON에서 mongo 내보내기가 내 날짜 개체를 중첩 된 $ 날짜 필드 이름으로 변환했습니다. 그래서 나는 당신의 아이디어를 이끌어 내고 '$ date'-> 'date'를 변환 한 다음, 예를 들어''processedDate.date "'를 사용하여 long 값에 대해 질의 할 수있었습니다. 나는 밀리 스로 날짜를 변환하고 범위를 수행하기 위해 쿼리에 주입해야하지만, 그렇게하지 않으면 날짜를 저장하는 데 드는 비용이라고 생각합니다. 이제는 몽고 수출이하는 일을 살펴보고 전략을 발전시켜야합니다. 모든 도움을 주셔서 감사합니다! – reverend

+0

명심해야 할 점은 데이터를 통과하는 동안 다른 번역을 할 수 있다는 것입니다. 에포크 밀리 초를 다른 스칼라 날짜 형식 (이 사이트의 질문/답변에있는 많은 정보)으로 변환하는 것은 매우 쉽습니다. 문제는 스파크 SQL이 실제로 처리 할 수있는 날짜의 종류를 파악해야 할 필요가 있다는 것입니다. OTOH, Spark SQL이 당신이 원하는 것을 처리 할 수 ​​없다는 사실이 밝혀지면 곧바로 Spark을 사용하여 RDD를 걸러 낼 수 있습니다. –