Spark SQL을 사용하여 신기원에서 UTC 밀리 초로 저장된 날짜를 검색하는 방법에 대한 해결책을 찾지 못했습니다.Spark SQL - 에포크에서 UTC millis로 저장된 날짜를 선택하는 방법은 무엇입니까?
이|-- dateCreated: struct (nullable = true)
||-- $date: long (nullable = true)
전체 스키마는 다음과 같다 : 나는이되는 NoSQL 데이터 소스 (MongoDB의에서 JSON)에서 당긴 스키마는 목표 날짜가
scala> accEvt.printSchema
root
|-- _id: struct (nullable = true)
| |-- $oid: string (nullable = true)
|-- appId: integer (nullable = true)
|-- cId: long (nullable = true)
|-- data: struct (nullable = true)
| |-- expires: struct (nullable = true)
| | |-- $date: long (nullable = true)
| |-- metadata: struct (nullable = true)
| | |-- another key: string (nullable = true)
| | |-- class: string (nullable = true)
| | |-- field: string (nullable = true)
| | |-- flavors: string (nullable = true)
| | |-- foo: string (nullable = true)
| | |-- location1: string (nullable = true)
| | |-- location2: string (nullable = true)
| | |-- test: string (nullable = true)
| | |-- testKey: string (nullable = true)
| | |-- testKey2: string (nullable = true)
|-- dateCreated: struct (nullable = true)
| |-- $date: long (nullable = true)
|-- id: integer (nullable = true)
|-- originationDate: struct (nullable = true)
| |-- $date: long (nullable = true)
|-- processedDate: struct (nullable = true)
| |-- $date: long (nullable = true)
|-- receivedDate: struct (nullable = true)
| |-- $date: long (nullable = true)
를 내 목표는 함께 쿼리를 작성하는 것입니다
SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1]
내 처리까지하고있다 :
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = [email protected]
scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json")
...
14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s
accEvt: org.apache.spark.sql.SchemaRDD =
SchemaRDD[6] at RDD at SchemaRDD.scala:103
scala> accEvt.registerAsTable("accomplishmentEvent")
의 라인
내가 제대로하지 수있는 부두가 날짜를 추론 내 SELECT 문을 형성하는 방법은 지금
scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println)
...
[74475]
을 (이 시점에서 다음베이스 라인 쿼리는 성공적으로 실행). 예를 들어, 다음은 w/o 오류를 실행하지만 모든 레코드의 개수보다는 0을 반환합니다 (74475).
scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println)
...
[0]
가 나는 또한 같은 몇 가지면을 시도 : 권장
scala> val now = new java.util.Date()
now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014
scala> val today = now.getTime
today: Long = 1414613115743
scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000)
thirtydaysago: Long = 1416316083039
scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println)
을, 나는 작동하는지 확인하기 위해 명명 된 필드를 선택했습니다. 그래서 :
scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println)
반환 : 오류의
scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println)
결과 :
java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true)))
...
을
[[1376318850033]]
[[1376319429590]]
[[1376320804289]]
[[1376320832835]]
[[1376320832960]]
[[1376320835554]]
[[1376320914480]]
[[41899]]
[[1376321109341]]
[[1376321121469]]
는 그런 시도하고 내가 시도 작업 날짜의 어떤 종류를 얻을 수 확장
필드 이름 앞에 0을 붙입니다. 오류의 다른 종류의으로도 제안 결과 : 분명히
scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println)
java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found
select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5
나는이 방법으로 저장된 날짜를 선택하는 방법을 받고 있지 않다 - 그 누구도 날이 틈을 작성 도와 드릴까요?
저는 Scala와 Spark의 최신 버전입니다. 초보적인 질문이라면 용서해주십시오.하지만 포럼과 Spark 설명서에서 검색 결과가 비어 있습니다.
감사합니다.
에 그것을 만들 시간이 걸릴 수있는 것 문제가있는 부분을 분명히 밝혀주십시오. 검색어에 올바른 필터 표현식 (예 : WHERE 절)을 구성하고 있습니까? 또한, 아마도'RDD'에 데이터를 가져 오는 방법을 알아 냈을 것입니다 :'RDD'의 타입은 무엇입니까? –
나는 내 게시물에 더 많은 세부 사항을 추가했다. 내가 한 일과 그 일이 나를 피하는 것이 무엇인지를보다 정확하게 설명했다. 고맙습니다. – reverend
시도해 볼 두 가지 사항 : (1) accEvt.printSchema()의 전체 출력을 질문에 추가하십시오 (실제로는 맨 위의 스 니펫입니까?) (2) 오히려 특정 필드 (들)를 선택하십시오 *를 사용하여 필드 이름 지정이 작동하는지 확인하십시오. JSON이 평평하지 않은 것처럼 보입니다. 스 니펫에 표시된 필드가 dataCreated. $ data로 처리되어야하는지 궁금합니다. Spark SQL 문서의 JSON 예제는 ** 평평합니다. –