모든 필드가 varchar 유형 인 하나의 Oracle 테이블에서 데이터를 읽고 유사한 필드가있는 다른 Oracle 테이블에 저장하는 것이 좋습니다.하지만 이상적으로는 올바른 데이터 유형. 이 작업은 java에서만 수행해야합니다.Apache Spark with Java, Oracle에서 Varchar2의 날짜 유형으로 변환하지 못함
create table employeeStr (
name varchar2(50),
empid varchar2(50),
age varchar2(50),
salary varchar2(50),
dt_joined varchar2(50));
을 아래 표에 기록 :
내 자바 코드는 다음과 같습니다create table employeeNorm (
name varchar2(50),
empid number,
age number(3,0),
salary number(10,2),
dt_joined date);
:
SparkSession sparkSession =
SparkSession.builder().master("local[*]").appName("HandlingOracleDataTypes").getOrCreate();
SQLContext sqlContext = sparkSession.sqlContext();
sqlContext.udf().register("toDate", new UDF1<String, java.sql.Date>() {
@Override
public java.sql.Date call(String dateStr) throws Exception {
Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr);
return new java.sql.Date(date.getTime());
}
}, DataTypes.DateType);
sqlContext.udf().register("toDate2", new UDF1<String, Date>() {
@Override
public Date call(String dateStr) throws Exception {
Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr);
return date;
}
}, DataTypes.DateType);
sqlContext.udf().register("toDate3", new UDF1<String, String>() {
@Override
public String call(String dateStr) throws Exception {
Date date = new SimpleDateFormat("yyyyMMdd").parse(dateStr);
return new SimpleDateFormat("dd-MMM-yyyy").format(date);
}
}, DataTypes.StringType);
Properties connectionProperties = new Properties();
connectionProperties.put("user", "<username>");
connectionProperties.put("password", "<password>");
String jdbcUrl = "<jdbcurl>";
Dataset<Row> employeeStrDS = sparkSession.read().jdbc(jdbcUrl, "employeeStr", connectionProperties);
employeeStrDS.show();
employeeStrDS.printSchema();
employeeStrDS.withColumn("empid",employeeStrDS.col("empid").cast(DataTypes.IntegerType));
employeeStrDS.withColumn("age",employeeStrDS.col("age").cast(DataTypes.IntegerType));
employeeStrDS.withColumn("salary",employeeStrDS.col("salary").cast(DataTypes.FloatType));
//employeeStrDS.withColumn("dt_joined",employeeStrDS.col("dt_joined").cast(DataTypes.DateType));
//employeeStrDS.registerTempTable("abc");
//sqlContext.sql("select toDate(dt_joined) from abc").show();
employeeStrDS.withColumn("dt_joined", functions.callUDF("toDate3", employeeStrDS.col("dt_joined")));
//employeeStrDS.printSchema();
employeeStrDS.write().mode(SaveMode.Append).jdbc(jdbcUrl, "employeeNorm", connectionProperties);
나는 "dt_joined 제거하면 그래서 아래 표에서 데이터 집합을 읽을 수 "테이블과 코드의 열,이 코드는 작동하지만 그림에"dt_joined "열을 가져 오면 아무 것도 작동하지 않습니다. 코드에 언급 된 3 개의 UDF를 모두 시도했지만 예외가 발생할 때마다 예외가 발생합니다. 이에 대한 해결책을 제안하십시오.
Caused by: java.sql.BatchUpdateException: ORA-01861: literal does not match format string
at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:12296)
at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:246)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLDataException: ORA-01861: literal does not match format string
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1059)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:522)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:257)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:587)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:225)
업데이트 : 실제 시나리오는 스파크 코드, 임팔라에서 데이터를 읽어 dataframes를 생성합니다. 임팔라 테이블에는 모든 열이 문자열로 있습니다. 그래서 기본적으로 스키마와 함께 데이터 프레임을 모든 문자열로 이상적인 데이터 유형을 가진 Oracle 테이블로 저장합니다.
필자는'spark-shell'을 사용하고 Oracle 테이블에 단 하나의 행만 저장하는 코드를 작성하도록 제안합니다. 그것으로 당신은 형식이 맞는지 아닌지를 알 수 있습니다 (최소한). –