1

I've written a web scraper and would like it to run as quickly as possible. The scraping isn't trivial; I scrape a few web-pages, gather links from them, scrape those, then gather links from those, then scrape those again (illustrated by the three different functions, showing the process to be multi-level). Since standard http requests in python are blocking, I did this using async/await using the httpx library, using a job queue and workers.

Here is the code I've written. I've removed some domain-specific code and imports just to make the code a bit shorter and more to-the-point. The functions may seem similar with the in-between code removed, but they are web-page specific, and, in fact, very different. They simply demonstrate that this is a recursive procedure.

@dataclass
class TaskSpec:
    func: Callable
    args: list = field(default_factory=list)
    kwargs: dict = field(default_factory=dict)


@dataclass
class TaskResult:
    result: Any
    new_tasks: list[TaskSpec] = field(default_factory=list)


async def get_page_1(url: str, client: httpx.AsyncClient):
    response = await client.get(url)
    content = BeautifulSoup(response.content, features="lxml")
    
    result = ...

    ...

    new_jobs = []
    for row in html_table_rows:
        url = ...

        new_jobs.append(TaskSpec(get_page_2, (url, client)))

    return TaskResult(result, new_jobs)


async def get_page_2(url: str, client: httpx.AsyncClient):
    response = await client.get(url)
    content = BeautifulSoup(response.content, features="lxml")
    
    result = ...

    ...

    new_jobs = []
    for row in html_table_rows:
        url = ...

        new_jobs.append(TaskSpec(get_page_3, (url, client)))

    return TaskResult(result, new_jobs)


async def get_page_3(url: str, client: httpx.AsyncClient):
    response = await client.get(url)
    content = BeautifulSoup(response.content, features="lxml")
    
    result = ...

    return TaskResult(result)


async def process_queue(queue: asyncio.Queue, result_set: list):
    while True:
        try:
            spec: TaskSpec = await queue.get()
            result: TaskResult = await spec.func(*spec.args, **spec.kwargs)

            for new_task in result.new_tasks:
                queue.put_nowait(new_task)

            result_set.append(result.result)

        # If the task failed, re-add the task to the queue
        except httpx.ReadTimeout:
            print("Request timed out. Re-adding task to queue...")
            queue.put_nowait(spec)

        except Exception as e:
            pass
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue()
    result_set = []

    num_workers = 20
    httpx_limits = httpx.Limits(max_connections=num_workers)

    async with httpx.AsyncClient(timeout=10, limits=httpx_limits) as client:
        for url in urls:
            queue.put_nowait(TaskSpec(get_page_1, (url, client)))

        workers = []
        for i in range(num_workers):
            workers.append(asyncio.create_task(process_queue(queue, result_set)))

        # Wait for the queue to be emptied
        await queue.join()

    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

A few points regarding my code that I would love feedback on:

  1. The reason for the TaskSpec class is so that I can easily re-add the same job back to the queue. I get a good number of httpx.ReadTimeout errors, presumably due to my slow connection, or perhaps because the actual parsing takes a while, and some of the requests spend too long being left waiting. So sometimes, I need to re-queue a webpage scraping job, and this was the easiest way I could come up with. I had also tried to create task objects and re-add the coroutine back to the queue using task.get_coro(), but this didn't work since the coroutine was already executed. Also, this way, the actual parsing functions get_page_{1,3} don't actually need to know about the queue.

  2. I used 20 workers because increasing that number seemed to result in a lot of httpx.ReadTimeout errors. I don't know why that is. My guess was that maybe httpx was getting back the responses completely fine, but then it took my code too long to run, and return control to the event loop, so that they couldn't be handled withing the timeout period. This is pure speculation though. I would like to be able to increase the number of workers, so that hopefully, it would run faster, since the runtime of the entire script still seems to rely mainly on the strength of my internet connection.

  3. I am not happy that I have to pass the result_set list to process_queue. This is a very C-like way of doing things, and I don't like doing this in Python. However, I see no other way of having process_queue save results and then return them back to main(). This is because the workers don't actually know when the queue is empty. I wait for the queue to empty in main() using queue.join(), and then cancel the worker jobs. And if I cancel them, they can't return anything. Right?

This is my first time really playing around with async/await in Python, so there may be much better ways of doing this same thing, and I'd be glad to hear any feedback. I am also very interested if there is a better way to re-queue failed jobs for execution, and if anyone could shed some light on the httpx.ReadTimeout errors, I would appreciate that as well.

CLARIFICATION: To be clear, I am not asking whether the code itself is okay, but whether the structure of the scraper is okay and whether my usage of async/await is correct. I would use this structure for other scrapers as well, so the the web-page-specific code is irrelevant here. This would just serve as the backbone of the scraper.

0

2 Answers 2

2

The design you have chosen definitely works, though it is more reminiscent of traditional multithreading designs than of async designs. Your current design would also work perfectly well with a blocking HTTP library, assuming that your 20 process_queue() instances were each running in a separate thread (but see a note on queue termination below).

What I really like about your design is that you separate the domain-specific get_page_n() functions from the task management code. However, the task management might be unnecessarily complicated because you are thinking about coroutines as lightweight threads, when it is often more useful to think about futures: objects that eventually resolve to a value, and can be awaited.

First of all, I want to talk about managing work in progress limits. You currently achieve this by starting 20 process_queue() tasks that pull work from a queue. Instead, you could start any number of tasks that directly work on some item, and instead manage a WIP limit with a synchronization primitive like a semaphore. The work queue would become unnecessary, and the job of this function would move to managing retries and the returned new_tasks. For example:

async def run_with_retry(spec: TaskSpec, limit: Semaphore) -> TaskResult:
      while True:  # potentially retry this task
        try:
            async with limit:
                return await spec.func(*spec.args, **spec.kwargs)
        except httpx.ReadTimeout:
            print("Request timed out. Retrying later...")
            continue

async def process_task_spec(spec: TaskSpec, limit: Semaphore, result_set: list) -> None:
    result = await run_with_retry(spec, limit)

    result_set.append(result.result)

    new_tasks = [
        asyncio.create_task(process_task_spec(new_task, limit, result_set))
        for new_task in result.new_tasks
    ]
    await asyncio.gather(new_tasks)

async def main():
    result_set = []

    num_workers = 20
    httpx_limits = httpx.Limits(max_connections=num_workers)
    task_limits = asyncio.Semaphore(num_workers)

    async with httpx.AsyncClient(timeout=10, limits=httpx_limits) as client:
        initial_tasks = [
            process_task_spec(
                TaskSpec(get_page_1, (url, client)),
                task_limits,
                result_set,
            )
            for url in urls
        ]
        await asyncio.gather(initial_tasks)

Note that keeping a task queue around is still very sensible if you want to avoid this recursive structure.

The TaskSpec class was task like in your original design, being an object that will eventually be executed and resolve to a value. Instead of using one of Python's built-in executors, you created your own executor. But now, it is little more than a deferred function call – the class could be replaced with a Callable[[], Future[TaskResult]] type, and a call like:

TaskSpec(get_page_1, (url, client))

could be written as:

lambda: get_page_1(url, client)

In principle, it would also be possible to remove this lambda entirely, since get_page_1(url, client) is a coroutine. However, the coroutine would execute immediately up to the first await point. Thus, keeping one level of indirection could still be desirable, in order to control when the function starts executing – important for enforcing your WIP limits.

Next, we can get rid of your result_set if you want. In my revised design, the process_task_spec() function manages the execution of the task spec and of all subtasks. It could therefore be updated to return a list of results, which the main function can concatenate:

async def process_task_spec(spec: TaskSpec, limit: Semaphore) -> list:
    ...
    
    all_results = [result.result]

    new_tasks = [...]
    for sub_results in await asyncio.gather(new_tasks):
        all_results.extend(sub_results)
    
async def main():
    ...

        all_results = [*r for r in await asyncio.gather(initial_tasks)]

This will change the order of results from completion order in your design to task creation order in my design. This may or may not be relevant.

A note on exception handling: exceptions and concurrency are tricky. The asyncio.gather() function gives us two modes for dealing with exceptions:

  • The default modes fails the gather on the first exception, but without cancelling the other tasks. Thus, await asyncio.gather(tasks) may have to be written as:

    try:
      await asyncio.gather(tasks)
    except:
      for t in tasks:
        t.cancel()
      raise
    
  • Alternatively, you can set return_exceptions=True. This will also not cause any cancellation, but will return one result per task given to the gather. The result will either be the actual result, or the exception object. Essentially, this wraps ever task like this:

    async def return_exception(task):
      try:
        return await task
      except Exception as e:
        return e
    

    This can be appropriate if you later go through the results and aggregate/report exceptions, without wanting to quickly terminate the program.

A note on queue termination: In your question, you noted the problem that you have waited for the queue to empty, and then have to cancel your workers. In a multi-threaded design the typical approach would be to wait until the input queue is empty and all tasks have produced results, then sending a new input that causes orderly shutdown of the workers. In Python, this is often done by enqueueing None, and having the worker thread check whether the dequeued item is None.

Understanding timeouts. If you are having trouble with timing-related issues, the first step would be to gather useful statistics – how long does each task actually take to process? You can take the time just before and after the await spec.func(...). For investigating latency, it is common to use percentiles (e.g. 50%/median to represent “typical” latency, or 95% percentiles to represent “maximum” latencies). You could experiment to see how latency changes with different num_workers limits. It would be best to keep these metrics well below the 10s timeout you configured.

Timeouts could be caused by a number of reasons, such as rate-limiting on the server you are connecting to, package loss or bandwidth saturation on your connection, or by blocking operations in your coroutines. It's also worth noting that you must not have more active workers than available connections.

A note on scraping. If you are scraping other people's servers, good etiquette expects you to avoid excessive load. This means rate-limiting your requests, typically to one request every few seconds per target server. If you are crawling links, it is also important to respect robots.txt. Failure to follow such etiquette might trigger firewall rules against you.

You could manage such rate limits within the retry loop, but before the WIP-limited section. E.g. you might have a shared last_accessed dictionary that provides a timestamp for each domain. Access to the dictionary would be protected by a lock:

from contextlib import asynccontextmanager

async def run_with_retry(spec: TaskSpec, wip_limit: Semaphore, rate_limit: RateLimit) -> TaskResult:
    domain = get_domain(spec.url)
    while True:  # potentially retry this task
        try:
            async with rate_limit(domain), wip_limit:
                return await spec.func(*spec.args, **spec.kwargs)
        ...

class RateLimit:
  def __init__(self, delay_seconds: float) -> None:
    assert delay_seconds > 0.0
    # explict "None" indicates domain is currently processed by someone else
    self._last_accessed: dict[str, Optional[float]] = {}
    self._lock: asyncio.Lock()
    self._delay_seconds = delay_seconds

  @asynccontextmanager
  async def __call__(self, domain: str):
    while True:
      async with self._lock:
        last_accessed = self._last_accessed.get(domain, -math.inf)

        if last_accessed is None:
          # Domain currently processed by someone else:
          # must wait at least this long anyway
          sleep_for = self._delay_seconds
        else:
          sleep_for = last_accessed + self._delay_seconds - time.monotonic()

        if sleep_for <= 0:
          self._last_accessed[domain] = None  # mark domain as in-use
          break

      await asyncio.sleep(sleep_for)

    try:
      yield
    finally:
      async with self._lock:
        assert self._last_accessed[domain] is None
        self._last_accessed[domain] = time.monotonic()     

That design enforces a pause of at least delay_seconds between requests, effectively sequencing requests to the same domain. If you instead want to limit the average rate of requests (which may allow concurrent requests), the implementation can be simplified by getting rid of the None status. Instead of marking a domain as in-use, the entry would be updated to time.monotonic() before the yield, and no further bookkeeping would be needed after the yield.

1
  • This is a wonderful answer! I haven't worked that much with futures before, so a threading model is more intuitive to me. I think I understand it better now that you've walked me through it! Thank you very much! I like the recursive structure you've written. I don't think I need a queue now that I see what you've done here. The queue was there mainly for re-queueing failed jobs.
    – Pavlin
    Commented Oct 24, 2022 at 8:46
1

Note: this answer was originally posted on Code Review SE

get_page_{1,3} are almost identical. You can replace them with one function that takes depth as a parameter. You call it with depth=2 from main() and then do the recursion while decreasing depth by 1 on each step. If depth == 0 don't continue with the recursion.

Don't ever write

except Exception as e:
    pass

You are silencing every possible error that might happen in the try block. If something goes wrong in an unexpected way you will never know and it will be impossible to debug.

Deal with httpx.ReadTimeouts on a lower level, inside the get_page() method. Just retry getting the response until you get it (you can write a method for this). This will spare you from re-adding the task back into the queue.

The errors themselves are most likely caused by your limited internet speed while you're trying to load pages from 20 different workers, so this is the bottleneck for the program's performance.

Passing the result_set list to process_queue is totally fine.

2
  • Thanks for the response, but this doesn't really address my main questions: Is my code conceptually good? Regarding your comments, as I wrote, I removed a lot fo the domain code from get_page_{1,3}, so that the overall concept would be clearer. The complete functions are not similar. I caught Exception because I'm trying to get familiar with async/await, not write production-level code. I think that dealing with the ReadTimeout inside get_page() might be a good idea, but wouldn't that clog the workers in case I were requesting a page that wouldn't exist for instance?
    – Pavlin
    Commented Oct 21, 2022 at 20:44
  • @Pavlin if you're requesting a page that doesn't exist you'll get the same problem with the current approach (except you won't since you silence every possible exception anyway). Why did you write except Exception to begin with? Queue approach is good, yes.
    – QuasiStellar
    Commented Oct 21, 2022 at 21:24

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.