0

모든 필드가 varchar 유형 인 하나의 Oracle 테이블에서 데이터를 읽고 유사한 필드가있는 다른 Oracle 테이블에 저장하는 것이 좋습니다.하지만 이상적으로는 올바른 데이터 유형. 이 작업은 java에서만 수행해야합니다.Apache Spark with Java, Oracle에서 Varchar2의 날짜 유형으로 변환하지 못함

create table employeeStr (
name varchar2(50), 
empid varchar2(50), 
age varchar2(50), 
salary varchar2(50), 
dt_joined varchar2(50)); 

을 아래 표에 기록 :

내 자바 코드는 다음과 같습니다
create table employeeNorm (
name varchar2(50), 
empid number, 
age number(3,0), 
salary number(10,2), 
dt_joined date); 

:

SparkSession sparkSession = 
     SparkSession.builder().master("local[*]").appName("HandlingOracleDataTypes").getOrCreate(); 

SQLContext sqlContext = sparkSession.sqlContext(); 

sqlContext.udf().register("toDate", new UDF1<String, java.sql.Date>() { 
    @Override 
    public java.sql.Date call(String dateStr) throws Exception { 
     Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr); 
     return new java.sql.Date(date.getTime()); 
    } 
}, DataTypes.DateType); 

sqlContext.udf().register("toDate2", new UDF1<String, Date>() { 
    @Override 
    public Date call(String dateStr) throws Exception { 
     Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr); 
     return date; 
    } 
}, DataTypes.DateType); 

sqlContext.udf().register("toDate3", new UDF1<String, String>() { 
    @Override 
    public String call(String dateStr) throws Exception { 
     Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr); 
     return new SimpleDateFormat("dd-MMM-yyyy").format(date); 
    } 
}, DataTypes.StringType); 

Properties connectionProperties = new Properties(); 
connectionProperties.put("user", "<username>"); 
connectionProperties.put("password", "<password>"); 

String jdbcUrl = "<jdbcurl>"; 

Dataset<Row> employeeStrDS = sparkSession.read().jdbc(jdbcUrl, "employeeStr", connectionProperties); 

employeeStrDS.show(); 
employeeStrDS.printSchema(); 

employeeStrDS.withColumn("empid",employeeStrDS.col("empid").cast(DataTypes.IntegerType)); 
employeeStrDS.withColumn("age",employeeStrDS.col("age").cast(DataTypes.IntegerType)); 
employeeStrDS.withColumn("salary",employeeStrDS.col("salary").cast(DataTypes.FloatType)); 
//employeeStrDS.withColumn("dt_joined",employeeStrDS.col("dt_joined").cast(DataTypes.DateType)); 
//employeeStrDS.registerTempTable("abc"); 
//sqlContext.sql("select toDate(dt_joined) from abc").show(); 

employeeStrDS.withColumn("dt_joined", functions.callUDF("toDate3", employeeStrDS.col("dt_joined"))); 
//employeeStrDS.printSchema(); 
employeeStrDS.write().mode(SaveMode.Append).jdbc(jdbcUrl, "employeeNorm", connectionProperties); 

나는 "dt_joined 제거하면 그래서 아래 표에서 데이터 집합을 읽을 수 "테이블과 코드의 열,이 코드는 작동하지만 그림에"dt_joined "열을 가져 오면 아무 것도 작동하지 않습니다. 코드에 언급 된 3 개의 UDF를 모두 시도했지만 예외가 발생할 때마다 예외가 발생합니다. 이에 대한 해결책을 제안하십시오.

Caused by: java.sql.BatchUpdateException: ORA-01861: literal does not match format string 

    at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:12296) 
    at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:246) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.sql.SQLDataException: ORA-01861: literal does not match format string 

    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450) 
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399) 
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059) 
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522) 
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257) 
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587) 
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225) 

업데이트 : 실제 시나리오는 스파크 코드, 임팔라에서 데이터를 읽어 dataframes를 생성합니다. 임팔라 테이블에는 모든 열이 문자열로 있습니다. 그래서 기본적으로 스키마와 함께 데이터 프레임을 모든 문자열로 이상적인 데이터 유형을 가진 Oracle 테이블로 저장합니다.

+0

필자는'spark-shell'을 사용하고 Oracle 테이블에 단 하나의 행만 저장하는 코드를 작성하도록 제안합니다. 그것으로 당신은 형식이 맞는지 아닌지를 알 수 있습니다 (최소한). –

답변

2

Oracle DB에서 Spark Cluster로 모든 데이터를 가져온 다음 Oracle DB로 다시 가져올 필요가 있기 때문에이 경우 Spark을 사용하면 이익을 얻을 수 있다고 생각하지 않습니다. SQL을 사용하면 모든 것을 Oracle DB 내부에서 수행 할 수 있습니다. 당신이해야 할 모든 (오라클 DB 측) 다음과 같은 SQL 문을 실행하는 것입니다

insert into employeeNorm 
select name, empid, age, salary, to_date(dt_joined, 'yyyy-mm-dd') 
from employeeStr; 

commit; 

하면 해당 날짜 형식 'yyyy-mm-dd'을 교체해야을 - ... 자세한 내용은 아래

참고를 참조하십시오 date/time format in Oracle's to_date() function은 표준 UNIX 형식과 호환되지 않습니다. employeeStr이 모든 것을 너무 큰 경우 sqlplus를 사용하여, 자바 등

PS -

Oracle  UNIX 
------  ---- 
YYYY  %Y 
YY   %y 
MM   %m 
DD   %d 
HH24  %H 
MI   %M 
SS   %S 

그것은 어떻게 그 문을 실행하는 방법에이야 : 여기

는 최소한의 매핑 한 거래에서 BULK INSERT in chunks을 고려해야합니다.

+0

업데이트를 참조하십시오. 실제 시나리오는 Impala DB에서 작성되므로 Spark 만 사용해야합니다. – abhihello123

+0

@ abhihello123, 작은 재현 가능한 샘플 데이터 세트를 제공하십시오. – MaxU