MongoDB에있는 내 컬렉션의 모든 문서에는 동일한 필드가 있습니다. 내 목표는 파이썬에 pandas.DataFrame
또는 dask.DataFrame
으로로드하는 것입니다.MongoDB에서 파이썬으로 데이터로드 병렬화
필자는 병렬화하여 로딩 절차를 가속화하고 싶습니다. 제 계획은 여러 프로세스 나 스레드를 생성하는 것입니다. 각 프로세스는 컬렉션의 청크를로드 한 다음이 청크를 병합합니다.
어떻게하면 MongoDB에서 올바르게 수행 할 수 있습니까?
PostgreSQL에서 비슷한 접근 방식을 시도했습니다. 나의 초기 아이디어는 SKIP
과 LIMIT
을 SQL 쿼리에 사용했다. 각 특정 쿼리에 대해 열린 각 커서가 처음부터 데이터 테이블 읽기를 시작하고 지정된 양의 행을 건너 뛰기 때문에 실패했습니다. 그래서 추가 열을 만들고 레코드 번호를 포함하고 쿼리에서이 숫자의 범위를 지정해야했습니다.
반대로 MongoDB는 각 문서에 고유 한 ObjectID를 할당합니다. 그러나 ObjectID 하나를 다른 것으로부터 뺄 수 없다는 것을 발견했습니다. 순서 지정 작업과 비교할 수 있습니다 (작음, 큼 및 동일).
pymongo
또한 count
, limit
과 같이 내 작업에 유용한 것처럼 보이는 인덱싱 작업을 지원하는 몇 가지 메서드가있는 커서 개체를 반환합니다.
Spark 용 MongoDB 커넥터는 어떻게 든이 작업을 수행합니다. 불행히도, 나는 스칼라에 익숙하지 않아서 어떻게하는지 알아내는 것이 어렵습니다.
Mongo에서 Python으로 데이터를 병렬로로드하는 올바른 방법은 무엇입니까?
지금까지, 나는 다음과 같은 솔루션에 왔어요 : dask.dataframe.from_delayed
내부적 효율적으로 단일 스레드에서 모든 모음을로드 통과 발전기 목록을 만듭니다처럼
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
# import other modules.
collection = get_mongo_collection()
cursor = collection.find({ })
def process_document(in_doc):
out_doc = # process doc keys and values
return pd.DataFrame(out_doc)
df = dd.from_delayed((delayed(process_document)(d) for d in cursor))
그러나, 그것은 보인다.
업데이트. 나는 , 그 skip
방법은 pymongo.Cursor
의 컬렉션의 시작부터 PostgreSQL으로 시작합니다. 동일한 페이지에서 응용 프로그램의 페이지 매김 논리를 사용하는 것이 좋습니다. 지금까지 찾은 솔루션은 이것을 위해 _id
을 사용합니다. 그러나 그들은 또한 마지막으로 표시된 _id
을 저장합니다. 이는 단일 스레드에서 작동 함을 의미합니다.
업데이트 2. 공식 MongoDb Spark 커넥터에서 파티셔너 코드를 찾았습니다. https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32
처음에는이 파티셔너는 컬렉션의 모든 문서에서 키 필드를 읽고 값 범위를 계산합니다.
업데이트 3 : 나의 불완전한 해결책.직접 호출 할 때
def process_document(in_doc, other_arg):
# custom processing of incoming records
return out_doc
def compute_id_ranges(collection, query, partition_size=50):
cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING)
id_ranges = [cur[0]['_id']]
count = 1
for r in cur:
count += 1
if count > partition_size:
id_ranges.append(r['_id'])
count = 0
id_ranges.append(r['_id'])
return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])
def load_chunk(id_pair, collection, query={}, projection=None):
q = query
q.update({"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}})
cur = collection.find(q, projection)
return pd.DataFrame([process_document(d, other_arg) for d in cur])
def parallel_load(*args, **kwargs):
collection = kwargs['collection']
query = kwargs.get('query', {})
projection = kwargs.get('projection', None)
id_ranges = compute_id_ranges(collection, query)
dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
df = dd.from_delayed(dfs)
return df
collection = connect_to_mongo_and_return_collection_object(credentials)
# df = parallel_load(collection=collection)
id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)
load_chunk
완벽하게 실행 : 예외를 제기 무엇
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***)
81 return expr, {}
82 if isinstance(expr, (Iterator, list, tuple, set)):
---> 83 args, dasks = unzip((to_task_dask(e) for e in expr), 2)
84 args = list(args)
85 dsk = sharedict.merge(*dasks)
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
2342
2343 def __next__(self):
-> 2344 raise TypeError("'Collection' object is not iterable")
2345
2346 next = __next__
TypeError: 'Collection' object is not iterable
: DASK 잘못 Collection
개체를 치료하는 것 때문에 작동하지 않습니다
는 pymongo에서 예외를 가져옵니다 . 그러나 위에서 언급 한 예외 인 delayed(load_chunk)(blah-blah-blah)
이 실패합니다.
내가 당신의 직관이 바로 여기 당신이 원하는 생각에서 오류가 발생하지 않습니다 : 코드는 파이썬 3를 위해 작성되었습니다 데이터 세트의 다른 세그먼트를 얻는 여러 문법 쿼리를 구성한 다음 dask.delayed를 사용하여 병렬로로드합니다. 결국 dask.dataframe.from_delayed를 사용하여 데이터 프레임과 같은 일부 dask 컬렉션을 생성합니다. 누락 된 부분은 페이지 매김이라고 생각합니다. 더 자세한 정보를 원한다면 몽고 개발자에게 ping을 할 것입니다. – MRocklin
두 개의 미리 정의 된'_id' 사이에'_id'가있는 문서 덩어리를로드하는 함수를 작성했습니다. 'def load_chunk (id_pair, collection, query = {}, projection = None)' 이 함수를'delayed'에 래핑하면 Mongo 컬렉션을 반복하려고 시도하고 컬렉션이 반복 가능하지 않다는 Exception을 얻습니다. 'dfs = delayed (load_chunk) (id_pair, collection, query, projection) ' 죄송합니다. 지금 재현 할 수있는 예는 없습니다. – wl2776