Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pygeoapi/api/environmental_data_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,14 @@ def get_collection_edr_query(api: API, request: APIRequest,
HTTPStatus.INTERNAL_SERVER_ERROR, headers, request.format,
'NoApplicableCode', msg)

headers['Content-Type'] = formatter.mimetype

if formatter.attachment:
filename = f'{dataset}.{formatter.extension}'
if p.filename is None:
filename = f'{dataset}.{formatter.extension}'
else:
filename = f'{p.filename}'

cd = f'attachment; filename="{filename}"'
headers['Content-Disposition'] = cd

Expand Down
141 changes: 125 additions & 16 deletions pygeoapi/formatter/csv_.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io
import logging

from shapely.geometry import shape as geojson_to_geom

from pygeoapi.formatter.base import BaseFormatter, FormatterSerializationError

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,12 +62,30 @@ def write(self, options: dict = {}, data: dict = None) -> str:
Generate data in CSV format

:param options: CSV formatting options
:param data: dict of GeoJSON data
:param data: dict of data

:returns: string representation of format
"""
type = data.get('type') or ''
LOGGER.debug(f'Formatting CSV from data type: {type}')

if 'Feature' in type or 'features' in data:
return self._write_from_geojson(options, data)
elif 'Coverage' in type or 'coverages' in data:
return self._write_from_covjson(options, data)

def _write_from_geojson(
self, options: dict = {}, data: dict = None, is_point=False
) -> str:
"""
Generate GeoJSON data in CSV format

is_point = False
:param options: CSV formatting options
:param data: dict of GeoJSON data
:param is_point: whether the features are point geometries

:returns: string representation of format
"""
try:
fields = list(data['features'][0]['properties'].keys())
except IndexError:
Expand All @@ -75,32 +95,121 @@ def write(self, options: dict = {}, data: dict = None) -> str:
if self.geom:
LOGGER.debug('Including point geometry')
if data['features'][0]['geometry']['type'] == 'Point':
fields.insert(0, 'x')
fields.insert(1, 'y')
LOGGER.debug('point geometry detected, adding x,y columns')
fields.extend(['x', 'y'])
is_point = True
else:
# TODO: implement wkt geometry serialization
LOGGER.debug('not a point geometry, skipping')
LOGGER.debug('not a point geometry, adding wkt column')
fields.append('wkt')

LOGGER.debug(f'CSV fields: {fields}')
output = io.StringIO()
writer = csv.DictWriter(output, fields, extrasaction='ignore')
writer.writeheader()

try:
output = io.StringIO()
writer = csv.DictWriter(output, fields)
writer.writeheader()
for feature in data['features']:
self._add_feature(writer, feature, is_point)

return output.getvalue().encode('utf-8')

def _add_feature(
self, writer: csv.DictWriter, feature: dict, is_point: bool
) -> None:
"""
Add feature data to CSV writer

for feature in data['features']:
fp = feature['properties']
:param writer: CSV DictWriter
:param feature: dict of GeoJSON feature
:param is_point: whether the feature is a point geometry
"""
fp = feature['properties']
try:
if self.geom:
if is_point:
fp['x'] = feature['geometry']['coordinates'][0]
fp['y'] = feature['geometry']['coordinates'][1]
LOGGER.debug(fp)
writer.writerow(fp)
[fp['x'], fp['y']] = feature['geometry']['coordinates']
else:
geom = geojson_to_geom(feature['geometry'])
fp['wkt'] = geom.wkt

LOGGER.debug(f'Writing feature to row: {fp}')
writer.writerow(fp)
except ValueError as err:
LOGGER.error(err)
raise FormatterSerializationError('Error writing CSV output')

def _write_from_covjson(
self, options: dict = {}, data: dict = None
) -> str:
"""
Generate CovJSON data in CSV format

:param options: CSV formatting options
:param data: dict of CovJSON data

:returns: string representation of format
"""
LOGGER.debug('Processing CovJSON data for CSV output')
units = {}
for p, v in data['parameters'].items():
unit = v['unit']['symbol']
if isinstance(unit, dict):
unit = unit.get('value')

units[p] = unit

fields = ['parameter', 'datetime', 'value', 'unit', 'x', 'y']
LOGGER.debug(f'CSV fields: {fields}')
output = io.StringIO()
writer = csv.DictWriter(output, fields)
writer.writeheader()

if data['type'] == 'Coverage':
is_point = 'point' in data['domain']['domainType'].lower()
self._add_coverage(writer, units, data, is_point)
else:
[
self._add_coverage(writer, units, coverage, True)
for coverage in data['coverages']
if 'point' in coverage['domain']['domainType'].lower()
]
return output.getvalue().encode('utf-8')

@staticmethod
def _add_coverage(
writer: csv.DictWriter, units: dict, data: dict, is_point: bool = False
) -> None:
"""
Add coverage data to CSV writer

:param writer: CSV DictWriter
:param units: dict of parameter units
:param data: dict of CovJSON coverage data
:param is_point: whether the coverage is a point coverage
"""

if is_point is False:
LOGGER.warning('Non-point coverages not supported for CSV output')
return

axes = data['domain']['axes']
time_range = range(len(axes['t']['values']))

try:
[
writer.writerow({
'parameter': parameter,
'datetime': axes['t']['values'][time_value],
'value': data['ranges'][parameter]['values'][time_value],
'unit': units[parameter],
'x': axes['x']['values'][-1],
'y': axes['y']['values'][-1]
})
for parameter in data['ranges']
for time_value in time_range
]
except ValueError as err:
LOGGER.error(err)
raise FormatterSerializationError('Error writing CSV output')

def __repr__(self):
return f'<CSVFormatter> {self.name}'
172 changes: 168 additions & 4 deletions tests/formatter/test_csv__formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
#
# =================================================================

import csv
import io
from csv import DictReader
from io import StringIO
import json

import pytest

from pygeoapi.formatter.base import FormatterSerializationError
from pygeoapi.formatter.csv_ import CSVFormatter

from ..util import get_test_file_path


@pytest.fixture()
def fixture():
Expand All @@ -58,12 +63,47 @@ def fixture():
return data


@pytest.fixture
def data():
data_path = get_test_file_path('data/items.geojson')
with open(data_path, 'r', encoding='utf-8') as fh:
return json.load(fh)


@pytest.fixture(scope='function')
def csv_reader_geom_enabled(data):
"""csv_reader with geometry enabled"""
formatter = CSVFormatter({'geom': True})
output = formatter.write(data=data)
return DictReader(StringIO(output.decode('utf-8')))


@pytest.fixture
def invalid_geometry_data():
return {
'features': [
{
'id': 1,
'type': 'Feature',
'properties': {
'id': 1,
'title': 'Invalid Point Feature'
},
'geometry': {
'type': 'Point',
'coordinates': [-130.44472222222223]
}
}
]
}


def test_csv__formatter(fixture):
f = CSVFormatter({'geom': True})
f_csv = f.write(data=fixture)

buffer = io.StringIO(f_csv.decode('utf-8'))
reader = csv.DictReader(buffer)
buffer = StringIO(f_csv.decode('utf-8'))
reader = DictReader(buffer)

header = list(reader.fieldnames)

Expand All @@ -80,3 +120,127 @@ def test_csv__formatter(fixture):
assert data['id'] == '1972'
assert data['foo'] == 'bar'
assert data['title'] == ''


def test_write_with_geometry_enabled(csv_reader_geom_enabled):
"""Test CSV output with geometry enabled"""
rows = list(csv_reader_geom_enabled)

# Verify the header
header = list(csv_reader_geom_enabled.fieldnames)
assert len(header) == 4

# Verify number of rows
assert len(rows) == 9


def test_write_without_geometry(data):
formatter = CSVFormatter({'geom': False})
output = formatter.write(data=data)
csv_reader = DictReader(StringIO(output.decode('utf-8')))

"""Test CSV output with geometry disabled"""
rows = list(csv_reader)

# Verify headers don't include geometry
headers = csv_reader.fieldnames
assert 'geometry' not in headers

# Verify data
first_row = rows[0]
assert first_row['uri'] == \
'http://localhost:5000/collections/objects/items/1'
assert first_row['name'] == 'LineString'


def test_write_empty_features():
"""Test handling of empty feature collection"""
formatter = CSVFormatter({'geom': True})
data = {
'features': []
}
output = formatter.write(data=data)
assert output == ''


@pytest.mark.parametrize(
'row_index,expected_wkt',
[
(2, 'POINT (-85 33)'),
(3, 'MULTILINESTRING ((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))'), # noqa
(4, 'POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))'),
(5, 'POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))'), # noqa
(6, 'MULTIPOLYGON (((30 20, 45 40, 10 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5)))') # noqa
]
)
def test_wkt(csv_reader_geom_enabled, row_index, expected_wkt):
"""Test CSV output of multi-point geometry"""
rows = list(csv_reader_geom_enabled)

# Verify data
geometry_row = rows[row_index]
assert geometry_row['wkt'] == expected_wkt


def test_invalid_geometry_data(invalid_geometry_data):
formatter = CSVFormatter({'geom': True})
with pytest.raises(FormatterSerializationError):
formatter.write(data=invalid_geometry_data)


@pytest.fixture
def point_coverage_data():
return {
'type': 'Coverage',
'domain': {
'type': 'Domain',
'domainType': 'PointSeries',
'axes': {
'x': {'values': [-10.1]},
'y': {'values': [-40.2]},
't': {'values': [
'2013-01-01', '2013-01-02', '2013-01-03',
'2013-01-04', '2013-01-05', '2013-01-06']}
}
},
'parameters': {
'PSAL': {
'type': 'Parameter',
'description': {'en': 'The measured salinity'},
'unit': {'symbol': 'psu'},
'observedProperty': {
'id': 'http://vocab.nerc.ac.uk/standard_name/sea_water_salinity/', # noqa
'label': {'en': 'Sea Water Salinity'}
}
}
},
'ranges': {
'PSAL': {
'axisNames': ['t'],
'shape': [6],
'values': [
43.9599, 43.9599, 43.9640, 43.9640, 43.9679, 43.987
]
}
}
}


def test_point_coverage_csv(point_coverage_data):
"""Test CSV output of point coverage data"""
formatter = CSVFormatter({'geom': True})
output = formatter.write(data=point_coverage_data)
csv_reader = DictReader(StringIO(output.decode('utf-8')))
rows = list(csv_reader)

# Verify number of rows
assert len(rows) == 6

# Verify data
first_row = rows[0]
assert first_row['parameter'] == 'PSAL'
assert first_row['datetime'] == '2013-01-01'
assert first_row['value'] == '43.9599'
assert first_row['unit'] == 'psu'
assert first_row['x'] == '-10.1'
assert first_row['y'] == '-40.2'