2016-12-05 6 views
2

Spark Dataset을 기존 Postgresql 테이블에 쓰려고합니다 (열 유형과 같은 테이블 메타 데이터를 변경할 수 없음). 이 표의 열 중 하나는 HStore 유형이며 문제의 원인입니다. 나는 (탈출 할 때 빈 문자열을 제공 여기에 원래지도가 비어) 쓰기 시작할 때Spark Dataset을 사용하여 PostgreSQL hstore를 작성하는 방법

나는 다음과 같은 예외를 참조하십시오

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause. 
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136) 
    at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419) 
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308) 
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004) 
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187) 
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212) 
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351) 
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying 

이것은 어떻게 내가 그 일을 해요 :

def escapePgHstore[A, B](hmap: Map[A, B]) = { 
    hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",") 
} 
... 
val props = new Properties() 
props.put("user", "xxxxxxx") 
props.put("password", "xxxxxxx") 

ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column")) 
    .drop("original_column") 
    .coalesce(1).write 
    .mode(org.apache.spark.sql.SaveMode.Append) 
    .option("driver", "org.postgresql.Driver") 
    .jdbc(jdbcUrl, hashedTablePartName, props) 

내가 escapePgHstore 나는 다음과 같은 오류를 참조하여 문자열에지도 [문자열, 긴]에서 original_column 탈출하지 않는 경우 :

java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint> 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292) 
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.App$class.main(App.scala:76) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
,691을

스파크가 유효한 hstore 데이터 유형을 작성하는 올바른 방법은 무엇입니까 ??

답변

2

필자는 postgres가 적절한 유형의 열을 추측하도록했습니다. official documentation에 설명 된대로 연결 문자열에서 stringtypeunspecified으로 설정하십시오.

props.put("stringtype", "unspecified") 

이제 완벽하게 작동합니다!

+1

이 나를 위해 큰 일! 당신은 저에게 **** 시간의 부하를 덜어 줬습니다. 그리고 이것은 제가 주제에서 찾을 수있는 유일한 정보였습니다. 즉, 한 가지 더 중요한 부분을 발견했습니다. 작성중인'hstore' 열은 이미 존재해야합니다. Spark이 사용하고있는'SaveMode'가 "덮어 쓰기"로 설정되어 있다면, Postgres는 텍스트를'hstore' 열로 파싱하려고 시도하지 않습니다. Spark은 Postgres에게'text' 칼럼임을 알려줍니다. – mtrewartha

0

HSTORE JSON 및 JSONB 열이있는 Postgres 테이블에 데이터 프레임을 쓰는 데 사용하는 코드입니다. 따라서 Postgres에서 생성 된 Spark Dataframe에서 생성 할 수없는 복잡한 데이터 유형은 일반적으로 옵션 또는 데이터 프레임에서 SQL 로의 쓰기 기능으로 설정하려는 속성에서 stringtype="unspecified"을 지정해야합니다.

다음은 write() 기능을 사용하여 PostgreSQL의 테이블에 스파크 Dataframe를 작성하는 예입니다

dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save()