2017-05-19 6 views
4

MongoDB에있는 내 컬렉션의 모든 문서에는 동일한 필드가 있습니다. 내 목표는 파이썬에 pandas.DataFrame 또는 dask.DataFrame으로로드하는 것입니다.MongoDB에서 파이썬으로 데이터로드 병렬화

필자는 병렬화하여 로딩 절차를 가속화하고 싶습니다. 제 계획은 여러 프로세스 나 스레드를 생성하는 것입니다. 각 프로세스는 컬렉션의 청크를로드 한 다음이 청크를 병합합니다.

어떻게하면 MongoDB에서 올바르게 수행 할 수 있습니까?

PostgreSQL에서 비슷한 접근 방식을 시도했습니다. 나의 초기 아이디어는 SKIPLIMIT을 SQL 쿼리에 사용했다. 각 특정 쿼리에 대해 열린 각 커서가 처음부터 데이터 테이블 읽기를 시작하고 지정된 양의 행을 건너 뛰기 때문에 실패했습니다. 그래서 추가 열을 만들고 레코드 번호를 포함하고 쿼리에서이 숫자의 범위를 지정해야했습니다.

반대로 MongoDB는 각 문서에 고유 한 ObjectID를 할당합니다. 그러나 ObjectID 하나를 다른 것으로부터 뺄 수 없다는 것을 발견했습니다. 순서 지정 작업과 비교할 수 있습니다 (작음, 큼 및 동일).

pymongo 또한 count, limit과 같이 내 작업에 유용한 것처럼 보이는 인덱싱 작업을 지원하는 몇 가지 메서드가있는 커서 개체를 반환합니다.

Spark 용 MongoDB 커넥터는 어떻게 든이 작업을 수행합니다. 불행히도, 나는 스칼라에 익숙하지 않아서 어떻게하는지 알아내는 것이 어렵습니다.

Mongo에서 Python으로 데이터를 병렬로로드하는 올바른 방법은 무엇입니까?

지금까지, 나는 다음과 같은 솔루션에 왔어요 : dask.dataframe.from_delayed 내부적 효율적으로 단일 스레드에서 모든 모음을로드 통과 발전기 목록을 만듭니다처럼

import pandas as pd 
import dask.dataframe as dd 
from dask.delayed import delayed 

# import other modules. 

collection = get_mongo_collection() 
cursor = collection.find({ }) 

def process_document(in_doc): 
    out_doc = # process doc keys and values 
    return pd.DataFrame(out_doc) 

df = dd.from_delayed((delayed(process_document)(d) for d in cursor)) 

그러나, 그것은 보인다.

업데이트. 나는 , 그 skip 방법은 pymongo.Cursor의 컬렉션의 시작부터 PostgreSQL으로 시작합니다. 동일한 페이지에서 응용 프로그램의 페이지 매김 논리를 사용하는 것이 좋습니다. 지금까지 찾은 솔루션은 이것을 위해 _id을 사용합니다. 그러나 그들은 또한 마지막으로 표시된 _id을 저장합니다. 이는 단일 스레드에서 작동 함을 의미합니다.

업데이트 2. 공식 MongoDb Spark 커넥터에서 파티셔너 코드를 찾았습니다. https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32

처음에는이 파티셔너는 컬렉션의 모든 문서에서 키 필드를 읽고 값 범위를 계산합니다.

업데이트 3 : 나의 불완전한 해결책.직접 호출 할 때

def process_document(in_doc, other_arg): 
    # custom processing of incoming records 
    return out_doc 

def compute_id_ranges(collection, query, partition_size=50): 
    cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING) 
    id_ranges = [cur[0]['_id']] 
    count = 1 
    for r in cur: 
     count += 1 
     if count > partition_size: 
      id_ranges.append(r['_id']) 
      count = 0 
    id_ranges.append(r['_id']) 
    return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])  


def load_chunk(id_pair, collection, query={}, projection=None): 
    q = query 
    q.update({"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}}) 
    cur = collection.find(q, projection) 

    return pd.DataFrame([process_document(d, other_arg) for d in cur]) 


def parallel_load(*args, **kwargs): 
    collection = kwargs['collection'] 
    query = kwargs.get('query', {}) 
    projection = kwargs.get('projection', None) 

    id_ranges = compute_id_ranges(collection, query) 

    dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ] 
    df = dd.from_delayed(dfs) 
    return df 

collection = connect_to_mongo_and_return_collection_object(credentials) 

# df = parallel_load(collection=collection) 

id_ranges = compute_id_ranges(collection) 
dedf = delayed(load_chunk)(id_ranges[0], collection) 

load_chunk 완벽하게 실행 : 예외를 제기 무엇

/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***) 
    81   return expr, {} 
    82  if isinstance(expr, (Iterator, list, tuple, set)): 
---> 83   args, dasks = unzip((to_task_dask(e) for e in expr), 2) 
    84   args = list(args) 
    85   dsk = sharedict.merge(*dasks) 

/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self) 
    2342 
    2343  def __next__(self): 
-> 2344   raise TypeError("'Collection' object is not iterable") 
    2345 
    2346  next = __next__ 

TypeError: 'Collection' object is not iterable 

: DASK 잘못 Collection 개체를 치료하는 것 때문에 작동하지 않습니다

는 pymongo에서 예외를 가져옵니다 . 그러나 위에서 언급 한 예외 인 delayed(load_chunk)(blah-blah-blah)이 실패합니다.

+0

내가 당신의 직관이 바로 여기 당신이 원하는 생각에서 오류가 발생하지 않습니다 : 코드는 파이썬 3를 위해 작성되었습니다 데이터 세트의 다른 세그먼트를 얻는 여러 문법 쿼리를 구성한 다음 dask.delayed를 사용하여 병렬로로드합니다. 결국 dask.dataframe.from_delayed를 사용하여 데이터 프레임과 같은 일부 dask 컬렉션을 생성합니다. 누락 된 부분은 페이지 매김이라고 생각합니다. 더 자세한 정보를 원한다면 몽고 개발자에게 ping을 할 것입니다. – MRocklin

+0

두 개의 미리 정의 된'_id' 사이에'_id'가있는 문서 덩어리를로드하는 함수를 작성했습니다. 'def load_chunk (id_pair, collection, query = {}, projection = None)' 이 함수를'delayed'에 래핑하면 Mongo 컬렉션을 반복하려고 시도하고 컬렉션이 반복 가능하지 않다는 Exception을 얻습니다. 'dfs = delayed (load_chunk) (id_pair, collection, query, projection) ' 죄송합니다. 지금 재현 할 수있는 예는 없습니다. – wl2776

답변

0

"thery're rulez, 망을 읽기":)

pymongo.Collection이 커서의 목록을 반환 방법 parallel_scan 있습니다.

업데이트. 컬렉션이 너무 자주 변경되지 않고 쿼리가 항상 동일한 경우 (이 경우)이 함수는 작업을 수행 할 수 있습니다. 쿼리 결과를 여러 컬렉션에 저장하고 병렬 검색을 실행할 수 있습니다.

+0

아직 없습니다. 몇 가지 제한 사항이 여전히 존재합니다. https://groups.google.com/d/msg/mongodb-user/8qshghoR4WU/9d3Chf7fFAAJ - 동일한 아이디어 – wl2776

1

필자는 pymongo 병렬 처리를 조사 중이었고 이것이 저에게 효과적이었습니다. 내 겸손한 게이밍 랩톱으로 4 천만 건의 문서를 처리하는 데 거의 100 분이 걸렸습니다. CPU를 100 % 활용하여 AC를 켜야했습니다.

데이터베이스를 분할하고 프로세스에 할당 된 배치를 건너 뛰기 기능을 사용했습니다.

import multiprocessing 
from pymongo import MongoClient 

def your_function(something): 
    <...> 
    return result 

def process_cursor(skip_n,limit_n): 
    print('Starting process',skip_n//limit_n,'...') 
    collection = MongoClient().<db_name>.<collection_name> 
    cursor = collection.find({}).skip(skip_n).limit(limit_n) 
    for doc in cursor:   
     <do your magic> 
     # for example: 
     result = your_function(doc['your_field'] # do some processing on each document 
     # update that document by adding the result into a new field 
     collection.update_one({'_id': doc['_id']}, {'$set': {'<new_field_eg>': result} }) 

    print('Completed process',skip_n//limit_n,'...') 


if __name__ == '__main__': 
    n_cores = 7    # number of splits (logical cores of the CPU-1) 
    collection_size = 40126904 # your collection size 
    batch_size = round(collection_size/n_cores+0.5) 
    skips = range(0, n_cores*batch_size, batch_size) 

    processes = [ multiprocessing.Process(target=process_cursor, args=(skip_n,batch_size)) for skip_n in skips] 

    for process in processes: 
     process.start() 

    for process in processes: 
     process.join() 

마지막 분할 나머지 문서보다 더 큰 제한이있을 것이다, 그러나 그것은

+0

매우 비효율적 인 솔루션입니다. 각 커서는 수집 시작부터 시작하여 일부 레코드 만 삭제합니다. 그것은 Mongo 매뉴얼에서 말해진다. – wl2776

+1

@ wl2776 사실, 나는이 요지를 매뉴얼에 언급했다. 그러나 건너 뛰기는 프로세스가 시작될 때 ** 한 번 ** 완료되며, 최악의 경우 (마지막 배치)에는 약 15 초가 소요되어 3 천 5 백만 건의 레코드를 건너 뜁니다. 건너 뛴 후에 커서는 그 지점부터 계속됩니다. 총 처리 시간 (100 분)과 비교할 때 건너 뛸 시간은 40 밀리 콜렉션에서는 무시할 수 있습니다. 성능 측면에서 각 레코드에 대한 30 줄의 정규식 명령을 사용하여 분당 평균 400,000 문서 또는 초당 6600 문서를 처리했습니다 –