일부 변환 후에 일부 데이터가 포함 된 dask 데이터 프레임이 있습니다. 나는 그 데이터를 mysql 테이블에 다시 쓰고 싶다. 데이터 프레임을 db URL로 취하고 데이터 프레임을 데이터베이스에 다시 쓰는 함수를 구현했습니다. 데이터 프레임의 데이터를 최종적으로 편집해야하기 때문에 팬더 df.to_dict('record')
을 사용하여 쓰기를 처리합니다. 내가 MySQL의에서 존경받는 테이블에 갈 때 나는 모든 열에서 같은 값으로 7702을 얻을Dask DataTrame 테이블에 쓸 DataFrame.map_partition()
from functools import partial
partial_store_partition_to_db(store_partition_to_db db_url=url)
dask_dataframe = dask_dataframe_data.map_partitions(partial_store_partition_to_db)
all_records = dask_dataframe.compute()
print len([record_dict for record_list in all_records for record_dict in record_list]] # Gives me 7700
:
함수 내 DASK 코드에서 그
def store_partition_to_db(df, db_url):
from sqlalchemy import create_engine
from mymodels import DBTableBaseModel
records_dict = df.to_dict(records)
records_to_db = []
for record in records_dict:
transformed_record = transform_record_some_how # transformed_record is a dictionary
records_to_db.append(transformed_record)
engine = create_engine(db_uri)
engine.execute(DBTableBaseModel.__table__.insert(), records_to_db)
return records_to_db
처럼 보인다 1. all_records를 해당 값으로 필터링하려고하면 사전이 반환되지 않습니다. 전에이 상황을 만난 사람이 있습니까? dask를 사용하여 paritions에서 DB 쓰기를 어떻게 처리합니까?
PS : 나는 LocalCluster를 사용하고 DASK는
'engine.execute' 대신에'to_sql' ('https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html)을 사용해 보셨습니까? – mdurant