2017-11-27 4 views
0

JDBC를 사용하여 spark에서 sqlserver 데이터에 대한 모든 dml 연산을 수행하고 싶지만 UPDATE 쿼리를 실행하는 동안 문제가 있습니다. 다음은 UPDATE 쿼리를 실행하는 동안 예외가 발생하는 것과 함께 실행에 사용 된 쿼리와 연결을 얻기 위해 사용되는 코드입니다. 이 문제를 극복하는 방법에 대한 도움이나 조언이 도움이 될 것입니다. 미리 감사드립니다. JDBC를 통해 스파크에서 업데이트 쿼리를 실행하는 방법

val jdbcDbTable = "dbName" 

val jdbcSqlConnStr = "jdbc:sqlserver://xxxx:portno;" + "user=xx;password=xxx;" 
val jdbcDF = sqlContext.read.format("jdbc") 
    .options(Map("driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
       "url" -> jdbcSqlConnStr, 
       "dbtable" -> jdbcDbTable)) 
    .load() 
jdbcDF.registerTempTable("customer1") 

val cust = sqlContext.sql("Select * from customer1") 
cust.show() 

문제이며, 그 선택 * 문은 적절한 결과를 반환하지만이 같은 업데이트 문을 수행 할 때

val upd = sqlContext.sql("update customer1 set C_NAME='newcustomer' " + 
     " where C_CustKey=1471774") 
upd.show() 

내가 얻을 오류 :

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'update' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0) 

== SQL == 
update customer1 set C_NAME='newcustomer' where C_CustKey=1471774 
^^^ 

답변

1

당신에게 JDBC를 통해 테이블을 갱신하지 않으면, spark 카탈로그 테이블을 갱신하려고합니다. Spark은 업데이트 문을 지원하지 않습니다.

JDBC 연결 (Spark 또는 Plain JDBC 사용)을 통해 쿼리를 실행 한 다음 데이터 프레임을 다시 가져올 수 있습니다.