diff --git a/postprocessing/compare_comstock_to_ami.py.template b/postprocessing/compare_comstock_to_ami.py.template index 9e18c1fe2..00cea4e02 100644 --- a/postprocessing/compare_comstock_to_ami.py.template +++ b/postprocessing/compare_comstock_to_ami.py.template @@ -12,37 +12,101 @@ logger = logging.getLogger(__name__) def main(): # ComStock run comstock = cspp.ComStock( - s3_base_dir='eulp/comstock_core', - comstock_run_name='ami_comparison', - comstock_run_version='ami_comparison', - comstock_year=2018, - truth_data_version='v01', - buildstock_csv_name='buildstock.csv', - acceptable_failure_percentage=0.9, - drop_failed_runs=True, - color_hex='#0072B2', - skip_missing_columns=True, - athena_table_name='ami_comparison', - reload_from_csv=False, - include_upgrades=False - ) - - # CBECS + s3_base_dir='com-sdr', # If run not on S3, download results_up**.parquet manually + comstock_run_name='rtuadv_v11', # Name of the run on S3 + comstock_run_version='rtuadv_v11', # Use whatever you want to see in plot and folder names + comstock_year=2018, # Typically don't change this + athena_table_name=None, # Typically don't change this + truth_data_version='v01', # Typically don't change this + buildstock_csv_name='buildstock.csv', # Download buildstock.csv manually + acceptable_failure_percentage=0.25, # Can increase this when testing and high failure are OK + drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data + color_hex='#0072B2', # Color used to represent this run in plots + skip_missing_columns=True, # False if you want to ensure you have all data specified for export + reload_from_cache=False, # True if CSV already made and want faster reload times + include_upgrades=False, # False if not looking at upgrades + # output_dir = 's3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/comstock_amy2018_release_1' + ) + + + # Stock Estimation for Apportionment: + stock_estimate = cspp.Apportion( + stock_estimation_version='2025R3', # Only updated when a new stock estimate is published + truth_data_version='v01', # Typically don't change this + reload_from_cache=False # Set to "True" if you have already run apportionment and would like to keep consistant values between postprocessing runs. + ) + + # Scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment + base_sim_outs = comstock.get_sim_outs_for_upgrade(0) + comstock.create_allocated_weights(stock_estimate, base_sim_outs, reload_from_cache=False) + + # CBECS cbecs = cspp.CBECS( - cbecs_year = 2018, - truth_data_version='v01', - color_hex='#009E73', - reload_from_csv=False + cbecs_year=2018, # 2012 and 2018 currently available + truth_data_version='v01', # Typically don't change this + color_hex='#009E73', # Color used to represent CBECS in plots + reload_from_csv=True # True if CSV already made and want faster reload times ) - - # Scale ComStock run to CBECS 2018 AND remove non-ComStock buildings from CBECS + + # First scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment + # Then scale both ComStock runs to CBECS 2018 AND remove non-ComStock buildings from CBECS # This is how weights in the models are set to represent national energy consumption - comstock.add_national_scaling_weights(cbecs, remove_non_comstock_bldg_types_from_cbecs=True) + base_sim_outs = comstock.get_sim_outs_for_upgrade(0) + alloc_wts = comstock.get_allocated_weights() + comstock.create_allocated_weights_scaled_to_cbecs(cbecs, base_sim_outs, alloc_wts, remove_non_comstock_bldg_types_from_cbecs=True) + comstock.create_allocated_weights_plus_util_bills_for_upgrade(0) + + + # county resolution, files by state and county + county_resolution = { + 'geo_top_dir': 'by_state_and_county', + 'partition_cols': { + comstock.STATE_ABBRV: 'state', + comstock.COUNTY_ID: 'county', + }, + 'aggregation_levels': [comstock.COUNTY_ID], + 'data_types': ['full'], + 'file_types': ['parquet'], + } + + # state level resolution, one single national file + state_resolution = { + 'geo_top_dir': 'national_by_state', + 'partition_cols': {}, + 'aggregation_levels': [[comstock.STATE_ABBRV, comstock.CZ_ASHRAE]], + 'data_types': ['full'], # other options: 'detailed', 'basic' **If using multiple options, order must go from more detailed to less detailed. + 'file_types': ['parquet'], # other options:'parquet' + } + + # specify the export level + # IMPORTANT: if making county level timeseries plots, must export county level data to S3. This does not occur automatically. + geo_exports = [county_resolution] #state_resolution + + for geo_export in geo_exports: + # write files locally as needed - usually not needed for AMI comparison plots + #comstock.export_metadata_and_annual_results_for_upgrade(upgrade_id=0, geo_exports=[geo_export]) + + # Also write to S3 if making timeseries plots + s3_dir = f"s3://{comstock.s3_base_dir}/{comstock.comstock_run_name}/{comstock.comstock_run_name}" + s3_output_dir = comstock.setup_fsspec_filesystem(s3_dir, aws_profile_name=None) + comstock.export_metadata_and_annual_results_for_upgrade(upgrade_id=0, geo_exports=[geo_export], output_dir=s3_output_dir) + + # write select results to S3 for Athena/Glue when needed for timeseries plots + s3_dir = f"s3://{comstock.s3_base_dir}/{comstock.comstock_run_name}/{comstock.comstock_run_name}" + database = "enduse" + crawler_name = comstock.comstock_run_name # used to set name of crawler, cannot include slashes + workgroup = "eulp" # Athena workgroup to use + glue_service_role = "service-role/AWSGlueServiceRole-default" + + # Export parquet files to S3 for Athena/Glue + comstock.create_sightglass_tables( + s3_location=f"{s3_dir}/metadata_and_annual_results_aggregates", + dataset_name=crawler_name, + database_name=database, + glue_service_role=glue_service_role) + comstock.fix_timeseries_tables(crawler_name, database) + comstock.create_views(crawler_name, database, workgroup) - # Export CBECS and ComStock data to wide and long formats for Tableau and to skip processing later - cbecs.export_to_csv_wide() # May comment this out after run once - comstock.export_to_csv_wide() # May comment this out after run once - # AMI ami = cspp.AMI( truth_data_version='v01', @@ -52,8 +116,8 @@ def main(): # comparison comparison = cspp.ComStockToAMIComparison(comstock, ami, make_comparison_plots=True) - comparison.export_plot_data_to_csv_wide() - + #comparison.export_plot_data_to_csv_wide() + # Code to execute the script if __name__ == "__main__": main() \ No newline at end of file diff --git a/postprocessing/compare_upgrades-test_timeseries_plots.py b/postprocessing/compare_upgrades-test_timeseries_plots.py new file mode 100644 index 000000000..078dc529b --- /dev/null +++ b/postprocessing/compare_upgrades-test_timeseries_plots.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging +import comstockpostproc as cspp + +logging.basicConfig(level='INFO') # Use DEBUG, INFO, or WARNING +logger = logging.getLogger(__name__) + +def main(): + # ComStock run + comstock = cspp.ComStock( + s3_base_dir='com-sdr', # If run not on S3, download results_up**.parquet manually + comstock_run_name='pump_v9', # Name of the run on S3 + comstock_run_version='pump_v9', # Use whatever you want to see in plot and folder names + comstock_year=2018, # Typically don't change this + athena_table_name=None, # Typically don't change this + truth_data_version='v01', # Typically don't change this + buildstock_csv_name='buildstock.csv', # Download buildstock.csv manually + acceptable_failure_percentage=0.05, # Can increase this when testing and high failure are OK + drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data + color_hex='#0072B2', # Color used to represent this run in plots + skip_missing_columns=True, # False if you want to ensure you have all data specified for export + reload_from_cache=False, # True if CSV already made and want faster reload times + include_upgrades=True, # False if not looking at upgrades + upgrade_ids_to_skip=[], # Use [1, 3] etc. to exclude certain upgrades + make_timeseries_plots=False, + timeseries_locations_to_plot={ + 'MN': 'Minnesota', # specify location (either county ID or state ID) and corresponding name for plots and folders. + #'MA':'Massachusetts', + #'OR': 'Oregon', + #'LA': 'Louisiana', + #'AZ': 'Arizona', + #'TN': 'Tennessee', + ('MA', 'NH', 'CT', 'VT', 'RI'): 'New England', # example of multiple states together - using tuples as keys + #'G4900350': 'Salt Lake City', + #'G2500250': 'Boston', # if specifying a county, you must export county level data to S3 + #'G4804530': 'Austin', + ('G2500250', 'G4804530'):'Baustin' # multiple counties together - using tuples as keys + }, + + upgrade_ids_for_comparison={} # Use {'':[0,1,2]}; add as many upgrade IDs as needed, but plots look strange over 5 + #output_dir = 's3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/comstock_amy2018_release_1' + ) + + # Stock Estimation for Apportionment: + stock_estimate = cspp.Apportion( + stock_estimation_version='2025R3', # Only updated when a new stock estimate is published + truth_data_version='v01', # Typically don't change this + reload_from_cache=False, # Set to "True" if you have already run apportionment and would like to keep consistant values between postprocessing runs. + #output_dir = 's3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/comstock_amy2018_release_1' + ) + + # Scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment + base_sim_outs = comstock.get_sim_outs_for_upgrade(0) + comstock.create_allocated_weights(stock_estimate, base_sim_outs, reload_from_cache=False) + + # CBECS + cbecs = cspp.CBECS( + cbecs_year=2018, # 2012 and 2018 currently available + truth_data_version='v01', # Typically don't change this + color_hex='#009E73', # Color used to represent CBECS in plots + reload_from_csv=False # True if CSV already made and want faster reload times + ) + + # Scale ComStock to CBECS 2018 AND remove non-ComStock buildings from CBECS + base_sim_outs = comstock.get_sim_outs_for_upgrade(0) + alloc_wts = comstock.get_allocated_weights() + comstock.create_allocated_weights_scaled_to_cbecs(cbecs, base_sim_outs, alloc_wts, remove_non_comstock_bldg_types_from_cbecs=True) + + # Add utility bills onto allocated weights + for upgrade_id in comstock.upgrade_ids_to_process: + # up_sim_outs = comstock.get_sim_outs_for_upgrade(upgrade_id) + # up_alloc_wts = comstock.get_allocated_weights_scaled_to_cbecs_for_upgrade(upgrade_id) + comstock.create_allocated_weights_plus_util_bills_for_upgrade(upgrade_id) + + # Specify geo exports + + # county resolution, files by state and county + county_resolution = { + 'geo_top_dir': 'by_state_and_county', + 'partition_cols': { + comstock.STATE_ABBRV: 'state', + comstock.COUNTY_ID: 'county', + }, + 'aggregation_levels': [comstock.COUNTY_ID], # , comstock.COUNTY_ID], # Full tract resolution (agg=in.nhgis_tract_gisjoin) + 'data_types': ['full'], + 'file_types': ['parquet'], + } + + # state level resolution, one single national file + state_resolution = { + 'geo_top_dir': 'national_by_state', + 'partition_cols': {}, + 'aggregation_levels': [[comstock.STATE_ABBRV, comstock.CZ_ASHRAE]], + 'data_types': ['full'], # other options: 'detailed', 'basic' **If using multiple options, order must go from more detailed to less detailed. + 'file_types': ['parquet'], # other options:'parquet' + } + + # specify the export level + # IMPORTANT: if making county level timeseries plots, must export county level data to S3. This does not occur automatically. + geo_exports = [county_resolution] #county_resolution + + for geo_export in geo_exports: + for upgrade_id in comstock.upgrade_ids_to_process: + #if upgrade_id == 0: + # continue + #comstock.export_metadata_and_annual_results_for_upgrade(upgrade_id, [geo_export]) + + # Also write to S3 if making timeseries plots + if comstock.make_timeseries_plots: # TODO: force geo exports to county data if couunty timeseries is requested. + s3_dir = f"s3://{comstock.s3_base_dir}/{comstock.comstock_run_name}/{comstock.comstock_run_name}" + s3_output_dir = comstock.setup_fsspec_filesystem(s3_dir, aws_profile_name=None) + comstock.export_metadata_and_annual_results_for_upgrade(upgrade_id=upgrade_id, geo_exports=[geo_export], output_dir=s3_output_dir) + + # write select results to S3 for Athena/Glue when needed for timeseries plots + if comstock.make_timeseries_plots: + s3_dir = f"s3://{comstock.s3_base_dir}/{comstock.comstock_run_name}/{comstock.comstock_run_name}" + database = "enduse" + crawler_name = comstock.comstock_run_name # used to set name of crawler, cannot include slashes + workgroup = "eulp" # Athena workgroup to use + glue_service_role = "service-role/AWSGlueServiceRole-default" + + # Export parquet files to S3 for Athena/Glue + comstock.create_sightglass_tables(s3_location=f"{s3_dir}/metadata_and_annual_results_aggregates", + dataset_name=crawler_name, + database_name=database, + glue_service_role=glue_service_role) + comstock.fix_timeseries_tables(crawler_name, database) + comstock.create_views(crawler_name, database, workgroup) + + # Create measure run comparisons; only use if run has measures + comparison = cspp.ComStockMeasureComparison(comstock, timeseries_locations_to_plot=comstock.timeseries_locations_to_plot, make_comparison_plots = comstock.make_comparison_plots, make_timeseries_plots = comstock.make_timeseries_plots) + + # Export dictionaries corresponding to the exported columns + #comstock.export_data_and_enumeration_dictionary() + +# Code to execute the script +if __name__=="__main__": + main() diff --git a/postprocessing/compare_upgrades.py.template b/postprocessing/compare_upgrades.py.template index 36618a804..43a7cd784 100644 --- a/postprocessing/compare_upgrades.py.template +++ b/postprocessing/compare_upgrades.py.template @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- import logging - import comstockpostproc as cspp logging.basicConfig(level='INFO') # Use DEBUG, INFO, or WARNING @@ -11,9 +10,9 @@ logger = logging.getLogger(__name__) def main(): # ComStock run comstock = cspp.ComStock( - s3_base_dir='com-sdr', # If run not on S3, download results_up**.parquet manually - comstock_run_name='sdr_2025_r3_combined', # Name of the run on S3 - comstock_run_version='sdr_2025_r3_combined', # Use whatever you want to see in plot and folder names + s3_base_dir='eulp/euss_com', # If run not on S3, download results_up**.parquet manually + comstock_run_name='cchpc_10k_3', # Name of the run on S3 + comstock_run_version='cchpc_10k_3', # Use whatever you want to see in plot and folder names comstock_year=2018, # Typically don't change this athena_table_name=None, # Typically don't change this truth_data_version='v01', # Typically don't change this @@ -22,18 +21,25 @@ def main(): drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data color_hex='#0072B2', # Color used to represent this run in plots skip_missing_columns=True, # False if you want to ensure you have all data specified for export - reload_from_cache=True, # True if CSV already made and want faster reload times + reload_from_cache=False, # True if CSV already made and want faster reload times include_upgrades=True, # False if not looking at upgrades upgrade_ids_to_skip=[], # Use [1, 3] etc. to exclude certain upgrades - make_timeseries_plots=False, - states={ - #'MN': 'Minnesota', # specify state to use for timeseries plots in dictionary format. State ID must correspond correctly. - 'MA':'Massachusetts', - #'OR': 'Oregon', - #'LA': 'Louisiana', - #'AZ': 'Arizona', - #'TN': 'Tennessee' - }, + make_timeseries_plots=True, + timeseries_locations_to_plot={ + 'MN': 'Minnesota', # specify location (either county ID or state ID) and corresponding name for plots and folders. + #'MA':'Massachusetts', + 'OH':'Ohio', + #'OR': 'Oregon', + #'LA': 'Louisiana', + 'AZ': 'Arizona', + #'TN': 'Tennessee', + #('MA', 'NH', 'CT', 'VT', 'RI'): 'New England', # example of multiple states together - using tuples as keys + #'G4900350': 'Salt Lake City', + #'G2500250': 'Boston', # if specifying a county, you must export county level data to S3 + #'G4804530': 'Austin', + #('G2500250', 'G4804530'):'Baustin' # multiple counties together - using tuples as keys + }, + upgrade_ids_for_comparison={} # Use {'':[0,1,2]}; add as many upgrade IDs as needed, but plots look strange over 5 #output_dir = 's3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/comstock_amy2018_release_1' ) @@ -42,26 +48,26 @@ def main(): stock_estimate = cspp.Apportion( stock_estimation_version='2025R3', # Only updated when a new stock estimate is published truth_data_version='v01', # Typically don't change this - reload_from_cache=True, # Set to "True" if you have already run apportionment and would like to keep consistant values between postprocessing runs. + reload_from_cache=False, # Set to "True" if you have already run apportionment and would like to keep consistent values between postprocessing runs. #output_dir = 's3://oedi-data-lake/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2025/comstock_amy2018_release_1' ) # Scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment base_sim_outs = comstock.get_sim_outs_for_upgrade(0) - comstock.create_allocated_weights(stock_estimate, base_sim_outs, reload_from_cache=True) + comstock.create_allocated_weights(stock_estimate, base_sim_outs, reload_from_cache=False) # CBECS cbecs = cspp.CBECS( cbecs_year=2018, # 2012 and 2018 currently available truth_data_version='v01', # Typically don't change this color_hex='#009E73', # Color used to represent CBECS in plots - reload_from_csv=True # True if CSV already made and want faster reload times + reload_from_csv=False # True if CSV already made and want faster reload times ) # Scale ComStock to CBECS 2018 AND remove non-ComStock buildings from CBECS base_sim_outs = comstock.get_sim_outs_for_upgrade(0) alloc_wts = comstock.get_allocated_weights() - comstock.create_allocated_weights_scaled_to_cbecs(cbecs, base_sim_outs, alloc_wts, remove_non_comstock_bldg_types_from_cbecs=False) + comstock.create_allocated_weights_scaled_to_cbecs(cbecs, base_sim_outs, alloc_wts, remove_non_comstock_bldg_types_from_cbecs=True) # Add utility bills onto allocated weights for upgrade_id in comstock.upgrade_ids_to_process: @@ -69,44 +75,67 @@ def main(): # up_alloc_wts = comstock.get_allocated_weights_scaled_to_cbecs_for_upgrade(upgrade_id) comstock.create_allocated_weights_plus_util_bills_for_upgrade(upgrade_id) - # Export metadata files - geo_exports = [ - #{'geo_top_dir': 'by_state_and_county', - # 'partition_cols': { - # comstock.STATE_ABBRV: 'state', - # comstock.COUNTY_ID: 'county', - # }, - # 'aggregation_levels': ['in.nhgis_tract_gisjoin'], # , comstock.COUNTY_ID], # Full tract resolution (agg=in.nhgis_tract_gisjoin) - # 'data_types': ['full', 'basic'], - # 'file_types': ['csv', 'parquet'], - #}, - {'geo_top_dir': 'national_by_state', - 'partition_cols': {}, - 'aggregation_levels': [[comstock.STATE_ABBRV, comstock.CZ_ASHRAE]], - 'data_types': ['full'], # other options: 'detailed', 'basic' **If using multiple options, order must go from more detailed to less detailed. - 'file_types': ['csv'], # other options:'parquet' - } - ] + # Specify geo exports + # county resolution, files by state and county + county_resolution = { + 'geo_top_dir': 'by_state_and_county', + 'partition_cols': { + comstock.STATE_ABBRV: 'state', + comstock.COUNTY_ID: 'county', + }, + 'aggregation_levels': [comstock.COUNTY_ID], # , comstock.COUNTY_ID], # Full tract resolution (agg=in.nhgis_tract_gisjoin) + 'data_types': ['full'], + 'file_types': ['parquet'], + } + + # state level resolution, one single national file + state_resolution = { + 'geo_top_dir': 'national_by_state', + 'partition_cols': {}, + 'aggregation_levels': [[comstock.STATE_ABBRV, comstock.CZ_ASHRAE]], + 'data_types': ['full'], # other options: 'detailed', 'basic' **If using multiple options, order must go from more detailed to less detailed. + 'file_types': ['parquet'], # other options:'parquet' + } + + # specify the export level + # IMPORTANT: if making county level timeseries plots, must export county level data to S3. This does not occur automatically. + geo_exports = [state_resolution] #state_resolution for geo_export in geo_exports: for upgrade_id in comstock.upgrade_ids_to_process: - # if upgrade_id == 0: - # continue + # if upgrade_id == 0: + # continue comstock.export_metadata_and_annual_results_for_upgrade(upgrade_id, [geo_export]) - ## Compare to CBECS - #comp = cspp.ComStockToCBECSComparison(cbecs_list=[cbecs], - # comstock_list=[comstock], - # upgrade_id='All', - # make_comparison_plots=True, - # make_hvac_plots=False) - #comp.export_to_csv_wide() # May comment this out after run once + # Also write to S3 if making timeseries plots + if comstock.make_timeseries_plots: #TODO: force geo exports to county data if county timeseries is requested. + s3_dir = f"s3://{comstock.s3_base_dir}/{comstock.comstock_run_name}/{comstock.comstock_run_name}" + s3_output_dir = comstock.setup_fsspec_filesystem(s3_dir, aws_profile_name=None) + comstock.export_metadata_and_annual_results_for_upgrade(upgrade_id=upgrade_id, geo_exports=[geo_export], output_dir=s3_output_dir) + + + # write select results to S3 for Athena/Glue when needed for timeseries plots + if comstock.make_timeseries_plots: + s3_dir = f"s3://{comstock.s3_base_dir}/{comstock.comstock_run_name}/{comstock.comstock_run_name}" + database = "enduse" + dataset_name = f"{comstock.comstock_run_name}/{comstock.comstock_run_name}" # name of the dir in s3_dir we want to make new tables and views for + crawler_name = comstock.comstock_run_name # used to set name of crawler, cannot include slashes + workgroup = "eulp" # Athena workgroup to use + glue_service_role = "service-role/AWSGlueServiceRole-default" + + # Export parquet files to S3 for Athena/Glue + comstock.create_sightglass_tables(s3_location=f"{s3_dir}/metadata_and_annual_results_aggregates", + dataset_name=crawler_name, + database_name=database, + glue_service_role=glue_service_role) + comstock.fix_timeseries_tables(crawler_name, database) + comstock.create_views(crawler_name, database, workgroup) # Create measure run comparisons; only use if run has measures - #comparison = cspp.ComStockMeasureComparison(comstock, states=comstock.states, make_comparison_plots = comstock.make_comparison_plots, make_timeseries_plots = comstock.make_timeseries_plots) + comparison = cspp.ComStockMeasureComparison(comstock, timeseries_locations_to_plot=comstock.timeseries_locations_to_plot, make_comparison_plots = comstock.make_comparison_plots, make_timeseries_plots = comstock.make_timeseries_plots) # Export dictionaries corresponding to the exported columns - comstock.export_data_and_enumeration_dictionary() + #comstock.export_data_and_enumeration_dictionary() # Code to execute the script if __name__=="__main__": diff --git a/postprocessing/comstockpostproc/comstock.py b/postprocessing/comstockpostproc/comstock.py index 25565692a..f25126b88 100644 --- a/postprocessing/comstockpostproc/comstock.py +++ b/postprocessing/comstockpostproc/comstock.py @@ -8,6 +8,12 @@ import re import shutil from functools import lru_cache +from fsspec.core import url_to_fs +from sqlalchemy.engine import create_engine +from joblib import Parallel, delayed +import sqlalchemy as sa +import time +from sqlalchemy_views import CreateView from pathlib import Path import boto3 @@ -24,6 +30,7 @@ from pathlib import Path from buildstock_query import BuildStockQuery +from .comstock_query_builder import ComStockQueryBuilder from comstockpostproc.ami import AMI from comstockpostproc.cbecs import CBECS from comstockpostproc.comstock_apportionment import Apportion @@ -45,7 +52,7 @@ class ComStock(NamingMixin, UnitsMixin, GasCorrectionModelMixin, S3UtilitiesMixi def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstock_year, athena_table_name, truth_data_version, buildstock_csv_name = 'buildstock.csv', acceptable_failure_percentage=0.01, drop_failed_runs=True, color_hex=NamingMixin.COLOR_COMSTOCK_BEFORE, weighted_energy_units='tbtu', weighted_demand_units='gw', weighted_ghg_units='co2e_mmt', weighted_utility_units='billion_usd', skip_missing_columns=False, - reload_from_cache=False, make_comparison_plots=True, make_timeseries_plots=True, include_upgrades=True, upgrade_ids_to_skip=[], states={}, upgrade_ids_for_comparison={}, rename_upgrades=False, output_dir=None, aws_profile_name=None): + reload_from_cache=False, make_comparison_plots=True, make_timeseries_plots=True, include_upgrades=True, upgrade_ids_to_skip=[], timeseries_locations_to_plot={}, upgrade_ids_for_comparison={}, rename_upgrades=False, output_dir=None, aws_profile_name=None): """ A class to load and transform ComStock data for export, analysis, and comparison. Args: @@ -59,6 +66,7 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc """ # Initialize members + self.s3_base_dir = s3_base_dir self.comstock_run_name = comstock_run_name self.comstock_run_version = comstock_run_version self.year = comstock_year @@ -97,7 +105,7 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc self.upgrade_ids_to_skip = upgrade_ids_to_skip self.upgrade_ids_for_comparison = upgrade_ids_for_comparison self.upgrade_ids_to_process = [] - self.states = states + self.timeseries_locations_to_plot = timeseries_locations_to_plot self.unweighted_weighted_map = {} self.cached_parquet = [] # List of parquet files to reload and export # TODO our current credential setup aren't playing well with this approach but does with the s3 ServiceResource @@ -374,27 +382,48 @@ def download_data(self): self.read_delimited_truth_data_file_from_S3(s3_file_path, ',') def download_timeseries_data_for_ami_comparison(self, ami, reload_from_csv=True, save_individual_regions=False): + + # Initialize Athena client + athena_client = BuildStockQuery(workgroup='eulp', + db_name='enduse', + table_name=self.comstock_run_name, + buildstock_type='comstock', + skip_reports=True, + metadata_table_suffix='_md_agg_by_state_and_county_vu', + ) + if reload_from_csv: file_name = f'Timeseries for AMI long.csv' - file_path = os.path.join(self.output_dir, file_name) + file_path = os.path.join(self.output_dir['fs_path'], file_name) if not os.path.exists(file_path): raise FileNotFoundError( f'Cannot find {file_path} to reload data, set reload_from_csv=False to create CSV.') logger.info(f'Reloading from CSV: {file_path}') self.ami_timeseries_data = pd.read_csv(file_path, low_memory=False, index_col='timestamp', parse_dates=True) else: + athena_end_uses = list(map(lambda x: self.END_USES_TIMESERIES_DICT[x], self.END_USES)) athena_end_uses.append('total_site_electricity_kwh') all_timeseries_df = pd.DataFrame() for region in ami.ami_region_map: - region_file_path_long = os.path.join(self.output_dir, region['source_name'] + '_building_type_timeseries_long.csv') + region_file_path_long = os.path.join(self.output_dir['fs_path'], region['source_name'] + '_building_type_timeseries_long.csv') if os.path.isfile(region_file_path_long) and reload_from_csv and save_individual_regions: logger.info(f"timeseries data in long format for {region['source_name']} already exists at {region_file_path_long}") continue + builder = ComStockQueryBuilder(self.comstock_run_name) + weight_view_table = f'{self.comstock_run_name}_md_agg_by_state_and_county_vu' + query = builder.get_timeseries_aggregation_query( + upgrade_id=0, + enduses=athena_end_uses, + group_by=[self.BLDG_TYPE, 'time'], + restrictions=[(self.COUNTY_ID, region['county_ids'])], + timestamp_grouping='hour', + weight_view_table=weight_view_table, + include_area_normalized_cols=True + ) + + ts_agg = athena_client.execute(query) - ts_agg = self.athena_client.agg.aggregate_timeseries(enduses=athena_end_uses, - group_by=['build_existing_model.building_type', 'time'], - restrict=[('build_existing_model.county_id', region['county_ids'])]) if ts_agg['time'].dtype == 'Int64': # Convert bigint to timestamp type if necessary ts_agg['time'] = pd.to_datetime(ts_agg['time']/1e9, unit='s') @@ -407,7 +436,7 @@ def download_timeseries_data_for_ami_comparison(self, ami, reload_from_csv=True, timeseries_df['region_name'] = region['source_name'] all_timeseries_df = pd.concat([all_timeseries_df, timeseries_df]) - data_path = os.path.join(self.output_dir, 'Timeseries for AMI long.csv') + data_path = os.path.join(self.output_dir['fs_path'], 'Timeseries for AMI long.csv') all_timeseries_df.to_csv(data_path, index=True) self.ami_timeseries_data = all_timeseries_df @@ -415,7 +444,7 @@ def convert_timeseries_to_long(self, agg_df, county_ids, output_name, save_indiv # rename columns agg_df = agg_df.set_index('time') agg_df = agg_df.rename(columns={ - "build_existing_model.building_type": "building_type", + self.BLDG_TYPE: "building_type", "electricity_exterior_lighting_kwh": "exterior_lighting", "electricity_interior_lighting_kwh": "interior_lighting", "electricity_interior_equipment_kwh": "interior_equipment", @@ -426,7 +455,18 @@ def convert_timeseries_to_long(self, agg_df, county_ids, output_name, save_indiv "electricity_cooling_kwh": "cooling", "electricity_heating_kwh": "heating", "electricity_refrigeration_kwh": "refrigeration", - "total_site_electricity_kwh": "total" + "total_site_electricity_kwh": "total", + "electricity_exterior_lighting_kwh_per_sf": "exterior_lighting_per_sf", + "electricity_interior_lighting_kwh_per_sf": "interior_lighting_per_sf", + "electricity_interior_equipment_kwh_per_sf": "interior_equipment_per_sf", + "electricity_water_systems_kwh_per_sf": "water_systems_per_sf", + "electricity_heat_recovery_kwh_per_sf": "heat_recovery_per_sf", + "electricity_fans_kwh_per_sf": "fans_per_sf", + "electricity_pumps_kwh_per_sf": "pumps_per_sf", + "electricity_cooling_kwh_per_sf": "cooling_per_sf", + "electricity_heating_kwh_per_sf": "heating_per_sf", + "electricity_refrigeration_kwh_per_sf": "refrigeration_per_sf", + "total_site_electricity_kwh_per_sf": "kwh_per_sf" }) # aggregate by hour @@ -448,8 +488,9 @@ def convert_timeseries_to_long(self, agg_df, county_ids, output_name, save_indiv agg_df = agg_df.set_index('timestamp') agg_df = agg_df[agg_df.index.dayofyear != 366] - # melt into long format - val_vars = [ + # melt into long format with both kwh and kwh_per_sf columns + # First melt the absolute values (kwh) + kwh_vars = [ 'exterior_lighting', 'interior_lighting', 'interior_equipment', @@ -462,50 +503,82 @@ def convert_timeseries_to_long(self, agg_df, county_ids, output_name, save_indiv 'refrigeration', 'total' ] - agg_df = pd.melt( + + kwh_df = pd.melt( agg_df.reset_index(), id_vars=[ 'timestamp', 'building_type', 'sample_count' ], - value_vars=val_vars, + value_vars=kwh_vars, var_name='enduse', value_name='kwh' ).set_index('timestamp') - agg_df = agg_df.rename(columns={'sample_count': 'bldg_count'}) - # size get data - # note that this is a Polars dataframe, not a Pandas dataframe - self.data = self.data.with_columns([pl.col('in.nhgis_county_gisjoin').cast(pl.Utf8)]) - # size_df = self.data.filter(self.data['in.nhgis_county_gisjoin'].is_in(county_ids)) - size_df = self.data.clone().filter(pl.col('in.nhgis_county_gisjoin').is_in(county_ids)) - size_df = size_df.select(['in.comstock_building_type', 'in.sqft', 'calc.weighted.sqft']).collect() + # Then melt the normalized values (kwh_per_sf) + kwh_per_sf_vars = [ + 'exterior_lighting_per_sf', + 'interior_lighting_per_sf', + 'interior_equipment_per_sf', + 'water_systems_per_sf', + 'heat_recovery_per_sf', + 'fans_per_sf', + 'pumps_per_sf', + 'cooling_per_sf', + 'heating_per_sf', + 'refrigeration_per_sf', + 'kwh_per_sf' + ] - # Cast columns to match dtype of lists - size_df = size_df.with_columns([ - pl.col('in.comstock_building_type').cast(pl.Utf8), - pl.col('in.sqft').cast(pl.Float64), - pl.col('calc.weighted.sqft').cast(pl.Float64) - ]) + # Map per_sf column names to base enduse names + per_sf_mapping = { + 'exterior_lighting_per_sf': 'exterior_lighting', + 'interior_lighting_per_sf': 'interior_lighting', + 'interior_equipment_per_sf': 'interior_equipment', + 'water_systems_per_sf': 'water_systems', + 'heat_recovery_per_sf': 'heat_recovery', + 'fans_per_sf': 'fans', + 'pumps_per_sf': 'pumps', + 'cooling_per_sf': 'cooling', + 'heating_per_sf': 'heating', + 'refrigeration_per_sf': 'refrigeration', + 'kwh_per_sf': 'total' + } - size_df = size_df.group_by('in.comstock_building_type').agg(pl.col(['in.sqft', 'calc.weighted.sqft']).sum()) - size_df = size_df.with_columns((pl.col('calc.weighted.sqft')/pl.col('in.sqft')).alias('weight')) - size_df = size_df.with_columns((pl.col('in.comstock_building_type').replace(self.BLDG_TYPE_TO_SNAKE_CASE, default=None)).alias('building_type')) - # file_path = os.path.join(self.output_dir, output_name + '_building_type_size.csv') - # size_df.write_csv(file_path) + kwh_per_sf_df = pd.melt( + agg_df.reset_index(), + id_vars=[ + 'timestamp', + 'building_type', + 'sample_count' + ], + value_vars=kwh_per_sf_vars, + var_name='enduse', + value_name='kwh_per_sf_value' + ).set_index('timestamp') + + # Map the per_sf enduse names to base names + kwh_per_sf_df['enduse'] = kwh_per_sf_df['enduse'].map(per_sf_mapping) - weight_dict = dict(zip(size_df['building_type'].to_list(), size_df['weight'].to_list())) - weight_size_dict = dict(zip(size_df['building_type'].to_list(), size_df['calc.weighted.sqft'].to_list())) - agg_df['kwh_weighted'] = agg_df['kwh'] * agg_df['building_type'].map(weight_dict) + # Rename the value column to the final name + kwh_per_sf_df = kwh_per_sf_df.rename(columns={'kwh_per_sf_value': 'kwh_per_sf'}) - # calculate kwh/sf - agg_df['kwh_per_sf'] = agg_df.apply(lambda row: row['kwh_weighted'] / weight_size_dict.get(row['building_type'], 1), axis=1) - agg_df = agg_df.drop(['kwh', 'kwh_weighted'], axis=1) + # Reset index so timestamp becomes a column for merging + kwh_df = kwh_df.reset_index() + kwh_per_sf_df = kwh_per_sf_df.reset_index() + + # Merge the two dataframes on timestamp, building_type, sample_count, and enduse + agg_df = kwh_df.merge( + kwh_per_sf_df, + on=['timestamp', 'building_type', 'sample_count', 'enduse'], + how='inner' + ).set_index('timestamp') + agg_df = agg_df.rename(columns={'sample_count': 'bldg_count'}) # save out long data format if save_individual_region: - output_file_path = os.path.join(self.output_dir, output_name + '_building_type_timeseries_long.csv') + output_file_path = os.path.join(self.output_dir['fs_path'], output_name + '_building_type_timeseries_long.csv') agg_df.to_csv(output_file_path, index=True) logger.info(f"Saved enduse timeseries in long format for {output_name} to {output_file_path}") @@ -522,9 +595,6 @@ def load_data(self, upgrade_id, acceptable_failure_percentage=0.01, drop_failed_ buildstock = pl.read_csv(os.path.join(self.data_dir, self.buildstock_file_name), infer_schema_length=50000) buildstock = buildstock.rename({'Building': 'sample_building_id'}) - - # if "sample_building_id" not in buildstock: - # raise Exception(f"the csv path is {os.path.join(self.data_dir, self.buildstock_file_name)}") buildstock_bldg_count = buildstock.shape[0] logger.debug(f'{buildstock_bldg_count} models in buildstock.csv') @@ -2703,7 +2773,11 @@ def create_weighted_aggregate_output(self, assert isinstance(wtd_agg_outs, pl.LazyFrame) return wtd_agg_outs - def export_metadata_and_annual_results_for_upgrade(self, upgrade_id, geo_exports, n_parallel=-1): + def export_metadata_and_annual_results_for_upgrade(self, upgrade_id, geo_exports, n_parallel=-1, output_dir=None): + # Use self.output_dir if output_dir is not specified + if output_dir is None: + output_dir = self.output_dir + # # Define the geographic partitions to export # geo_exports = [ # {'geo_top_dir': 'national', @@ -2769,23 +2843,23 @@ def export_metadata_and_annual_results_for_upgrade(self, upgrade_id, geo_exports first_geo_combos = first_geo_combos.sort(by=geo_col_names[0]) # Make a directory for the geography type - full_geo_dir = f"{self.output_dir['fs_path']}/metadata_and_annual_results/{geo_top_dir}" - self.output_dir['fs'].mkdirs(full_geo_dir, exist_ok=True) + full_geo_dir = f"{output_dir['fs_path']}/metadata_and_annual_results/{geo_top_dir}" + output_dir['fs'].mkdirs(full_geo_dir, exist_ok=True) # Make a directory for each data type X file type combo if None in aggregation_levels: for data_type in data_types: for file_type in file_types: - self.output_dir['fs'].mkdirs(f'{full_geo_dir}/{data_type}/{file_type}', exist_ok=True) + output_dir['fs'].mkdirs(f'{full_geo_dir}/{data_type}/{file_type}', exist_ok=True) # Make an aggregates directory for the geography type - full_geo_agg_dir = f"{self.output_dir['fs_path']}/metadata_and_annual_results_aggregates/{geo_top_dir}" - self.output_dir['fs'].mkdirs(full_geo_agg_dir, exist_ok=True) + full_geo_agg_dir = f"{output_dir['fs_path']}/metadata_and_annual_results_aggregates/{geo_top_dir}" + output_dir['fs'].mkdirs(full_geo_agg_dir, exist_ok=True) # Make a directory for each data type X file type combo for data_type in data_types: for file_type in file_types: - self.output_dir['fs'].mkdirs(f'{full_geo_agg_dir}/{data_type}/{file_type}', exist_ok=True) + output_dir['fs'].mkdirs(f'{full_geo_agg_dir}/{data_type}/{file_type}', exist_ok=True) # Builds a file path for each aggregate based on name, file type, and aggregation level def get_file_path(full_geo_agg_dir, full_geo_dir, geo_prefixes, geo_levels, file_type, aggregation_level): @@ -2796,7 +2870,7 @@ def get_file_path(full_geo_agg_dir, full_geo_dir, geo_prefixes, geo_levels, file geo_level_dir = f'{agg_level_dir}/{data_type}/{file_type}' if len(geo_levels) > 0: geo_level_dir = f'{geo_level_dir}/' + '/'.join(geo_levels) - self.output_dir['fs'].mkdirs(geo_level_dir, exist_ok=True) + output_dir['fs'].mkdirs(geo_level_dir, exist_ok=True) file_name = f'upgrade{upgrade_id}' # Add geography prefix to filename if len(geo_prefixes) > 0: @@ -2914,7 +2988,7 @@ def get_file_path(full_geo_agg_dir, full_geo_dir, geo_prefixes, geo_levels, file for file_type in file_types: file_path = get_file_path(full_geo_agg_dir, full_geo_dir, geo_prefixes, geo_levels, file_type, aggregation_level) logger.debug(f"Queuing {file_path}") - combo = (geo_data_for_data_type, self.output_dir, file_type, file_path) + combo = (geo_data_for_data_type, output_dir, file_type, file_path) combos_to_write.append(combo) # Write files in parallel @@ -2986,7 +3060,7 @@ def get_file_path(full_geo_agg_dir, full_geo_dir, geo_prefixes, geo_levels, file for file_type in file_types: file_path = get_file_path(full_geo_agg_dir, full_geo_dir, geo_prefixes, geo_levels, file_type, aggregation_level) logger.debug(f"Queuing {file_path}: n_cols = {n_cols:,}, n_rows = {n_rows:,}") - combo = (data_type_df, self.output_dir, file_type, file_path) + combo = (data_type_df, output_dir, file_type, file_path) combos_to_write.append(combo) # Write files in parallel @@ -4031,3 +4105,693 @@ def sightGlass_metadata_check(self, comstock_data: pl.LazyFrame): if err_log: raise ValueError(err_log) + + + @staticmethod + def create_sightglass_tables( + s3_location: str, + dataset_name: str, + database_name: str = "vizstock", + glue_service_role: str = "vizstock-glue-service-role", + ): + fs, fs_path = url_to_fs(s3_location) + + glue = boto3.client("glue", region_name="us-west-2") + + crawler_name_base = f"vizstock_{dataset_name}" + tbl_prefix_base = f"{dataset_name}" + + crawler_params = { + "Role": glue_service_role, + "DatabaseName": database_name, + "Targets": {}, + "SchemaChangePolicy": { + "UpdateBehavior": "UPDATE_IN_DATABASE", + "DeleteBehavior": "DELETE_FROM_DATABASE", + } + } + + # Crawl each metadata target separately to create a + # unique table name for each metadata folder + logger.info(f"Creating crawlers for metadata") + md_agg_paths = fs.ls(f"{s3_location}") #{fs_path}/metadata_and_annual_results_aggregates/ + md_paths = fs.ls(f"{fs_path}/metadata_and_annual_results/") + + for md_path in md_agg_paths + md_paths: + md_geog = md_path.split('/')[-1] + if '_aggregates' in md_path: + crawler_name = f'{crawler_name_base}_md_agg_{md_geog}' + tbl_prefix = f'{tbl_prefix_base}_md_agg_{md_geog}_' + else: + crawler_name = f'{crawler_name_base}_md_{md_geog}' + tbl_prefix = f'{tbl_prefix_base}_md_{md_geog}_' + + crawler_params["Name"] = crawler_name + crawler_params["TablePrefix"] = tbl_prefix + crawler_params["Targets"]["S3Targets"] = [{ + "Path": f"s3://{md_path}/full/parquet", + "SampleSize": 1 + }] + + try: + _ = glue.get_crawler(Name=crawler_name) + except glue.exceptions.EntityNotFoundException as ex: + logger.info(f"Creating Crawler {crawler_name}") + glue.create_crawler(**crawler_params) + else: + logger.info(f"Updating Crawler {crawler_name}") + glue.update_crawler(**crawler_params) + + logger.info(f"Running Crawler {crawler_name} on: {md_path}") + + expected_table_name = f"{tbl_prefix}{md_geog}" + logger.info(f"Expected Athena table: {expected_table_name} in database {database_name}") + + glue.start_crawler(Name=crawler_name) + time.sleep(10) + + crawler_info = glue.get_crawler(Name=crawler_name) + while crawler_info["Crawler"]["State"] != "READY": + time.sleep(10) + crawler_info = glue.get_crawler(Name=crawler_name) + logger.info(f"Crawler state: {crawler_info['Crawler']['State']}") + glue.delete_crawler(Name=crawler_name) + logger.info(f"Deleting Crawler {crawler_name}") + + return dataset_name + + @staticmethod + def fix_timeseries_tables(dataset_name: str, database_name: str = "vizstock"): + logger.info("Updating timeseries table schemas") + + glue = boto3.client("glue", region_name="us-west-2") + tbl_prefix = f"{dataset_name}_" + get_tables_kw = {"DatabaseName": database_name, "Expression": f"{tbl_prefix}*"} + tbls_resp = glue.get_tables(**get_tables_kw) + tbl_list = tbls_resp["TableList"] + + while "NextToken" in tbls_resp: + tbls_resp = glue.get_tables(NextToken=tbls_resp["NextToken"], **get_tables_kw) + tbl_list.extend(tbls_resp["TableList"]) + + for tbl in tbl_list: + + # Skip timeseries views that may exist, including ones created by this script + if tbl.get("TableType") == "VIRTUAL_VIEW": + continue + + table_name = tbl['Name'] + if not '_timeseries' in table_name: + continue + + # Check the dtype of the 'upgrade' column. + # Must be 'bigint' to match the metadata schema so joined queries work. + do_tbl_update = False + for part_key in tbl["PartitionKeys"]: + if part_key["Name"] == "upgrade" and part_key["Type"] != "bigint": + do_tbl_update = True + part_key["Type"] = "bigint" + + if not do_tbl_update: + logger.debug(f"Skipping {table_name} because it already has correct partition dtypes") + continue + + # Delete the automatically-created partition index. + # While the index exists, dtypes for the columns in the index cannot be modified. + indexes_resp = glue.get_partition_indexes( + DatabaseName=database_name, + TableName=table_name + ) + for index in indexes_resp['PartitionIndexDescriptorList']: + index_name = index['IndexName'] + index_keys = index['Keys'] + logger.debug(f'Deleting index {index_name} with keys {index_keys} in {table_name}') + glue.delete_partition_index( + DatabaseName=database_name, + TableName=table_name, + IndexName=index_name + ) + + # Wait for index deletion to complete + index_deleted = False + for i in range(0, 60): + indexes_resp = glue.get_partition_indexes( + DatabaseName=database_name, + TableName=table_name + ) + if len(indexes_resp['PartitionIndexDescriptorList']) == 0: + index_deleted = True + break + logger.debug('Waiting 10 seconds to check index deletion status') + time.sleep(10) + if not index_deleted: + raise RuntimeError(f'Did not delete index in 600 seconds, stopping.') + + # Change the dtype of the 'upgrade' partition column to bigint + tbl_input = {} + for k, v in tbl.items(): + if k in ( + "Name", + "Description", + "Owner", + "Retention", + "StorageDescriptor", + "PartitionKeys", + "TableType", + "Parameters", + ): + tbl_input[k] = v + logger.debug(f"Updating dtype of upgrade column in {table_name} to bigint") + glue.update_table(DatabaseName=database_name, TableInput=tbl_input) + + # Recreate the index + key_names = [k['Name'] for k in index_keys] + logger.debug(f"Creating index with columns: {key_names} in {table_name}") + glue.create_partition_index( + # CatalogId='string', + DatabaseName=database_name, + TableName=table_name, + PartitionIndex={ + 'Keys': key_names, + 'IndexName': 'vizstock_partition_index' + } + ) + + # Wait for index creation to complete + index_created = False + for i in range(0, 60): + indexes_resp = glue.get_partition_indexes( + DatabaseName=database_name, + TableName=table_name + ) + for index in indexes_resp['PartitionIndexDescriptorList']: + index_status = index['IndexStatus'] + if index_status == 'ACTIVE': + index_created = True + if index_created: + break + else: + logger.debug('Waiting 10 seconds to check index creation status') + time.sleep(10) + if not index_created: + raise RuntimeError(f'Did not create index in 600 seconds, stopping.') + + @staticmethod + def column_filter(col): + return not ( + col.name.endswith(".co2e_kg") + or col.name.find("out.electricity.pv") > -1 + or col.name.find("out.electricity.purchased") > -1 + or col.name.find("out.electricity.net") > -1 + or col.name.find(".net.") > -1 + or col.name.find("applicability.") > -1 + or col.name.find("out.qoi.") > -1 + or col.name.find("out.emissions.") > -1 + or col.name.find("out.params.") > -1 + or col.name.find("out.utility_bills.") > -1 + or col.name.find("calc.") > -1 + or col.name.startswith("in.ejscreen") + or col.name.startswith("in.cejst") + or col.name.startswith("in.cluster_id") + or col.name.startswith("in.size_bin_id") + or col.name.startswith("in.sampling_region_id") + or col.name.startswith("out.district_heating.interior_equipment") + or col.name.startswith("out.district_heating.cooling") + or col.name.endswith("_applicable") + or col.name.startswith("out.electricity.total.apr") + or col.name.startswith("out.electricity.total.aug") + or col.name.startswith("out.electricity.total.dec") + or col.name.startswith("out.electricity.total.feb") + or col.name.startswith("out.electricity.total.jan") + or col.name.startswith("out.electricity.total.jul") + or col.name.startswith("out.electricity.total.jun") + or col.name.startswith("out.electricity.total.mar") + or col.name.startswith("out.electricity.total.may") + or col.name.startswith("out.electricity.total.nov") + or col.name.startswith("out.electricity.total.oct") + or col.name.startswith("out.electricity.total.sep") + or col.name == "in.airtightness..m3_per_m2_h" # Skip BIGINT airtightness column TODO: fix upstream in postproc + ) + + @staticmethod + def create_column_alias(col_name): + + # accept SQLAlchemy Column / ColumnElement or plain str + if not isinstance(col_name, str): + # prefer .name, then .key; fallback to str (last resort) + name = getattr(col_name, "name", None) or getattr(col_name, "key", None) + if not name: + name = str(col_name) + col_name = name + + # 1) name normalizations + normalized = ( + col_name + .replace("gas", "natural_gas") + .replace("fueloil", "fuel_oil") + .replace("districtheating", "district_heating") + .replace("districtcooling", "district_cooling") + ) + + # 2) regex patterns + m1 = re.search( + r"^(electricity|natural_gas|fuel_oil|propane|district_heating|district_cooling|other_fuel)_(\w+)_(kwh|therm|mbtu|kbtu)$", + normalized, + ) + m2 = re.search( + r"^total_site_(electricity|natural_gas|fuel_oil|propane|district_heating|district_cooling|other_fuel)_(kwh|therm|mbtu|kbtu)$", + normalized, + ) + m3 = re.search(r"^(total)_(site_energy)_(kwh|therm|mbtu|kbtu)$", normalized) + m4 = re.search( + r"^total_net_site_(electricity|natural_gas|fuel_oil|propane|district_heating|district_cooling|other_fuel)_(kwh|therm|mbtu|kbtu)$", + normalized, + ) + m5 = re.search( + r"^total_purchased_site_(electricity|natural_gas|fuel_oil|propane|district_heating|district_cooling|other_fuel)_(kwh|therm|mbtu|kbtu)$", + normalized, + ) + + if not (m1 or m2 or m3 or m4 or m5): + # Not an energy column we care about + return 1.0, normalized + + if m1: + fueltype, enduse, fuel_units = m1.groups() + elif m2: + fueltype, fuel_units = m2.groups() + enduse = "total" + elif m3: + enduse, fueltype, fuel_units = m3.groups() # "total","site_energy",units + # If you prefer "site_energy" to be reported as a pseudo-fuel, keep as-is. + # Otherwise map to a specific convention here. + elif m4: + fueltype, fuel_units = m4.groups() + enduse = "net" + else: # m5 + fueltype, fuel_units = m5.groups() + enduse = "purchased" + + # 3) build alias + col_alias = f"out.{fueltype}.{enduse}.energy_consumption" + + # Created using OpenStudio unit conversion library + energy_unit_conv_to_kwh = { + "mbtu": 293.0710701722222, + "m_btu": 293.0710701722222, + "therm": 29.307107017222222, + "kbtu": 0.2930710701722222, + } + + # 4) conversion factor to kWh + if fuel_units == "kwh": + conv_factor = 1.0 + else: + try: + conv_factor = energy_unit_conv_to_kwh[fuel_units] + except KeyError: + raise ValueError(f"Unhandled energy unit: {fuel_units!r} in column {col_name!r}") + + return conv_factor, col_alias + + @staticmethod + def create_views( + dataset_name: str, database_name: str = "vizstock", workgroup: str = "eulp" + ): + glue = boto3.client("glue", region_name="us-west-2") + + logger.info(f'Creating views for {dataset_name}') + + # Get a list of metadata tables + get_md_tables_kw = { + "DatabaseName": database_name, + "Expression": f"{dataset_name}_md_.*", + } + tbls_resp = glue.get_tables(**get_md_tables_kw) + tbl_list = tbls_resp["TableList"] + while "NextToken" in tbls_resp: + tbls_resp = glue.get_tables(NextToken=tbls_resp["NextToken"], **get_md_tables_kw) + tbl_list.extend(tbls_resp["TableList"]) + md_tbls = [x["Name"] for x in tbl_list if x["TableType"] != "VIRTUAL_VIEW"] + + # Create a view for each metadata table + for metadata_tblname in md_tbls: + # Connect to the metadata table + engine = create_engine( + f"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/{database_name}?work_group={workgroup}" + ) + meta = sa.MetaData(bind=engine) + metadata_tbl = sa.Table(metadata_tblname, meta, autoload=True) + logger.info(f"Loaded metadata table: {metadata_tblname}") + + # Extract the partition columns from the table name + bys = [] + if 'by' in metadata_tblname: + bys = metadata_tblname.replace(f'{dataset_name}_md_', '') + bys = bys.replace(f'agg_', '').replace(f'by_', '').replace(f'_parquet', '').split('_and_') + logger.debug(f"Partition identifiers = {', '.join(bys)}") + + # Select columns for the metadata, aliasing partition columns + cols = [] + for col in metadata_tbl.columns: + cols.append(col) + + continue + + # Remove everything but the input characteristics and output energy columns from the view + cols = list(filter(ComStock.column_filter, cols)) + + # Alias the out.foo columns to remove units + # TODO: add this to timeseries stuff + cols_aliased = [] + for col in cols: + if col.name.startswith("out.") and col.name.find("..") > -1: + unitless_name = col.name.split('..')[0] + cols_aliased.append(col.label(unitless_name)) + # logger.debug(f'Aliasing {col.name} to {unitless_name}') + elif col.name == 'in.sqft..ft2': + unitless_name = 'in.sqft' # Special requirement for SightGlass + cols_aliased.append(col.label(unitless_name)) + # logger.debug(f'Aliasing {col.name} to {unitless_name}') + else: + cols_aliased.append(col) + cols = cols_aliased + + # Check columns + col_type_errs = 0 + for c in cols: + # Column name length for postgres compatibility + if len(c.name) > 63: + col_type_errs += 1 + logger.error(f'column: `{c.name}` must be < 64 chars for SightGlass postgres storage') + + # in.foo columns may not be bigints or booleans because they cannot be serialized to JSON + # when determining the unique set of filter values for SightGlass + if c.name.startswith('in.') and (str(c.type) == 'BIGINT' or (str(c.type) == 'BOOLEAN')): + col_type_errs += 1 + logger.error(f'in.foo column {c.name} may not be a BIGINT or BOOLEAN for SightGlass to work') + + # Expected bigint columns in SightGlass + expected_bigints = ['metadata_index', 'upgrade', 'bldg_id'] + if c.name in expected_bigints: + if not str(c.type) == 'BIGINT': + col_type_errs += 1 + logger.error(f'Column {c} must be a BIGINT, but found {c.type}') + + if col_type_errs > 0: + raise RuntimeError(f'{col_type_errs} were found in columns, correct these in metadata before proceeding.') + + # For PUMAs, need to create one view for each census region + if 'puma' in bys: + regions = ("South", "Midwest", "Northeast", "West") + for region in regions: + q = sa.select(cols).where(metadata_tbl.c["in.census_region_name"].in_([region])) + view_name = metadata_tblname.replace('_by_state_and_puma_parquet', '_puma') + view_name = f"{view_name}_{region.lower()}_vu" + db_plus_vu = f'{database_name}_{view_name}' + if len(db_plus_vu) > 63: + raise RuntimeError(f'db + view: `{db_plus_vu}` must be < 64 chars for SightGlass postgres storage') + view = sa.Table(view_name, meta) + create_view = CreateView(view, q, or_replace=True) + logger.info(f"Creating metadata view: {view_name}, partitioned by {bys}") + engine.execute(create_view) + + else: + q = sa.select(cols) + view_name = metadata_tblname.replace('_parquet', '') + view_name = f"{view_name}_vu" + db_plus_vu = f'{database_name}_{view_name}' + #if len(db_plus_vu) > 63: #TODO: add name truncation if we need to use this method for sightglass + # raise RuntimeError(f'db + view: `{db_plus_vu}` must be < 64 chars for SightGlass postgres storage') + view = sa.Table(view_name, meta) + create_view = CreateView(view, q, or_replace=True) + logger.info(f"Creating metadata view: {view_name}, partitioned by {bys}") + engine.execute(create_view) + + # Get a list of timeseries tables + get_ts_tables_kw = { + "DatabaseName": database_name, + "Expression": f"{dataset_name}_timeseries*", + } + tbls_resp = glue.get_tables(**get_ts_tables_kw) + tbl_list = tbls_resp["TableList"] + while "NextToken" in tbls_resp: + tbls_resp = glue.get_tables(NextToken=tbls_resp["NextToken"], **get_ts_tables_kw) + tbl_list.extend(tbls_resp["TableList"]) + ts_tbls = [x["Name"] for x in tbl_list if x["TableType"] != "VIRTUAL_VIEW"] + + # Create a view for each timeseries table, removing the emissions columns + for ts_tblname in ts_tbls: + engine = create_engine( + f"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/{database_name}?work_group={workgroup}" + ) + meta = sa.MetaData(bind=engine) + ts_tbl = sa.Table(ts_tblname, meta, autoload=True) + cols = list(filter(ComStock.column_filter, ts_tbl.columns)) + cols_aliased = [] + for col in cols: + + # rename + conv_factor, col_alias = ComStock.create_column_alias(col) + if (col.name == col_alias) & (conv_factor==1): # and conversion factor is 1 + cols_aliased.append(col) + elif (col.name != col_alias) & (conv_factor==1): #name different, conversion factor 1 + cols_aliased.append(col.label(col_alias)) #TODO: return both alias and new units + else: # name and conversion different + cols_aliased.append((col/conv_factor).label(col_alias)) #TODO: return both alias and new units + + if col.name == 'timestamp': #timestamp + # Convert bigint to timestamp type if necessary + if str(col.type) == 'BIGINT': + # Pandas uses nanosecond resolution integer timestamps. + # Presto expects second resolution values in from_unixtime. + # Must divide values by 1e9 to go from nanoseconds to seconds. + cols_aliased.append(sa.func.from_unixtime(col / 1e9).label('timestamp')) #NOTE: syntax for adding unit conversions + else: + cols_aliased.append(col) + + cols = cols_aliased + + q = sa.select(cols) + view_name = f"{ts_tblname}_vu" + + db_plus_vu = f'{database_name}_{view_name}' + if len(db_plus_vu) > 63: + raise RuntimeError(f'db + view: `{db_plus_vu}` must be < 64 chars for SightGlass postgres storage') + view = sa.Table(view_name, meta) + create_view = CreateView(view, q, or_replace=True) + logger.info(f"Creating timeseries view: {view_name}") + engine.execute(create_view) + logger.debug('Columns in view:') + for c in cols: + logger.debug(c) + + # get weighted load profiles + + def determine_state_or_county_timeseries_table(self, location_input, location_name=None): + """ + Determine if location input represents state or county request(s). + + Args: + location_input: Either a single location ID string or tuple/list of location IDs + location_name (str, optional): The location name (e.g., 'Minnesota', 'Boston') + + Returns: + str: 'state' for state-level locations, 'county' for county-level locations + """ + + # Handle single location (string) + if isinstance(location_input, str): + if len(location_input) == 2 and location_input.isalpha(): + # State ID: 2-letter alphabetic code (e.g., 'MN', 'CA', 'TX') + return 'state' + elif location_input.startswith('G') and len(location_input) > 2: + # County ID: starts with 'G' followed by numbers (e.g., 'G123456') + return 'county' + else: + # Handle other potential formats - default to state + logger.warning(f"Warning: Unrecognized location ID format: {location_input}, defaulting to state") + return 'state' + + # Handle multiple locations (tuple or list) + elif isinstance(location_input, (tuple, list)): + # Check all locations and determine if any counties are present + for location_id in location_input: + if self.determine_state_or_county_timeseries_table(location_id) == 'county': + return 'county' # Use county table if any counties present + return 'state' # All are states + + else: + logger.warning(f"Warning: Unexpected location input type: {type(location_input)}, defaulting to state") + return 'state' + + def get_weighted_load_profiles_from_s3(self, df, upgrade_num, location_input, upgrade_name): + """ + This method retrieves weighted timeseries profiles from s3/athena. + Returns dataframe with weighted kWh columns for baseline and upgrade. + + Args: + location_input: Either a single location ID (e.g., 'MN') or tuple/list of location IDs (e.g., ('MA', 'NH')) + """ + + # Normalize location_input to a list for consistent processing + if isinstance(location_input, str): + location_list = [location_input] + elif isinstance(location_input, (tuple, list)): + location_list = list(location_input) + else: + raise ValueError(f"Unexpected location_input type: {type(location_input)}") + + # Determine table type based on the ENTIRE request set, not just this location + # If ANY location in timeseries_locations_to_plot is a county, use county table for ALL + requires_county_table = False + for loc_key in self.timeseries_locations_to_plot.keys(): + if self.determine_state_or_county_timeseries_table(loc_key) == 'county': + requires_county_table = True + break + + # Set up table configuration based on whether counties are needed anywhere + if requires_county_table: + agg = 'county' + metadata_table_suffix = '_md_agg_by_state_and_county_vu' + weight_view_table = f'{self.comstock_run_name}_md_agg_by_state_and_county_vu' + else: + agg = 'state' + metadata_table_suffix = '_md_agg_national_by_state_vu' + weight_view_table = f'{self.comstock_run_name}_md_agg_national_by_state_vu' + + # Initialize Athena client + athena_client = BuildStockQuery(workgroup='eulp', + db_name='enduse', + table_name=self.comstock_run_name, + buildstock_type='comstock', + skip_reports=True, + metadata_table_suffix=metadata_table_suffix, + ) + + # Create upgrade ID to name mapping from existing data + upgrade_name_mapping = self.data.select(self.UPGRADE_ID, self.UPGRADE_NAME).unique().collect().sort(self.UPGRADE_ID).to_dict(as_series=False) + dict_upid_to_upname = dict(zip(upgrade_name_mapping[self.UPGRADE_ID], upgrade_name_mapping[self.UPGRADE_NAME])) + + # Determine geographic types for all locations + state_locations = [] + county_locations = [] + + for location_id in location_list: + location_geo_type = self.determine_state_or_county_timeseries_table(location_id) + if location_geo_type == 'state': + state_locations.append(location_id) + else: + county_locations.append(location_id) + + # Build applicability query for all locations + builder = ComStockQueryBuilder(self.comstock_run_name) + + # Determine column type based on primary geographic type + primary_geo_type = self.determine_state_or_county_timeseries_table(location_input) + + query_params = { + 'upgrade_ids': upgrade_num, + 'columns': [ + self.DATASET, + f'"{self.STATE_NAME}"' if primary_geo_type == 'state' else f'"{self.COUNTY_ID}"', + self.BLDG_ID, + self.UPGRADE_ID, + self.UPGRADE_APPL + ], + 'weight_view_table': weight_view_table + } + + # Add geographic filters for all locations + if state_locations and county_locations: + # Mixed - need to query both separately and combine + # For now, use the primary type's approach + if primary_geo_type == 'state': + query_params['state'] = state_locations if len(state_locations) > 1 else state_locations[0] + else: + query_params['county'] = county_locations if len(county_locations) > 1 else county_locations[0] + elif state_locations: + query_params['state'] = state_locations if len(state_locations) > 1 else state_locations[0] + elif county_locations: + query_params['county'] = county_locations if len(county_locations) > 1 else county_locations[0] + + applicability_query = builder.get_applicability_query(**query_params) + + # Execute query to get applicable buildings + applic_df = athena_client.execute(applicability_query) + applic_bldgs_list = list(applic_df[self.BLDG_ID].unique()) + applic_bldgs_list = [int(x) for x in applic_bldgs_list] + + # Create location description string for logging messages + location_desc = f"locations {location_list}" if len(location_list) > 1 else f"{primary_geo_type} {location_list[0]}" + + # Check if any applicable buildings were found + if len(applic_bldgs_list) == 0: + print(f"Warning: No applicable buildings found for {location_desc} and upgrade(s) {upgrade_num}. Returning empty DataFrames.") + return pd.DataFrame(), pd.DataFrame() + + # Build geographic restrictions for all locations + geo_restrictions = [] + if state_locations: + geo_restrictions.append((self.STATE_ABBRV, state_locations)) + if county_locations: + geo_restrictions.append((self.COUNTY_ID, county_locations)) + + # Initialize variables before loop to avoid NameError if loop doesn't execute + df_base_ts_agg_weighted = None + df_up_ts_agg_weighted = None + + # loop through upgrades + for upgrade_id in df[self.UPGRADE_ID].unique(): + + # if there are upgrades, restrict baseline to match upgrade applicability + if (upgrade_id in (0, "0")) and (df[self.UPGRADE_ID].nunique() > 1): + + # Create query builder and generate query + builder = ComStockQueryBuilder(self.comstock_run_name) + + # Build restrictions list including all geographic locations + restrictions = [(self.BLDG_ID, applic_bldgs_list)] + restrictions.extend(geo_restrictions) + + query = builder.get_timeseries_aggregation_query( + upgrade_id=upgrade_id, + enduses=(list(self.END_USES_TIMESERIES_DICT.values())+["total_site_electricity_kwh"]), + restrictions=restrictions, + timestamp_grouping='hour', + weight_view_table=weight_view_table + ) + + location_desc = f"locations {location_list}" if len(location_list) > 1 else f"{primary_geo_type} {location_list[0]}" + logger.info(f"Getting weighted baseline load profile for {location_desc} and upgrade id {upgrade_id} with {len(applic_bldgs_list)} applicable buildings (using {agg} table).") + + # Execute query + df_base_ts_agg_weighted = athena_client.execute(query) + df_base_ts_agg_weighted[self.UPGRADE_NAME] = dict_upid_to_upname[0] + + else: + # baseline load data when no upgrades are present, or upgrade load data + # create query builder and generate query for upgrade data + builder = ComStockQueryBuilder(self.comstock_run_name) + + # Build restrictions list including all geographic locations (same as baseline) + restrictions = [('bldg_id', applic_bldgs_list)] + restrictions.extend(geo_restrictions) + + query = builder.get_timeseries_aggregation_query( + upgrade_id=upgrade_id, + enduses=(list(self.END_USES_TIMESERIES_DICT.values())+["total_site_electricity_kwh"]), + restrictions=restrictions, + timestamp_grouping='hour', + weight_view_table=weight_view_table + ) + + df_up_ts_agg_weighted = athena_client.execute(query) + df_up_ts_agg_weighted[self.UPGRADE_NAME] = dict_upid_to_upname[int(upgrade_num[0])] + + + # Initialize default values in case no data was processed + df_base_ts_agg_weighted = pd.DataFrame() if 'df_base_ts_agg_weighted' not in locals() else df_base_ts_agg_weighted + df_up_ts_agg_weighted = pd.DataFrame() if 'df_up_ts_agg_weighted' not in locals() else df_up_ts_agg_weighted + + return df_base_ts_agg_weighted, df_up_ts_agg_weighted diff --git a/postprocessing/comstockpostproc/comstock_measure_comparison.py b/postprocessing/comstockpostproc/comstock_measure_comparison.py index 0e942a5a1..6073dd427 100644 --- a/postprocessing/comstockpostproc/comstock_measure_comparison.py +++ b/postprocessing/comstockpostproc/comstock_measure_comparison.py @@ -16,7 +16,7 @@ class ComStockMeasureComparison(NamingMixin, UnitsMixin, PlottingMixin): - def __init__(self, comstock_object: comstock.ComStock, states, make_comparison_plots, make_timeseries_plots, image_type='jpg', name=None): + def __init__(self, comstock_object: comstock.ComStock, timeseries_locations_to_plot, make_comparison_plots, make_timeseries_plots, image_type='jpg', name=None): # Initialize members assert isinstance(comstock_object.data, pl.LazyFrame) @@ -28,6 +28,9 @@ def __init__(self, comstock_object: comstock.ComStock, states, make_comparison_p self.data = comstock_object.plotting_data.clone() #not really a deep copy, only schema is copied but not data. assert isinstance(self.data, pl.LazyFrame) + # Store reference to ComStock object for use in plotting methods + self.comstock_object = comstock_object + self.color_map = {} self.image_type = image_type self.name = name @@ -39,11 +42,14 @@ def __init__(self, comstock_object: comstock.ComStock, states, make_comparison_p self.dataset_name = comstock_object.dataset_name self.output_dir = os.path.join( current_dir, '..', 'output', self.dataset_name, 'measure_runs') + self.column_for_grouping = self.UPGRADE_NAME self.dict_measure_dir = {} # this can be called to determine output directory self.upgrade_ids_for_comparison = comstock_object.upgrade_ids_for_comparison self.comstock_run_name = comstock_object.comstock_run_name - self.states = states + self.athena_table_name = comstock_object.athena_table_name + self.s3_base_dir = comstock_object.s3_base_dir + self.timeseries_locations_to_plot = timeseries_locations_to_plot self.make_timeseries_plots = make_timeseries_plots self.lazyframe_plotter = LazyFramePlotter() @@ -82,11 +88,10 @@ def __init__(self, comstock_object: comstock.ComStock, states, make_comparison_p # make consumption plots for upgrades if requested by user if make_comparison_plots: logger.info(f'Making plots for upgrade {upgrade}') - self.make_plots(df_upgrade, self.column_for_grouping, states, make_timeseries_plots, color_map, self.dict_measure_dir[upgrade]) + self.make_plots(df_upgrade, self.column_for_grouping, timeseries_locations_to_plot, make_timeseries_plots, color_map, self.dict_measure_dir[upgrade], self.s3_base_dir, self.athena_table_name, self.comstock_run_name) else: logger.info("make_comparison_plots is set to false, so not plots were created. Set make_comparison_plots to True for plots.") - # make plots comparing multiple upgrades together for comp_name, comp_up_ids in self.upgrade_ids_for_comparison.items(): @@ -114,13 +119,13 @@ def __init__(self, comstock_object: comstock.ComStock, states, make_comparison_p # make consumption plots for upgrades if requested by user if make_comparison_plots: - self.make_comparative_plots(df_upgrade, self.column_for_grouping, states, make_timeseries_plots, color_map, comp_output_dir) + self.make_comparative_plots(df_upgrade, self.column_for_grouping, timeseries_locations_to_plot, make_timeseries_plots, color_map, comp_output_dir, self.s3_base_dir, self.athena_table_name, self.comstock_run_name) else: logger.info("make_comparison_plots is set to false, so not plots were created. Set make_comparison_plots to True for plots.") end_time = pd.Timestamp.now() logger.info(f"Time taken to make all plots is {end_time - start_time}") - def make_plots(self, lazy_frame: pl.LazyFrame, column_for_grouping, states, make_timeseries_plots, color_map, output_dir): + def make_plots(self, lazy_frame: pl.LazyFrame, column_for_grouping, timeseries_locations_to_plot, make_timeseries_plots, color_map, output_dir, s3_base_dir, athena_table_name, comstock_run_name): time_start = pd.Timestamp.now() BASIC_PARAMS = { @@ -185,19 +190,23 @@ def make_plots(self, lazy_frame: pl.LazyFrame, column_for_grouping, states, make columns=(self.lazyframe_plotter.BASE_COLUMNS + self.lazyframe_plotter.UNMET_HOURS_COLS))(**BASIC_PARAMS) if make_timeseries_plots: - TIMESERIES_PARAMS = {'comstock_run_name': self.comstock_run_name, 'states': states, 'color_map': color_map, - 'output_dir': output_dir} + TIMESERIES_PARAMS = { + 'comstock_run_name': self.comstock_run_name, + 'timeseries_locations_to_plot': timeseries_locations_to_plot, + 'color_map': color_map, + 'output_dir': output_dir, + 'comstock_obj': self.comstock_object} LazyFramePlotter.plot_with_lazy(plot_method=self.plot_measure_timeseries_peak_week_by_state, lazy_frame=lazy_frame.clone(), - columns=(self.lazyframe_plotter.BASE_COLUMNS + [self.UPGRADE_ID, self.BLDG_WEIGHT, self.BLDG_TYPE]))(**TIMESERIES_PARAMS) - LazyFramePlotter.plot_with_lazy(plot_method=self.plot_measure_timeseries_season_average_by_state, lazy_frame=lazy_frame.clone(), - columns=(self.lazyframe_plotter.BASE_COLUMNS + [self.UPGRADE_ID, self.BLDG_WEIGHT, self.BLDG_TYPE]))(**TIMESERIES_PARAMS) + columns=(self.lazyframe_plotter.BASE_COLUMNS + [self.UPGRADE_ID, self.BLDG_TYPE]))(**TIMESERIES_PARAMS) #self.BLDG_WEIGHT, LazyFramePlotter.plot_with_lazy(plot_method=self.plot_measure_timeseries_season_average_by_state, lazy_frame=lazy_frame.clone(), - columns=(self.lazyframe_plotter.BASE_COLUMNS + [self.UPGRADE_ID, self.BLDG_WEIGHT, self.BLDG_TYPE]))(**TIMESERIES_PARAMS) + columns=(self.lazyframe_plotter.BASE_COLUMNS + [self.UPGRADE_ID, self.BLDG_TYPE]))(**TIMESERIES_PARAMS) #self.BLDG_WEIGHT + LazyFramePlotter.plot_with_lazy(plot_method=self.plot_measure_timeseries_annual_average_by_state_and_enduse, lazy_frame=lazy_frame.clone(), + columns=(self.lazyframe_plotter.BASE_COLUMNS + [self.UPGRADE_ID, self.BLDG_TYPE]))(**TIMESERIES_PARAMS) #self.BLDG_WEIGHT time_end = pd.Timestamp.now() logger.info(f"Time taken to make plots is {time_end - time_start}") - def make_comparative_plots(self, lazy_frame: pl.LazyFrame, column_for_grouping, states, make_timeseries_plots, color_map, output_dir): + def make_comparative_plots(self, lazy_frame: pl.LazyFrame, column_for_grouping, timeseries_locations_to_plot, make_timeseries_plots, color_map, output_dir, s3_base_dir, athena_table_name, comstock_run_name): # Make plots comparing the upgrades assert isinstance(lazy_frame, pl.LazyFrame) diff --git a/postprocessing/comstockpostproc/comstock_query_builder.py b/postprocessing/comstockpostproc/comstock_query_builder.py new file mode 100644 index 000000000..33ca48ece --- /dev/null +++ b/postprocessing/comstockpostproc/comstock_query_builder.py @@ -0,0 +1,370 @@ +""" +Query templates and builders for ComStock data extraction from Athena. + +This module provides SQL query templates and builder functions to construct +Athena queries for various ComStock analysis needs. +""" + +import logging +from typing import List, Optional, Union + +logger = logging.getLogger(__name__) + + +class ComStockQueryBuilder: + """ + Builder class for creating SQL queries to extract ComStock data from Athena. + + This class provides templated queries that can be customized with parameters + for different analysis needs while maintaining consistent query structure. + """ + + def __init__(self, athena_table_name: str): + """ + Initialize the query builder with the base Athena table name. + + Args: + athena_table_name: Base name of the Athena table (without _timeseries or _baseline suffix) + """ + self.athena_table_name = athena_table_name + self.timeseries_table = f"{athena_table_name}_timeseries" + self.baseline_table = f"{athena_table_name}_baseline" + + #TODO: this method has not yet been validated, and should be prior to use. + def get_monthly_energy_consumption_query(self, + upgrade_ids: Optional[List[Union[int, str]]] = None, + states: Optional[List[str]] = None, + building_types: Optional[List[str]] = None) -> str: + """ + Build query for monthly natural gas and electricity consumption by state and building type. + + Args: + upgrade_ids: List of upgrade IDs to filter on (optional) + states: List of state IDs to filter on (optional) + building_types: List of building types to filter on (optional) + + Returns: + SQL query string + + """ + + # Build WHERE clause conditions + where_conditions = ['"build_existing_model.building_type" IS NOT NULL'] + + if upgrade_ids: + upgrade_list = ', '.join([f'"{uid}"' for uid in upgrade_ids]) + where_conditions.append(f'"upgrade" IN ({upgrade_list})') + + if states: + state_list = ', '.join([f'"{state}"' for state in states]) + where_conditions.append(f'SUBSTRING("build_existing_model.county_id", 2, 2) IN ({state_list})') + + if building_types: + btype_list = ', '.join([f'"{bt}"' for bt in building_types]) + where_conditions.append(f'"build_existing_model.create_bar_from_building_type_ratios_bldg_type_a" IN ({btype_list})') + + where_clause = ' AND '.join(where_conditions) + + query = f""" + SELECT + "upgrade", + "month", + "state_id", + "building_type", + sum("total_site_gas_kbtu") AS "total_site_gas_kbtu", + sum("total_site_electricity_kwh") AS "total_site_electricity_kwh" + FROM + ( + SELECT + EXTRACT(MONTH from "time") as "month", + SUBSTRING("build_existing_model.county_id", 2, 2) AS "state_id", + "build_existing_model.create_bar_from_building_type_ratios_bldg_type_a" as "building_type", + "upgrade", + "total_site_gas_kbtu", + "total_site_electricity_kwh" + FROM + "{self.timeseries_table}" + JOIN "{self.baseline_table}" + ON "{self.timeseries_table}"."building_id" = "{self.baseline_table}"."building_id" + WHERE {where_clause} + ) + GROUP BY + "upgrade", + "month", + "state_id", + "building_type" + """ + + return query.strip() + + def get_timeseries_aggregation_query( + self, + upgrade_id: Union[int, str], + enduses: List[str], + weight_view_table: str, + group_by: Optional[List[str]] = None, + restrictions: List[tuple] = None, + timestamp_grouping: str = 'hour', + building_ids: Optional[List[int]] = None, + include_sample_stats: bool = True, + include_area_normalized_cols: bool = False) -> str: + """ + Build query for timeseries data aggregation similar to buildstock query functionality. + + This matches the pattern from working buildstock queries that join timeseries data + with a weight view table for proper weighting and aggregation. + + Automatically detects if the timeseries table is partitioned by state based on naming conventions: + - Tables with 'ts_by_state' in the name are treated as state-partitioned + - Tables with '_timeseries' in the name (without 'ts_by_state') are treated as non-partitioned + - Other naming patterns assume no partitioning with a warning to the user regarding this assumption. + TODO: Improve this in the future to check for partitioning directly from Athena metadata if possible. + + Args: + upgrade_id: Upgrade ID to filter on + enduses: List of end use columns to select + weight_view_table: Name of the weight view table (required - e.g., 'rtuadv_v11_md_agg_national_by_state_vu' or 'rtuadv_v11_md_agg_national_by_county_vu') + group_by: Additional columns to group by (e.g., ['build_existing_model.building_type']) + restrictions: List of (column, values) tuples for filtering + timestamp_grouping: How to group timestamps ('hour', 'day', 'month') + building_ids: Specific building IDs to filter on (optional) + include_sample_stats: Whether to include sample_count, units_count, rows_per_sample + include_area_normalized_cols: Whether to include weighted area and kwh_weighted columns for AMI comparison + + Returns: + SQL query string + + """ + + # Weight view table is required - cannot auto-assign without knowing state vs county aggregation + if weight_view_table is None: + raise ValueError("weight_view_table parameter is required. Cannot auto-assign without knowing geographic aggregation level (state vs county). " + "Please provide the full weight view table name (e.g., 'your_dataset_md_agg_national_by_state_vu' or 'your_dataset_md_agg_national_by_county_vu')") + + # Auto-detect if timeseries is partitioned by state based on naming conventions + table_name_lower = self.timeseries_table.lower() + if 'ts_by_state' in table_name_lower: + timeseries_partitioned_by_state = True + logger.info(f"Detected state-partitioned timeseries table: {self.timeseries_table}. " + "Query will include state partition filter so timeseries files for a building ID are not double counted.") + elif '_timeseries' in table_name_lower: + timeseries_partitioned_by_state = False + logger.info(f"Detected standard non-partitioned timeseries table: {self.timeseries_table}") + else: + timeseries_partitioned_by_state = False + logger.warning(f"Timeseries table name '{self.timeseries_table}' does not match expected patterns ('ts_by_state' or '_timeseries'). " + "Assuming no state partitioning. If the table is actually partitioned by state, building IDs may be double-counted. " + "Please use standard naming conventions: 'dataset_ts_by_state' for partitioned or 'dataset_timeseries' for non-partitioned tables.") + + # Build timestamp grouping with date_trunc and time adjustment + time_group = '1' + if timestamp_grouping == 'hour': + time_select = f"date_trunc('hour', date_add('second', -900, {self.timeseries_table}.time)) AS time" + elif timestamp_grouping == 'day': + time_select = f"date_trunc('day', {self.timeseries_table}.time) AS time" + elif timestamp_grouping == 'month': + time_select = f"date_trunc('month', {self.timeseries_table}.time) AS time" + else: + time_select = f"{self.timeseries_table}.time AS time" + + # Build SELECT clause with sample statistics + select_clauses = [time_select] + + # Add group_by columns to SELECT (excluding 'time' which is already handled) + if group_by: + for col in group_by: + if col != 'time': + select_clauses.append(f'{weight_view_table}."{col}"') + + if include_sample_stats: + select_clauses.extend([ + f"count(distinct({self.timeseries_table}.building_id)) AS sample_count", + f"(count(distinct({self.timeseries_table}.building_id)) * sum({weight_view_table}.weight)) / sum(1) AS units_count", + f"sum(1) / count(distinct({self.timeseries_table}.building_id)) AS rows_per_sample" + ]) + + # Add weighted area if requested (for AMI comparison) + if include_area_normalized_cols: + weighted_area = f'sum({weight_view_table}."in.sqft" * {weight_view_table}.weight) AS weighted_sqft' + select_clauses.append(weighted_area) + + # Build weighted enduse aggregations + for enduse in enduses: + weighted_sum = f"sum({self.timeseries_table}.{enduse} * {weight_view_table}.weight) AS {enduse}" + select_clauses.append(weighted_sum) + + # Add kwh_per_sf columns if requested (for AMI comparison - normalized by weighted area) + # Note: Building area is per building, but appears in multiple timestep rows when joined. + # The sum() counts each building's area once per timestep. Divide by rows_per_sample to correct. + # + # Example: 4 buildings (each 10k sqft), 15-min data aggregated to hourly (4 timesteps): + # - Each building contributes 4 rows (one per 15-min interval) + # - weighted_energy_sum: 100 kWh (energy varies per timestep, summed across all 16 rows) + # - weighted_area_sum: 160k sqft (same 10k per building repeated 4 times: 4 buildings × 10k × 4 timesteps) + # - rows_per_sample: 16 total rows / 4 distinct buildings = 4 timesteps/building + # - Corrected area: 160k / 4 = 40k sqft (removes timestep duplication) + # - Result: 100 kWh / 40k sqft = 0.0025 kWh/sqft + if include_area_normalized_cols and enduse.endswith('_kwh'): + # Build the formula in parts for clarity + weighted_energy_sum = f"sum({self.timeseries_table}.{enduse} * {weight_view_table}.weight)" + weighted_area_sum = f"sum({weight_view_table}.\"in.sqft\" * {weight_view_table}.weight)" + rows_per_sample = f"(sum(1) / count(distinct({self.timeseries_table}.building_id)))" + corrected_area = f"({weighted_area_sum} / {rows_per_sample})" + + kwh_per_sf = f"{weighted_energy_sum} / {corrected_area} AS {enduse.replace('_kwh', '_kwh_per_sf')}" + select_clauses.append(kwh_per_sf) + + select_clause = ',\n '.join(select_clauses) + + # Build WHERE clause - join conditions first, then filters + where_conditions = [ + f'{weight_view_table}."bldg_id" = {self.timeseries_table}.building_id', + f'{weight_view_table}.upgrade = {self.timeseries_table}.upgrade', + f'{weight_view_table}.upgrade = {upgrade_id}' + ] + + # Add state partition filter if timeseries is partitioned by state (enables partition pruning) + # When partitioned by state, each building's timeseries data is duplicated across each state + # it is apportioned to. To avoid double counting, we filter to only the state matching the weight view. + if timeseries_partitioned_by_state: + where_conditions.append(f'{weight_view_table}.state = {self.timeseries_table}.state') + + if building_ids: + bldg_list = ', '.join([str(bid) for bid in building_ids]) + where_conditions.append(f'{weight_view_table}."bldg_id" IN ({bldg_list})') + + # Handle restrictions - these are typically filters on the weight view table + if restrictions: + for column, values in restrictions: + # Determine if this is a numeric column that shouldn't be quoted + numeric_columns = ['bldg_id', 'building_id', 'upgrade', 'upgrade_id'] + is_numeric = column in numeric_columns + + if isinstance(values, (list, tuple)): + if len(values) == 1: + if is_numeric: + where_conditions.append(f'{weight_view_table}."{column}" = {values[0]}') + else: + where_conditions.append(f'{weight_view_table}."{column}" = \'{values[0]}\'') + else: + if is_numeric: + value_list = ', '.join([str(v) for v in values]) + else: + value_list = ', '.join([f"'{v}'" for v in values]) + where_conditions.append(f'{weight_view_table}."{column}" IN ({value_list})') + else: + if is_numeric: + where_conditions.append(f'{weight_view_table}."{column}" = {values}') + else: + where_conditions.append(f'{weight_view_table}."{column}" = \'{values}\'') + + where_clause = ' AND '.join(where_conditions) + + # Build GROUP BY clause - use column positions based on SELECT order + group_by_positions = [time_group] # Time is always position 1 + if group_by: + # Add positions for additional group_by columns (excluding 'time') + position = 2 # Start after time + for col in group_by: + if col != 'time': + group_by_positions.append(str(position)) + position += 1 + + group_by_clause = ', '.join(group_by_positions) + + # Build the query using FROM clause with comma-separated tables + query = f"""SELECT {select_clause} + FROM {weight_view_table}, {self.timeseries_table} + WHERE {where_clause} + GROUP BY {group_by_clause} + ORDER BY {group_by_clause}""" + + return query + + def get_applicability_query(self, + upgrade_ids: List[Union[int, str]], + state: Optional[Union[str, List[str]]] = None, + county: Optional[Union[str, List[str]]] = None, + columns: Optional[List[str]] = None, + weight_view_table: Optional[str] = None) -> str: + """ + Build query to get applicable buildings and their characteristics from the weight view. + + Args: + upgrade_ids: List of upgrade IDs to filter on + state: State abbreviation(s) to filter on (optional, can be single string or list) + county: County GISJOIN(s) to filter on (optional, can be single string or list) + columns: Specific columns to select (optional, defaults to common applicability columns) + weight_view_table: Name of the weight view table (optional, uses default naming) + + Returns: + SQL query string + + """ + + # If no weight view table provided, construct name based on geographic level + if weight_view_table is None: + if county is not None: + weight_view_table = f"{self.athena_table_name}_md_agg_national_by_county_vu" + else: + weight_view_table = f"{self.athena_table_name}_md_agg_national_by_state_vu" + + # Default columns for applicability queries + if columns is None: + # Choose geographic column based on which filter is being used + geo_column = '"in.nhgis_county_gisjoin"' if county else '"in.state"' + columns = [ + 'dataset', + geo_column, + 'bldg_id', + 'upgrade', + 'applicability' + ] + + # Build SELECT clause + select_clause = ',\n '.join(columns) + + # Build WHERE clause + where_conditions = [] + + # Filter by upgrade IDs + if len(upgrade_ids) == 1: + where_conditions.append(f'upgrade = {upgrade_ids[0]}') + else: + upgrade_list = ','.join(map(str, upgrade_ids)) + where_conditions.append(f'upgrade IN ({upgrade_list})') + + # Filter by applicability + where_conditions.append('applicability = true') + + # Filter by geographic location (state or county) - support single values or lists + if state: + if isinstance(state, str): + where_conditions.append(f'"in.state" = \'{state}\'') + elif isinstance(state, (list, tuple)): + if len(state) == 1: + where_conditions.append(f'"in.state" = \'{state[0]}\'') + else: + state_list = "', '".join(state) + where_conditions.append(f'"in.state" IN (\'{state_list}\')') + elif county: + if isinstance(county, str): + where_conditions.append(f'"in.nhgis_county_gisjoin" = \'{county}\'') + elif isinstance(county, (list, tuple)): + if len(county) == 1: + where_conditions.append(f'"in.nhgis_county_gisjoin" = \'{county[0]}\'') + else: + county_list = "', '".join(county) + where_conditions.append(f'"in.nhgis_county_gisjoin" IN (\'{county_list}\')') + + where_clause = ' AND '.join(where_conditions) + + query = f""" + SELECT DISTINCT + {select_clause} + FROM {weight_view_table} + WHERE {where_clause} + ORDER BY bldg_id + """ + + return query.strip() \ No newline at end of file diff --git a/postprocessing/comstockpostproc/comstock_to_ami_comparison.py b/postprocessing/comstockpostproc/comstock_to_ami_comparison.py index 87ef1dcc7..35bcfdebe 100644 --- a/postprocessing/comstockpostproc/comstock_to_ami_comparison.py +++ b/postprocessing/comstockpostproc/comstock_to_ami_comparison.py @@ -35,17 +35,17 @@ def __init__(self, comstock_object:ComStock, ami_object:AMI, image_type='jpg', n logger.warning(f'No timeseries data was available for {dataset.dataset_name}, unable to make AMI comparison.') continue if isinstance(dataset, ComStock): - df = dataset.ami_timeseries_data + df = dataset.ami_timeseries_data.copy(deep=True) df['run'] = self.comstock_object.dataset_name df['year'] = self.comstock_object.year + # Normalize building type names to match AMI format + df['building_type'] = df['building_type'].map(self.BLDG_TYPE_TO_SNAKE_CASE).fillna(df['building_type'].str.lower()) dfs_to_concat.append(df) - df.iloc[0:0] elif isinstance(dataset, AMI): - df = dataset.ami_timeseries_data + df = dataset.ami_timeseries_data.copy(deep=True) df['run'] = self.ami_object.dataset_name df['enduse'] = 'total' dfs_to_concat.append(df) - df.iloc[0:0] self.color_map[dataset.dataset_name] = dataset.color dataset_names.append(dataset.dataset_name) @@ -54,7 +54,10 @@ def __init__(self, comstock_object:ComStock, ami_object:AMI, image_type='jpg', n self.name = ' vs '.join(dataset_names) # Combine into a single dataframe for convenience - self.ami_timeseries_data = pd.concat(dfs_to_concat, join='outer') + if len(dfs_to_concat) == 0: + self.ami_timeseries_data = pd.DataFrame() + else: + self.ami_timeseries_data = pd.concat(dfs_to_concat, join='outer') # Make directories current_dir = os.path.dirname(os.path.abspath(__file__)) diff --git a/postprocessing/comstockpostproc/plotting_mixin.py b/postprocessing/comstockpostproc/plotting_mixin.py index 766e60cce..10e7a7bff 100644 --- a/postprocessing/comstockpostproc/plotting_mixin.py +++ b/postprocessing/comstockpostproc/plotting_mixin.py @@ -11,7 +11,6 @@ import plotly.express as px import seaborn as sns import plotly.graph_objects as go -from buildstock_query import BuildStockQuery import matplotlib.colors as mcolors from plotly.subplots import make_subplots @@ -3009,82 +3008,6 @@ def plot_load_duration_curve(self, df, region, building_type, color_map, output_ output_path = os.path.abspath(os.path.join(output_dir, filename)) plt.savefig(output_path, bbox_inches='tight') - - # get weighted load profiles - def wgt_by_btype(self, df, run_data, dict_wgts, upgrade_num, state, upgrade_name): - """ - This method weights the timeseries profiles. - Returns dataframe with weighted kWh columns. - """ - btype_list = df[self.BLDG_TYPE].unique() - - applic_bldgs_list = list(df.loc[(df[self.UPGRADE_NAME].isin(upgrade_name)) & (df[self.UPGRADE_APPL]==True), self.BLDG_ID]) - applic_bldgs_list = [int(x) for x in applic_bldgs_list] - - dfs_base=[] - dfs_up=[] - for btype in btype_list: - - # get building weights - btype_wgt = dict_wgts[btype] - - # apply weights by building type - def apply_wgts(df): - # Identify columns that contain 'kwh' in their names - kwh_columns = [col for col in df.columns if 'kwh' in col] - - # Apply the weight and add the suffix 'weighted' - weighted_df = df[kwh_columns].apply(lambda x: x * btype_wgt).rename(columns=lambda x: x + '_weighted') - # Concatenate the new weighted columns with the original DataFrame without the unweighted 'kwh' columns - df_wgt = pd.concat([df.drop(columns=kwh_columns), weighted_df], axis=1) - - return df_wgt - - # baseline load data - aggregate electricity total only - df_base_ts_agg = run_data.agg.aggregate_timeseries( - upgrade_id=0, - enduses=(list(self.END_USES_TIMESERIES_DICT.values())+["total_site_electricity_kwh"]), - restrict=[(('build_existing_model.building_type', [self.BLDG_TYPE_TO_SNAKE_CASE[btype]])), - ('state_abbreviation', [f"{state}"]), - (run_data.bs_bldgid_column, applic_bldgs_list), - ], - timestamp_grouping_func='hour', - get_query_only=False - ) - - # add baseline data - df_base_ts_agg_weighted = apply_wgts(df_base_ts_agg) - df_base_ts_agg_weighted[self.UPGRADE_NAME] = 'baseline' - dfs_base.append(df_base_ts_agg_weighted) - - for upgrade in upgrade_num: - # upgrade load data - all enduses - upgrade_ts_agg = run_data.agg.aggregate_timeseries( - upgrade_id=upgrade.astype(str), - enduses=(list(self.END_USES_TIMESERIES_DICT.values())+["total_site_electricity_kwh"]), - restrict=[(('build_existing_model.building_type', [self.BLDG_TYPE_TO_SNAKE_CASE[btype]])), - ('state_abbreviation', [f"{state}"]), - ], - timestamp_grouping_func='hour', - get_query_only=False - ) - - # add upgrade data - df_upgrade_ts_agg_weighted = apply_wgts(upgrade_ts_agg) - df_upgrade_ts_agg_weighted[self.UPGRADE_NAME] = self.dict_upid_to_upname[upgrade] - dfs_up.append(df_upgrade_ts_agg_weighted) - - - # concatinate and combine baseline data - dfs_base_combined = pd.concat(dfs_base, join='outer', ignore_index=True) - dfs_base_combined = dfs_base_combined.groupby(['time', self.UPGRADE_NAME], observed=True, as_index=False)[dfs_base_combined.loc[:, dfs_base_combined.columns.str.contains('_kwh')].columns].sum() - - # concatinate and combine upgrade data - dfs_upgrade_combined = pd.concat(dfs_up, join='outer', ignore_index=True) - dfs_upgrade_combined = dfs_upgrade_combined.groupby(['time', self.UPGRADE_NAME], observed=True, as_index=False)[dfs_upgrade_combined.loc[:, dfs_upgrade_combined.columns.str.contains('_kwh')].columns].sum() - - return dfs_base_combined, dfs_upgrade_combined - # plot order_list = [ 'interior_equipment', @@ -3110,26 +3033,19 @@ def map_to_season(month): else: return 'Winter' - def plot_measure_timeseries_peak_week_by_state(self, df, output_dir, states, color_map, comstock_run_name): #, df, region, building_type, color_map, output_dir - - # run crawler - run_data = BuildStockQuery('eulp', - 'enduse', - self.comstock_run_name, - buildstock_type='comstock', - skip_reports=False) + def plot_measure_timeseries_peak_week_by_state(self, df, output_dir, timeseries_locations_to_plot, color_map, comstock_run_name, comstock_obj=None): #, df, region, building_type, color_map, output_dir # get upgrade ID - df_upgrade = df.loc[df[self.UPGRADE_ID]!=0, :] + df_data = df.copy(deep=True) + # coerce data type of upgrade ID - arrives as float, need str of int for querying + df_data[self.UPGRADE_ID] = pd.to_numeric(df_data[self.UPGRADE_ID], errors="coerce").astype("Int64").astype(str) + df_upgrade = df_data.loc[(df_data[self.UPGRADE_ID]!="0"), :] upgrade_num = list(df_upgrade[self.UPGRADE_ID].unique()) upgrade_name = list(df_upgrade[self.UPGRADE_NAME].unique()) - # get weights - dict_wgts = df_upgrade.groupby(self.BLDG_TYPE, observed=True)[self.BLDG_WEIGHT].mean().to_dict() - # apply queries and weighting - for state, state_name in states.items(): - dfs_base_combined, dfs_upgrade_combined = self.wgt_by_btype(df, run_data, dict_wgts, upgrade_num, state, upgrade_name) + for location, location_name in timeseries_locations_to_plot.items(): + dfs_base_combined, dfs_upgrade_combined = comstock_obj.get_weighted_load_profiles_from_s3(df_data, upgrade_num, location, upgrade_name) # merge into single dataframe dfs_merged = pd.concat([dfs_base_combined, dfs_upgrade_combined], ignore_index=True) @@ -3158,13 +3074,13 @@ def map_to_season(month): # make dec 31st last week of year dfs_merged.loc[dfs_merged['Day_of_Year']==365, 'Week_of_Year'] = 55 dfs_merged = dfs_merged.loc[dfs_merged['Year']==2018, :] - max_peak = dfs_merged.loc[:, 'total_site_electricity_kwh_weighted'].max() + max_peak = dfs_merged.loc[:, 'total_site_electricity_kwh'].max() # find peak week by season seasons = ['Spring', 'Summer', 'Fall', 'Winter'] for season in seasons: - peak_week = dfs_merged.loc[dfs_merged['Season']==season, ["total_site_electricity_kwh_weighted", "Week_of_Year"]] - peak_week = peak_week.loc[peak_week["total_site_electricity_kwh_weighted"] == peak_week["total_site_electricity_kwh_weighted"].max(), "Week_of_Year"][0] + peak_week = dfs_merged.loc[dfs_merged['Season']==season, ["total_site_electricity_kwh", "Week_of_Year"]] + peak_week = peak_week.loc[peak_week["total_site_electricity_kwh"] == peak_week["total_site_electricity_kwh"].max(), "Week_of_Year"].iloc[0] # filter to the week @@ -3175,7 +3091,7 @@ def map_to_season(month): # rename columns dfs_merged_pw.columns = dfs_merged_pw.columns.str.replace("electricity_", "") - dfs_merged_pw.columns = dfs_merged_pw.columns.str.replace("_kwh_weighted", "") + dfs_merged_pw.columns = dfs_merged_pw.columns.str.replace("_kwh", "") # convert hourly kWH to 15 minute MW dfs_merged_pw.loc[:, self.order_list] = dfs_merged_pw.loc[:, self.order_list]/1000 @@ -3185,8 +3101,9 @@ def map_to_season(month): # Create traces for area plot traces = [] # add aggregate measure load - dfs_merged_pw_up = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name'] != "baseline"] + dfs_merged_pw_up = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name'] != "Baseline"] dfs_merged_pw_up.columns = dfs_merged_pw_up.columns.str.replace("total_site", "Measure Total") + # if only 1 upgrade, plot end uses and total if len(upgrade_num) == 1: # loop through end uses @@ -3215,19 +3132,19 @@ def map_to_season(month): else: # if more than 1 upgrade, add only aggregate loads for upgrade in upgrade_num: - dfs_merged_pw_up_mult = dfs_merged_pw_up.loc[dfs_merged_pw_up['in.upgrade_name'] == self.dict_upid_to_upname[upgrade]] + dfs_merged_pw_up_mult = dfs_merged_pw_up.loc[dfs_merged_pw_up['in.upgrade_name'] == self.dict_upid_to_upname[int(upgrade)]] upgrade_trace = go.Scatter( x=dfs_merged_pw_up_mult['time'], y=dfs_merged_pw_up_mult['Measure Total'], mode='lines', - line=dict(width=1.8, dash='solid'), #color=color_map[self.dict_upid_to_upname[upgrade]] - name=self.dict_upid_to_upname[upgrade], + line=dict(width=1.8, dash='solid'), #color=color_map[self.dict_upid_to_upname[int(upgrade)]] + name=self.dict_upid_to_upname[int(upgrade)], ) traces.append(upgrade_trace) # add baseline load - dfs_merged_pw_base = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name']=="baseline"] + dfs_merged_pw_base = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name']=="Baseline"] dfs_merged_pw_base.columns = dfs_merged_pw_base.columns.str.replace("total_site", "Baseline Total") # Create a trace for the baseline load @@ -3257,7 +3174,7 @@ def map_to_season(month): y=-0.35, # Adjust this value as needed to place the title correctly xref='paper', yref='paper', - text=f"{season} Peak Week, Applicable Buildings - {state_name}", + text=f"{season} Peak Week, Applicable Buildings - {location_name}", showarrow=False, font=dict( size=16 @@ -3271,34 +3188,27 @@ def map_to_season(month): title = f"{season}_peak_week" fig_name = f'{title.replace(" ", "_").lower()}.{self.image_type}' fig_name_html = f'{title.replace(" ", "_").lower()}.html' - fig_sub_dir = os.path.abspath(os.path.join(output_dir, f"timeseries/{state_name}")) + fig_sub_dir = os.path.abspath(os.path.join(output_dir, f"timeseries/{location_name}")) if not os.path.exists(fig_sub_dir): os.makedirs(fig_sub_dir) fig_path = os.path.abspath(os.path.join(fig_sub_dir, fig_name)) fig_path_html = os.path.abspath(os.path.join(fig_sub_dir, fig_name_html)) - fig.write_image(fig_path, scale=10) + fig.write_image(fig_path, scale=6) fig.write_html(fig_path_html) + # save timeseries data + dfs_merged.to_csv(f"{fig_sub_dir}/timeseries_data_{location_name}.csv") - dfs_merged.to_csv(f"{fig_sub_dir}/timeseries_data_{state_name}.csv") - - def plot_measure_timeseries_season_average_by_state(self, df, output_dir, states, color_map, comstock_run_name): - - # run crawler - run_data = BuildStockQuery('eulp', - 'enduse', - self.comstock_run_name, - buildstock_type='comstock', - skip_reports=False) + def plot_measure_timeseries_season_average_by_state(self, df, output_dir, timeseries_locations_to_plot, color_map, comstock_run_name, comstock_obj=None): # get upgrade ID - df_upgrade = df.loc[df[self.UPGRADE_ID]!=0, :] + df_data = df.copy() + # coerce data type of upgrade ID - arrives as float, need str of int for querying + df_data[self.UPGRADE_ID] = pd.to_numeric(df_data[self.UPGRADE_ID], errors="coerce").astype("Int64").astype(str) + df_upgrade = df_data.loc[(df_data[self.UPGRADE_ID]!="0") & (df_data[self.UPGRADE_ID]!=0), :] upgrade_num = list(df_upgrade[self.UPGRADE_ID].unique()) upgrade_name = list(df_upgrade[self.UPGRADE_NAME].unique()) - # get weights - dict_wgts = df_upgrade.groupby(self.BLDG_TYPE, observed=True)[self.BLDG_WEIGHT].mean().to_dict() - standard_colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'] upgrade_colors = {upgrade: standard_colors[i % len(standard_colors)] for i, upgrade in enumerate(upgrade_num)} @@ -3317,15 +3227,15 @@ def map_to_dow(dow): return 'Weekend' # apply queries and weighting - for state, state_name in states.items(): + for location, location_name in timeseries_locations_to_plot.items(): # check to see if timeseries file exists. # if it does, reload. Else, query data. - fig_sub_dir = os.path.abspath(os.path.join(output_dir, f"timeseries/{state_name}")) - file_path = os.path.join(fig_sub_dir, f"timeseries_data_{state_name}.csv") + fig_sub_dir = os.path.abspath(os.path.join(output_dir, f"timeseries/{location_name}")) + file_path = os.path.join(fig_sub_dir, f"timeseries_data_{location_name}.csv") dfs_merged=None if not os.path.exists(file_path): - dfs_base_combined, dfs_upgrade_combined = self.wgt_by_btype(df, run_data, dict_wgts, upgrade_num, state, upgrade_name) + dfs_base_combined, dfs_upgrade_combined = comstock_obj.get_weighted_load_profiles_from_s3(df_data, upgrade_num, location, upgrade_name) # merge into single dataframe dfs_merged = pd.concat([dfs_base_combined, dfs_upgrade_combined], ignore_index=True) @@ -3353,7 +3263,7 @@ def map_to_dow(dow): dfs_merged['Day_Type'] = dfs_merged['Day_of_Week'].apply(map_to_dow) dfs_merged_gb = dfs_merged.groupby(['in.upgrade_name', 'Season', 'Day_Type', 'Hour_of_Day'], observed=True)[dfs_merged.loc[:, dfs_merged.columns.str.contains('_kwh')].columns].mean().reset_index() - max_peak = dfs_merged_gb.loc[:, 'total_site_electricity_kwh_weighted'].max() + max_peak = dfs_merged_gb.loc[:, 'total_site_electricity_kwh'].max() # find peak week by season seasons = ['Summer', 'Shoulder', 'Winter'] @@ -3379,15 +3289,15 @@ def map_to_dow(dow): # rename columns dfs_merged_pw.columns = dfs_merged_pw.columns.str.replace("electricity_", "") - dfs_merged_pw.columns = dfs_merged_pw.columns.str.replace("_kwh_weighted", "") + dfs_merged_pw.columns = dfs_merged_pw.columns.str.replace("_kwh", "") # convert hourly kWH to 15 minute MW dfs_merged_pw.loc[:, self.order_list] = dfs_merged_pw.loc[:, self.order_list]/1000 dfs_merged_pw.loc[:, "total_site"] = dfs_merged_pw.loc[:, "total_site"]/1000 dfs_merged_pw = dfs_merged_pw.sort_values('Hour_of_Day') - dfs_merged_pw_up = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name'] != 'baseline', :] + dfs_merged_pw_up = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name'] != 'Baseline', :] dfs_merged_pw_up.columns = dfs_merged_pw_up.columns.str.replace("total_site", "Measure Total") - dfs_merged_pw_up = dfs_merged_pw_up.loc[dfs_merged_pw['in.upgrade_name'] != 'baseline', :] + dfs_merged_pw_up = dfs_merged_pw_up.loc[dfs_merged_pw['in.upgrade_name'] != 'Baseline', :] if len(upgrade_num) == 1: # Create traces for area plot @@ -3430,21 +3340,21 @@ def map_to_dow(dow): showlegend = upgrade not in legend_entries legend_entries.add(upgrade) - dfs_merged_pw_up_mult = dfs_merged_pw_up.loc[dfs_merged_pw_up['in.upgrade_name'] == self.dict_upid_to_upname[upgrade], :] + dfs_merged_pw_up_mult = dfs_merged_pw_up.loc[dfs_merged_pw_up['in.upgrade_name'] == self.dict_upid_to_upname[int(upgrade)], :] upgrade_trace = go.Scatter( x=dfs_merged_pw_up_mult['Hour_of_Day'], y=dfs_merged_pw_up_mult['Measure Total'], mode='lines', - line=dict(color=upgrade_colors[upgrade], width=1.8, dash='solid'), #color=color_map[self.dict_upid_to_upname[upgrade]] - name=self.dict_upid_to_upname[upgrade], - legendgroup=self.dict_upid_to_upname[upgrade], + line=dict(color=upgrade_colors[upgrade], width=1.8, dash='solid'), #color=color_map[self.dict_upid_to_upname[int(upgrade)]] + name=self.dict_upid_to_upname[int(upgrade)], + legendgroup=self.dict_upid_to_upname[int(upgrade)], showlegend=showlegend ) fig.add_trace(upgrade_trace, row=row, col=col) # add baseline load - dfs_merged_pw_base = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name'] == "baseline"] + dfs_merged_pw_base = dfs_merged_pw.loc[dfs_merged_pw['in.upgrade_name'] == "Baseline"] dfs_merged_pw_base.columns = dfs_merged_pw_base.columns.str.replace("total_site", "Baseline Total") showlegend = 'Baseline Total' not in legend_entries @@ -3467,7 +3377,7 @@ def map_to_dow(dow): # Update layout fig.update_layout( - title=f"Seasonal Average, Applicable Buildings - {state_name}", + title=f"Seasonal Average, Applicable Buildings - {location_name}", title_x=0.04, # Align title to the left title_y=0.97, # Move title to the bottom title_xanchor='left', @@ -3488,28 +3398,25 @@ def map_to_dow(dow): title = "seasonal_average_subplot" fig_name = f'{title.replace(" ", "_").lower()}.{self.image_type}' fig_name_html = f'{title.replace(" ", "_").lower()}.html' - fig_sub_dir = os.path.abspath(os.path.join(output_dir, f"timeseries/{state_name}")) + fig_sub_dir = os.path.abspath(os.path.join(output_dir, f"timeseries/{location_name}")) if not os.path.exists(fig_sub_dir): os.makedirs(fig_sub_dir) fig_path = os.path.abspath(os.path.join(fig_sub_dir, fig_name)) fig_path_html = os.path.abspath(os.path.join(fig_sub_dir, fig_name_html)) - fig.write_image(fig_path, scale=10) + fig.write_image(fig_path, scale=6) fig.write_html(fig_path_html) - def plot_measure_timeseries_annual_average_by_state_and_enduse(self, df, output_dir, states, color_map, comstock_run_name): - - # run crawler - run_data = BuildStockQuery('eulp', 'enduse', self.comstock_run_name, buildstock_type='comstock', skip_reports=False) + def plot_measure_timeseries_annual_average_by_state_and_enduse(self, df, output_dir, timeseries_locations_to_plot, color_map, comstock_run_name, comstock_obj=None): # get upgrade ID - df_upgrade = df.loc[df[self.UPGRADE_ID] != 0, :] + df_data = df.copy() + # coerce data type of upgrade ID - arrives as float, need str of int for querying + df_data[self.UPGRADE_ID] = pd.to_numeric(df_data[self.UPGRADE_ID], errors="coerce").astype("Int64").astype(str) + df_upgrade = df_data.loc[(df_data[self.UPGRADE_ID]!="0"), :] upgrade_num = list(df_upgrade[self.UPGRADE_ID].unique()) upgrade_name = list(df_upgrade[self.UPGRADE_NAME].unique()) - # get weights - dict_wgts = df_upgrade.groupby(self.BLDG_TYPE, observed=True)[self.BLDG_WEIGHT].mean().to_dict() - standard_colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'] upgrade_colors = {upgrade: standard_colors[i % len(standard_colors)] for i, upgrade in enumerate(upgrade_num)} @@ -3528,7 +3435,7 @@ def map_to_dow(dow): return 'Weekend' # apply queries and weighting - for state, state_name in states.items(): + for state, state_name in timeseries_locations_to_plot.items(): # check to see if timeseries data already exists # check to see if timeseries file exists. @@ -3538,7 +3445,10 @@ def map_to_dow(dow): dfs_merged=None if not os.path.exists(file_path): - dfs_base_combined, dfs_upgrade_combined = self.wgt_by_btype(df, run_data, dict_wgts, upgrade_num, state, upgrade_name) + + ######### This is where we get the weighted data + dfs_base_combined, dfs_upgrade_combined = comstock_obj.get_weighted_load_profiles_from_s3(df_data, upgrade_num, state, upgrade_name) + ######### # merge into single dataframe dfs_merged = pd.concat([dfs_base_combined, dfs_upgrade_combined], ignore_index=True) @@ -3565,11 +3475,10 @@ def map_to_dow(dow): dfs_merged['Season'] = dfs_merged['Month'].apply(map_to_season) dfs_merged_gb = dfs_merged.groupby(['in.upgrade_name', 'Season', 'Hour_of_Day'], observed=True)[dfs_merged.loc[:, dfs_merged.columns.str.contains('_kwh')].columns].mean().reset_index() - max_peak = dfs_merged_gb.loc[:, 'total_site_electricity_kwh_weighted'].max() # rename columns, convert units dfs_merged_gb.columns = dfs_merged_gb.columns.str.replace("electricity_", "") - dfs_merged_gb.columns = dfs_merged_gb.columns.str.replace("_kwh_weighted", "") + dfs_merged_gb.columns = dfs_merged_gb.columns.str.replace("_kwh", "") # find peak week by season seasons = ['Summer', 'Shoulder', 'Winter'] @@ -3617,7 +3526,7 @@ def map_to_dow(dow): showlegend = True # add upgrade - dfs_merged_gb_season_up = dfs_merged_gb_season.loc[dfs_merged_gb_season['in.upgrade_name'] != 'baseline', :] + dfs_merged_gb_season_up = dfs_merged_gb_season.loc[dfs_merged_gb_season['in.upgrade_name'] != 'Baseline', :] # if only 1 upgrade, plot end uses and total if len(upgrade_num) == 1: trace = go.Scatter( @@ -3632,21 +3541,21 @@ def map_to_dow(dow): else: # if more than 1 upgrade, add only aggregate loads for upgrade in upgrade_num: - dfs_merged_pw_up_mult = dfs_merged_gb_season_up.loc[dfs_merged_gb_season_up['in.upgrade_name'] == self.dict_upid_to_upname[upgrade]] + dfs_merged_pw_up_mult = dfs_merged_gb_season_up.loc[dfs_merged_gb_season_up['in.upgrade_name'] == self.dict_upid_to_upname[int(upgrade)]] upgrade_trace = go.Scatter( x=dfs_merged_pw_up_mult['Hour_of_Day'], y=dfs_merged_pw_up_mult[enduse]/1000, mode='lines', - line=dict(color=upgrade_colors[upgrade], width=1.8, dash='solid'), #color=color_map[self.dict_upid_to_upname[upgrade]] - name=self.dict_upid_to_upname[upgrade], - legendgroup=self.dict_upid_to_upname[upgrade], + line=dict(color=upgrade_colors[upgrade], width=1.8, dash='solid'), #color=color_map[self.dict_upid_to_upname[int(upgrade)]] + name=self.dict_upid_to_upname[int(upgrade)], + legendgroup=self.dict_upid_to_upname[int(upgrade)], showlegend=showlegend ) fig.add_trace(upgrade_trace, row=row, col=col) # add baseline load - dfs_merged_gb_season_base = dfs_merged_gb_season.loc[dfs_merged_gb_season['in.upgrade_name'] == "baseline"] + dfs_merged_gb_season_base = dfs_merged_gb_season.loc[dfs_merged_gb_season['in.upgrade_name'] == "Baseline"] baseline_trace = go.Scatter( x=dfs_merged_gb_season_base['Hour_of_Day'], y=dfs_merged_gb_season_base[enduse]/1000, @@ -3699,5 +3608,5 @@ def map_to_dow(dow): fig_path = os.path.abspath(os.path.join(fig_sub_dir, fig_name)) fig_path_html = os.path.abspath(os.path.join(fig_sub_dir, fig_name_html)) - fig.write_image(fig_path, scale=10) + fig.write_image(fig_path, scale=6) fig.write_html(fig_path_html) diff --git a/postprocessing/comstockpostproc/s3_utilities_mixin.py b/postprocessing/comstockpostproc/s3_utilities_mixin.py index cd7e3c6ea..7306aed76 100644 --- a/postprocessing/comstockpostproc/s3_utilities_mixin.py +++ b/postprocessing/comstockpostproc/s3_utilities_mixin.py @@ -219,7 +219,7 @@ def setup_fsspec_filesystem(self, output_dir, aws_profile_name): output_dir = os.path.abspath(os.path.join(current_dir, '..', 'output', self.dataset_name)) # PyAthena >2.18.0 implements an s3 filesystem that replaces s3fs but does not implement file.open() # Make fsspec use the s3fs s3 filesystem implementation for writing files to S3 - register_implementation("s3", s3fs.S3FileSystem, clobber=True) + #register_implementation("s3", s3fs.S3FileSystem, clobber=True) out_fs, out_fs_path = url_to_fs(output_dir, profile=aws_profile_name) output_dir = { 'fs': out_fs, diff --git a/postprocessing/setup.py b/postprocessing/setup.py index 3f3d4310f..aab6fd4b9 100644 --- a/postprocessing/setup.py +++ b/postprocessing/setup.py @@ -42,7 +42,7 @@ 'Programming Language :: Python :: 3.10', ], keywords='comstock postprocessing', - python_requires="==3.10.12", + python_requires="==3.12.12", install_requires=[ 'boto3', 'botocore', @@ -59,7 +59,7 @@ 'scipy', 'seaborn>=0.12.0', 'xlrd', - 'buildstock_query @ git+https://github.com/NREL/buildstock-query@8f65e034' + 'buildstock_query @ git+https://github.com/NREL/buildstock-query@0479759' ], extras_require={ 'dev': [