2017-12-07 20 views
0

SQL 쿼리를 실행하고 출력 DF를 사전으로 변환하여 mongo에 대량 삽입하기 위해 pyspark와 함께 Snappydata를 사용하고 있습니다. 나는 스파크 DF를 사전으로 변환하는 것을 테스트하기 위해 많은 비슷한 질문을했다.pespark에서 사전을 만드는 가장 빠른 방법 DF

현재 map(lambda row: row.asDict(), x.collect())이 방법을 사용하여 bulk DF를 사전으로 변환합니다. 그리고 10K 기록을 위해 2-3 초가 걸립니다.

x = snappySession.sql("select * from test") 
df = map(lambda row: row.asDict(), x.collect()) 
db.collection.insert_many(df) 

가 빨리 방법이 있나요 :

나는 내 생각을 impliment 방법을 아래에 언급 한?

답변

0

내가 foreachPartition를 사용하는 것이 좋습니다 것 :

insert_to_mongo
(snappySession 
    .sql("select * from test") 
    .foreachPartition(insert_to_mongo)) 

: 내가 직접 MongoDB를에 보내는 DF 알고 있어요

def insert_to_mongo(rows): 
    client = ... 
    db = ... 
    db.collection.insert_many((row.asDict() for row in rows)) 
+0

코드를 확인 했습니까? 실행 했습니까? 그것은 나를주는 오류'AttributeError : 'itertools.chain'개체에 속성이 없습니다 'asDict'' – techie95

0

Spark에서 직접 Mongo에 글을 쓸 수 있는지 여부는 가장 좋은 방법입니다. 이것은 분산 방식으로 스파크의 모든 사전을 생성합니다

x = snappySession.sql("select * from test") 
dictionary_rdd = x.rdd.map(lambda row: row.asDict()) 

for d in dictionary_rdd.toLocalIterator(): 
    db.collection.insert_many(d) 

: 그 실패

, 당신은이 방법을 사용할 수 있습니다. 행은 드라이버로 반환되고 한 번에 한 행씩 Mongo에 삽입되므로 메모리가 부족하지 않게됩니다.

+0

. 주어진 [documentation] (https://docs.mongodb.com/spark-connector/master/python-api/)에는 db 인증이 없습니다. 이것이 내가이 방법을 선택한 이유입니다. – techie95

+0

@ShaikRizwana 나는 대답을 – Anake

+0

고맙다. @ Anake하지만 그것의 복용은 거의 12-15 초. 다른 방법으로 제안합니까? – techie95