2017-02-20 14 views
0

저는 노드 스트림에 익숙하지만 단일 파이프 단계로 많이 재사용하는 코드를 추상화하는 모범 사례에 어려움을 겪고 있습니다.노드 - 파이프 단계를 함수로 추상화

inputStream 
.pipe(csv.parse({columns:true}) 
.pipe(csv.transform(function(row) {return transform(row); })) 
.pipe(csv.stringify({header: true}) 
.pipe(outputStream); 

실제 작업 transform()에서 일어나는 :

여기에 오늘을 쓰고있어 무엇의 버전을 박탈합니다. 실제로 변경되는 유일한 사항은 inputStream, transform()outputStream입니다. 내가 말했듯이, 이것은 내가 실제로 사용하는 것의 버려진 버전이다. 나는 많은 오류 처리 및 각 파이프 단계에서 로깅, 궁극적으로 왜 내가 코드 추상화하려고 해요.

내가 작성 찾고 있어요 것은 하나의 파이프 단계는 같은입니다 :

inputStream 
.pipe(csvFunction(transform(row))) 
.pipe(outputStream); 

는 내가 이해 사투를 벌인거야 것은 스트림을 받아 하나의 함수로 그 파이프 단계를 설정하는 방법을이다 스트림을 반환합니다. 저는 through2와 같은 도서관을 보았습니다. 그러나 나는 그것이 내가 가려고하는 곳으로 가는지 어떻게 확신 할 수 없습니다.

답변

0

가 여기에 내가 함께가는 결국거야. 나는 through2 라이브러리와 streaming API of the csv library을 사용하여 내가 찾고 있던 파이프 함수를 생성했다. 내가 끝으로 이벤트 리스너를 제거 곳이 부분을 주목할 필요가

var csv = require('csv'); 
    through = require('through2'); 

module.exports = function(transformFunc) { 
    parser = csv.parse({columns:true, relax_column_count:true}), 
    transformer = csv.transform(function(row) { 
     return transformFunc(row); 
    }), 
    stringifier = csv.stringify({header: true}); 

    return through(function(chunk,enc,cb){ 
     var stream = this; 

      parser.on('data', function(data){ 
       transformer.write(data); 
      }); 

      transformer.on('data', function(data){ 
       stringifier.write(data); 
      }); 

      stringifier.on('data', function(data){ 
       stream.push(data); 
      }); 

      parser.write(chunk); 

      parser.removeAllListeners('data'); 
      transformer.removeAllListeners('data'); 
      stringifier.removeAllListeners('data'); 
      cb(); 
    }) 
} 

, 이것은 내가 너무 많은 이벤트 리스너를 생성 한 메모리 오류로 실행하기 때문이었다. 나는 처음에 once으로 이벤트를 듣고이 문제를 해결하려고 시도했으나 후속 청크가 읽히지 않게하여 다음 파이프 단계로 넘어갔습니다.

의견이나 추가 아이디어가 있으면 알려주세요.

2

는이 같은 PassThrough 클래스를 사용할 수 있습니다 :

var PassThrough = require('stream').PassThrough; 

var csvStream = new PassThrough(); 
csvStream.on('pipe', function (source) { 
    // undo piping of source 
    source.unpipe(this); 
    // build own pipe-line and store internally 
    this.combinedStream = 
    source.pipe(csv.parse({columns: true})) 
     .pipe(csv.transform(function (row) { 
     return transform(row); 
     })) 
     .pipe(csv.stringify({header: true})); 
}); 

csvStream.pipe = function (dest, options) { 
    // pipe internal combined stream to dest 
    return this.combinedStream.pipe(dest, options); 
}; 

inputStream 
    .pipe(csvStream) 
    .pipe(outputStream); 
+0

변환 (행) 참조를 스트림으로 어떻게 전달합니까? 변환 스트림을 사용하는 것이 더 좋지 않습니까? PassThrough는 실제로 스트림에서 작업하는 것보다 스트림을 모니터링하는 것이 더 낫다고 읽었습니다. – AdamPat

+0

스트림 처리 패키지 방법에 따라 다릅니다. 단순히 보이는 곳이나 참조로 넘겨 주면됩니다. 물론 변환 스트림에서 솔루션을 구현할 수도 있습니다 (https://nodejs.org/api/stream.html#stream_implementing_a_transform_stream). PassThrough 스트림으로 아이디어를 보여주는 것은 내가 찾은 가장 간단한 방법이었습니다. – Marc