Skip to content

Commit 593279b

Browse files
committed
aligned preprocessing to match current & rollback write for consistent result
1 parent 4ddd5a0 commit 593279b

File tree

1 file changed

+47
-25
lines changed

1 file changed

+47
-25
lines changed

doctor_visits/delphi_doctor_visits/process_data.py

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -81,32 +81,54 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
8181
return df
8282

8383

84-
def write_to_csv(output_df: pd.DataFrame, prefix: str, geo_id: str, weekday: bool, se: bool, logger, output_path="."):
84+
def write_to_csv(
85+
output_df: pd.DataFrame, prefix: str, geo_level: str, weekday: bool, se: bool, logger, output_path="."
86+
):
8587
"""
8688
Write sensor values to csv.
8789
8890
Args:
8991
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
90-
geo_id: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
92+
geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
9193
se: boolean to write out standard errors, if true, use an obfuscated name
9294
out_name: name of the output file
9395
output_path: outfile path to write the csv (default is current directory)
9496
"""
95-
out_name = format_outname(prefix, se, weekday)
96-
filtered_df = format_df(output_df, geo_id, se, logger)
97+
# out_name = format_outname(prefix, se, weekday)
9798

99+
# write out results
100+
out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
98101
if se:
99-
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")
102+
assert prefix is not None, "template has no obfuscated prefix"
103+
out_name = prefix + "_" + out_name
100104

101-
dates = set(list(output_df["date"]))
102-
grouped = filtered_df.groupby("date")
103-
for d in dates:
104-
filename = "%s/%s_%s_%s.csv" % (output_path, (d + Config.DAY_SHIFT).strftime("%Y%m%d"), geo_id, out_name)
105-
single_date_df = grouped.get_group(d)
106-
single_date_df = single_date_df.drop(columns=["date"])
107-
single_date_df.to_csv(filename, index=False, na_rep="NA")
105+
if se:
106+
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")
108107

109-
logger.debug(f"wrote {len(single_date_df)} rows for {geo_id}")
108+
out_n = 0
109+
for d in set(output_df["date"]):
110+
filename = "%s/%s_%s_%s.csv" % (output_path, (d + Config.DAY_SHIFT).strftime("%Y%m%d"), geo_level, out_name)
111+
single_date_df = output_df[output_df["date"] == d]
112+
with open(filename, "w") as outfile:
113+
outfile.write("geo_id,val,se,direction,sample_size\n")
114+
115+
for line in single_date_df.itertuples():
116+
geo_id = line.geo_id
117+
sensor = 100 * line.val # report percentages
118+
se_val = 100 * line.se
119+
assert not np.isnan(sensor), "sensor value is nan, check pipeline"
120+
assert sensor < 90, f"strangely high percentage {geo_level, sensor}"
121+
if not np.isnan(se_val):
122+
assert se_val < 5, f"standard error suspiciously high! investigate {geo_level}"
123+
124+
if se:
125+
assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid"
126+
outfile.write("%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA"))
127+
else:
128+
# for privacy reasons we will not report the standard error
129+
outfile.write("%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA"))
130+
out_n += 1
131+
logger.debug(f"wrote {out_n} rows for {geo_level}")
110132

111133

112134
def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame:
@@ -131,29 +153,29 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
131153
dtype=Config.DTYPES,
132154
blocksize=None,
133155
)
134-
135-
ddata = ddata.dropna()
136156
# rename inconsistent column names to match config column names
137157
ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP)
138-
139158
ddata = ddata[Config.FILT_COLS]
140-
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL])
141-
142-
# restrict to training start and end date
143-
startdate = startdate - Config.DAY_SHIFT
144159

145-
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data"
146-
assert startdate < enddate, "Start date >= end date"
147-
assert enddate <= dropdate, "End date > drop date"
160+
ddata = ddata.dropna()
148161

149-
date_filter = (ddata[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (ddata[Config.DATE_COL] < dropdate)
162+
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL])
150163

151-
df = ddata[date_filter].compute()
164+
df = ddata.compute()
152165

153166
# aggregate age groups (so data is unique by service date and FIPS)
154167
df = df.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()
155168
assert np.sum(df.duplicated()) == 0, "Duplicates after age group aggregation"
156169
assert (df[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative"
157170

171+
# restrict to training start and end date
172+
startdate = startdate - Config.DAY_SHIFT
173+
174+
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data"
175+
assert startdate < enddate, "Start date >= end date"
176+
assert enddate <= dropdate, "End date > drop date"
177+
178+
date_filter = (df[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (df[Config.DATE_COL] < dropdate)
179+
df = df[date_filter]
158180
logger.info(f"Done processing {filepath}")
159181
return df

0 commit comments

Comments
 (0)