Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
576 views
in Technique[技术] by (71.8m points)

python - await asyncio.gather() is hanging

I'm working on a script that will send > 1M lines of CSV data to an API endpoint and I'd like to do as much of this asynchronously as possible. The API I'm working with has a rate limit of 100K requests per minute.

Problem:

When the script gets to await asyncio.gather(*tasks) - it seems to just hang. I've sat and waited for 5 minutes and...nothing.

Questions:

  • Why is await asyncio.gather() hanging for so long?
  • Am I trying to shove too many requests into a single await asyncio.gather() statement or should it be able to handle this volume without any issues?
  • I'm trying to use asyncio_throttle to keep the requests per minute under 100K. Is this a good approach?
  • Is there a more efficient way to do this that I should be considering?

I appreciate the help!

import os
import csv
import asyncio
import aiohttp
from asyncio_throttle import Throttler
import json
from time import sleep

'''
500 unique timestamps
1322501 lines
timeStamp, appName, hostName, cpu_idle, mem_util, txnCount, errCount

[{
  "common": {
      "key": "value"
  },
  "metrics": [{
      "key": "value"
  }]
}]
'''

inputFile = os.environ['FILENAME']
apiKey = os.environ['API_KEY']
url = os.environ['API_ENDPOINT']
metrics = {'cpu_idle':3 ,'mem_util': 4,'transaction_count': 5,'error_count': 6}


async def sendIt(session, payload, throttler, line_count):
    
    d = [payload]
    headers = {'Content-Type': 'application/json','Api-Key': apiKey}
    async with throttler:
        async with session.post(url, data = json.dumps(d), headers = headers) as response:
            json_response = response
            print(str(line_count) + ' --- ' + str(json_response))


async def createPayload(row):
    payload = {}
    common = {}
    measurements = {}
    metricslist = []

    common['app.name'] = row[1]
    common['host.name'] = row[2]
    
    for k,v in metrics.items():
        measurements['name'] = k
        measurements['type'] = 'gauge'
        measurements['value'] = float(row[int(v)])
        metricslist.append(measurements)
        measurements = {}

    payload['common'] = {'timestamp': int(row[0]), 'attributes': common}
    payload['metrics'] = metricslist    
    
    return payload

async def main():
    tasks = []
    throttler = Throttler(rate_limit=1500, period=1)
    conn = aiohttp.TCPConnector(limit=1500)

    with open(inputFile) as f:
        csv_reader = csv.reader(f, delimiter=',')
        next(csv_reader)
        async with aiohttp.ClientSession(connector=conn) as session:
            line_count = 0
            for row in csv_reader:
                line_count += 1
                print('Appending row ' + str(line_count))
                tasks.append(sendIt(session, await createPayload(row), throttler, line_count))
            
            print('Here we go...')
            await asyncio.gather(*tasks)
            print('Complete!')
            f.close()

asyncio.run(main())

question from:https://stackoverflow.com/questions/65863930/await-asyncio-gather-is-hanging

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...