0

spark에서 DataFramesUPDATE 명령을 구현하려고합니다. 하지만이 오류가 발생했습니다. 수행해야 할 것에 대한 제안을하십시오. spark-sql & DataFrames에서 UPDATE 명령을 사용하는 방법

17/01/19 11:49:39 INFO Replace$: query --> UPDATE temp SET c2 = REPLACE(c2,"i","a"); 
17/01/19 11:49:39 ERROR Main$: [1.1] failure: ``with'' expected but identifier UPDATE found 

UPDATE temp SET c2 = REPLACE(c2,"i","a"); 
^ 
java.lang.RuntimeException: [1.1] failure: ``with'' expected but identifier UPDATE found 

UPDATE temp SET c2 = REPLACE(c2,"i","a"); 

다음은 프로그램을

object Replace extends SparkPipelineJob{ 
    val logger = LoggerFactory.getLogger(getClass) 
    protected implicit val jsonFormats: Formats = DefaultFormats 

    def createSetCondition(colTypeMap:List[(String,DataType)], pattern:String, replacement:String):String = { 
    val res = colTypeMap map { 
     case (c,t) => 
     if(t == StringType) 
      c+" = REPLACE(" + c + ",\"" + pattern + "\",\"" + replacement + "\")" 
     else 
      c+" = REPLACE(" + c + "," + pattern + "," + replacement + ")" 
    } 
    return res.mkString(" , ") 
    } 

    override def execute(dataFrames: List[DataFrame], sc: SparkContext, sqlContext: SQLContext, params: String, productId: Int) : List[DataFrame] = { 
    import sqlContext.implicits._ 

    val replaceData = ((parse(params)).extractOpt[ReplaceDataSchema]).get 
    logger.info(s"Replace-replaceData --> ${replaceData}") 

    val (inputDf, (columnsMap, colTypeMap)) = (dataFrames(0), LoadInput.colMaps(dataFrames(0))) 

    val tableName = Constants.TEMP_TABLE 
    inputDf.registerTempTable(tableName) 

    val colMap = replaceData.colName map { 
     x => (x,colTypeMap.get(x).get) 
    } 
    logger.info(s"colMap --> ${colMap}") 

    val setCondition = createSetCondition(colMap,replaceData.input,replaceData.output) 
    val query = "UPDATE "+tableName+" SET "+setCondition+";" 
    logger.info(s"query --> ${query}") 

    val outputDf = sqlContext.sql(query) 
    List(outputDf) 
    } 
} 

을 일부 추가 정보입니다.

17/01/19 11:49:39 INFO Replace$: Replace-replaceData --> ReplaceDataSchema(List(SchemaDetectData(s3n://fakepath/data37.csv,None,None)),List(c2),i,a) 
17/01/19 11:49:39 INFO Replace$: colMap --> List((c2,StringType)) 

data37.csv

c1 c2 
90 nine 

필요한 경우 추가 정보를 요청하십시오.

답변

0

Spark SQL은 UPDATE 쿼리를 지원하지 않습니다. 데이터를 "수정"하려면 새 테이블을 만들려면 SELECT :

SELECT * REPLACE(c2, 'i', 'a') AS c2 FROM table