큰 데이터 문제로 작업 중이며 일부 동시성 및 비동기 문제에 봉착했습니다. 이 문제는 다음과 같다 :파일의 각 행에 대한 비동기 HTTP API 호출 - Python
1) 여러 큰 파일 (내가 concurrent.futures 모듈에서 ProcessPoolExecutor이 방법으로 사용하여 처리하고 ~ 4기가바이트 15 개까지 각 x)의 유무 :
def process(source):
files = os.list(source)
with ProcessPoolExecutor() as executor:
future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files}
for future in as_completed(future_to_url):
data = future.result()
2) 이제 각 파일, 나는 줄 단위로 가고 싶다. 특정 json을 만들기 위해 라인을 처리하고, 같은 2K jsons를 묶어서 응답을 얻으려는 요청을하는 API를 친다. 여기에 코드입니다 : 이제
def process_individual_file(source, input_file):
limit = 2000
with open(source+input_file) as sf:
for line in sf:
json_array.append(form_json(line))
limit -= 1
if limit == 0:
response = requests.post(API_URL, json=json_array)
#check response status here
limit = 2000
3) 문제, 정말 큰 것을 각 파일의 행 수 및 차단이 API 호출 및 응답 속도가 느린 비트 프로그램을 완료하는 데 시간의 엄청난 금액을하고있다.
4) API 호출을 수행 할 때 2000의 다음 일괄 처리를 계속 처리 할 수 있도록 API 호출 비동기을 작성하는 것이 좋습니다.
5) 지금까지 시도한 것 : asyncio을 사용하여 이것을 구현하려고했지만 거기에서 일련의 미래 작업을 수집하고 이벤트 루프를 사용하여 완료 될 때까지 기다릴 필요가 있습니다. 이런 식으로 뭔가 : 원함을 시작하기 전에 모든 작업을 수집 기다립니다이 간접적으로 이전과 동일하기 때문에
async def process_individual_file(source, input_file):
tasks = []
limit = 2000
with open(source+input_file) as sf:
for line in sf:
json_array.append(form_json(line))
limit -= 1
if limit == 0:
tasks.append(asyncio.ensure_future(call_api(json_array)))
limit = 2000
await asyncio.wait(tasks)
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(process_individual_file(source, input_file))
ioloop.close()
6) 난 정말이 이해하고 있지 않다. 누군가가이 문제의 올바른 아키텍처가되어야하는 것을 도와 줄 수 있습니까? 모든 작업을 수집하지 않고 다음 번 배치를 병렬 처리 할 수있는 기능을 사용하지 않고 API를 비동기식으로 호출하려면 어떻게해야합니까? 그것은 그들을 를 시작하기 전에 모든 작업을 수집 기다립니다이 간접적으로 이전과 동일하기 때문에
감사 @Mikhail Gerasimov 날을 수정하십시오. 실제로 나는 그것을 정확하게 이해하지 못했고 예상 결과가 나오지 않아 붙어있었습니다. 당신이 말하는 것은 의미가 있지만 또 하나의 의구심은 파일 크기 때문에 수백만 건의 요청이있을 것이기 때문에 앞으로는 모든 것을 저장할 수있는 불편 함이 있습니다. 명부. –
여기 제가 한동안 더 많은 기사를 읽었습니다. [링크] (https://hackernoon.com/controlling-python-async-creep-ec0a0f4b79ba). 다른 스레드에서 이벤트 루프를 트리거하고 해당 스레드에 내 미래를 위임하고 API 응답에서 콜백을 트리거 할 수있는 것처럼 보입니다. 지금 같은 POC에서 일하고 있습니다. 나 또한 당신의 제안을 시도하고 결과로 돌아 가자. 고마워요. :) –
@ShubhamPatil 많은 병렬 요청을 피하는 방법을 보여주는 답변이 업데이트되었습니다. –