"""Functionality that extends on what the base StatsCan api returns in some way.
Todo
----
Function to delete tables
Extend getChangedCubeList with a function that returns all tables updated
within a date range
"""
import json
import pathlib
import zipfile
import h5py
import numpy as np
import pandas as pd
import requests
from tqdm import tqdm
from stats_can.helpers import parse_tables
from stats_can.helpers import parse_vectors
from stats_can.scwds import get_bulk_vector_data_by_range
from stats_can.scwds import get_code_sets
from stats_can.scwds import get_cube_metadata
from stats_can.scwds import get_data_from_vectors_and_latest_n_periods
from stats_can.scwds import get_full_table_download
from stats_can.scwds import get_series_info_from_vector
[docs]def get_tables_for_vectors(vectors):
"""Get a list of dicts mapping vectors to tables.
Parameters
----------
vectors : list of str or str
Vectors to find tables for
Returns
-------
tables_list: list of dict
keys for each vector number return the table, plus a key for
'all_tables' that has a list of unique tables used by vectors
"""
v_json = get_series_info_from_vector(vectors)
vectors = [j["vectorId"] for j in v_json]
tables_list = {j["vectorId"]: str(j["productId"]) for j in v_json}
tables_list["all_tables"] = []
for vector in vectors:
if tables_list[vector] not in tables_list["all_tables"]:
tables_list["all_tables"].append(tables_list[vector])
return tables_list
[docs]def table_subsets_from_vectors(vectors):
"""Get a list of dicts mapping tables to vectors.
Parameters
----------
vectors : list of str or str
Vectors to find tables for
Returns
-------
tables_dict: list of dict
keys for each table used by the vectors, matched to a list of vectors
"""
start_tables_dict = get_tables_for_vectors(vectors)
tables_dict = {t: [] for t in start_tables_dict["all_tables"]}
vecs = list(start_tables_dict.keys())[:-1] # all but the all_tables key
for vec in vecs:
tables_dict[start_tables_dict[vec]].append(vec)
return tables_dict
[docs]def download_tables(tables, path=None, csv=True):
"""Download a json file and zip of data for a list of tables to path.
Parameters
----------
tables: list of str
tables to be downloaded
path: str or path object, default: None (will do current directory)
Where to download the table and json
csv: boolean, default True
download in CSV format, if not download SDMX
Returns
-------
downloaded: list
list of tables that were downloaded
"""
path = pathlib.Path(path) if path else pathlib.Path()
metas = get_cube_metadata(tables)
for meta in metas:
product_id = meta["productId"]
zip_url = get_full_table_download(product_id, csv=csv)
zip_file_name = product_id + ("-eng.zip" if csv else ".zip")
json_file_name = product_id + ".json"
zip_file = path / zip_file_name
json_file = path / json_file_name
# Thanks http://evanhahn.com/python-requests-library-useragent/
response = requests.get(zip_url, stream=True, headers={"user-agent": None})
progress_bar = tqdm(
desc=zip_file_name,
total=int(response.headers.get("content-length", 0)),
unit="B",
unit_scale=True,
)
# Thanks https://bit.ly/2sPYPYw
with open(json_file, "w") as outfile:
json.dump(meta, outfile)
with open(zip_file, "wb") as handle:
for chunk in response.iter_content(chunk_size=512):
if chunk: # filter out keep-alive new chunks
handle.write(chunk)
progress_bar.update(len(chunk))
progress_bar.close()
return [meta["productId"] for meta in metas]
[docs]def zip_update_tables(path=None, csv=True):
"""Check local json, update zips of outdated tables.
Grabs the json files in path, checks them against the metadata on
StatsCan and grabs updated tables where there have been changes
There isn't actually a "last modified date" part to the metadata
What I'm doing is comparing the latest reference period. Almost all
data changes will at least include incremental releases, so this should
capture what I want
Parameters
----------
path: str or pathlib.Path, default: None
where to look for tables to update
csv: boolean, default: True
Downloads updates in CSV form by default, SDMX if false
Returns
-------
update_table_list: list
list of the tables that were updated
"""
local_jsons = list_zipped_tables(path=path)
tables = [j["productId"] for j in local_jsons]
remote_jsons = get_cube_metadata(tables)
update_table_list = [
local["productId"]
for local, remote in zip(local_jsons, remote_jsons)
if local["cubeEndDate"] != remote["cubeEndDate"]
]
download_tables(update_table_list, path, csv=csv)
return update_table_list
[docs]def zip_table_to_dataframe(table, path=None):
"""Read a StatsCan table into a pandas DataFrame.
If a zip file of the table does not exist in path, downloads it
Parameters
----------
table: str
the table to load to dataframe from zipped csv
path: str or pathlib.Path, default: current working directory when module is loaded
where to download the tables or load them
Returns
-------
df: pandas.DataFrame
the table as a dataframe
"""
path = pathlib.Path(path) if path else pathlib.Path()
# Parse tables returns a list, can only do one table at a time here though
table = parse_tables(table)[0]
table_zip = table + "-eng.zip"
table_zip = path / table_zip
if not table_zip.is_file():
download_tables([table], path)
csv_file = table + ".csv"
with zipfile.ZipFile(table_zip) as myzip:
with myzip.open(csv_file) as myfile:
col_names = pd.read_csv(myfile, nrows=0).columns
# reopen the file or it misses the first row
with myzip.open(csv_file) as myfile:
types_dict = {"VALUE": float}
types_dict.update({col: str for col in col_names if col not in types_dict})
df = pd.read_csv(myfile, dtype=types_dict)
possible_cats = [
"GEO",
"DGUID",
"STATUS",
"SYMBOL",
"TERMINATED",
"DECIMALS",
"UOM",
"UOM_ID",
"SCALAR_FACTOR",
"SCALAR_ID",
"VECTOR",
"COORDINATE",
"Wages",
"National Occupational Classification for Statistics (NOC-S)",
"Supplementary unemployment rates",
"Sex",
"Age group",
"Labour force characteristics",
"Statistics",
"Data type",
"Job permanency",
"Union coverage",
"Educational attainment",
]
actual_cats = [col for col in possible_cats if col in col_names]
df[actual_cats] = df[actual_cats].astype("category")
df["REF_DATE"] = pd.to_datetime(df["REF_DATE"], errors="ignore")
return df
[docs]def list_zipped_tables(path=None):
"""List StatsCan tables available.
defaults to looking in the current working directory and for zipped CSVs
Parameters
----------
path: string or path, default None
Where to look for zipped tables
Returns
-------
tables: list
list of available tables json data
"""
# Find json files
path = pathlib.Path(path) if path else pathlib.Path.cwd()
jsons = path.glob("*.json")
tables = []
for j in jsons:
try:
with open(j) as json_file:
result = json.load(json_file)
if "productId" in result:
tables.append(result)
except ValueError as e:
print("failed to read json file" + j)
print(e)
return tables
[docs]def tables_to_h5(tables, h5file="stats_can.h5", path=None):
"""Take a table and its metadata and put it in an hdf5 file.
Parameters
----------
tables: list of str
tables to add to the h5file
h5file: str, default stats_can.h5
name of the h5file to store the tables in
path: str or path, default = current working directory
path to the h5file
Returns
-------
tables: list
list of tables loaded into the file
"""
path = pathlib.Path(path) if path else pathlib.Path()
h5file = path / h5file
tables = parse_tables(tables)
path = h5file.parent
for table in tables:
hkey = "table_" + table
jkey = "json_" + table
zip_file = table + "-eng.zip"
json_file = table + ".json"
zip_file = path / zip_file
json_file = path / json_file
if not json_file.is_file():
download_tables([table], path)
df = zip_table_to_dataframe(table, path=path)
with open(json_file) as f_name:
df_json = json.load(f_name)
with pd.HDFStore(h5file, "a") as store:
store.put(key=hkey, value=df, format="table", complevel=1)
with h5py.File(h5file, "a") as hfile:
if jkey in hfile.keys():
del hfile[jkey]
hfile.create_dataset(jkey, data=json.dumps(df_json))
zip_file.unlink()
json_file.unlink()
return tables
[docs]def table_from_h5(table, h5file="stats_can.h5", path=None):
"""Read a table from h5 to a dataframe.
Parameters
----------
table: str
name of the table to read
h5file: str, default stats_can.h5
name of the h5file to retrieve the table from
path: str or path, default = current working directory
path to the h5file
Returns
-------
df: pd.DataFrame
table in dataframe format
"""
path = pathlib.Path(path) if path else pathlib.Path()
table = "table_" + parse_tables(table)[0]
h5 = path / h5file
try:
with pd.HDFStore(h5, "r") as store:
df = pd.read_hdf(store, key=table)
except (KeyError, OSError):
print("Downloading and loading " + table)
tables_to_h5(tables=table, h5file=h5file, path=path)
with pd.HDFStore(h5, "r") as store:
df = pd.read_hdf(store, key=table)
return df
[docs]def list_h5_tables(path=None, h5file="stats_can.h5"):
"""Return a list of metadata for StatsCan tables from an hdf5 file.
Parameters
----------
path: str or path, default = current working directory
path to the h5 file
h5file: str, default stats_can.h5
name of the h5file to read table data from
Returns
-------
jsons: list
list of available tables json data
"""
keys = h5_included_keys(h5file=h5file, path=path)
tables = parse_tables([k for k in keys if k.startswith("json_")])
return metadata_from_h5(tables, h5file=h5file, path=path)
[docs]def list_downloaded_tables(path=None, h5file="stats_can.h5"):
"""Return a list of metadata for StatsCan tables.
Wrapper for list zipped tables and list h5 tables
Parameters
----------
path: str or path, default = current working directory
path to the h5 file
h5file: str, default stats_can.h5
name of the h5file to read table data from
Returns
-------
jsons: list
list of available tables json data
"""
if h5file:
jsons = list_h5_tables(path=path, h5file=h5file)
else:
jsons = list_zipped_tables(path=path)
return jsons
[docs]def h5_update_tables(h5file="stats_can.h5", path=None, tables=None):
"""Update any stats_can tables contained in an h5 file.
Parameters
----------
h5file: str, default stats_can.h5
name of the h5file to store the tables in
path: str or path, default = current working directory
path to the h5file
tables: str or list of str, optional, default None
If included will only update the subset of tables already in the file
and in the tables parameter
Returns
-------
update_table_list: [str]
List of tables that were updated
"""
if tables:
local_jsons = metadata_from_h5(tables, h5file=h5file, path=path)
else:
h5 = pathlib.Path(path) / h5file if path else pathlib.Path(h5file)
with h5py.File(h5, "r") as f:
keys = [key for key in f.keys() if key.startswith("json")]
local_jsons = [json.loads(f[key][()]) for key in keys]
tables = [j["productId"] for j in local_jsons]
remote_jsons = get_cube_metadata(tables)
update_table_list = [
local["productId"]
for local, remote in zip(local_jsons, remote_jsons)
if local["cubeEndDate"] != remote["cubeEndDate"]
]
tables_to_h5(update_table_list, h5file=h5file, path=path)
return update_table_list
[docs]def update_tables(path=None, h5file="stats_can.h5", tables=None, csv=True):
"""Update downloaded tables where required.
Reads local metadata, either from json files or stored in an h5 file,
compares it to the metadata on the StatsCan website and downloads those
tables that don't have matching metadata
Just a wrapper for zip_update tables and h5_update_tables functions
Parameters
----------
path: str or path, default None
Path where local tables are stored, assumes current directory if None
h5file: str, default 'stats_can.h5'
Name of the h5 file storing StatsCan tables, set to None for zips
tables: list of str, default None
For hdf5 only, update only a subset of tables that are both in the file
and specified by this argument, None means update all tables
csv: boolean, default True
If updating zips this determines whether to update zipped CSV or SDMX
Returns
-------
update_table_list: list of str
list of updated tables
"""
if h5file:
return h5_update_tables(h5file=h5file, path=path, tables=tables)
else:
return zip_update_tables(path=path, csv=csv)
[docs]def h5_included_keys(h5file="stats_can.h5", path=None):
"""Return a list of keys in an h5 file.
Parameters
----------
h5file: str, default stats_can.h5
name of the h5file to store the tables in
path: str or path, default = current working directory
path to the h5file
Returns
-------
keys: list
list of keys in the hdf5 file
"""
h5file = pathlib.Path(path) / h5file if path else pathlib.Path(h5file)
with h5py.File(h5file, "r") as f:
keys = [key for key in f.keys()]
return keys
[docs]def delete_tables(tables, path=None, h5file="stats_can.h5", csv=True):
"""Delete downloaded tables.
Parameters
----------
tables: list
list of tables to delete
path: str or path object, default None
where to look for the tables to delete
h5file: str default stats_can.h5
h5file to remove from, set to None to remove zips
csv: boolean, default True
if h5file is None this specifies whether to delete zipped csv or SDMX
Returns
-------
to_delete: list
list of deleted tables
"""
path = pathlib.Path(path) if path else pathlib.Path()
clean_tables = parse_tables(tables)
available_tables_jsons = list_downloaded_tables(path=path, h5file=h5file)
available_tables = [j["productId"] for j in available_tables_jsons]
to_delete = [t for t in clean_tables if t in available_tables]
if h5file:
keys_to_del = []
for td in to_delete:
json_to_del = "json_" + td
tbl_to_del = "table_" + td
keys_to_del.append(json_to_del)
keys_to_del.append(tbl_to_del)
h5file = path / h5file
with h5py.File(h5file, "a") as f:
for k in keys_to_del:
del f[k]
else:
files_to_del = []
for td in to_delete:
json_to_del = td + ".json"
zip_to_del = td + ("-eng.zip" if csv else ".zip")
files_to_del.append(zip_to_del)
files_to_del.append(json_to_del)
for file in files_to_del:
p = path / file
if p.is_file():
p.unlink()
return to_delete
[docs]def table_to_df(table, path=None, h5file="stats_can.h5"):
"""Read a table to a dataframe.
Wrapper for table_from_h5 and zip_table_to_dataframe
Parameters
----------
table: str
name of the table to read
h5file: str, default stats_can.h5
name of the h5file to retrieve the table from, None for zip
path: str or path, default = current working directory
path to the table data
Returns
-------
df: pd.DataFrame
table in dataframe format
"""
if h5file:
df = table_from_h5(table=table, h5file=h5file, path=path)
else:
df = zip_table_to_dataframe(table=table, path=path)
return df
[docs]def vectors_to_df(vectors, periods=1, start_release_date=None, end_release_date=None):
"""Get DataFrame of vectors with n periods data or over range of release dates.
Wrapper on get_bulk_vector_data_by_range and
get_data_from_vectors_and_latest_n_periods function to turn the resulting
list of JSONs into a DataFrame
Parameters
----------
vectors: str or list of str
vector numbers to get info for
periods: int
number of periods to retrieve data for
start_release_date: datetime.date
start release date for the data
end_release_date: datetime.date
end release date for the data
Returns
-------
df: DataFrame
vectors as columns and ref_date as the index (not release date)
"""
df = pd.DataFrame()
if (end_release_date is None) | (start_release_date is None):
start_list = get_data_from_vectors_and_latest_n_periods(vectors, periods)
else:
start_list = get_bulk_vector_data_by_range(
vectors, start_release_date, end_release_date
)
for vec in start_list:
name = "v" + str(vec["vectorId"])
# If there's no data for the series just skip it
if not vec["vectorDataPoint"]:
continue
ser = (
pd.DataFrame(vec["vectorDataPoint"])
.assign(refPer=lambda x: pd.to_datetime(x["refPer"], errors="ignore"))
.set_index("refPer")
.rename(columns={"value": name})
.filter([name])
)
df = pd.concat([df, ser], axis=1, sort=True)
return df
[docs]def vectors_to_df_local(vectors, path=None, start_date=None, h5file="stats_can.h5"):
"""Make a dataframe with vector columns indexed on date from local data.
Parameters
----------
vectors: list
list of vectors to be read in
path: str or path object, default None
path to StatsCan tables
start_date: datetime, optional, default None
optional earliest reference date to include
h5file: str, default stats_can.h5
if specified will extract dataframes from an hdf5file instead of
zipped csv tables
Returns
-------
final_df: pandas.DataFrame
DataFrame of the vectors
"""
# Preserve an initial copy of the list for ordering, parsed and then
# converted to string for consistency in naming
vectors_ordered = parse_vectors(vectors)
vectors_ordered = ["v" + str(v) for v in vectors_ordered]
table_vec_dict = table_subsets_from_vectors(vectors)
tables = list(table_vec_dict.keys())
tables_dfs = {}
columns = ["REF_DATE", "VECTOR", "VALUE"]
if h5file:
meta = metadata_from_h5(tables, h5file=h5file, path=path)
existing_tables = [t["productId"] for t in meta]
to_download_tables = list(set(tables) - set(existing_tables))
tables_to_h5(to_download_tables, h5file=h5file, path=path)
for table in tables:
if h5file:
tables_dfs[table] = table_from_h5(table, h5file=h5file, path=path)[columns]
else:
tables_dfs[table] = zip_table_to_dataframe(table, path)[columns]
df = tables_dfs[table] # save me some typing
vec_list = ["v" + str(v) for v in table_vec_dict[table]]
df = df[df["VECTOR"].isin(vec_list)]
if start_date is not None:
start_date = np.datetime64(start_date)
df = df[df["REF_DATE"] >= start_date]
df = df.pivot(index="REF_DATE", columns="VECTOR", values="VALUE")
df.columns = list(df.columns) # remove categorical index
tables_dfs[table] = df
final_df = tables_dfs[tables[0]]
for table in tables[1:]:
final_df = pd.merge(
final_df, tables_dfs[table], how="outer", left_index=True, right_index=True
)
final_df = final_df[vectors_ordered]
return final_df
[docs]def code_sets_to_df_dict():
"""Get all code sets.
Code sets provide additional metadata to describe
information. Code sets are grouped into scales, frequencies, symbols etc.
and returned as dictionary of dataframes.
Returns
-------
pandas.Dataframe: list
dictionary of dataframes
"""
codes = get_code_sets()
# Packs each code group in a dataframe for better lookup via dictionary
codes_df_lookup = {key: pd.DataFrame(codes[key]) for key in codes.keys()}
return codes_df_lookup