2017-12-13 4 views
0

저는 Apache Spark SQL의 초보자입니다. 아래는 Spark SQL 응용 프로그램 코드와 쿼리 결과입니다.Spark SQL 데이터 프레임 쓰기 메소드를 사용하여 MySQL 테이블 행 추가하기

SparkSession spark = SparkSession.builder().appName("Spark SQL Test") 
       .master("local[*]").getOrCreate();  

Properties connectionProperties = new Properties(); 
connectionProperties.put("driver", "com.mysql.jdbc.Driver"); 
connectionProperties.put("url", "jdbc:mysql://localhost:3306/test"); 
connectionProperties.put("user", "root"); 
connectionProperties.put("password", "password"); 

Dataset<Row> jdbcDF = spark.read().jdbc(connectionProperties.getProperty("url"), "family", connectionProperties); 
jdbcDF.show(); 
jdbcDF.printSchema(); 

그리고 그 결과는

+------+----------+--------+-------+ 
|EMP_ID|EMP_PASSWD|EMP_NAME|EMP_AGE| 
+------+----------+--------+-------+ 
| jina|  bbb| mother|  45| 
|joseph|  aaa| father|  50| 
|julian|  ccc|  son |  20| 
+------+----------+--------+-------+ 


root 
|-- EMP_ID: string (nullable = false) 
|-- EMP_PASSWD: string (nullable = false) 
|-- EMP_NAME: string (nullable = false) 
|-- EMP_AGE: integer (nullable = false) 

이다 그리고 나는 가족 테이블에 다른 행을 추가하려고합니다. 먼저 아래와 같은 SQL 명령을 사용합니다.

Dataset<Row> appendSql = spark.sql("INSERT INTO family VALUES('jane' , 'ddd', 'daughter' , 15)"); 

그런 다음 원하는 결과를 가져올 수 있습니다.

List<Object> appendData = Arrays.asList("julia", "eee", "grand mother", new Integer(70)); 
spark.createDataFrame(appendData, Object.class).write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"), "family", connectionProperties); 

그러나 위의 라인이

java.sql.BatchUpdateException: Field 'EMP_ID' doesn't have a default value 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) 
    at com.mysql.jdbc.Util.getInstance(Util.java:408) 
    at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) 
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1778) 
    at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) 
    at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:641) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.sql.SQLException: Field 'EMP_ID' doesn't have a default value 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) 
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973) 
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909) 
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527) 
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680) 
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484) 
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) 
    at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) 
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) 
    ... 15 more 

내가 컬럼의 데이터 유형 간단하기 때문에 StructField 인터페이스를 사용하지 않으려 다음과 같은 예외를 발생 아래 같은

+------+----------+--------+-------+ 
|EMP_ID|EMP_PASSWD|EMP_NAME|EMP_AGE| 
+------+----------+--------+-------+ 
| jane|  ddd|daughter|  15| 
| jina|  bbb| mother|  45| 
|joseph|  aaa| father|  50| 
|julian|  ccc|  son|  20| 
+------+----------+--------+-------+ 

그리고 내가 사용하는 쓰기 방법 . 내가 도대체 ​​뭘 잘못하고있는 겁니까?

답변

0

다음 코드로 해결됩니다.

List<String> appendData = new ArrayList<String>(); 
appendData.add("julia,eee,grandmother 2,70"); 
appendData.add("jane,ddd,daughter ,15"); 

Dataset<Row> df_1 = spark.createDataset(appendData, Encoders.STRING()).toDF(); 
Dataset<Row> df_2 = df_1.selectExpr("split(value, ',')[0] as EMP_ID", "split(value, ',')[1] as EMP_PASSWD" , "split(value, ',')[2] as EMP_NAME", "split(value, ',')[3] as EMP_AGE"); 

df_2.show(); 

df_2.write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"), "family", connectionProperties);