2017-10-24 12 views
0

스트리밍과 함께 snappy-job 쉘에서 실행할 항아리를 만들려고합니다. 집계 함수가 있고 창에서 완벽하게 작동합니다. 하지만 각 키에 대해 하나의 값을 가진 테이블이 있어야합니다. github의 예제를 기반으로 항아리 파일을 만들고 이제는 SQL 명령을 넣는 데 문제가 있습니다. 통합에 대한Snappydata - sql put into jobserver 값을 집계하지 않습니다.

내 코드 :

val resultStream: SchemaDStream = snsc.registerCQ("select publisher, cast(sum(bid)as int) as bidCount from " + 
    "AggrStream window (duration 1 seconds, slide 1 seconds) group by publisher") 

val conf = new ConnectionConfBuilder(snsc.snappySession).build() 

resultStream.foreachDataFrame(df => { 

    df.write.insertInto("windowsAgg") 

    println("Data received in streaming window") 
    df.show() 

    println("Updating table updateTable") 
    val conn = ConnectionUtil.getConnection(conf) 
    val result = df.collect() 

    val stmt = conn.prepareStatement("put into updateTable (publisher, bidCount) values " + 
    "(?,?+(nvl((select bidCount from updateTable where publisher = ?),0)))") 

    result.foreach(row => { 
     println("row" + row) 
     val publisher = row.getString(0) 
     println("publisher " + publisher) 
     val bidCount = row.getInt(1) 
     println("bidcount : " + bidCount) 

     stmt.setString(1, publisher) 
     stmt.setInt(2, bidCount) 
     stmt.setString(3, publisher) 

     println("Prepared Statement after bind variables set: " + stmt.toString()) 

     stmt.addBatch() 
    } 
    ) 
    stmt.executeBatch() 
    conn.close() 
}) 

snsc.start() 
snsc.awaitTermination() 
} 

I 업데이트하거나 테이블 updateTable에 삽입해야하지만, 갱신시에 스트림에서 하나에 추가 한 현재의 값을 명령. 그리고 지금 :

내가 코드를 실행할 때 내가 볼 : updateTable에서 선택 다시

1488487984048,publisher333,adv1,web1,geo1,11,c1 

과 :

select * from updateTable; 
PUBLISHER      |BIDCOUNT 
-------------------------------------------- 
publisher333     |11 

select * from updateTable; 
PUBLISHER      |BIDCOUNT 
-------------------------------------------- 
publisher333     |10 

가 그럼 난 카프카에 메시지를 보냈을

Bidcount 값을 추가하는 대신 덮어 씁니다. 내가 이따위-SQL에서 명령에 넣어 실행하면 하지만 완벽하게 작동 쉘 :

put into updateTable (publisher, bidcount) values ('publisher333',4+ 
(nvl((select bidCount from updateTable where publisher = 
'publisher333'),0))); 
1 row inserted/updated/deleted 
snappy> select * from updateTable; 
PUBLISHER      |BIDCOUNT 
-------------------------------------------- 
publisher333     |15 

당신이이 경우 좀 도와 주 시겠어요? Mayby 누군가가 snappydata를 사용하여 값을 삽입하거나 업데이트 할 수있는 다른 솔루션을 가지고 있습니까?

고맙습니다.

답변

0

bidCount 값은 스트리밍의 경우 tomi_update 테이블에서 읽지 만 snappy-sql의 경우 updateTable에서 읽습니다. 이것은 의도적입니까? 두 경우 모두 updateTable을 사용하고 싶습니까?

+0

죄송합니다. 오타입니다 ... 코드 예제를 변경했습니다. 나는 두 가지 경우를 처리해야한다 : 1. 테이블 updateTable에 새로운 행을 추가하고이 경우 값 bidCount는 kafka의 값과 같다. 사례 2 : 키가있는 행이 updateTable 테이블에 이미 있으므로 현재 값을 bidCount에 추가해야합니다. kafka 메시지에서 값을 – Tomtom

+0

또한 updateTable에 기본 키가 정의되어 있습니까? –

+0

예, 테이블 정의 : snsc.sql ("행을 사용하여 테이블 updateTable (게시자 varchar (32) NOT NULL PRIMARY KEY, bidCount int) 만들기") – Tomtom