Python aiohttp rate limit
HTTP rate limit is often the max requests in a limited time period, and sometimes could also be the max concurrent requests.
Max requests in a limited time period
from aiolimiter import AsyncLimiter
RATE_LIMIT_IN_SECOND = 20
# 1.0 for time period during 1 second
rate_limit = AsyncLimiter(RATE_LIMIT_IN_SECOND, 1.0)
async with rate_limit:
await my_aiohttp_request()
Max concurrent requests
Official doc: Limiting connection pool size
import aiohttp
MAX_CONCURRENT = 10
async def main():
# The default limit is 100
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
async with aiohttp.ClientSession(connector=connector) as session:
await my_aiohttp_request()
if __name__ == "__main__":
asyncio.run(main())
The object connector
from connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
must be created within an async function.
Example
We can borrow the official example on asyncio queues.
The below example shows how to send GET method to https://httpbin.org/get with a rate limit of 20 requests per second and max 10 concurrent requests.
import asyncio
import random
import time
import aiohttp
from aiolimiter import AsyncLimiter
MAX_CONCURRENT = 10
RATE_LIMIT_IN_SECOND = 20
rate_limit = AsyncLimiter(RATE_LIMIT_IN_SECOND, 1.0)
async def my_aiohttp_request(session, name):
response = await session.get("https://httpbin.org/get")
response.raise_for_status()
json_response = await response.json()
print(f"{name} finished aiohttp request with response: {json_response}")
# do something on reponse here
async def worker(name, queue, session):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
async with rate_limit:
await my_aiohttp_request(session, name)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f"{name} has slept for {sleep_for:.2f} seconds")
async def main():
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
async with aiohttp.ClientSession(connector=connector) as session:
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = [
asyncio.create_task(worker(f"worker-{idx}", queue, session))
for idx in range(MAX_CONCURRENT)
]
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print("====")
print(f"3 workers slept in parallel for {total_slept_for:.2f} seconds")
print(f"total expected sleep time: {total_sleep_time:.2f} seconds")
asyncio.run(main())
Leave a comment