2016-12-26 8 views
0

포크 프로세스에서 무한 데이터 스트림이 있습니다. 이 스트림이 모듈에 의해 처리되기를 원한다. 그리고 때로는이 스트림에서 다른 모듈에 의해 처리되도록 데이터를 복제하고 싶다. (예를 들어 데이터 스트림을 모니터링한다.하지만 흥미로운 일이 발생하면 다음 n 바이트를 추가 조사).NodeJS 스트림 분할

는 그럼 다음과 같은 시나리오를 가정하자 : 나는 프로그램을 시작하고 내가 다른 스트림 리더에 의해 1 초 동안 같은 데이터를 처리 할

  • 2 초 나중에 읽을 스트림을 소모 시작

    1. 시간이 지나면 두 번째 소비자를 닫고 싶지만 원래 소비자는 변경하지 않아야합니다. 여기

  • 이에 대한 코드입니다 : 여기

    var stream = process.stdout; 
    
    stream.pipe(detector); // Using the first consumer 
    
    function startAnotherConsumer() { 
        stream2 = new PassThrough(); 
        stream.pipe(stream2); 
    
        // use stream2 somewhere else 
    } 
    
    function stopAnotherConsumer() { 
        stream.unpipe(stream2); 
    } 
    

    내 문제는 stream2를 unpiping 것은 그것을 폐쇄하지 않는다는 것입니다. 나는 unpipe 명령 후 stream.end()를 호출 할 경우, 다음 오류와 충돌 : 심지어 두 번째 스트림에서 플러시 할 버퍼를 돕기 위해 소스 스트림을 일시 중지하려

    events.js:160 
         throw er; // Unhandled 'error' event 
        ^
    
    Error: write after end 
        at writeAfterEnd (_stream_writable.js:192:12) 
        at PassThrough.Writable.write (_stream_writable.js:243:5) 
        at Socket.ondata (_stream_readable.js:555:20) 
        at emitOne (events.js:101:20) 
        at Socket.emit (events.js:188:7) 
        at readableAddChunk (_stream_readable.js:176:18) 
        at Socket.Readable.push (_stream_readable.js:134:10) 
        at Pipe.onread (net.js:548:20) 
    

    했지만 작동 중 하나를하지 않았다 :

    function stopAnotherConsumer() { 
        stream.pause(); 
        stream2.once('unpipe', function() { 
         stream.resume(); 
         stream2.end(); 
        }); 
        stream.unpipe(stream2); 
    } 
    

    이전과 같은 오류 (after after write).

    어떻게 문제를 해결할 수 있습니까? 내 의도는 한 지점에서 스트리밍 된 데이터를 복제 한 다음 잠시 후 두 번째 스트림을 닫는 것입니다.

    Note: I tried to use this answer to make it work.

    답변

    0

    답변이 없으므로 필자는 (패치 워크) 솔루션을 게시합니다. 누군가에게 더 나은 사람이있을 경우에는 다시 돌려 놓지 마십시오.

    새로운 스트림 :

    const Writable = require('stream').Writable; 
    const Transform = require('stream').Transform; 
    
    class DuplicatorStream extends Transform { 
        constructor(options) { 
         super(options); 
    
         this.otherStream = null; 
        } 
    
        attachStream(stream) { 
         if (!stream instanceof Writable) { 
          throw new Error('DuplicatorStream argument is not a writeable stream!'); 
         } 
    
         if (this.otherStream) { 
          throw new Error('A stream is already attached!'); 
         } 
    
         this.otherStream = stream; 
         this.emit('attach', stream); 
        } 
    
        detachStream() { 
         if (!this.otherStream) { 
          throw new Error('No stream to detach!'); 
         } 
    
         let stream = this.otherStream; 
         this.otherStream = null; 
         this.emit('detach', stream); 
        } 
    
        _transform(chunk, encoding, callback) { 
         if (this.otherStream) { 
          this.otherStream.write(chunk); 
         } 
    
         callback(null, chunk); 
        } 
    } 
    
    module.exports = DuplicatorStream; 
    

    그리고 사용 :

    var stream = process.stdout; 
    var stream2; 
    
    duplicatorStream = new DuplicatorStream(); 
    stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain 
    duplicatorStream.pipe(detector); // Using the first consumer 
    
    function startAnotherConsumer() { 
        stream2 = new stream.PassThrough(); 
        duplicatorStream.attachStream(stream2); 
    
        // use stream2 somewhere else 
    } 
    
    function stopAnotherConsumer() { 
        duplicatorStream.once('detach', function() { 
         stream2.end(); 
        }); 
        duplicatorStream.detachStream(); 
    }