Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11,755 changes: 11,755 additions & 0 deletions doc/07-morpc-census-demo.html

Large diffs are not rendered by default.

6,357 changes: 4,276 additions & 2,081 deletions doc/07-morpc-census-demo.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions morpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .morpc import *
from .logs import *
from .geocode import *
import morpc.frictionless
import morpc.census
import morpc.plot
Expand Down
3 changes: 2 additions & 1 deletion morpc/census/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .census import *
from .geos import *
from .geos import *
from .api import *
66 changes: 39 additions & 27 deletions morpc/census/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,11 @@ def get(url, params, varBatchSize=20):
"""

import json # We need json to make a deep copy of the params dict
from morpc.req import get_json_safely
from morpc.req import get_json_safely, get_text_safely
from morpc.census.api import get_group_variables
import pandas as pd
import re
from io import StringIO

if len(re.findall(r'group\((.+)\)', params['get'])) == 0:
# We need to reserve one variable in each batch for GEO_ID. If the user requests more than 49 variables per
Expand Down Expand Up @@ -452,29 +453,26 @@ def get(url, params, varBatchSize=20):
if(requestCount == 1):
censusData = df.set_index("GEO_ID").copy()
else:
censusData = censusData.join(df.set_index("GEO_ID"))
censusData = censusData.join(df.set_index("GEO_ID")).reset_index()

requestCount += 1
else:
logger.info('Found group parameter. Ignoring variable limits.')

year = url.replace('https://api.census.gov/data/', '').replace('?','')[0:4]
survey_table = url.replace('https://api.census.gov/data/', '').replace('?','')[5:]
else:
group = re.findall(r'group\((.+)\)', params['get'])[0]

variables = get_group_variables(survey_table, year, group)
logger.info(f'Found group {group} parameter. Ignoring variable limits.')

vars = ['GEO_ID', 'NAME']
for x in variables:
vars.append(x)
params_string = "&".join([f"{k}={v}" for k, v in params.items()])

records = get_json_safely(url, params=params)
columns = records.pop(0)
text = get_text_safely(f"{url}{params_string}")

censusData = pd.DataFrame.from_records(records, columns=columns)
censusData = censusData.filter(items=vars, axis='columns')
try:
censusData = pd.read_csv(StringIO(text.replace("[",'').replace("]",'')), sep=",", quotechar='"')

return censusData.reset_index()
except Exception as e:
logger.error(f"Error creating Dataframe from records. {e}")
raise RuntimeError

return censusData

class CensusAPI:
_CensusAPI_logger = logging.getLogger(__name__).getChild(__qualname__)
Expand Down Expand Up @@ -536,16 +534,6 @@ def __init__(self, survey_table, year, group, scope, scale=None, variables=None)

self.validate()

logger.info(f"Building Request URL and Parameters.")
self.REQUEST = get_api_request(self.SURVEY, self.YEAR, self.GROUP, self.SCOPE, self.VARIABLES, self.SCALE)

try:
logger.info(f"Getting data from {self.REQUEST['url']} with parameters {self.REQUEST['params']}.")
self.DATA = get(self.REQUEST['url'], self.REQUEST['params'])
except Exception as e:
self.logger.error(f"Error retrieving data: {e}")
raise RuntimeError("Failed to retrieve data from Census API.")

self.VARS = get_group_variables(self.SURVEY, self.YEAR, self.GROUP)
if self.VARIABLES is not None:
temp = {}
Expand All @@ -556,6 +544,19 @@ def __init__(self, survey_table, year, group, scope, scale=None, variables=None)
temp[VAR] = self.VARS[VAR]
self.VARS = temp

logger.info(f"Building Request URL and Parameters.")
self.REQUEST = get_api_request(self.SURVEY, self.YEAR, self.GROUP, self.SCOPE, self.VARIABLES, self.SCALE)

try:
logger.info(f"Getting data from {self.REQUEST['url']} with parameters {self.REQUEST['params']}.")
self.DATA = get(self.REQUEST['url'], self.REQUEST['params'])
logger.debug(f"Request converted to DataFrame:")
logger.debug(f"\n\n{self.DATA.head(5).to_markdown()}")

except Exception as e:
self.logger.error(f"Error retrieving data: {e}")
raise RuntimeError("Failed to retrieve data from Census API.")

self.LONG = self.melt()

def melt(self):
Expand All @@ -574,13 +575,24 @@ def melt(self):

logger.info(f"Melting data into long format.")


long = self.DATA.melt(id_vars=['GEO_ID', 'NAME'], var_name='variable', value_name='value')
logger.debug(f"\n\n{long.head(5).to_markdown()}")

long = long.loc[~long['value'].isna()]
long['variable_type'] = [re.findall(r"[0-9]+([A-Z]+)", x)[0] for x in long['variable']]
long = long.loc[long['variable'].str.endswith(('E', 'M'))]
logger.debug(f"Removing unneeded variable types, variables remaining: {[x for x in long['variable'].unique()]}")

long['variable_type'] = [re.findall(r"[0-9]+([A-Z]{1,2})", x)[0] for x in long['variable']]

logger.debug(f"included variable types: {[x for x in long['variable_type']]}")

long = long.loc[~long['variable_type'].str.endswith('A')]

long['variable_type'] = [VARIABLE_TYPES[x] for x in long['variable_type']]

long['variable_label'] = [re.split("!!", self.VARS[variable]['label'],maxsplit=1)[1] for variable in long['variable']]

long['variable'] = [re.findall(r"([A-Z0-9_]+[0-9]+)[A-Z]+", x)[0] for x in long['variable']]

long['reference_period'] = self.YEAR
Expand Down
5 changes: 4 additions & 1 deletion morpc/census/geos.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
"region-corpo": {
"in": "state:39",
"for": f"county:{','.join([morpc.CONST_COUNTY_NAME_TO_ID[x][2:6] for x in morpc.CONST_REGIONS['CORPO Region']])}"
}
},
"columbus-msa": {
"for": f"metropolitan statistical area/micropolitan statistical area:{morpc.CONST_COLUMBUS_MSA_ID}"
}
}

for x in STATE_SCOPES:
Expand Down
12 changes: 6 additions & 6 deletions morpc/frictionless/frictionless.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ def name_to_desc_map(schema):
return {schema.fields[i].name:schema.fields[i].description for i in range(len(schema.fields))}


<<<<<<< HEAD
def cast_field_types(df, schema, forceInteger=False, forceInt64=False, nullBoolValue=False, handleMissingFields="error", verbose=False):
=======
def cast_field_types(df, schema, forceInteger=False, forceInt64=False, nullBoolValue=False, handleMissingFields="error", handleMissingValues=True, verbose=True):
>>>>>>> 9ca98ecf8915c3511b11ce73cad936925701444f
def cast_field_types(df, schema, forceInteger=False, forceInt64=False, nullBoolValue=False, handleMissingFields="error", handleMissingValues=True, verbose=False):
"""
Given a dataframe and the Frictionless Schema object (see load_schema), recast each of the fields in the
dataframe to the data type specified in the schema. s
Expand Down Expand Up @@ -189,6 +185,8 @@ def cast_field_types(df, schema, forceInteger=False, forceInt64=False, nullBoolV
outDF[fieldName] = outDF[fieldName].astype("float")
elif(fieldType == "date" or fieldType == "datetime"):
outDF[fieldName] = pd.to_datetime(outDF[fieldName])
elif(fieldType == "year"):
outDF[fieldName] = [pd.to_datetime(x, format='%Y').year for x in outDF[fieldName]]
elif(fieldType == "geojson"):
try:
logger.info(f"Fieldname {fieldName} as geojson. Attempting to convert to geometry.")
Expand Down Expand Up @@ -256,7 +254,9 @@ def cast_field_types(df, schema, forceInteger=False, forceInt64=False, nullBoolV
else:
logger.error("Field {} is a type that is not currently supported for casting to boolean. Convert it to boolean, numeric, or string types first.".format(fieldName))
raise RuntimeError

elif(fieldType == 'any'):
logger.info(f"Field {fieldName} as type 'any' in schema. This may be due to the schema being produced automatically frictionless.Schema.describe(). Converting to string. ")
outDF[fieldName] = outDF[fieldName].astype('string')
else:
outDF[fieldName] = outDF[fieldName].astype(fieldType)

Expand Down
58 changes: 58 additions & 0 deletions morpc/geocode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
logger = logging.getLogger(__name__)

def geocode(addresses: list, endpoint=None):
"""
Geocode a list of adresses.

Parameters:
-----------
addresses : list
A list of addresses to pass to geopy.

endpoint : str
Optional: str of the endpoint. Used for running nominatim in local docker container, then change to "localhost:8080".

Returns:
--------
pandas.DataFrame

"""

import pandas as pd, time
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from tqdm import tqdm

tqdm.pandas()

df = pd.DataFrame({'address': addresses}) # needs column 'address'

if endpoint == None:
delay = 1
logging.info(f"Fetching from default public nominatim instance.")
geolocator = Nominatim(user_agent="morpc-py", timeout=10)

# Wrap with RateLimiter: min 1 sec between calls as per Nominatim policy
geocode = RateLimiter(geolocator.geocode, min_delay_seconds=delay)

df["location"] = df["address"].progress_apply(geocode)
df["lat"] = df["location"].apply(lambda loc: loc.latitude if loc else None)
df["lon"] = df["location"].apply(lambda loc: loc.longitude if loc else None)
else:
delay = 0
geolocator = Nominatim(domain=endpoint, scheme='http', user_agent="local-nominatim")

geocode = geolocator.geocode

df["location"] = df["address"].progress_apply(geocode)
df["lat"] = df["location"].apply(lambda loc: loc.latitude if loc else None)
df["lon"] = df["location"].apply(lambda loc: loc.longitude if loc else None)





return df


Loading