diff --git a/doctor_visits/delphi_doctor_visits/config.py b/doctor_visits/delphi_doctor_visits/config.py index 78e49d094..92c65422c 100644 --- a/doctor_visits/delphi_doctor_visits/config.py +++ b/doctor_visits/delphi_doctor_visits/config.py @@ -24,13 +24,36 @@ class Config: GEO_COL = "PatCountyFIPS" AGE_COL = "PatAgeGroup" HRR_COLS = ["Pat HRR Name", "Pat HRR ID"] - ID_COLS = [DATE_COL] + [GEO_COL] + [AGE_COL] + HRR_COLS - FILT_COLS = ID_COLS + COUNT_COLS - DTYPES = {"ServiceDate": str, "PatCountyFIPS": str, - "Denominator": int, "Flu1": int, - "Covid_like": int, "Flu_like": int, - "Mixed": int, "PatAgeGroup": str, - "Pat HRR Name": str, "Pat HRR ID": float} + # as of 2020-05-11, input file expected to have 10 columns + # id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name + # value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed + ID_COLS = [DATE_COL] + [GEO_COL] + HRR_COLS + [AGE_COL] + # drop HRR columns - unused for now since we assign HRRs by FIPS + FILT_COLS = [DATE_COL] + [GEO_COL] + [AGE_COL] + COUNT_COLS + DTYPES = { + "ServiceDate": str, + "PatCountyFIPS": str, + "Denominator": int, + "Flu1": int, + "Covid_like": int, + "Flu_like": int, + "Mixed": int, + "PatAgeGroup": str, + "Pat HRR Name": str, + "Pat HRR ID": float, + "servicedate": str, + "patCountyFIPS": str, + "patAgeGroup": str, + "patHRRname": str, + "patHRRid": float, + } + DEVIANT_COLS_MAP = { + "servicedate": "ServiceDate", + "patCountyFIPS": "PatCountyFIPS", + "patHRRname": "Pat HRR Name", + "patAgeGroup": "PatAgeGroup", + "patHRRid": "Pat HRR ID", + } SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter MAX_BACKFILL_WINDOW = 7 # maximum number of days used to average a backfill correction diff --git a/doctor_visits/delphi_doctor_visits/modify_claims_drops.py b/doctor_visits/delphi_doctor_visits/modify_claims_drops.py deleted file mode 100644 index 3be9393e4..000000000 --- a/doctor_visits/delphi_doctor_visits/modify_claims_drops.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python3 - -"""Modify the drops. - -Drops are expected to be numbered as: - -../EDI_AGG_INPATIENT/EDI_AGG_INPATIENT_1_07052020_1456.csv.gz -../EDI_AGG_INPATIENT/EDI_AGG_INPATIENT_2_07052020_1456.csv.gz -... etc. -""" -# third party -import numpy as np -import pandas as pd - - -def modify_and_write(f, logger, test_mode=False): - """ - Modify drops given a folder path. - - Will rename necessary columns in the input files, and check the number of - columns and duplications. - - Args: - data_path: path to the file to be modified. - test_mode: Don't overwrite the drops if test_mode==True - - """ - filename = str(f) - out_path = f.parents[0] / f.name - dfs = pd.read_csv(f, dtype={"PatCountyFIPS": str, - "patCountyFIPS": str}) - if "servicedate" in dfs.columns: - dfs.rename(columns={"servicedate": "ServiceDate"}, inplace=True) - if "patCountyFIPS" in dfs.columns: - dfs.rename(columns={"patCountyFIPS": "PatCountyFIPS"}, inplace=True) - if "patHRRname" in dfs.columns: - dfs.rename(columns={"patHRRname": "Pat HRR Name"}, inplace=True) - if "patAgeGroup" in dfs.columns: - dfs.rename(columns={"patAgeGroup": "PatAgeGroup"}, inplace=True) - if "patHRRid" in dfs.columns: - dfs.rename(columns={"patHRRid": "Pat HRR ID"}, inplace=True) - - assert np.sum( - dfs.duplicated(subset=["ServiceDate", "PatCountyFIPS", - "Pat HRR Name", "PatAgeGroup"])) == 0, \ - f'Duplication across drops in {filename}!' - assert dfs.shape[1] == 10, f'Wrong number of columns in {filename}' - - if not test_mode: - dfs.to_csv(out_path, index=False) - logger.info("Wrote modified csv", filename=out_path) - return dfs diff --git a/doctor_visits/delphi_doctor_visits/process_data.py b/doctor_visits/delphi_doctor_visits/process_data.py new file mode 100644 index 000000000..28ad882ab --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/process_data.py @@ -0,0 +1,181 @@ +"""Module providing functions for processing and wrangling data.""" + +from datetime import datetime +from pathlib import Path + +import dask.dataframe as dd +import numpy as np +import pandas as pd + +from .config import Config + + +def format_outname(prefix: str, se: bool, weekday: bool): + """ + Write out results. + + Parameters + ---------- + prefix: + se: boolean to write out standard errors, if true, use an obfuscated name + weekday: boolean for weekday adjustments. + signals will be generated with weekday adjustments (True) or without + adjustments (False) + + Returns + ------- + outname str + """ + out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" + if se: + assert prefix is not None, "template has no obfuscated prefix" + out_name = prefix + "_" + out_name + return out_name + + +def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger): + """ + Format dataframe and checks for anomalies to write results. + + Parameters + ---------- + df: dataframe from output from update_sensor + geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"] + se: boolean to write out standard errors, if true, use an obfuscated name + logger + + Returns + ------- + filtered and formatted dataframe + """ + # report in percentage + df["val"] = df["val"] * 100 + df["se"] = df["se"] * 100 + + val_isnull = df["val"].isnull() + df_val_null = df[val_isnull] + assert len(df_val_null) == 0, "sensor value is nan, check pipeline" + df = df[~val_isnull] + + se_too_high = df["se"] >= 5 + df_se_too_high = df[se_too_high] + assert len(df_se_too_high) == 0, f"standard error suspiciously high! investigate {geo_id}" + df = df[~se_too_high] + + sensor_too_high = df["val"] >= 90 + df_sensor_too_high = df[sensor_too_high] + assert len(df_sensor_too_high) == 0, f"standard error suspiciously high! investigate {geo_id}" + df = df[~sensor_too_high] + + if se: + valid_cond = (df["se"] > 0) & (df["val"] > 0) + invalid_df = df[~valid_cond] + if len(invalid_df) > 0: + logger.info("p=0, std_err=0 invalid") + df = df[valid_cond] + else: + df["se"] = np.NAN + + df["direction"] = np.NAN + df["sample_size"] = np.NAN + return df + + +def write_to_csv( + output_df: pd.DataFrame, prefix: str, geo_level: str, weekday: bool, se: bool, logger, output_path="." +): + """ + Write sensor values to csv. + + Args: + output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id + geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"] + se: boolean to write out standard errors, if true, use an obfuscated name + out_name: name of the output file + output_path: outfile path to write the csv (default is current directory) + """ + # out_name = format_outname(prefix, se, weekday) + + # write out results + out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" + if se: + assert prefix is not None, "template has no obfuscated prefix" + out_name = prefix + "_" + out_name + + if se: + logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") + + out_n = 0 + for d in set(output_df["date"]): + filename = "%s/%s_%s_%s.csv" % (output_path, (d + Config.DAY_SHIFT).strftime("%Y%m%d"), geo_level, out_name) + single_date_df = output_df[output_df["date"] == d] + with open(filename, "w") as outfile: + outfile.write("geo_id,val,se,direction,sample_size\n") + + for line in single_date_df.itertuples(): + geo_id = line.geo_id + sensor = 100 * line.val # report percentages + se_val = 100 * line.se + assert not np.isnan(sensor), "sensor value is nan, check pipeline" + assert sensor < 90, f"strangely high percentage {geo_level, sensor}" + if not np.isnan(se_val): + assert se_val < 5, f"standard error suspiciously high! investigate {geo_level}" + + if se: + assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid" + outfile.write("%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA")) + else: + # for privacy reasons we will not report the standard error + outfile.write("%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) + out_n += 1 + logger.debug(f"wrote {out_n} rows for {geo_level}") + + +def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame: + """ + Read csv using Dask, filters unneeded data, then converts back into pandas dataframe. + + Parameters + ---------- + filepath: path to the aggregated doctor-visits data + startdate: first sensor date (YYYY-mm-dd) + enddate: last sensor date (YYYY-mm-dd) + dropdate: data drop date (YYYY-mm-dd) + + ------- + """ + filepath = Path(filepath) + logger.info(f"Processing {filepath}") + + ddata = dd.read_csv( + filepath, + compression="gzip", + dtype=Config.DTYPES, + blocksize=None, + ) + # rename inconsistent column names to match config column names + ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP) + ddata = ddata[Config.FILT_COLS] + + ddata = ddata.dropna() + + ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL]) + + df = ddata.compute() + + # aggregate age groups (so data is unique by service date and FIPS) + df = df.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index() + assert np.sum(df.duplicated()) == 0, "Duplicates after age group aggregation" + assert (df[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative" + + # restrict to training start and end date + startdate = startdate - Config.DAY_SHIFT + + assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data" + assert startdate < enddate, "Start date >= end date" + assert enddate <= dropdate, "End date > drop date" + + date_filter = (df[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (df[Config.DATE_COL] < dropdate) + df = df[date_filter] + logger.info(f"Done processing {filepath}") + return df diff --git a/doctor_visits/delphi_doctor_visits/run.py b/doctor_visits/delphi_doctor_visits/run.py index 2dccffc8c..5018cd346 100644 --- a/doctor_visits/delphi_doctor_visits/run.py +++ b/doctor_visits/delphi_doctor_visits/run.py @@ -13,11 +13,12 @@ from delphi_utils import get_structured_logger -# first party -from .update_sensor import update_sensor, write_to_csv from .download_claims_ftp_files import download -from .modify_claims_drops import modify_and_write from .get_latest_claims_name import get_latest_filename +from .process_data import csv_to_df, write_to_csv + +# first party +from .update_sensor import update_sensor def run_module(params, logger=None): # pylint: disable=too-many-statements @@ -63,9 +64,6 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements # find the latest files (these have timestamps) claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date) - # modify data - modify_and_write(claims_file, logger) - ## get end date from input file # the filename is expected to be in the format: # "EDI_AGG_OUTPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz" @@ -106,6 +104,15 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements ## geographies geos = ["state", "msa", "hrr", "county", "hhs", "nation"] + claims_df = csv_to_df(claims_file, startdate_dt, enddate_dt, dropdate_dt, logger) + + ## print out other vars + logger.info("outpath:\t\t%s", export_dir) + logger.info("parallel:\t\t%s", params["indicator"]["parallel"]) + logger.info("weekday:\t\t%s", params["indicator"]["weekday"]) + logger.info("write se:\t\t%s", se) + logger.info("obfuscated prefix:\t%s", prefix) + max_dates = [] n_csv_export = [] ## start generating @@ -116,10 +123,10 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements else: logger.info("Starting with no adj", geo_type=geo) sensor = update_sensor( - filepath=claims_file, - startdate=startdate, - enddate=enddate, - dropdate=dropdate, + data=claims_df, + startdate=startdate_dt, + enddate=enddate_dt, + dropdate=dropdate_dt, geo=geo, parallel=params["indicator"]["parallel"], weekday=weekday, @@ -129,13 +136,8 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements if sensor is None: logger.error("No sensors calculated, no output will be produced") continue - # write out results - out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" - if params["indicator"]["se"]: - assert prefix is not None, "template has no obfuscated prefix" - out_name = prefix + "_" + out_name - write_to_csv(sensor, geo, se, out_name, logger, export_dir) + write_to_csv(sensor, prefix, geo, weekday, se, logger, export_dir) max_dates.append(sensor.date.max()) n_csv_export.append(sensor.date.unique().shape[0]) logger.debug("Wrote files", export_dir=export_dir) diff --git a/doctor_visits/delphi_doctor_visits/update_sensor.py b/doctor_visits/delphi_doctor_visits/update_sensor.py index 4cac1e81c..b4b0bbb58 100644 --- a/doctor_visits/delphi_doctor_visits/update_sensor.py +++ b/doctor_visits/delphi_doctor_visits/update_sensor.py @@ -7,9 +7,8 @@ - 2020-04-30: Aaron Rumack (add megacounty code) - 2020-05-06: Aaron and Maria (weekday effects/other adjustments) """ - # standard packages -from datetime import timedelta +from datetime import datetime, timedelta from multiprocessing import Pool, cpu_count # third party @@ -24,57 +23,21 @@ from .sensor import DoctorVisitsSensor -def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, output_path="."): - """Write sensor values to csv. - - Args: - output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id - se: boolean to write out standard errors, if true, use an obfuscated name - out_name: name of the output file - output_path: outfile path to write the csv (default is current directory) - """ - if se: - logger.info("WARNING: WRITING SEs", filename=out_name) - - out_n = 0 - for d in set(output_df["date"]): - filename = "%s/%s_%s_%s.csv" % (output_path, - (d + Config.DAY_SHIFT).strftime("%Y%m%d"), - geo_level, - out_name) - single_date_df = output_df[output_df["date"] == d] - with open(filename, "w") as outfile: - outfile.write("geo_id,val,se,direction,sample_size\n") - - for line in single_date_df.itertuples(): - geo_id = line.geo_id - sensor = 100 * line.val # report percentages - se_val = 100 * line.se - assert not np.isnan(sensor), "sensor value is nan, check pipeline" - assert sensor < 90, f"strangely high percentage {geo_id, sensor}" - if not np.isnan(se_val): - assert se_val < 5, f"standard error suspiciously high! investigate {geo_id}" - - if se: - assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid" - outfile.write( - "%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA")) - else: - # for privacy reasons we will not report the standard error - outfile.write( - "%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) - out_n += 1 - logger.debug("Wrote rows", num_rows=out_n, geo_type=geo_level) - - def update_sensor( - filepath, startdate, enddate, dropdate, geo, parallel, - weekday, se, logger + data: pd.DataFrame, + startdate: datetime, + enddate: datetime, + dropdate: datetime, + geo: str, + parallel: bool, + weekday: bool, + se: bool, + logger, ): """Generate sensor values. Args: - filepath: path to the aggregated doctor-visits data + data: dataframe of the cleaned claims file startdate: first sensor date (YYYY-mm-dd) enddate: last sensor date (YYYY-mm-dd) dropdate: data drop date (YYYY-mm-dd) @@ -84,41 +47,9 @@ def update_sensor( se: boolean to write out standard errors, if true, use an obfuscated name logger: the structured logger """ - # as of 2020-05-11, input file expected to have 10 columns - # id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name - # value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed - data = pd.read_csv( - filepath, - usecols=Config.FILT_COLS, - dtype=Config.DTYPES, - parse_dates=[Config.DATE_COL], - ) - assert ( - np.sum(data.duplicated(subset=Config.ID_COLS)) == 0 - ), "Duplicated data! Check the input file" - - # drop HRR columns - unused for now since we assign HRRs by FIPS - data.drop(columns=Config.HRR_COLS, inplace=True) - data.dropna(inplace=True) # drop rows with any missing entries - - # aggregate age groups (so data is unique by service date and FIPS) - data = data.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index() - assert np.sum(data.duplicated()) == 0, "Duplicates after age group aggregation" - assert (data[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative" - - ## collect dates - # restrict to training start and end date drange = lambda s, e: np.array([s + timedelta(days=x) for x in range((e - s).days)]) - startdate = pd.to_datetime(startdate) - Config.DAY_SHIFT - burnindate = startdate - Config.DAY_SHIFT - enddate = pd.to_datetime(enddate) - dropdate = pd.to_datetime(dropdate) - assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data" - assert startdate < enddate, "Start date >= end date" - assert enddate <= dropdate, "End date > drop date" - data = data[(data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & \ - (data[Config.DATE_COL] < dropdate)] fit_dates = drange(Config.FIRST_DATA_DATE, dropdate) + burnindate = startdate - Config.DAY_SHIFT burn_in_dates = drange(burnindate, dropdate) sensor_dates = drange(startdate, enddate) # The ordering of sensor dates corresponds to the order of burn-in dates diff --git a/doctor_visits/setup.py b/doctor_visits/setup.py index 17d6fc9af..c3cc2edc8 100644 --- a/doctor_visits/setup.py +++ b/doctor_visits/setup.py @@ -11,6 +11,7 @@ "pytest-cov", "pytest", "scikit-learn", + "dask>=2023.5.*,<2023.6.0", # latest support for 3.8 "cvxpy>=1.5", "scs<3.2.6", # TODO: remove this ; it is a cvxpy dependency, and the excluded version appears to break our jenkins build. see: https://github.com/cvxgrp/scs/issues/283 ] diff --git a/doctor_visits/tests/comparison/process_data/20200204_state_smoothed_cli.csv b/doctor_visits/tests/comparison/process_data/20200204_state_smoothed_cli.csv new file mode 100644 index 000000000..72cba27f6 --- /dev/null +++ b/doctor_visits/tests/comparison/process_data/20200204_state_smoothed_cli.csv @@ -0,0 +1,52 @@ +geo_id,val,se,direction,sample_size +ak,0.569287,NA,NA,NA +al,0.328228,NA,NA,NA +ar,0.370763,NA,NA,NA +az,0.530073,NA,NA,NA +ca,0.351530,NA,NA,NA +co,0.401868,NA,NA,NA +ct,0.601417,NA,NA,NA +dc,0.878253,NA,NA,NA +de,0.324467,NA,NA,NA +fl,0.479217,NA,NA,NA +ga,0.475930,NA,NA,NA +hi,0.393773,NA,NA,NA +ia,0.481491,NA,NA,NA +id,0.445713,NA,NA,NA +il,0.380958,NA,NA,NA +in,0.357658,NA,NA,NA +ks,0.365005,NA,NA,NA +ky,0.368104,NA,NA,NA +la,0.405224,NA,NA,NA +ma,0.347109,NA,NA,NA +md,0.478480,NA,NA,NA +me,0.292373,NA,NA,NA +mi,0.432469,NA,NA,NA +mn,0.436532,NA,NA,NA +mo,0.354799,NA,NA,NA +ms,0.385404,NA,NA,NA +mt,0.363729,NA,NA,NA +nc,0.502467,NA,NA,NA +nd,0.384162,NA,NA,NA +ne,0.504449,NA,NA,NA +nh,0.406304,NA,NA,NA +nj,0.350642,NA,NA,NA +nm,0.336862,NA,NA,NA +nv,0.590539,NA,NA,NA +ny,0.369274,NA,NA,NA +oh,0.402905,NA,NA,NA +ok,0.339027,NA,NA,NA +or,0.421793,NA,NA,NA +pa,0.342980,NA,NA,NA +ri,0.353920,NA,NA,NA +sc,0.321687,NA,NA,NA +sd,0.508804,NA,NA,NA +tn,0.454150,NA,NA,NA +tx,0.358389,NA,NA,NA +ut,0.488488,NA,NA,NA +va,0.371326,NA,NA,NA +vt,0.307760,NA,NA,NA +wa,0.440772,NA,NA,NA +wi,0.373994,NA,NA,NA +wv,0.317498,NA,NA,NA +wy,0.346961,NA,NA,NA diff --git a/doctor_visits/tests/comparison/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl b/doctor_visits/tests/comparison/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl new file mode 100644 index 000000000..76eb4e38c Binary files /dev/null and b/doctor_visits/tests/comparison/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl differ diff --git a/doctor_visits/tests/test_geomap.py b/doctor_visits/tests/test_geomap.py index 02cb1e0f8..0b5913c8a 100644 --- a/doctor_visits/tests/test_geomap.py +++ b/doctor_visits/tests/test_geomap.py @@ -3,14 +3,14 @@ from delphi_doctor_visits.geo_maps import GeoMaps from delphi_doctor_visits.config import Config -CONFIG = Config() DATA = pd.read_csv( "test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz", - usecols=CONFIG.FILT_COLS, - dtype=CONFIG.DTYPES, - parse_dates=[CONFIG.DATE_COL], + dtype=Config.DTYPES, nrows=9, ) +DATA.rename(columns=Config.DEVIANT_COLS_MAP, inplace=True) +DATA = DATA[Config.FILT_COLS] +DATA[Config.DATE_COL] = DATA[Config.DATE_COL].apply(pd.to_datetime) GM = GeoMaps() diff --git a/doctor_visits/tests/test_modify_claims_drops.py b/doctor_visits/tests/test_modify_claims_drops.py deleted file mode 100644 index 8368c5148..000000000 --- a/doctor_visits/tests/test_modify_claims_drops.py +++ /dev/null @@ -1,25 +0,0 @@ -# standard -from unittest.mock import Mock -from pathlib import Path - -import pandas as pd - -# third party -from delphi_doctor_visits.modify_claims_drops import (modify_and_write) - - -class TestDropsModification: - - def test_modify_and_write(self): - data_path = Path('./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz') - logger = Mock() - df = modify_and_write(data_path, logger, test_mode=True) - expected_colnames = ['PatCountyFIPS', 'Pat HRR Name', 'Pat HRR ID', 'PatAgeGroup'] - - test_df = pd.read_csv(data_path, dtype={"PatCountyFIPS": str, - "patCountyFIPS": str}) - - assert set(expected_colnames).issubset(set(df.columns)) - assert df.shape[0] == test_df.shape[0] - assert df.shape[1] == test_df.shape[1] - assert df.shape[1] == 10 diff --git a/doctor_visits/tests/test_process_data.py b/doctor_visits/tests/test_process_data.py new file mode 100644 index 000000000..5abfc0db1 --- /dev/null +++ b/doctor_visits/tests/test_process_data.py @@ -0,0 +1,62 @@ +"""Tests for update_sensor.py.""" +from datetime import datetime, timedelta +import logging +import os +from pathlib import Path +import pandas as pd + +from delphi_doctor_visits.process_data import csv_to_df, write_to_csv, format_outname + +TEST_LOGGER = logging.getLogger() + +class TestProcessData: + geo = "state", + startdate = datetime(2020, 2, 4) + enddate = datetime(2020, 2, 5) + dropdate = datetime(2020, 2,6) + geo = "state" + se = False + weekday = False + prefix = "wip_XXXXX" + filepath = "./test_data" + compare_path = "./comparison" + + def test_csv_to_df(self): + actual = csv_to_df( + filepath=f"{self.filepath}/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz", + startdate=self.startdate, + enddate=self.enddate, + dropdate=self.dropdate, + logger=TEST_LOGGER, + ) + + columns = list(actual.columns) + expected = pd.read_pickle(f"{self.compare_path}/process_data/main_after_date_SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.pkl") + expected.reset_index(drop=True) + expected = expected[columns] + pd.testing.assert_frame_equal(expected, actual) + + def test_write_to_csv(self): + output_df = pd.read_csv(f"{self.compare_path}/update_sensor/all.csv", parse_dates=["date"]) + + write_to_csv( + output_df=output_df, + prefix=self.prefix, + geo_level=self.geo, + se=self.se, + weekday=self.weekday, + logger=TEST_LOGGER, + output_path=self.filepath + ) + + outname = format_outname(self.prefix, self.se, self.weekday) + + files = list(Path(self.filepath).glob(f"*{outname}.csv")) + + for f in files: + filename = f.name + actual = pd.read_csv(f) + expected = pd.read_csv(f"{self.compare_path}/process_data/{filename}") + pd.testing.assert_frame_equal(expected, actual) + os.remove(f) + diff --git a/doctor_visits/tests/test_update_sensor.py b/doctor_visits/tests/test_update_sensor.py index ab74c1c90..1d107b31c 100644 --- a/doctor_visits/tests/test_update_sensor.py +++ b/doctor_visits/tests/test_update_sensor.py @@ -1,18 +1,27 @@ """Tests for update_sensor.py.""" +from datetime import datetime import logging import pandas as pd from delphi_doctor_visits.update_sensor import update_sensor +from delphi_doctor_visits.process_data import csv_to_df TEST_LOGGER = logging.getLogger() class TestUpdateSensor: + start_date = datetime(2020, 2, 4) + end_date = datetime(2020, 2, 5) + drop_date = datetime(2020, 2, 6) def test_update_sensor(self): + claims_df = csv_to_df(filepath="./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz", + startdate=self.start_date, enddate=self.end_date, dropdate=self.drop_date, + logger=TEST_LOGGER) + actual = update_sensor( - filepath="./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz", - startdate="2020-02-04", - enddate="2020-02-05", - dropdate="2020-02-06", + data=claims_df, + startdate=self.start_date, + enddate=self.end_date, + dropdate=self.drop_date, geo="state", parallel=False, weekday=False,