STAT 39000: Project 2 — Spring 2022

Motivation: The the previous project, we very slowly started to learn about asynchronous vs synchronous programming. Mostly, you just had to describe scenarios, whether they were synchronous or asynchronous, and you had to explain things at a high level. In this project, we will dig into some asynchronous code, and learn the very basics.

Context: This is the first project in a series of three projects that explore sync vs. async, parallelism, and concurrency.

Scope: Python, coroutines, tasks

Learning Objectives
  • Understand the difference between synchronous and asynchronous programming.

  • Identify, create, and await coroutines.

  • Properly identify the order in which asynchronous code is executed.

  • Utilize 1 or more synchronizing primitives to ensure that asynchronous code is executed in the correct order.

Make sure to read about, and use the template found here, and the important information about projects submissions here.

Dataset(s)

The following questions will use the following dataset(s):

  • /depot/datamine/data/noaa/*.csv

Questions

Question 1

In the original version of the previous project, I gave you the following scenario.

You have 2 reports to write, and 2 wooden pencils. 1 sharpened pencil will write 1/2 of 1 report. You have a helper that is willing to sharpen 1 pencil at a time, for you, and that helper is able to sharpen a pencil in the time it takes to write 1/2 of 1 report. You can assume that you start with 2 sharpened pencils.

— the-examples-book.com

I then asked, in question (4), for you to write asynchronous Python code that simulates the scenario. In addition, I asked you to write the amount of time proper asynchronous code would take to run. While the first part of the question was unfair to ask (yet), the second part was not.

In this asynchronous example, the author could start with the first sharpened pencil and write 1/2 of the report in 5 seconds. Next, hand the first pencil off to the assistant to help sharpen it. While that is happening, use the second pencil to write the second half of the first report. Next, receive the first (now sharpened) pencil back from the assistant and hand the second pencil to the assistant to be sharpened. While the assistant was sharpening the second pencil, you would write the first half of the second report. The assistant would return the (now sharpened) second pencil back to you to finish the second report. This process would (in theory) take 20 seconds as the assistant would be sharpening pencils at the same time you are writing the report. As an effect, you could exclude the 4 seconds it takes to sharpen both pencils once, from our synchronous solution of 24 seconds.

In this project we will examine how to write asynchronous code that simulates the scenario, in a variety of ways that will teach you how to write asynchronous code. At the end of the project, you will write your own asynchronous code that will speed up a web scraping task. Let’s get started!

First thing is first. A few extremely astute students noticed an issue when trying to run async code in Jupyter Lab. Jupyter Lab has its own event loop already running, which causes problems if you were to try to run your own event loop. To get by this, we can use a package that automatically nests our event loops, so things work mostly as we would expect.

import asyncio
import nest_asyncio
nest_asyncio.apply()

asyncio.run(simulate_story())

Fill in the skeleton code below to simulate the scenario. Use only the provided functions, sharpen_pencil, and write_half_report, and the await keyword.

async def sharpen_pencil():
    await asyncio.sleep(2)

async def write_half_report():
    await asyncio.sleep(5)

async def simulate_story():
    # Write first half of report with first pencil

    # Hand pencil off to assistant to sharpen

    # Write second half of report with second pencil

    # Hand second pencil back to assistant to sharpen
    # take first (now sharpened) pencil back from assistant

    # Write the first half of second essay with first pencil

    # Take second (now sharpened) pencil back from assistant
    # and write the second half of the second report

Run the simulation in a new cell as follows.

%%time

import asyncio
import nest_asyncio

nest_asyncio.apply()

asyncio.run(simulate_story())

How long does you code take to run? Does it take the expected 20 seconds? If you have an idea why or why not, please try to explain. Otherwise, just say "I don’t know".

Items to submit
  • Code used to solve this problem.

  • Output from running the code.

Question 2

If you don’t have any experience writing asynchronous code, this might be pretty confusing! That’s okay, it is much easier to get confused writing asynchronous code than it is to write synchronous code.

Let’s break it down. First, the asyncio.run function takes care of the bulk of the work. It starts the event loop, finalizes asynchronous generators, and closes the threadpool. All you need to take from it is "it takes care of a lot of ugly magic".

Any function that starts with async is an asynchronous function. Calling an async function produces a coroutine. A coroutine is a function that has the ability to have its progress be pauses and restarted at will.

For example, if you called the following async function, it will not execute, but rather it will just create a coroutine object.

async def foo():
    await asyncio.sleep(5)
    print("Hello")

foo()
Output
<Coroutine object at 0x7f8b8b8b9b50>

To actually run the coroutine, you need to call the asyncio.run function.

asyncio.run(foo())
Output
Hello

Of course, it doesn’t make sense to call asyncio.run for each and every coroutine you create. It makes more sense to spin up the event loop once and handle the processes while it is running.

%%time

import asyncio
import nest_asyncio
nest_asyncio.apply()

async def foo():
    await asyncio.sleep(5)
    print("Hello")

async def bar():
    await asyncio.sleep(2)
    print("World")

async def main():
    await foo()
    await bar()

asyncio.run(main())

Run the code, what is the output?

Let’s take a step back. Why is asynchronous code useful? What do our asyncio.sleep calls represent? One of the slowest parts of a program is waiting for I/O or input/output. It takes time to wait for the operating system and hardware. If you are doing a lot of IO in your program, you could take advantage and perform other operations while waiting! This is what the asyncio.sleep calls represent — IO!

Any program where the IO speed limits the speed of the program is called I/O Bound. Any program where the program speed is limited by how fast the CPU can process the instructions is called CPU Bound. Async programming can drastically speed up I/O Bound software!

Okay, back to the code from above. What is the output? You may have expected foo to run, then, while foo is "doing some IO (sleeping)", bar will run. Then, in a total of 5 seconds, you may have expected "World Hello" to be printed. While the foo is sleeping, bar runs, gets done in 2 seconds, goes back to foo and finishes in another 3 seconds, right? Nope.

What happens is that when we await for foo, Python suspends the execution of main until foo is done. Then it resumes execution of main and suspends it again until bar is done for an approximate time of 7 seconds. We want both coroutines to run concurrently, not one at a time! How do we fix it? The easiest would be to use asyncio.gather.

%%time

import asyncio
import nest_asyncio

nest_asyncio.apply()

async def foo():
    await asyncio.sleep(5)
    print("Hello")

async def bar():
    await asyncio.sleep(2)
    print("World")

async def main():
    await asyncio.gather(foo(), bar())

asyncio.run(main())

asyncio.gather takes a list of awaitable objects and runs them concurrently by scheduling them as a task. Running the code above should work as expected, and run in approximately 5 seconds. We gain 2 seconds in performance since both foo and bar run concurrently. While foo is sleeping, bar is running and completes. We gain 2 seconds while those functions overlap.

What is a task? You can read about tasks here. A task is an object that runs a coroutine. The easiest way to create a task is to use the asyncio.create_task method. For example, if instead of awaiting both foo and bar, we scheduled foo as a task, you would get mostly the same result as if you used asyncio.gather.

%%time

import asyncio
import nest_asyncio

nest_asyncio.apply()

async def foo():
    await asyncio.sleep(5)
    print("Hello")

async def bar():
    await asyncio.sleep(2)
    print("World")

async def main():
    asyncio.create_task(foo())
    await bar()

asyncio.run(main())

As you can see, "World" prints in a couple of seconds, and 3 seconds later "Hello" prints, for a total execution time of 5 seconds. With that being said, something is odd with our output.

Output
World
CPU times: user 2.57 ms, sys: 1.06 ms, total: 3.63 ms
Wall time: 2 s
Hello

It says that it executed in 2 seconds, not 5. In addition, "Hello" prints after Jupyter says our execution completed. Why? Well, if you read here, you will see that asyncio.create_task takes a coroutine (in our case the output from foo()), and schedules it as a task in the event loop returned by asyncio.get_running_loop(). This is the critical part — it is scheduling the coroutine created by foo() to run on the same event loop that Jupyter Lab is running on, so even though our event loop created by asyncio.run stopped execution, foo ran until complete instead of cancelling as soon as bar was awaited! To observe this, open a terminal and run the following code to launch a Python interpreter:

module use /scratch/brown/kamstut/tdm/opt/modulefiles
module load python/f2021-s2022-py3.9.6
python3

Then, in the Python interpreter, run the following.

You may need to type it out manually.

import asyncio

async def foo():
    await asyncio.sleep(5)
    print("Hello")

async def bar():

    await asyncio.sleep(2)
    print("World")

async def main():
    asyncio.create_task(foo())
    await bar()

asyncio.run(main())

As you can see, the output is not the same as when you run it from within the Jupyter notebook. Instead of:

Output
World
CPU times: user 2.57 ms, sys: 1.06 ms, total: 3.63 ms
Wall time: 2 s
Hello

You should get:

Output
World

This is because this time, there is no confusion on which event loop to use when scheduling a task. Once we reach the end of main, the event loop is stopped and any tasks scheduled are terminated — even if they haven’t finished (like foo, in our example). If you wanted to modify main in order to wait for foo to complete, you could await the task after you await bar().

Note that this will work:

async def main():
    task = asyncio.create_task(foo())
    await bar()
    await task

But this, will not:

async def main():
    task = asyncio.create_task(foo())
    await task
    await bar()

The reason is that as soon as you call await task, main is suspended until the task is complete, which prevents both coroutines from executing concurrently (and we miss out on our 2 second performance gain). If you wait to call await task after await bar(), our task (foo) will continue to run concurrently as a task on our event loop along side bar (and we get our 2 second performance gain). In addition, asyncio.run will wait until task is finished before terminating execution, because we awaited it at the very end.

In the same way that asyncio.create_task schedules the coroutines as tasks on the event loop (immediately), so does asyncio.gather. In a previous example, we awaited our call to asyncio.gather.

%%time

import asyncio
import nest_asyncio

nest_asyncio.apply()

async def foo():
    await asyncio.sleep(5)
    print("Hello")

async def bar():
    await asyncio.sleep(2)
    print("World")

async def main():
    await asyncio.gather(foo(), bar())

asyncio.run(main())
Output
World
Hello
CPU times: user 3.41 ms, sys: 1.96 ms, total: 5.37 ms
Wall time: 5.01 s

This is critical, otherwise, main would execute immediately and terminate before either foo or bar finished.

%%time

import asyncio
import nest_asyncio

nest_asyncio.apply()

async def foo():
    await asyncio.sleep(5)
    print("Hello")

async def bar():
    await asyncio.sleep(2)
    print("World")

async def main():
    asyncio.gather(foo(), bar())

asyncio.run(main())
Output
CPU times: user 432 µs, sys: 0 ns, total: 432 µs
Wall time: 443 µs
World
Hello

As you can see, since we did not await our asyncio.gather call, main ran and finished immediately. The only reason "World" and "Hello" printed is that they finished running on the event loop that Jupyter uses instead of the loop we created using our call to asyncio.run. If you were to run the code from a Python interpreter instead of from Jupyter Lab, neither "World" nor "Hello" would print.

I know this is a lot to take in for a single question. If you aren’t quite following at this point I’d highly encourage you to post questions in Piazza before continuing, or rereading things until it starts to make sense.

Modify your simulate_story function from question (1) so that sharpen_pencil runs concurrently with write_quarter, and the total execution time is about 20 seconds.

Some important notes to keep in mind:

  • Make sure that the "rules" are still followed. You can still only write 1 quarter of the report at a time.

  • Make sure that your code awaits what needs to be awaited — even if technically those tasks would execute prior to simulate_story finishing.

Items to submit
  • Code used to solve this problem.

  • Output from running the code.

Question 3

That last question was quite a bit to take in! It is ok if it hasn’t all clicked! I’d encourage you to post questions in Piazza, and continue to mess around with simple async examples until it makes more sense. It will help us explain things better and improve things for the next group of students!

There are a couple of straightforward ways you could solve the previous question (well technically there are even more). One way involves queuing up the sharpen_pencil coroutines as tasks that run concurrently, and awaiting them at the end. The other involves using asyncio.gather to queue up select write_quarter and sharpen_pencil tasks to run concurrently, and await them.

While both of these methods do a great job simulating our simple story, there may be instances where a greater amount of control may be needed. In such circumstances, the Python synchronization primitives may be useful!

Read about the Event primitive, in particular. This primitive allows us to notify one or more async tasks that something has happened. This is particularly useful if you want some async code to wait for other async code to run before continuing on. Cool, how does it work? Let’s say I want to yell, but before I yell, I want the megaphone to be ready.

First, create an event, that represents some event.

import asyncio

async def yell(words, wait_for):
    print(f"{words.upper()}")

# create an event
megaphone_ready = asyncio.Event()

To wait to continue until the event has occurred, you just need to await the coroutine created by calling my_event.wait(). So in our case, we can add my_event.wait() before we yell in the yell function.

async def yell(words, wait_for):
    await wait_for.wait()
    print(f"{words.upper()}")

By default, our Event is set to False since the event has not occurred. The yell task will continue to await our event until the event is marked as set. To mark our event as set, we would use the set method.

import asyncio

async def yell(words, wait_for):
    await wait_for.wait()
    print(f"{words.upper()}")

async def main():
    megaphone_ready = asyncio.Event() # by default, it is not ready

    # create our yell task. Remember, tasks are immediately scheduled
    # on the event loop to run. At this point, the await wait_for.wait()
    # part of our yell function will prevent the task from moving
    # forward to the print statement until the event is set.
    yell_task = asyncio.create_task(yell("Hello", megaphone_ready))

    # let's say we have to dust off the megaphone for it to be ready
    # and it takes 1 second to do so
    await asyncio.sleep(1)

    # now, since we've dusted off the megaphone, we can mark it as ready
    megaphone_ready.set()

    # at this point in time, the await wait_for.wait() part of our code
    # from the yell function will be complete, and the yell function
    # will move on to the print statement and actually yell

    # Finally, we want to await for our yell_task to finish
    # if our yell_task wasn't a simple print statement, and tooks a few seconds
    # to finish, this await would be necessary for the main function to run
    # to completion.
    await yell_task

asyncio.run(main())

Consider each of the following as a separate event:

  • Writing the first quarter of the report

  • Writing the second quarter of the report

  • Writing the third quarter of the report

  • Writing the fourth quarter of the report

  • Sharpening the first pencil

  • Sharpening the second pencil

Use the Event primitive to make our code run as intended, concurrently. Use the following code as a skeleton for your solution. Do not modify the code, just make additions.

%%time

import asyncio
import nest_asyncio

nest_asyncio.apply()

async def write_quarter(current_event, events_to_wait_for = None):
    # TODO: if events_to_wait_for is not None
    # loop through the events and await them

    await asyncio.sleep(5)

    # TODO: at this point, the essay quarter has
    # been written and we should mark the current
    # event as set


async def sharpen_pencil(current_event, events_to_wait_for = None):
    # TODO: if events_to_wait_for is not None
    # loop through the events and await them

    await asyncio.sleep(2)

    # TODO: at this point, the essay quarter has
    # been written and we should mark the current
    # event as set


async def simulate_story():

    # TODO: declare each of the 6 events in our story

    # TODO: add each function call to a list of tasks
    # to be run concurrently. Should be something similar to
    # tasks = [write_quarter(something, [something,]), ...]
    tasks = []

    await asyncio.gather(*tasks)

asyncio.run(simulate_story())

The current_event is passed so we can mark it as set once the event has occurred.

The events_to_wait_for is passed so we can await them before continuing. This ensures that we don’t try and sharpen the first pencil until after we’ve written the first quarter of the essay. Or ensures that we don’t write the third quarter of the essay until after the first pencil has been sharpened.

The code you will add to write_quarter will be identical to the code you will add to sharpen_pencil.

The events_to_wait_for is expected to be iterable (a list). Make sure you pass a single event in a list if you only have one event to wait for.

It should take about 20 seconds to run.

Items to submit
  • Code used to solve this problem.

  • Output from running the code.

Question 4

While it is certainly useful to have some experience with async programming in Python, the context in which most data scientists will deal with it is writing APIs using something like fastapi, where a deep knowledge of async programming isn’t really needed.

What will be pretty common is the need to speed up code. One of the primary ways to do that is to parallelize your code.

In the previous project, in question (5), you described an operation that you could do to the entire flights dataset (/depot/datamine/data/flights/subset). In this situation, where you have a collection of neatly formatted datasets, a good first step would be to write a function that accepts a two paths as arguments. The first path could be the absolute path to the dataset to be processed. The second path could be the absolute path of the intermediate output file. Then, the function could process the dataset and output the intermediate calculations.

For example, let’s say you wanted to count how many flights in the dataset as a whole. Then, you could write a function to read in the dataset, count the flights, and output a file containing the number of flights. This would be easily parallelizable because you could process each of the files individually, in parallel, and at the very end, sum up the data in the output file.

Write a function that is "ready" to be parallelized, and that follows the operation you described in question (5) in the previous project. Test out the function on at least 2 of the datasets in /depot/datamine/data/flights/subset.

In the next project, we will parallelize and run some benchmarks.

Items to submit
  • Code used to solve this problem.

  • Output from running the code.

Please make sure to double check that your submission is complete, and contains all of your code and output before submitting. If you are on a spotty internet connect ion, it is recommended to download your submission after submitting it to make sure what you think you submitted, was what you actually submitted.

In addition, please review our submission guidelines before submitting your project.