2017-11-30 13 views
0

일부 변환 후에 일부 데이터가 포함 된 dask 데이터 프레임이 있습니다. 나는 그 데이터를 mysql 테이블에 다시 쓰고 싶다. 데이터 프레임을 db URL로 취하고 데이터 프레임을 데이터베이스에 다시 쓰는 함수를 구현했습니다. 데이터 프레임의 데이터를 최종적으로 편집해야하기 때문에 팬더 df.to_dict('record')을 사용하여 쓰기를 처리합니다. 내가 MySQL의에서 존경받는 테이블에 갈 때 나는 모든 열에서 같은 값으로 7702을 얻을Dask DataTrame 테이블에 쓸 DataFrame.map_partition()

from functools import partial 
partial_store_partition_to_db(store_partition_to_db db_url=url) 
dask_dataframe = dask_dataframe_data.map_partitions(partial_store_partition_to_db) 
all_records = dask_dataframe.compute() 

print len([record_dict for record_list in all_records for record_dict in record_list]] # Gives me 7700 

:

함수 내 DASK 코드에서 그

def store_partition_to_db(df, db_url): 
    from sqlalchemy import create_engine 
    from mymodels import DBTableBaseModel 

    records_dict = df.to_dict(records) 
    records_to_db = [] 
    for record in records_dict: 
     transformed_record = transform_record_some_how # transformed_record is a dictionary 
     records_to_db.append(transformed_record) 

    engine = create_engine(db_uri) 
    engine.execute(DBTableBaseModel.__table__.insert(), records_to_db) 

    return records_to_db 

처럼 보인다 1. all_records를 해당 값으로 필터링하려고하면 사전이 반환되지 않습니다. 전에이 상황을 만난 사람이 있습니까? dask를 사용하여 paritions에서 DB 쓰기를 어떻게 처리합니까?

PS : 나는 LocalCluster를 사용하고 DASK는

+0

'engine.execute' 대신에'to_sql' ('https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html)을 사용해 보셨습니까? – mdurant

답변

1

문제는 내가 map_partition 방법에 메타 정보를 제공하지 않았다이었다 분산 그것 때문에 내가 회전이 DB에 기록 된 푸 값으로 ataframe을 생성