-
Notifications
You must be signed in to change notification settings - Fork 67
Split covid_hosp into daily & timeseries tables #1126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 41 commits
e37407b
0647157
e2680e6
beb6b6f
569c396
c2ee818
2033478
23313b5
926cf16
321415c
f5718ca
edf11a1
927cbd1
bfe3394
62fe29b
015eaf4
28873b7
f6aec5d
ef55754
199ad09
db925ef
bc5d736
30581b1
3893f62
128fafc
3e671b4
adc8487
1e7654d
560e356
69f9897
acf9b1f
0719c96
1f47ada
c147287
098d590
efb1a70
3d07161
d8db5c2
cecf957
046f123
355abfd
c6016d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,7 @@ def __init__(self, | |
table_name=None, | ||
hhs_dataset_id=None, | ||
columns_and_types=None, | ||
key_columns=None, | ||
additional_fields=None): | ||
key_columns=None): | ||
"""Create a new Database object. | ||
|
||
Parameters | ||
|
@@ -37,22 +36,18 @@ def __init__(self, | |
columns_and_types : tuple[str, str, Callable] | ||
List of 3-tuples of (CSV header name, SQL column name, data type) for | ||
all the columns in the CSV file. | ||
additional_fields : tuple[str] | ||
List of 2-tuples of (value, SQL column name) fordditional fields to include | ||
at the end of the row which are not present in the CSV data. | ||
""" | ||
|
||
self.connection = connection | ||
self.table_name = table_name | ||
self.hhs_dataset_id = hhs_dataset_id | ||
self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' else \ | ||
self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' or table_name == "covid_hosp_state_daily" else \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this line is getting a little gnarly, but we will fix it with the other yaml-schema changes so you can leave it for now |
||
'publication_date' | ||
self.columns_and_types = { | ||
c.csv_name: c | ||
for c in (columns_and_types if columns_and_types is not None else []) | ||
} | ||
self.key_columns = key_columns if key_columns is not None else [] | ||
self.additional_fields = additional_fields if additional_fields is not None else [] | ||
|
||
@classmethod | ||
def logger(database_class): | ||
|
@@ -184,9 +179,9 @@ def nan_safe_dtype(dtype, value): | |
for csv_name in self.key_columns: | ||
dataframe.loc[:, csv_name] = dataframe[csv_name].map(self.columns_and_types[csv_name].dtype) | ||
|
||
num_columns = 2 + len(dataframe_columns_and_types) + len(self.additional_fields) | ||
num_columns = 2 + len(dataframe_columns_and_types) | ||
value_placeholders = ', '.join(['%s'] * num_columns) | ||
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types + self.additional_fields) | ||
columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types) | ||
sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ | ||
f'VALUES ({value_placeholders})' | ||
id_and_publication_date = (0, publication_date) | ||
|
@@ -200,8 +195,7 @@ def nan_safe_dtype(dtype, value): | |
for c in dataframe_columns_and_types: | ||
values.append(nan_safe_dtype(c.dtype, row[c.csv_name])) | ||
many_values.append(id_and_publication_date + | ||
tuple(values) + | ||
tuple(i.csv_name for i in self.additional_fields)) | ||
tuple(values)) | ||
n += 1 | ||
# insert in batches because one at a time is slow and all at once makes | ||
# the connection drop :( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
-- 1. Add new state_daily table mirroring state_timeseries table | ||
|
||
CREATE TABLE `covid_hosp_state_daily` ( | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
-- for uniqueness | ||
PRIMARY KEY (`id`), | ||
-- for fast lookup of most recent issue for a given state and date | ||
UNIQUE KEY `issue_by_state_and_date` (`state`, `date`, `issue`), | ||
-- for fast lookup of a time-series for a given state and issue | ||
KEY `date_by_issue_and_state` (`issue`, `state`, `date`), | ||
-- for fast lookup of all states for a given date and issue | ||
KEY `state_by_issue_and_date` (`issue`, `date`, `state`) | ||
) ENGINE=InnoDB DEFAULT CHARSET=utf8 | ||
SELECT * FROM covid_hosp_state_timeseries WHERE record_type='D'; | ||
-- AUTOINCREMENT is not preserved by `CREATE TABLE ... SELECT`; Re-add | ||
ALTER TABLE covid_hosp_state_daily MODIFY id INT NOT NULL AUTO_INCREMENT; | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
-- 2. Remove data with incorrect record_type from timeseries table (D records were moved to daily) | ||
|
||
DELETE FROM `covid_hosp_state_timeseries` WHERE record_type='D'; | ||
|
||
-- 3. Remove the record_type column from both tables | ||
|
||
ALTER TABLE `covid_hosp_state_daily` DROP COLUMN record_type; | ||
ALTER TABLE `covid_hosp_state_timeseries` DROP COLUMN record_type; | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -152,19 +152,65 @@ def handle(): | |||||
q.where_integers("date", dates) | ||||||
q.where_strings("state", states) | ||||||
|
||||||
# These queries prioritize the daily value if there is both a time series and daily value for a given issue/date/state. | ||||||
# Further details: https://github.com/cmu-delphi/delphi-epidata/pull/336/files#diff-097d4969fdc9ac1f722809e85f3dc59ad371b66011861a50d15fcc605839c63dR364-R368 | ||||||
if issues is not None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
does this work? if so, its a little easier to comprehend. same for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. more specifically -- do we always want to treat falsy values of we don't encourage users to submit 0 as an issue, but that doesn't mean they couldn't do it |
||||||
# Filter for all matching issues | ||||||
q.where_integers("issue", issues) | ||||||
# final query using specific issues | ||||||
query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} WHERE {q.conditions_clause}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" | ||||||
elif as_of is not None: | ||||||
sub_condition_asof = "(issue <= :as_of)" | ||||||
q.params["as_of"] = as_of | ||||||
query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state ORDER BY issue DESC, record_type) `row` FROM {q.table} WHERE {q.conditions_clause} AND {sub_condition_asof}) SELECT {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" | ||||||
|
||||||
# Get all issues matching the conditions from daily & timeseries | ||||||
union_subquery = f''' | ||||||
( | ||||||
SELECT *, 'D' as record_type FROM `covid_hosp_state_daily` {q.alias} WHERE {q.conditions_clause} | ||||||
UNION ALL | ||||||
SELECT *, 'T' as record_type FROM `covid_hosp_state_timeseries` {q.alias} WHERE {q.conditions_clause} | ||||||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) c''' | ||||||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
# Prioritize rows with record_type='D' for each issue/date/state group | ||||||
query = f''' | ||||||
SELECT {q.fields_clause} FROM ( | ||||||
SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY issue, date, state ORDER BY record_type) `row` | ||||||
FROM {union_subquery} | ||||||
) {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause} | ||||||
''' | ||||||
else: | ||||||
# final query using most recent issues | ||||||
subquery = f"(SELECT max(`issue`) `max_issue`, `date`, `state` FROM {q.table} WHERE {q.conditions_clause} GROUP BY `date`, `state`) x" | ||||||
condition = f"x.`max_issue` = {q.alias}.`issue` AND x.`date` = {q.alias}.`date` AND x.`state` = {q.alias}.`state`" | ||||||
query = f"WITH c as (SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY date, state, issue ORDER BY record_type) `row` FROM {q.table} JOIN {subquery} ON {condition}) select {q.fields_clause} FROM {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause}" | ||||||
# Filter for most recent issues | ||||||
cond_clause = q.conditions_clause | ||||||
if as_of is not None: | ||||||
krivard marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
# ...Filter for most recent issues before a given as_of | ||||||
cond_clause += " AND (issue <= :as_of)" | ||||||
q.params["as_of"] = as_of | ||||||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
join_condition = f"{q.alias}.state = x.state AND {q.alias}.date = x.date AND {q.alias}.issue = x.max_issue" | ||||||
|
||||||
# Get the rows from the daily & timeseries tables with the highest issue value within each state/date group | ||||||
join_daily = f''' | ||||||
SELECT {q.fields_clause}, 'D' AS record_type FROM `covid_hosp_state_daily` {q.alias} | ||||||
INNER JOIN ( | ||||||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
SELECT {q.alias}.state, {q.alias}.date, MAX({q.alias}.issue) AS max_issue | ||||||
FROM `covid_hosp_state_daily` {q.alias} | ||||||
WHERE {cond_clause} | ||||||
GROUP BY {q.alias}.state, {q.alias}.date | ||||||
) x | ||||||
ON {join_condition} | ||||||
''' | ||||||
join_timeseries = f''' | ||||||
SELECT {q.fields_clause}, 'T' AS record_type FROM `covid_hosp_state_timeseries` {q.alias} | ||||||
INNER JOIN ( | ||||||
SELECT {q.alias}.state, {q.alias}.date, MAX(issue) AS max_issue | ||||||
FROM `covid_hosp_state_timeseries` {q.alias} | ||||||
WHERE {cond_clause} | ||||||
GROUP BY {q.alias}.state, {q.alias}.date | ||||||
) x | ||||||
ON {join_condition} | ||||||
''' | ||||||
|
||||||
# Combine daily & timeseries queries, getting the combined latest issues (and prioritizing rows with record_type='D' in a tie) | ||||||
query = f''' | ||||||
SELECT {q.fields_clause} FROM ( | ||||||
SELECT {q.fields_clause}, ROW_NUMBER() OVER (PARTITION BY state, date ORDER BY issue DESC, record_type) `row` | ||||||
FROM ({join_daily} UNION ALL {join_timeseries}) {q.alias} | ||||||
) {q.alias} WHERE `row` = 1 ORDER BY {q.order_clause} | ||||||
''' | ||||||
|
||||||
# send query | ||||||
return execute_query(query, q.params, fields_string, fields_int, fields_float) |
Uh oh!
There was an error while loading. Please reload this page.