2017-12-24 48 views
1

큰 데이터 문제로 작업 중이며 일부 동시성 및 비동기 문제에 봉착했습니다. 이 문제는 다음과 같다 :파일의 각 행에 대한 비동기 HTTP API 호출 - Python

1) 여러 큰 파일 (내가 concurrent.futures 모듈에서 ProcessPoolExecutor이 방법으로 사용하여 처리하고 ~ 4기가바이트 15 개까지 각 x)의 유무 :

def process(source): 
    files = os.list(source) 
    with ProcessPoolExecutor() as executor: 
     future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files} 
     for future in as_completed(future_to_url): 
      data = future.result() 

2) 이제 각 파일, 나는 줄 단위로 가고 싶다. 특정 json을 만들기 위해 라인을 처리하고, 같은 2K jsons를 묶어서 응답을 얻으려는 요청을하는 API를 친다. 여기에 코드입니다 : 이제

def process_individual_file(source, input_file): 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       response = requests.post(API_URL, json=json_array) 
       #check response status here 
       limit = 2000 

3) 문제, 정말 큰 것을 각 파일의 행 수 및 차단이 API 호출 및 응답 속도가 느린 비트 프로그램을 완료하는 데 시간의 엄청난 금액을하고있다.

4) API 호출을 수행 할 때 2000의 다음 일괄 처리를 계속 처리 할 수 ​​있도록 API 호출 비동기을 작성하는 것이 좋습니다.

5) 지금까지 시도한 것 : asyncio을 사용하여 이것을 구현하려고했지만 거기에서 일련의 미래 작업을 수집하고 이벤트 루프를 사용하여 완료 될 때까지 기다릴 필요가 있습니다. 이런 식으로 뭔가 : 원함을 시작하기 전에 모든 작업을 수집 기다립니다이 간접적으로 이전과 동일하기 때문에

async def process_individual_file(source, input_file): 
    tasks = [] 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       tasks.append(asyncio.ensure_future(call_api(json_array))) 
       limit = 2000 

    await asyncio.wait(tasks) 

ioloop = asyncio.get_event_loop() 
ioloop.run_until_complete(process_individual_file(source, input_file)) 
ioloop.close() 

6) 난 정말이 이해하고 있지 않다. 누군가가이 문제의 올바른 아키텍처가되어야하는 것을 도와 줄 수 있습니까? 모든 작업을 수집하지 않고 다음 번 배치를 병렬 처리 할 수있는 기능을 사용하지 않고 API를 비동기식으로 호출하려면 어떻게해야합니까? 그것은 그들을 를 시작하기 전에 모든 작업을 수집 기다립니다이 간접적으로 이전과 동일하기 때문에

답변

1

난 정말이 이해하고 있지 않다.

아니요, 여기에 잘못되었습니다. asyncio.Taskasyncio.ensure_future으로 만들면 즉시 call_api 코 루틴을 실행하기 시작합니다. 이 얼마나 asyncio 일에 작업 :

import asyncio 


async def test(i): 
    print(f'{i} started') 
    await asyncio.sleep(i) 


async def main(): 
    tasks = [ 
     asyncio.ensure_future(test(i)) 
     for i 
     in range(3) 
    ] 

    await asyncio.sleep(0) 
    print('At this moment tasks are already started') 

    await asyncio.wait(tasks) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

출력 : 당신의 접근 방식과

0 started 
1 started 
2 started 
At this moment tasks are already started 

문제 process_individual_file 실제로 비동기되지 않는 것입니다 : 그것은 제어를 반환하지 않고 CPU 관련 작업의 많은 양의 작업을 수행하여 asyncio 이벤트 루프. 그것은 문제입니다 - 기능을 차단하는 이벤트 루프는 불가능한 작업을 실행합니다.

async def process_individual_file(source, input_file): 
    tasks = [] 
    limit = 2000 
    with open(source+input_file) as sf: 
     for line in sf: 
      await asyncio.sleep(0) # Return control to event loop to allow it execute tasks 

      json_array.append(form_json(line)) 
      limit -= 1 

      if limit == 0: 
       tasks.append(asyncio.ensure_future(call_api(json_array))) 
       limit = 2000 

    await asyncio.wait(tasks) 

: -

난 당신이 사용할 수 있다고 생각 아주 간단하지만 효과적인 솔루션은 각 라인을 읽고 이벤트 루프 수동 예를 들어, process_individual_file을 실행 몇 시간 후에 asyncio.sleep(0)를 사용하여 제어를 반환하는 것입니다 UPD :

요청의 수백만 할 것보다 더있을 것입니다 따라서 나는 0,123,516 그들 모두를위한 미래의 객체를 저장하는 데 불편한 느낌 오전목록

의미가 있습니다. 백만 개의 병렬 네트워크 요청을 실행하면 아무 일도 일어나지 않습니다. 이 경우에 한계를 설정하는 일반적인 방법은 asyncio.Semaphore과 같은 동기화 프리미티브를 사용하는 것입니다.

나는 발전기가 파일에서 json_array을 얻도록 조언하고, 새로운 작업을 추가하기 전에 Semaphore을 획득하고 작업 준비에 풀어 놓는다. 많은 병렬 실행 작업으로부터 보호되는 깨끗한 코드를 얻을 수 있습니다.

은 다음과 같이 다음과 같이 표시됩니다

def get_json_array(input_file): 
    json_array = [] 
    limit = 2000 

    with open(input_file) as sf: 
     for line in sf: 
      json_array.append(form_json(line)) 

      limit -= 1 
      if limit == 0: 
       yield json_array # generator will allow split file-reading logic from adding tasks 

       json_array = [] 
       limit = 2000 


sem = asyncio.Semaphore(50) # don't allow more than 50 parallel requests 

async def process_individual_file(input_file): 
    for json_array in get_json_array(input_file): 
     await sem.acquire() # file reading wouldn't resume until there's some place for newer tasks 
     task = asyncio.ensure_future(call_api(json_array)) 
     task.add_done_callback(lambda t: sem.release()) # on task done - free place for next tasks 
     task.add_done_callback(lambda t: print(t.result())) # print result on some call_api done 
+0

감사 @Mikhail Gerasimov 날을 수정하십시오. 실제로 나는 그것을 정확하게 이해하지 못했고 예상 결과가 나오지 않아 붙어있었습니다. 당신이 말하는 것은 의미가 있지만 또 하나의 의구심은 파일 크기 때문에 수백만 건의 요청이있을 것이기 때문에 앞으로는 모든 것을 저장할 수있는 불편 함이 있습니다. 명부. –

+0

여기 제가 한동안 더 많은 기사를 읽었습니다. [링크] (https://hackernoon.com/controlling-python-async-creep-ec0a0f4b79ba). 다른 스레드에서 이벤트 루프를 트리거하고 해당 스레드에 내 미래를 위임하고 API 응답에서 콜백을 트리거 할 수있는 것처럼 보입니다. 지금 같은 POC에서 일하고 있습니다. 나 또한 당신의 제안을 시도하고 결과로 돌아 가자. 고마워요. :) –

+0

@ShubhamPatil 많은 병렬 요청을 피하는 방법을 보여주는 답변이 업데이트되었습니다. –