Kinesis에서받은 각 트랜잭션에 대해 람다 함수를 실행하는 트리거가 있습니다. 생산자가 PutRecordsRequest() 메소드를 통해 여러 트랜잭션을 전송 중입니다. 람다 함수는 다음과 같습니다.S3로 올바르게 형식화 된 JSON을 작성하여 Athena/Redshift에로드하십시오.
var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
var fhStreamName = "transactions";
function writeToS3(jsonString,firehoseStreamName){
console.log("Writing to S3 : " + jsonString)
// Prepare storage to postings firehose stream...
var params = {
DeliveryStreamName: firehoseStreamName,
Record: {
Data: jsonString
}
};
// Store data!
firehose.putRecord(params, function(err, data) {
if (err) {
// This needs to be fired to Kinesis in the future...
console.log(err, err.stack);
}
else{
console.log(data);
}
});
}
function processEvent(event) {
// Convert data object because this is all that we need
var buf = new Buffer(event, "base64");
// Convert to actual string which is readable
var jsonString = buf.toString("utf8");
return jsonString;
}
exports.handler = function(event, context) {
var result = "";
// Loop events and register to firehose...
for(var i=0; i<event.Records.length; i++){
result = result + processEvent(event.Records[i].kinesis.data,fhStreamName);
}
writeToS3(result,fhStreamName);
context.done();
};
그러나 트랜잭션을 쓰면 S3에서 그들은 JSON 배열로 기록되지 않습니다. 다음은 예입니다
{
"userName" : "val1",
"betID" : "val2",
"anotherID" : val3
}{
"userName" : "val4",
"anotherID" : "val5",
"productID" : val6,
}
이 형식이 데이터가 아테나 또는 Redshift에 직접로드 할 수있다, 또는 유효한 배열에 있어야합니까? 여기서 http://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-copy-from-json.html은 여전히 Redshift에로드 할 수 있어야합니다. 여기 아테나에서 테이블을 만들 때 사용되는 특성이 있습니다
...
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://asgaard-data/data/'
가 어떻게이 데이터를로드 할 수 있습니다를 조회 할 수 있도록?
그리고 (JSON 1 in liner)는 firehose를 먹이는 람다 식에서 처리됩니까? – Mez