I've written a web scraper and would like it to run as quickly as possible. The scraping isn't trivial; I scrape a few web-pages, gather links from them, scrape those, then gather links from those, then scrape those again (illustrated by the three different functions, showing the process to be multi-level). Since standard http requests in python are blocking, I did this using async/await using the httpx
library, using a job queue and workers.
Here is the code I've written. I've removed some domain-specific code and imports just to make the code a bit shorter and more to-the-point. The functions may seem similar with the in-between code removed, but they are web-page specific, and, in fact, very different. They simply demonstrate that this is a recursive procedure.
@dataclass
class TaskSpec:
func: Callable
args: list = field(default_factory=list)
kwargs: dict = field(default_factory=dict)
@dataclass
class TaskResult:
result: Any
new_tasks: list[TaskSpec] = field(default_factory=list)
async def get_page_1(url: str, client: httpx.AsyncClient):
response = await client.get(url)
content = BeautifulSoup(response.content, features="lxml")
result = ...
...
new_jobs = []
for row in html_table_rows:
url = ...
new_jobs.append(TaskSpec(get_page_2, (url, client)))
return TaskResult(result, new_jobs)
async def get_page_2(url: str, client: httpx.AsyncClient):
response = await client.get(url)
content = BeautifulSoup(response.content, features="lxml")
result = ...
...
new_jobs = []
for row in html_table_rows:
url = ...
new_jobs.append(TaskSpec(get_page_3, (url, client)))
return TaskResult(result, new_jobs)
async def get_page_3(url: str, client: httpx.AsyncClient):
response = await client.get(url)
content = BeautifulSoup(response.content, features="lxml")
result = ...
return TaskResult(result)
async def process_queue(queue: asyncio.Queue, result_set: list):
while True:
try:
spec: TaskSpec = await queue.get()
result: TaskResult = await spec.func(*spec.args, **spec.kwargs)
for new_task in result.new_tasks:
queue.put_nowait(new_task)
result_set.append(result.result)
# If the task failed, re-add the task to the queue
except httpx.ReadTimeout:
print("Request timed out. Re-adding task to queue...")
queue.put_nowait(spec)
except Exception as e:
pass
finally:
queue.task_done()
async def main():
queue = asyncio.Queue()
result_set = []
num_workers = 20
httpx_limits = httpx.Limits(max_connections=num_workers)
async with httpx.AsyncClient(timeout=10, limits=httpx_limits) as client:
for url in urls:
queue.put_nowait(TaskSpec(get_page_1, (url, client)))
workers = []
for i in range(num_workers):
workers.append(asyncio.create_task(process_queue(queue, result_set)))
# Wait for the queue to be emptied
await queue.join()
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
A few points regarding my code that I would love feedback on:
The reason for the TaskSpec class is so that I can easily re-add the same job back to the queue. I get a good number of
httpx.ReadTimeout
errors, presumably due to my slow connection, or perhaps because the actual parsing takes a while, and some of the requests spend too long being left waiting. So sometimes, I need to re-queue a webpage scraping job, and this was the easiest way I could come up with. I had also tried to create task objects and re-add the coroutine back to the queue usingtask.get_coro()
, but this didn't work since the coroutine was already executed. Also, this way, the actual parsing functionsget_page_{1,3}
don't actually need to know about the queue.I used 20 workers because increasing that number seemed to result in a lot of
httpx.ReadTimeout
errors. I don't know why that is. My guess was that maybe httpx was getting back the responses completely fine, but then it took my code too long to run, and return control to the event loop, so that they couldn't be handled withing the timeout period. This is pure speculation though. I would like to be able to increase the number of workers, so that hopefully, it would run faster, since the runtime of the entire script still seems to rely mainly on the strength of my internet connection.I am not happy that I have to pass the
result_set
list toprocess_queue
. This is a very C-like way of doing things, and I don't like doing this in Python. However, I see no other way of havingprocess_queue
save results and then return them back tomain()
. This is because the workers don't actually know when the queue is empty. I wait for the queue to empty inmain()
usingqueue.join()
, and then cancel the worker jobs. And if I cancel them, they can't return anything. Right?
This is my first time really playing around with async/await in Python, so there may be much better ways of doing this same thing, and I'd be glad to hear any feedback. I am also very interested if there is a better way to re-queue failed jobs for execution, and if anyone could shed some light on the httpx.ReadTimeout
errors, I would appreciate that as well.
CLARIFICATION: To be clear, I am not asking whether the code itself is okay, but whether the structure of the scraper is okay and whether my usage of async/await is correct. I would use this structure for other scrapers as well, so the the web-page-specific code is irrelevant here. This would just serve as the backbone of the scraper.