만들기 비동기 포스트 HTTP 요청 aiohttp
모듈을 수행 할 수 있습니다
import asyncio
import aiohttp
async def post(url, json):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=json) as resp:
return await resp.json()
async def main():
res = await post('http://httpbin.org/post', {'test': 'object'})
print(res['json']) # {'test': 'object'}
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
당신이 스트림을 요청해야한다 배포 성공을 추적하는 /v2/events
을 사용하려면 (API doc 참조). 이 asynchronous iteration의로 aiohttp
달성 할 수있다 : 당신은 그냥 비동기 당신이 필요로하는 이벤트를 기다리는 내용 한 줄 한 줄을 읽어, 예를 들면 : 당신이 /v2/deployments
를 사용하려면
import asyncio
import aiohttp
async def stream(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async for line in resp.content:
yield line
async def main():
async for line in stream('http://httpbin.org/stream/10'):
print(line) # check if line contains event you need
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
당신이 정기적으로 일부 지연 대기 요청해야합니다 asyncio.sleep
을 사용하십시오. 이 경우 사용자의 기능이 차단되지 않습니다.
import asyncio
import aiohttp
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
async def main():
while True:
# Make request to see if deplayment finished:
res = await get('http://httpbin.org/get')
print(res['origin']) # break if ok here
# Async wait before next try:
await asyncio.sleep(3)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
감사합니다. 나는 내 질문을 충분히 명확하게 표현하지 않은 것이 두렵다. 내가 데프 (응용 프로그램)를 배포 '비동기 아닌 다른 방법이 있는지 알고 싶습니다 : deployment_id = 포스트 (응용 프로그램)을 기다리고 가 deployment_event을 기다리고 있습니다 (deployment_id가)' 어쨌든 경쟁 조건을 가지고 . – Karsten