42.23. Estimation of COVID-19 pandemic#

42.23.1. Loading data#

We will use data on COVID-19 infected individuals, provided by the Center for Systems Science and Engineering (CSSE) at Johns Hopkins University. Dataset is available in this GitHub Repository.

import pytest
import ipytest
import unittest
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pandas.testing import assert_frame_equal
from pandas.testing import assert_series_equal

ipytest.autoconfig()
plt.rcParams["figure.figsize"] = (10, 3)  # make figures larger

We can load the most recent data directly from GitHub using pd.read_csv. If for some reason the data is not available, you can always use the copy available locally in the data folder - just uncomment the line below that defines base_url:

# base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/"  # loading from Internet
base_url = "../../assets/data/estimation-covid-19/"  # loading from disk
infected_dataset_url = base_url + "time_series_covid19_confirmed_global.csv"
recovered_dataset_url = base_url + "time_series_covid19_recovered_global.csv"
deaths_dataset_url = base_url + "time_series_covid19_deaths_global.csv"
countries_dataset_url = base_url + "UID_ISO_FIPS_LookUp_Table.csv"

Let’s now load the data for infected individuals and see how the data looks like:

infected = pd.read_csv(infected_dataset_url)
infected.head()

We can see that each row of the table defines the number of infected individuals for each country and/or province, and columns correspond to dates. Similar tables can be loaded for other data, such as number of recovered and number of deaths.

recovered = pd.read_csv(recovered_dataset_url)
deaths = pd.read_csv(deaths_dataset_url)

42.23.2. Making sense of the data#

From the table above the role of province column is not clear. Let’s see the different values that are present in Province/State column:

infected["Province/State"].value_counts()

From the names we can deduce that countries like Australia and China have more detailed breakdown by provinces. Let’s look for information on China to see the example:

def column_filter(df, column_name, column_value):
    """
    Filters a pandas DataFrame based on a column value.

    Returns:
        pandas.DataFrame: The filtered DataFrame.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if column_name not in df.columns:
        raise Exception(f"{column_name} does not exist in df")
    return df[df[_______] == ______]

column_filter(infected, "Country/Region", "China")
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {"numbers": [1, 2, 3, 4, 5], "bools": [False, False, True, True, True]}
    )


class TestColumnFilter(unittest.TestCase):
    def test_column_filter_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_result = pd.DataFrame({"numbers": [3], "bools": [True]})

        # act
        result = column_filter(test_df, "numbers", 3)

        # assert
        assert result.reset_index(drop=True).equals(expected_result)
        
    def test_column_filter_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            column_filter(None, "numbers", 3)
    
    def test_column_filter_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            column_filter(pd.DataFrame(), "numbers", 3)
    
    def test_column_filter_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            column_filter(1, "numbers", 3)
    
    def test_column_filter_with_invalid_column_name_type(self):
        #assign
        test_df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            column_filter(test_df, 123, 3)

    def test_column_filter_with_empty_column_name(self):
        #assign
        test_df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            column_filter(test_df, "", 3)
    
    def test_column_filter_with_none_column_name(self):
        #assign
        test_df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            column_filter(test_df, None, 3)
👩‍💻 Hint

You can consider to fill column_name and column_value.

42.23.3. Pre-processing the data#

We are not interested in breaking countries down to further territories, thus we would first get rid of this breakdown and add information on all territories together, to get info for the whole country. This can be done using groupby:

def groupby_sum(df, column_name):
    """
    Groups a column in a Pandas DataFrame and computes the sum of the values in each group.

    Returns:
        pd.DataFrame: A Pandas DataFrame containing the groupby and sum results.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if column_name not in df.columns:
        raise Exception("Column does not exist.")
    # Group and aggregate data
    return df.________

# Group and sum infected cases by country/region
infected = ______
# Group and sum recovered cases by country/region
recovered = ______
# Group and sum deaths cases by country/region
deaths = ______

infected.head()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame({"c1": [1, 1, 1, 2, 2], "c2": [6, 7, 8, 9, 10]})


class TestGroupbySum(unittest.TestCase):
    def test_groupby_sum_happy_case(self):
        # assign
        test_df = create_test_df()
        expect_result = pd.DataFrame(data=[[21], [19]], index=[1, 2], columns=["c2"])

        # act
        actual_result = groupby_sum(test_df, "c1")

        # assert
        assert_frame_equal(actual_result, expect_result, check_names=False)

    def test_groupby_sum_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            groupby_sum(None, "c1")
    
    def test_groupby_sum_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            groupby_sum(pd.DataFrame(), "c1")
    
    def test_groupby_sum_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            groupby_sum(123, "c1")

    def test_groupby_sum_with_invalid_column_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            groupby_sum(test_df, "c100")
    
    def test_groupby_sum_with_invalid_column_name_type(self):
        #assign
        test_df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            filter(test_df, 123)

    def test_groupby_sum_with_empty_column_name(self):
        #assign
        test_df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            filter(test_df, "")
    
    def test_groupby_sum_with_none_column_name(self):
        #assign
        test_df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            filter(test_df, None)
👩‍💻 Hint

You can consider to use pandas.DataFrame.groupby() and aggregation function sum().

You can see that due to using groupby all DataFrames are now indexed by Country/Region. We can thus access the data for a specific country by using .loc:|

def plot_infected_vs_recovered(column_name):
    infected.loc[column_name][2:].plot()
    recovered.loc[column_name][2:].plot()
    plt.show()

plot_infected_vs_recovered("US")

Note how we use [2:] to remove first two elements of a sequence that contain geolocation of a country. We can also drop those two columns altogether:

def drop_columns(df, columns):
    """
    Drops the specified columns from a Pandas DataFrame.
    
    Returns:
        df after dropping
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if columns is None or not isinstance(columns, list) or len(columns) == 0:
        raise Exception("columns is not a valid list")
    if not set(columns).issubset(set(df.columns)):
        raise Exception("columns contains invalid column names")
    return df._________

# Dropping the "Lat" and "Long" columns from infected, recovered, deaths DataFrame.
______
______
______
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {
            "c1": [1, 2, 3, 4, 5],
            "c2": [6, 7, 8, 9, 10],
            "c3": [11, 12, 13, 14, 15],
            "c4": [16, 17, 18, 19, 20],
        }
    )

class TestDropColumns(unittest.TestCase):
    def test_drop_columns_with_empty_df(self):
        # act
        with pytest.raises(Exception):
            drop_columns(pd.DataFrame(), "c1")

    def test_drop_columns_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_result = pd.DataFrame(
            {
                "c3": [11, 12, 13, 14, 15],
                "c4": [16, 17, 18, 19, 20],
            }
        )
        # act
        drop_columns(test_df, ["c1", "c2"])

        # assert
        assert_frame_equal(test_df, expected_result)

    def test_drop_columns_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            drop_columns(None, "c1")

    def test_drop_columns_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            drop_columns(123, "c1")
    
    def test_drop_columns_with_none_columns(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            drop_columns(test_df, None)
    
    def test_drop_columns_with_empty_columns(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            drop_columns(test_df, [])
    
    def test_drop_columns_with_invalid_columns_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            drop_columns(test_df, 123)
    
    def test_drop_columns_with_invalid_columns_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            drop_columns(test_df, ["c1", "c100"])

    def test_drop_columns_with_invalid_columns_input(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            drop_columns(test_df, "c1000")
👩‍💻 Hint

You can consider to use pandas.DataFrame.drop(columns=coulumns, inplace=True).

42.23.4. Investigating the data#

Let’s now switch to investigating a specific country. Let’s create a frame that contains the data on infections indexed by date:

def mkframe(infected_df, recovered_df, deaths_df, index_name):
    """
    This function creates a new DataFrame by merging three input DataFrames and 
    converting the index to datetime format.

    Returns:
        pandas.DataFrame: A new DataFrame containing columns for infected, recovered, and deaths, 
        with the index converted to datetime format.
    """
    if infected_df is None or not isinstance(infected_df, pd.DataFrame) or infected_df.empty:
        raise Exception("invalid infected_df")
    if recovered_df is None or not isinstance(recovered_df, pd.DataFrame) or recovered_df.empty:
        raise Exception("invalid recovered_df")
    if deaths_df is None or not isinstance(deaths_df, pd.DataFrame) or deaths_df.empty:
        raise Exception("invalid deaths_df")
    if not isinstance(index_name, str) or index_name is None or not index_name.strip():
        raise Exception("column_name is not a valid string")    
    if index_name not in infected_df.index:
        raise Exception(f"{index_name} does not exist in {infected_df}")
    if index_name not in recovered_df.index:
        raise Exception(f"{index_name} does not exist in {recovered_df}")
    if index_name not in deaths_df.index:
        raise Exception(f"{index_name} does not exist in {deaths_df}")
    df = pd.DataFrame(
        {
            # Select the row with index_name from three DataFrames
            "infected": infected_df.______,
            "recovered": recovered_df.______,
            "deaths": deaths_df.______,
        }
    )
    df.index = pd.to_datetime(df.index)
    return df

# Merge the three DataFrame infected, recovered, and deaths into a new DataFrame
# and use the "US" column as the index of the new DataFrame.
df = ______
Check result by executing below... 📝
%%ipytest -qq

def create_test_df_1():
    return pd.DataFrame(
        data=[[2, 5, 9], [3, 4, 10], [9, 9, 8]],
        columns=["1/22/20", "1/23/20", "1/24/20"],
        index=["US", "UK", "FR"],
    )

def create_test_df_2():
    return pd.DataFrame(
        data=[[9, 9, 8], [2, 5, 9], [3, 4, 10]],
        columns=["1/22/20", "1/23/20", "1/24/20"],
        index=["US", "UK", "FR"],
    )

def create_test_df_3():
    return pd.DataFrame(
        data=[[3, 4, 10], [9, 9, 8], [2, 5, 9]],
        columns=["1/22/20", "1/23/20", "1/24/20"],
        index=["US", "UK", "FR"],
    )

class TestMkframe(unittest.TestCase):
    def test_mkframe_happy_case(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()
        expected_result = pd.DataFrame(
            data=[[2, 9, 3], [5, 9, 4], [9, 8, 10]],
            columns=["infected", "recovered", "deaths"],
            index=["2020-01-22", "2020-01-23", "2020-01-24"],
        )
        expected_result.index = pd.to_datetime(expected_result.index)

        # act
        test_df = mkframe(test_df_1, test_df_2, test_df_3, "US")

        # assert
        assert_frame_equal(test_df, expected_result)

    def test_mkframe_with_none_df_1(self):
        # assign
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(None, test_df_2, test_df_3, "US")

    def test_mkframe_with_none_df_2(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, None, test_df_3, "US")

    def test_mkframe_with_none_df_3(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, None, "US")
    
    def test_mkframe_with_empty_df_1(self):
        # assign
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(pd.DataFrame(), test_df_2, test_df_3, "US")
    
    def test_mkframe_with_empty_df_2(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, pd.DataFrame(), test_df_3, "US")
    
    def test_mkframe_with_empty_df_3(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, pd.DataFrame(), "US")
    
    def test_mkframe_with_invalid_df_1_type(self):
        # assign
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(123, test_df_2, test_df_3, "US")
    
    def test_mkframe_with_invalid_df_2_type(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, 123, test_df_3, "US")
    
    def test_mkframe_with_invalid_df_1_type(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, 123, "US")

    def test_mkframe_with_invalid_column_name(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, test_df_3, "China")
    
    def test_mkframe_with_empty_column_name(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, test_df_3, "")
    
    def test_mkframe_with_none_column_name(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, test_df_3, None)

    def test_mkframe_with_invalid_column_type(self):
        # assign
        test_df_1 = create_test_df_1()
        test_df_2 = create_test_df_2()
        test_df_3 = create_test_df_3()

        # act & assert
        with pytest.raises(Exception):
            mkframe(test_df_1, test_df_2, test_df_3, 123)
👩‍💻 Hint

You can consider to use pandas.DataFrame.loc[].

df.plot()
plt.show()

Now let’s compute the number of new infected people each day. This will allow us to see the speed at which pandemic progresses. The easiest day to do it is to use diff:

def append_diff_column(df, new_column, column_to_diff):
    """
    Append a new column to a dataframe, where the values in the new column are calculated as the difference
    between consecutive values in an existing column.

    Returns:
        pandas.Series: The newly created column containing the differences between consecutive values
        in the original column.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if column_to_diff not in df.columns:
        raise Exception("column_name_to_diff not exist in df")
    if new_column is None or not isinstance(new_column, str) or not new_column.strip():
        raise Exception("new_column is not a valid string")
    if column_to_diff is None or not isinstance(column_to_diff, str) or not column_to_diff.strip():
        raise Exception("column_to_diff is not a valid string")
    # The values in the new_column are calculated as the difference between consecutive values in column_to_diff
    df[new_column] = df[______].______
    return df[new_column]

# Add a new column "ninfected" diffed by "infected" column to the DataFrame "df", and display the plot
______.plot()
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {
            "date": [
                "2022-01-01",
                "2022-01-02",
                "2022-01-03",
                "2022-01-04",
                "2022-01-05",
                "2022-01-06",
            ],
            "column1": [1, 2, 4, 6, 9, 13],
            "column2": [1, 3, 6, 10, 15, 21],
        }
    )


class TestAppendDiffColumn(unittest.TestCase):
    def test_append_diff_column_happy_case(self):
        # assign
        df = create_test_df()
        expected_result = pd.Series(
            [np.nan, 1.0, 2.0, 2.0, 3.0, 4.0], name="new_column"
        )

        # act
        actual_result = append_diff_column(
            df, "new_column", "column1"
        )

        # assert
        assert_series_equal(actual_result, expected_result)

    def test_append_diff_column_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                pd.DataFrame(), "new_column", "column_to_diff"
            )

    def test_append_diff_column_with_none_df(self):
        # act
        with pytest.raises(Exception):
            append_diff_column(
                None, "new_column", "column1"
            )

    def test_append_diff_column_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                "invalid_df", "new_column", "column_to_diff"
            )
    
    def test_append_diff_column_with_invalid_new_column_type(self):
        # assign
        df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, 123, "column_to_diff"
            )   

    def test_append_diff_column_with_none_new_column(self):
        # assign
        df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, None, "column_to_diff"
            ) 
    
    def test_append_diff_column_with_empty_new_column(self):
        # assign
        df = create_test_df()
        
        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, "", "column_to_diff"
            ) 

    def test_append_diff_column_with_none_column_to_diff(
        self,
    ):
        # assign
        df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, "new_column", None
            )

    def test_append_diff_column_with_empty_column_to_diff(
        self,
    ):
        # assign
        df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, "new_column", ""
            )

    def test_append_diff_column_with_invalid_column_to_diff_name(
        self,
    ):
        # assign
        df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, "new_column", "invalid_column"
            )

    def test_append_diff_column_with_invalid_column_to_diff_type(
        self,
    ):
        # assign
        df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            append_diff_column(
                df, "new_column", 123
            )
👩‍💻 Hint

You can consider to use pandas.DataFrame.diff().

We can see high fluctuations in data. Let’s look closer at one of the months:

def filter_ninfected_by_year_and_month(df, year, month):
    """
    Filter a DataFrame by year and month, and return a column.

    Returns:
        pandas.Series: A Series object containing the filtered "ninfected" column.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if year is None or not isinstance(year, int) or year < 0:
        raise Exception("invalid year")
    if month is None or not isinstance(month, int) or month > 13 or month < 0:
        raise Exception("invalid month")
    return df[______ & ______]["ninfected"]

filter_ninfected_by_year_and_month(df, 2020, 7).plot()
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    test_df = pd.DataFrame(
        data=[[2, 9, 3, None], [5, 9, 4, 3], [9, 8, 10, 4]],
        columns=["infected", "recovered", "deaths", "ninfected"],
        index=["2020-01-22", "2020-01-23", "2020-01-24"],
    )
    test_df.index = pd.to_datetime(test_df.index)
    return test_df


class TestFilterNinfectedByYearAndMonth(unittest.TestCase):
    def test_filter_ninfected_by_year_and_month_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_result = pd.Series(
            [None, 3, 4],
            index=pd.to_datetime(["2020-01-22", "2020-01-23", "2020-01-24"]),
            name="ninfected",
        )

        # act
        result = filter_ninfected_by_year_and_month(test_df, 2020, 1)

        # assert
        assert result.equals(expected_result)

    def test_filter_ninfected_by_year_and_month_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(None, 2020, 1)

    def test_filter_ninfected_by_year_and_month_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(pd.DataFrame, 2020, 1)
    
    def test_filter_ninfected_by_year_and_month_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(123, 2020, 1)

    def test_filter_ninfected_by_year_and_month_with_none_year(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(test_df, None, 1)
    
    def test_filter_ninfected_by_year_and_month_with_invalid_year_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(test_df, "invalid_year_type", 1)

    def test_filter_ninfected_by_year_and_month_with_invalid_year_number(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(test_df, -10000, 1)

    def test_filter_ninfected_by_year_and_month_with_none_month(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(test_df, 2020, None)
    
    def test_filter_ninfected_by_year_and_month_with_invalid_month_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(test_df, 2020, "invalid_month_type")

    def test_filter_ninfected_by_year_and_month_with_invalid_year_number(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            filter_ninfected_by_year_and_month(test_df, 2020, 10000)

    
👩‍💻 Hint

You can consider to use pandas.DataFrame.index.

It clearly looks like there are weekly fluctuations in data. Because we want to be able to see the trends, it makes sense to smooth out the curve by computing running average (i.e. for each day we will compute the average value of the previous several days):

def get_rolling_window(df, column, window):
    """
    Returns a rolling window object of the specified column with the specified window size.
    
    Returns:
        A rolling window object of the specified column with the specified window size.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if column not in df.columns:
        raise Exception("invalid column")
    if window is None or not isinstance(window, int) or window <= 0 or window >= len(df.index):
        raise Exception("invalid window")
    # Calculate the moving average
    return ______

# Calculate the rolling window with a window size of 7 on the 'ninfected' column, 
# then calculate the mean
df["ninfav"] = ______
df["ninfav"].plot()
plt.show()
Check result by executing below... 📝
%%ipytest -qq

class TestGetRollingWindow(unittest.TestCase):
    def test_get_rolling_window_happy_case(self):
        # assign
        test_df = pd.DataFrame({
            'a': [1, 2, 3, 4, 5],
            'b': [5, 4, 3, 2, 1]
        })

        # act
        result = get_rolling_window(test_df, 'a', 3)

        # assert
        assert isinstance(result, pd.core.window.Rolling)

    def test_get_rolling_window_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(None, 'a', 3)

    def test_get_rolling_window_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(pd.DataFrame(), 'a', 3)

    def test_get_rolling_window_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(123, 'a', 3)

    def test_get_rolling_window_with_none_column(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, None, 3)

    def test_get_rolling_window_with_invalid_column_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, 123, 3)

    def test_get_rolling_window_with_invalid_column_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, 'c', 3)
    
    def test_get_rolling_window_with_empty_column(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, "", 3)

    def test_get_rolling_window_with_none_window(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, 'a', None)

    def test_get_rolling_window_with_invalid_window_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, "infected", "invalid_window_type")

    def test_get_rolling_window_with_negative_window(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, "infected", -10)
👩‍💻 Hint

You can consider to select a column and use pandas.DataFrame.rolling(window).

In order to be able to compare several countries, we might want to take the country’s population into account, and compare the percentage of infected individuals with respect to country’s population. In order to get country’s population, let’s load the dataset of countries:

countries = pd.read_csv(countries_dataset_url)
countries

Because this dataset contains information on both countries and provinces, to get the population of the whole country we need to be a little bit clever:

def filter_by_country_region(df, countries_and_region):
    """
    Filter the DataFrame by the given countries_and_region name and return rows with NaN Province_State.

    Returns:
        pandas DataFrame: the filtered DataFrame
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if countries_and_region not in df["Country_Region"].unique():
        raise Exception("countries_and_region name is wrong.")
    # Missing values are checked and processed quickly.
    return df[
        (df["Country_Region"] == ______) & df["Province_State"].______
    ]

filter_by_country_region(countries, "US")
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {
            "Country_Region": ["US", "US", "UK", "FR", "JP"],
            "Province_State": [None, "California", None, None, "Tokyo"],
            "Confirmed": [100, 50, 70, 80, 90],
            "Deaths": [10, 5, 7, 8, 9],
            "Recovered": [20, 10, 14, 16, 18],
        }
    )


class TestFilterByCountryRegion(unittest.TestCase):
    def test_filter_by_country_region_happy_case(self):
        # assign

        test_df = create_test_df()
        expected_result = pd.DataFrame(
            {
                "Country_Region": ["US"],
                "Province_State": [None],
                "Confirmed": [100],
                "Deaths": [10],
                "Recovered": [20],
            }
        )

        # act
        actual_result = filter_by_country_region(test_df, "US")

        # assert
        assert_frame_equal(expected_result, actual_result)

    def test_filter_by_country_region_without_None_Province_State(self):
        # arrange
        test_df = create_test_df()

        # act
        result = filter_by_country_region(test_df, "JP")

        # assert
        assert result.empty

    def test_filter_by_country_region_with_wrong_country_region_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with self.assertRaises(Exception):
            filter_by_country_region(test_df, "Wrong_name")

    def test_filter_by_country_region_with_none_df(self):
        # act & assert
        with self.assertRaises(Exception):
            filter_by_country_region(None, "US")

    def test_filter_by_country_region_with_empty_df(self):
        # act & assert
        with self.assertRaises(Exception):
            filter_by_country_region(pd.DataFrame(), "US")

    def test_filter_by_country_region_with_none_country_region_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with self.assertRaises(Exception):
            filter_by_country_region(test_df, None)

    def test_filter_by_country_region_with_empty_country_region_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with self.assertRaises(Exception):
            filter_by_country_region(test_df, "")

    def test_filter_by_country_region_with_invalid_country_region_name_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with self.assertRaises(Exception):
            filter_by_country_region(test_df, 123)
👩‍💻 Hint

You can consider to select a certain column and use pandas.DataFrame.isna().

def get_pinfected(df):
    """
    Computes the percentage of infected people in a given DataFrame `df`.

    Returns:
        pandas.Series: A new Series containing the percentage of infected people in the input DataFrame.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    pop = ______(countries, "US")["Population"].______
    return df["infected"] * 100 / pop

df["pinfected"] = get_pinfected(df)
df["pinfected"].plot(figsize=(10, 3))
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {
            "Country_Region": ["US", "US", "Canada", "Canada"],
            "Province_State": ["California", "New York", "Ontario", "Quebec"],
            "Population": [10000, 20000, 30000, 40000],
            "infected": [1000, 2000, 3000, 4000],
        }
    )


class TestGetPinfected(unittest.TestCase):
    def test_get_pinfected_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_result = pd.Series(
            [
                0.00030352119521741776,
                0.0006070423904348355,
                0.0009105635856522532,
                0.001214084780869671,
            ],
            name="infected",
        )

        # act
        actual_result = get_pinfected(test_df)

        # assert
        assert_series_equal(expected_result, actual_result, rtol=1e-3)

    def test_get_pinfected_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_pinfected(None)

    def test_get_pinfected_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_pinfected(pd.DataFrame())
    
    def test_get_pinfected_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            get_pinfected(123)
👩‍💻 Hint

You can consider to use the function defined before filter_by_country_region() and usepandas.DataFrame.iloc[] to select the first series number.

42.23.5. Computing \(R_t\)#

To see how infectious is the disease, we look at the basic reproduction number \(R_0\), which indicated the number of people that an infected person would further infect. When \(R_0\) is more than 1, the epidemic is likely to spread.

\(R_0\) is a property of the disease itself, and does not take into account some protective measures that people may take to slow down the pandemic. During the pandemic progression, we can estimate the reproduction number \(R_t\) at any given time \(t\). It has been shown that this number can be roughly estimated by taking a window of 8 days, and computing $\(R_t=\frac{I_{t-7}+I_{t-6}+I_{t-5}+I_{t-4}}{I_{t-3}+I_{t-2}+I_{t-1}+I_t}\)\( where \)I_t\( is the number of newly infected individuals on day \)t$.

Let’s compute \(R_t\) for our pandemic data. To do this, we will take a rolling window of 8 ninfected values, and apply the function to compute the ratio above:

def get_rt(df, column_name, window):
    """
    Calculate the Rt value of a given column in a DataFrame, using a rolling window.

    Returns:
        pandas.Series: A series containing the calculated Rt values.
    """ 
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if column_name not in df.columns:
        raise Exception("invalid column")
    if window is None or not isinstance(window, int) or window <= 0 or window >= len(df.index):
        raise Exception("invalid window")
    # Calculate Rt using a rolling window and a lambda function to sum the values
    # from the fourth day of the window onwards, and divide by the sum of the values
    # up to the third day of the window.
    df["Rt"] = get_rolling_window(df, column_name, window).apply(
        ______ x: x[4:].______ / x[:4].______
    )
    return df["Rt"]

get_rt(df, "ninfected", 8)
df["Rt"].plot()
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {
            "date": pd.date_range("2022-01-01", periods=18),
            "infected": [
                10,
                15,
                20,
                30,
                35,
                40,
                45,
                50,
                55,
                60,
                70,
                80,
                90,
                100,
                110,
                120,
                130,
                140,
            ],
        }
    )


class TestGetRt(unittest.TestCase):
    def test_get_rt_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_output = pd.Series(
            [
                None,
                None,
                None,
                None,
                None,
                None,
                None,
                2.2666666666666666,
                1.9,
                1.68,
                1.5666666666666667,
                1.5588235294117647,
                1.5789473684210527,
                1.619047619047619,
                1.6170212765957446,
                1.5849056603773586,
                1.5333333333333334,
                1.4705882352941178,
            ],
            dtype=np.float64,
        )

        # act
        result = get_rt(test_df, "infected", 8)

        # assert
        assert_series_equal(
            result, expected_output, rtol=0.001, check_dtype=False, check_names=False
        )

    def test_get_rt_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(None, 'a', 3)

    def test_get_rt_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(pd.DataFrame(), 'a', 3)

    def test_get_rt_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(123, 'a', 3)

    def test_get_rt_with_none_column(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, None, 3)

    def test_get_rt_with_invalid_column_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, 123, 3)

    def test_get_rt_with_invalid_column_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, 'c', 3)
    
    def test_get_rt_with_empty_column(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, "", 3)

    def test_get_rt_with_none_window(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, 'a', None)

    def test_get_rt_with_invalid_window_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, "infected", "invalid_window_type")

    def test_get_rt_with_negative_window(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_rolling_window(test_df, "infected", -10)
👩‍💻 Hint

You can consider to use lambda and sum().

You can see that there are some gaps in the graph. Those can be caused by either NaN, if inf values being present in the dataset. inf may be caused by division by 0, and NaN can indicate missing data, or no data available to compute the result (like in the very beginning of our frame, where rolling window of width 8 is not yet available). To make the graph nicer, we need to fill those values using replace and fillna function.

Let’s further look at the beginning of the pandemic. We will also limit the y-axis values to show only values below 6, in order to see better, and draw horizontal line at 1.

def rt_with_na_filled(df):
    """
    Calculate Rt with NA filled.
    
    Returns:
        A pandas Series object that contains Rt values with missing values (NaN) filled using the last non-missing value.
    """
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    # Filter out the data after May 1st 2020, replace infinite values with NaN, 
    # and fill the missing values using the last non-missing value.
    return (
        df[df.index < "2020-05-01"]["Rt"].______(np.inf, np.nan).______(method="pad")
    )

ax = rt_with_na_filled(df).plot(figsize=(10, 3))
ax.set_ylim([0, 6])
ax.axhline(1, linestyle="--", color="red")
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    return pd.DataFrame(
        {"Rt": [1.5, np.inf, 1.2, np.inf]},
        index=pd.to_datetime(["2020-04-29", "2020-04-30", "2020-05-01", "2020-05-02"]),
    )


class TestRtWithNaFilled(unittest.TestCase):
    def test_rt_with_na_filled_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_result = pd.Series(
            [1.5, 1.5], index=pd.to_datetime(["2020-04-29", "2020-04-30"]), name="Rt"
        )
        # act
        result = rt_with_na_filled(test_df)

        # assert
        pd.testing.assert_series_equal(result, expected_result)

    def test_rt_with_na_filled_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            rt_with_na_filled(None)

    def test_rt_with_na_filled_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            rt_with_na_filled(pd.DataFrame())
    
    def test_rt_with_na_filled_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            rt_with_na_filled(123)
👩‍💻 Hint

You can consider to use pandas.DataFrame.replace().

Another interesting indicator of the pandemic is the derivative, or daily difference in new cases. It allows us to see clearly when pandemic is increasing or declining.

def get_df_column_diff(df, column_name):
    if df is None or not isinstance(df, pd.DataFrame) or df.empty:
        raise Exception("df is not a valid DataFrame")
    if column_name not in df.columns:
        raise Exception("invalid column")
    # Calculate the difference between the current and the previous row's values for the given column
    return df[column_name].______

diff_series = get_df_column_diff(df, "ninfected")
diff_series.plot()
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
    test_df = pd.DataFrame(
        {
            "date": [
                "2022-01-01",
                "2022-01-02",
                "2022-01-03",
                "2022-01-04",
                "2022-01-05",
                "2022-01-06",
            ],
            "ninfected": [100, 110, 120, 130, 140, 150],
        }
    )
    test_df["date"] = pd.to_datetime(test_df["date"])
    test_df.set_index("date", inplace=True)
    return test_df


class TestGetDfColumnDiff(unittest.TestCase):
    def test_get_df_column_diff_happy_case(Self):
        # assign
        test_df = create_test_df()
        expected_diff = pd.Series(
            [None, 10, 10, 10, 10, 10],
            index=pd.to_datetime(
                [
                    "2022-01-01",
                    "2022-01-02",
                    "2022-01-03",
                    "2022-01-04",
                    "2022-01-05",
                    "2022-01-06",
                ]
            ),
        )

        # act
        column_diff = get_df_column_diff(test_df, "ninfected")

        # assert
        assert_series_equal(
            column_diff, expected_diff, check_dtype=False, check_names=False
        )

    def test_get_df_column_diff_with_none_df(Self):
        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(None, "ninfected")

    def test_get_df_column_diff_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(pd.DataFrame(), "ninfected")
    
    def test_get_df_column_diff_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(123, "ninfected")

    def test_get_df_column_diff_with_invalid_column_name(Self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(test_df, "invalid_column_name")
    
    def test_get_df_column_diff_with_none_column_name(Self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(test_df, None)

    def test_get_df_column_diff_with_none_column_type(Self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(test_df, 123)
    
    def test_get_df_column_diff_with_invalid_column_name(Self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(test_df, "invalid_column_name")
    
    def test_get_df_column_diff_with_empty_column(Self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_df_column_diff(test_df, "")
👩‍💻 Hint

You can consider to use pandas.DataFrame.diff()

Given the fact that there are a lot of fluctuations in data caused by reporting, it makes sense to smooth the curve by running rolling average to get the overall picture. Let’s again focus on the first months of the pandemic:

def get_smoothed_ax(df, column_name, datetime, window):
    """
    Returns a rolling mean of the diff of a column in a DataFrame up to a specific datetime.
   
    Returns:
        pandas Series with the smoothed values
    """
    if df is None:
        raise Exception("df cannot be None")
    if df.empty:
        raise Exception("df cannot be empty")
    if column_name not in df.columns:
        raise Exception("column not exist")
    # Filter the DataFrame to only include rows up to the datetime
    df_filtered = df[______]
    df_diff = df_filtered[column_name].diff()
    # Calculate the rolling mean of the diff
    df_rolling_mean = df_diff.rolling(window).______
    return df_rolling_mean

df_rolling_mean = get_smoothed_ax(df, "ninfected", "2020-06-01", 7)
ax = df_rolling_mean.plot()
ax.axhline(0, linestyle="-.", color="red")
plt.show()
Check result by executing below... 📝
%%ipytest -qq

def create_test_df():
        test_df = pd.DataFrame(
            data=[[2, 9, 3, None], [5, 9, 4, 3], [9, 8, 10, 4]],
            columns=["infected", "recovered", "deaths", "ninfected"],
            index=["2020-01-22", "2020-01-23", "2020-01-24"],
        )
        test_df.index = pd.to_datetime(test_df.index)
        return test_df

class TestGetSmoothedAx(unittest.TestCase):
    def test_get_smoothed_ax_happy_case(self):
        # assign
        test_df = create_test_df()
        expected_result = pd.Series([None, None, 1], index=test_df.index[0:], name="ninfected")

        # act
        result = get_smoothed_ax(test_df, "ninfected", "2020-01-25", 1)

        # assert
        assert result.equals(expected_result)

    def test_get_smoothed_ax_with_none_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(None, "ninfected", "2020-01-24", 2)

    def test_get_smoothed_ax_with_empty_df(self):
        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(pd.DataFrame, "ninfected", "2020-01-24", 2)
    
    def test_get_smoothed_ax_with_invalid_df_type(self):
        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(123, "ninfected", "2020-01-24", 2)

    def test_get_smoothed_ax_with_none_column_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, None, "2020-01-24", 2)

    def test_get_smoothed_ax_with_invalid_column_name_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, 123, "2020-01-24", 2)
    
    def test_get_smoothed_ax_with_nonexistent_column(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, "nonexistent_column", "2020-01-24", 2)

    def test_get_smoothed_ax_with_empty_column_name(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, "", "2020-01-24", 2)

    def test_get_smoothed_ax_with_invalid_window_type(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, "ninfected", "2020-01-24", "invalid_window_type")
    
    def test_get_smoothed_ax_with_none_window(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, "ninfected", "2020-01-24", None)
    
    def test_get_smoothed_ax_with_invalid_window_number(self):
        # assign
        test_df = create_test_df()

        # act & assert
        with pytest.raises(Exception):
            get_smoothed_ax(test_df, "ninfected", "2020-01-24", -1)
👩‍💻 Hint

You can consider to use pandas.DataFrame.index and mean().

42.23.6. Challenge#

Now it is time for you to play more with the code and data! Here are a few suggestions you can experiment with:

  • See the spread of the pandemic in different countries.

  • Plot \(R_t\) graphs for several countries on one plot for comparison, or make several plots side-by-side

  • See how the number of deaths and recoveries correlate with number of infected cases.

  • Try to find out how long a typical disease lasts by visually correlating infection rate and deaths rate and looking for some anomalies. You may need to look at different countries to find that out.

  • Calculate the fatality rate and how it changes over time. You may want to take into account the length of the disease in days to shift one time series before doing calculations

42.23.7. References#

You may look at further studies of COVID epidemic spread in the following publications:

42.23.8. Acknowledgments#

Thanks to Microsoft for creating the open-source course Data Science for Beginners. It inspires the majority of the content in this chapter.