스트리밍과 함께 snappy-job 쉘에서 실행할 항아리를 만들려고합니다. 집계 함수가 있고 창에서 완벽하게 작동합니다. 하지만 각 키에 대해 하나의 값을 가진 테이블이 있어야합니다. github의 예제를 기반으로 항아리 파일을 만들고 이제는 SQL 명령을 넣는 데 문제가 있습니다. 통합에 대한Snappydata - sql put into jobserver 값을 집계하지 않습니다.
내 코드 :
val resultStream: SchemaDStream = snsc.registerCQ("select publisher, cast(sum(bid)as int) as bidCount from " +
"AggrStream window (duration 1 seconds, slide 1 seconds) group by publisher")
val conf = new ConnectionConfBuilder(snsc.snappySession).build()
resultStream.foreachDataFrame(df => {
df.write.insertInto("windowsAgg")
println("Data received in streaming window")
df.show()
println("Updating table updateTable")
val conn = ConnectionUtil.getConnection(conf)
val result = df.collect()
val stmt = conn.prepareStatement("put into updateTable (publisher, bidCount) values " +
"(?,?+(nvl((select bidCount from updateTable where publisher = ?),0)))")
result.foreach(row => {
println("row" + row)
val publisher = row.getString(0)
println("publisher " + publisher)
val bidCount = row.getInt(1)
println("bidcount : " + bidCount)
stmt.setString(1, publisher)
stmt.setInt(2, bidCount)
stmt.setString(3, publisher)
println("Prepared Statement after bind variables set: " + stmt.toString())
stmt.addBatch()
}
)
stmt.executeBatch()
conn.close()
})
snsc.start()
snsc.awaitTermination()
}
I 업데이트하거나 테이블 updateTable에 삽입해야하지만, 갱신시에 스트림에서 하나에 추가 한 현재의 값을 명령. 그리고 지금 :
내가 코드를 실행할 때 내가 볼 : updateTable에서 선택 다시
1488487984048,publisher333,adv1,web1,geo1,11,c1
과 :
select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |11
select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |10
가 그럼 난 카프카에 메시지를 보냈을
Bidcount 값을 추가하는 대신 덮어 씁니다. 내가 이따위-SQL에서 명령에 넣어 실행하면 하지만 완벽하게 작동 쉘 :
put into updateTable (publisher, bidcount) values ('publisher333',4+
(nvl((select bidCount from updateTable where publisher =
'publisher333'),0)));
1 row inserted/updated/deleted
snappy> select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |15
당신이이 경우 좀 도와 주 시겠어요? Mayby 누군가가 snappydata를 사용하여 값을 삽입하거나 업데이트 할 수있는 다른 솔루션을 가지고 있습니까?
고맙습니다.
죄송합니다. 오타입니다 ... 코드 예제를 변경했습니다. 나는 두 가지 경우를 처리해야한다 : 1. 테이블 updateTable에 새로운 행을 추가하고이 경우 값 bidCount는 kafka의 값과 같다. 사례 2 : 키가있는 행이 updateTable 테이블에 이미 있으므로 현재 값을 bidCount에 추가해야합니다. kafka 메시지에서 값을 – Tomtom
또한 updateTable에 기본 키가 정의되어 있습니까? –
예, 테이블 정의 : snsc.sql ("행을 사용하여 테이블 updateTable (게시자 varchar (32) NOT NULL PRIMARY KEY, bidCount int) 만들기") – Tomtom