1

Kinesis에서받은 각 트랜잭션에 대해 람다 함수를 실행하는 트리거가 있습니다. 생산자가 PutRecordsRequest() 메소드를 통해 여러 트랜잭션을 전송 중입니다. 람다 함수는 다음과 같습니다.S3로 올바르게 형식화 된 JSON을 작성하여 Athena/Redshift에로드하십시오.

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose(); 
var fhStreamName = "transactions"; 

function writeToS3(jsonString,firehoseStreamName){ 

    console.log("Writing to S3 : " + jsonString) 

    // Prepare storage to postings firehose stream... 
    var params = { 
     DeliveryStreamName: firehoseStreamName, 
     Record: { 
      Data: jsonString 
     } 
    }; 

    // Store data! 
    firehose.putRecord(params, function(err, data) { 
     if (err) { 

      // This needs to be fired to Kinesis in the future... 
      console.log(err, err.stack); 
     } 
     else{ 
      console.log(data);    
     } 
    }); 
} 

function processEvent(event) { 

    // Convert data object because this is all that we need 
    var buf = new Buffer(event, "base64"); 

    // Convert to actual string which is readable 
    var jsonString = buf.toString("utf8"); 

    return jsonString; 
} 

exports.handler = function(event, context) { 

    var result = ""; 

    // Loop events and register to firehose... 
    for(var i=0; i<event.Records.length; i++){ 
     result = result + processEvent(event.Records[i].kinesis.data,fhStreamName); 
    } 

    writeToS3(result,fhStreamName); 

    context.done(); 
}; 

그러나 트랜잭션을 쓰면 S3에서 그들은 JSON 배열로 기록되지 않습니다. 다음은 예입니다

{ 
    "userName" : "val1", 
    "betID" : "val2", 
    "anotherID" : val3 
}{ 
    "userName" : "val4", 
    "anotherID" : "val5", 
    "productID" : val6, 
} 

이 형식이 데이터가 아테나 또는 Redshift에 직접로드 할 수있다, 또는 유효한 배열에 있어야합니까? 여기서 http://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-copy-from-json.html은 여전히 ​​Redshift에로드 할 수 있어야합니다. 여기 아테나에서 테이블을 만들 때 사용되는 특성이 있습니다

...

ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES (
    'serialization.format' = '1' 
) LOCATION 's3://asgaard-data/data/' 

가 어떻게이 데이터를로드 할 수 있습니다를 조회 할 수 있도록?

답변

0

동일한 문제가있는 다른 사람들에게 이것은 내 문제를 해결 한 코드이며 데이터는 이제 Athena, Redshift 등의 형식이 올바로 지정되었습니다. 노드에 있습니다. 데이터는 생산자로부터 Kinesis로 전송되며, 각 트랜잭션에는 트리거가 있으며 트랜잭션은 하나씩 처리되어 결국 S3에 쓰는 Firehose로 전송됩니다.

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose(); 
var fhStreamName = "transactions"; 

function processEvent(event,firehoseStreamName) { 

    // Convert data object because this is all that we need 
    var buf = new Buffer(event, "base64"); 

    // Convert to actual string which is readable 
    var jsonString = buf.toString("utf8"); 

    // Prepare storage to postings firehose stream... 
    var params = { 
     DeliveryStreamName: firehoseStreamName, 
     Record: { 
      Data: jsonString.replace(/\n|\r/g, "") + "\n" 
     } 
    }; 

    console.log("Writing : " + params.Record.Data) 

    // Store data! 
    firehose.putRecord(params, function(err, data) { 
     if (err) { 

      // This needs to be fired to Kinesis in the future... 
      console.log(err, err.stack); 
     } 
     else{ 
      //console.log(data);    
     } 
    }); 
} 

exports.handler = function(event, context) { 

    // Loop events and register to firehose... 
    for(var i=0; i<event.Records.length; i++){ 
     processEvent(event.Records[i].kinesis.data,fhStreamName); 
    } 

    context.done(); 
}; 
4

는 아테나의 경우, JSON 기록은 한 줄에 하나의 객체가 될 필요가 : 결과 파일이 잘 형성 JSON 객체 자체가 없기 때문에

{ "userName" : "val1", "betID" : "val2", "anotherID" : val3 } 
{ "userName" : "val4", "anotherID" : "val5", "productID" : val6 } 

이 직관적 보일 수 있지만,이 줄 바꿈으로 구분 텍스트는 Athena, Hive 및 유사한 처리 도구에 유용합니다. Redshift에는 동일한 구조가 적용될 것으로 생각되지만 Redshift에는 더 많은 옵션이 있습니다.

+0

그리고 (JSON 1 in liner)는 firehose를 먹이는 람다 식에서 처리됩니까? – Mez