2013-08-08 4 views
36

이 mongoskin 사용 (다음 문서로 이동하기 전에 콜백 대기), 나는 커서 반환이 같은 쿼리를 수행 할 수 있습니다 그러나순회 직렬

myCollection.find({}, function(err, resultCursor) { 
     resultCursor.each(function(err, result) { 

     } 
} 

을, 나는 싶습니다 각 문서에 대해 일부 비동기 함수를 호출하고, async.js 모듈의 eachSeries 구조와 유사하게 커서가 다시 호출 된 후에 만 ​​커서의 다음 항목으로 이동하십시오. 예 :

myCollection.find({}, function(err, resultCursor) { 
     resultCursor.each(function(err, result) { 

      externalAsyncFunction(result, function(err) { 
       //externalAsyncFunction completed - now want to move to next doc 
      }); 

     } 
} 

어떻게하면됩니까?

감사

UPDATE :이 큰 일괄 작업이며, 결과는 한 번에 메모리에 맞지 않을 수도로 toArray()를 사용하는 wan't하지 않는

.

+0

이동하기 전에 비동기 기능이 완료되기를 기다리면서 비동기 적으로 호출하는 이유는 무엇입니까? –

+0

@RotemHermon 나는 선택의 여지가 없어! 그것은 제 기능이 아니며 비동기입니다. (myAsyncFunction을 externalAsyncFunction으로 바꿉니다 ...) – UpTheCreek

+0

왜 toArray()를 사용하지 않고 그 결과를 반복하는 재귀 함수를 사용하고 있습니까? –

답변

45

toArray를 사용하여 모든 결과를 메모리에로드하지 않으려면 다음과 같이 커서를 사용하여 반복 할 수 있습니다.

myCollection.find({}, function(err, resultCursor) { 
    function processItem(err, item) { 
    if(item === null) { 
     return; // All done! 
    } 

    externalAsyncFunction(item, function(err) { 
     resultCursor.nextObject(processItem); 
    }); 

    } 

    resultCursor.nextObject(processItem); 
} 
+0

아아아, 티모시 감사합니다. – UpTheCreek

+11

큰 데이터 세트의 경우이 방법이 저에게 적합하지 않았습니다. 나는 "RangeError : 최대 호출 스택 크기 초과"를 얻는다. –

+0

@SoichiHayashi RangeError를 발생시키는 많은 것들이 있지만, 위의 예제는 던져서는 안된다. 어쩌면 별도의 질문으로 자세한 내용을 제공하면 문제가 어디에서 발생하는지 파악할 수 있습니다. –

0

결과를 Array으로 가져와 다음과 같은 재귀 함수를 사용하여 반복 할 수 있습니다.

myCollection.find({}).toArray(function (err, items) { 
    var count = items.length; 
    var fn = function() { 
     externalAsyncFuntion(items[count], function() { 
      count -= 1; 
      if (count) fn(); 
     }) 
    } 

    fn(); 
}); 
+0

죄송합니다. 의견에 대한 귀하의 질문에 답변하기에는 너무 느 렸습니다. 결과 집합이 너무 커서 toArray를 사용할 수 없습니다. – UpTheCreek

+0

오, 오케이. 그러면 다른 대답이 당신에게 적합합니다. –

+0

사실, 데이터가 커짐에 따라 파손될 수 있으므로이 패턴을 피하는 것이 좋습니다. –

1

async lib를 사용하면 다음과 같이 할 수 있습니다. 핵심은 현재 문서가 null인지 확인하는 것입니다. 있다면, 그것은 당신이 끝났음을 의미합니다.

async.series([ 
     function (cb) { 
      cursor.each(function (err, doc) { 
       if (err) { 
        cb(err); 
       } else if (doc === null) { 
        cb(); 
       } else { 
        console.log(doc); 
        array.push(doc); 
       } 
      }); 
     } 
    ], function (err) { 
     callback(err, array); 
    }); 
+0

Hi Antoine - 내가이 접근법에서 가지고있는 문제는 각 레코드를 비동기 적으로 처리해야하는 경우 커서 루프가 완료 될 때까지 대기 할 수있는 방법이 없다는 것입니다. (cursor.each는 '다음'콜백을 제공하지 않으므로 동기화 작업 만 가능합니다.) – UpTheCreek

-2

간단한 setTimeOut을 사용할 수 있습니다.

 import mongodb = require("mongodb"); 

     var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {}); 
     var db = new mongodb.Db('myDb', dbServer); 

     var util = require('util'); 
     var when = require('when'); //npm install when 

     var dbDefer = when.defer(); 
     db.open(function() { 
      console.log('db opened...'); 
      dbDefer.resolve(db); 
     }); 

     dbDefer.promise.then(function(db : mongodb.Db){ 
      db.collection('myCollection', function (error, dataCol){ 
       if(error) { 
        console.error(error); return; 
       } 

       var doneReading = when.defer(); 

       var processOneRecordAsync = function(record) : When.Promise{ 
        var result = when.defer(); 

        setTimeout (function() { 
         //simulate a variable-length operation 
         console.log(util.inspect(record)); 
         result.resolve('record processed'); 
        }, Math.random()*5); 

        return result.promise; 
       } 

       var runCursor = function (cursor : MongoCursor){ 
        cursor.next(function(error : any, record : any){ 
         if (error){ 
          console.log('an error occurred: ' + error); 
          return; 
         } 
         if (record){ 
          processOneRecordAsync(record).then(function(r){ 
           setTimeout(function() {runCursor(cursor)}, 1); 
          }); 
         } 
         else{ 
          //cursor up 
          doneReading.resolve('done reading data.'); 
         } 
        }); 
       } 

       dataCol.find({}, function(error, cursor : MongoCursor){ 
        if (!error) 
        { 
         setTimeout(function() {runCursor(cursor)}, 1); 
        } 
       }); 

       doneReading.promise.then(function(message : string){ 
        //message='done reading data' 
        console.log(message); 
       }); 
      }); 
     }); 
0

당신이 사용할 수있는 미래 : 이것은 (난 '때'모듈을 통해 약속을 사용하고 있지만, 그것은뿐만 아니라 그들없이 수행 할 수 있습니다) nodejs에 타이프 스크립트 실행의 예입니다

myCollection.find({}, function(err, resultCursor) { 
    resultCursor.count(Meteor.bindEnvironment(function(err,count){ 
     for(var i=0;i<count;i++) 
     { 
      var itemFuture=new Future(); 

      resultCursor.nextObject(function(err,item)){ 
       itemFuture.result(item); 
      } 

      var item=itemFuture.wait(); 
      //do what you want with the item, 
      //and continue with the loop if so 

     } 
    })); 
}); 
4

다른 사람이 약속의 방법을 찾고 있다면 (nextObject의 콜백 사용과 반대), 여기 있습니다. Node v4.2.2와 mongo driver v2.1.7을 사용하고 있습니다. 이 Cursor.forEach()의 asyncSeries 버전의 종류 :

function forEachSeries(cursor, iterator) { 
    return new Promise(function(resolve, reject) { 
    var count = 0; 
    function processDoc(doc) { 
     if (doc != null) { 
     count++; 
     return iterator(doc).then(function() { 
      return cursor.next().then(processDoc); 
     }); 
     } else { 
     resolve(count); 
     } 
    } 
    cursor.next().then(processDoc); 
    }); 
} 

이를 사용하려면, 커서 및 비동기 적으로 각 문서에서 작동 반복자를 통과 (당신이 Cursor.forEach에 대해서와 같은). 대부분의 mongodb 네이티브 드라이버 함수처럼 iterator는 약속을 반환해야합니다.

예, 컬렉션 test의 모든 문서를 업데이트하려고합니다.이것은 당신이 어떻게 할 것입니다 :

이 setImmediate 사용하여 큰 데이터 세트와 함께 작동
var theDb; 
MongoClient.connect(dbUrl).then(function(db) { 
    theDb = db;  // save it, we'll need to close the connection when done. 
    var cur = db.collection('test').find(); 

    return forEachSeries(cur, function(doc) { // this is the iterator 
    return db.collection('test').updateOne(
     {_id: doc._id}, 
     {$set: {updated: true}}  // or whatever else you need to change 
    ); 
    // updateOne returns a promise, if not supplied a callback. Just return it. 
    }); 
}) 
.then(function(count) { 
    console.log("All Done. Processed", count, "records"); 
    theDb.close(); 
}) 
+0

'forEachSeries'가 호출되는 곳을 보지 못했습니다. – chovy

+0

호출 스택 오버플로입니다. – chovy

6

: 사용

var cursor = collection.find({filter...}).cursor(); 

cursor.nextObject(function fn(err, item) { 
    if (err || !item) return; 

    setImmediate(fnAction, item, arg1, arg2, function() { 
     cursor.nextObject(fn); 
    }); 
}); 

function fnAction(item, arg1, arg2, callback) { 
    // Here you can do whatever you want to do with your item. 
    return callback(); 
} 
14

더 현대적인 접근 방식 async/await :

const cursor = db.collection("foo").find({}); 
while(await cursor.hasNext()) { 
    const doc = await cursor.next(); 
    // process doc here 
} 

노트 :

  • async iterators이 도착하면 더 간단하게 수행 할 일 수 있습니다.
  • 오류 검사를 위해 try/catch를 추가하고 싶을 것입니다.
  • contains 함수는 이므로 async이어야하며 코드는 (async function() { ... })()으로 묶어야합니다.
  • 원할 경우 while 루프의 끝에 await new Promise(resolve => setTimeout(resolve, 1000)); (1 초 동안 일시 중지)을 추가하여 문서를 차례로 처리한다는 것을 보여줍니다.
+0

완벽하게 일했습니다. 고마워요. 큰 데이터 세트가있는 함정이 있다면 어떤 아이디어가 있습니까? – FireBrand

+1

위대한, 이것은 단지 충돌하는 선택한 하나와는 달리 최고의 솔루션입니다 –