Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions swvo/io/dst/wdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

import numpy as np
import pandas as pd
import wget
import requests

logging.captureWarnings(True)

_logger = logging.getLogger(__name__)


class DSTWDC:
"""This is a class for the WDC Dst data.
Expand Down Expand Up @@ -56,7 +58,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None:
self.data_dir: Path = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)

logging.info(f"WDC Dst data directory: {self.data_dir}")
_logger.info(f"WDC Dst data directory: {self.data_dir}")

def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None:
"""Download and process WDC Dst data files.
Expand All @@ -80,35 +82,48 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce
temporary_dir = Path("./temp_wdc")
temporary_dir.mkdir(exist_ok=True, parents=True)

try:
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)

for file_path, time_interval in zip(file_paths, time_intervals):
filename = "index.html"
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)

URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m"))
for file_path, time_interval in zip(file_paths, time_intervals):
filename = "index.html"
if file_path.exists() and not reprocess_files:
continue

if file_path.exists():
if reprocess_files:
file_path.unlink()
else:
continue
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")

logging.debug(f"Downloading file {URL + filename} ...")
URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m"))

wget.download(URL + filename, str(temporary_dir))
if file_path.exists():
if reprocess_files:
file_path.unlink()
else:
continue

logging.debug("Processing file ...")
try:
_logger.debug(f"Downloading file {URL} ...")
response = requests.get(URL)
response.raise_for_status()
data = response.text.splitlines()
with open(temporary_dir / filename, "w") as file:
file.write("\n".join(data))
_logger.debug("Processing file ...")

processed_df = self._process_single_file(
temporary_dir / filename,
year=time_interval.year,
month=time_interval.month,
)
processed_df.to_csv(file_path, index=True, header=True)
processed_df.to_csv(tmp_path, index=True, header=True)
tmp_path.replace(file_path)
except Exception as e:
_logger.error(f"Failed to process {file_path}: {e}")
if tmp_path.exists():
tmp_path.unlink()
pass
continue

finally:
rmtree(temporary_dir, ignore_errors=True)
finally:
rmtree(temporary_dir, ignore_errors=True)

def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]:
"""Get list of file paths and their corresponding time intervals.
Expand Down
60 changes: 36 additions & 24 deletions swvo/io/kp/niemegk.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import numpy as np
import pandas as pd
import wget
import requests

logging.captureWarnings(True)

_logger = logging.getLogger(__name__)


class KpNiemegk:
"""A class to handle Niemegk Kp data.
Expand Down Expand Up @@ -58,7 +60,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None:
self.data_dir: Path = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)

logging.info(f"Kp Niemegk data directory: {self.data_dir}")
_logger.info(f"Kp Niemegk data directory: {self.data_dir}")

def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None:
"""Download and process Niemegk Kp data file.
Expand All @@ -78,32 +80,28 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce
Raise `FileNotFoundError` if the file is not downloaded successfully.
"""
if start_time < datetime.now(timezone.utc) - timedelta(days=30):
logging.info("We can only download and process a Kp Niemegk file from the last 30 days!")
_logger.info("We can only download and process a Kp Niemegk file from the last 30 days!")
return

temporary_dir = Path("./temp_kp_niemegk_wget")
temporary_dir.mkdir(exist_ok=True, parents=True)

try:
logging.debug(f"Downloading file {self.URL + self.NAME} ...")

wget.download(self.URL + self.NAME, str(temporary_dir))

# check if download was successfull
if os.stat(str(temporary_dir / self.NAME)).st_size == 0:
raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!")
_logger.debug(f"Downloading file {self.URL + self.NAME} ...")

logging.debug("Processing file ...")
processed_df = self._process_single_file(temporary_dir)
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
for file_path, time_interval in zip(file_paths, time_intervals):
if file_path.exists() and not reprocess_files:
continue
tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")
try:
self._download(temporary_dir)

file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
# check if download was successfull
if os.stat(str(temporary_dir / self.NAME)).st_size == 0:
raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!")

for file_path, time_interval in zip(file_paths, time_intervals):
if file_path.exists():
if reprocess_files:
file_path.unlink()
else:
continue
_logger.debug("Processing file ...")
processed_df = self._process_single_file(temporary_dir)

data_single_file = processed_df[
(processed_df.index >= time_interval[0]) & (processed_df.index <= time_interval[1])
Expand All @@ -112,12 +110,26 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce
if len(data_single_file.index) == 0:
continue

data_single_file.to_csv(file_path, index=True, header=False)
data_single_file.to_csv(tmp_path, index=True, header=False)
tmp_path.replace(file_path)

_logger.debug(f"Saving processed file {file_path}")
except Exception as e:
_logger.error(f"Failed to process {file_path}: {e}")
if tmp_path.exists():
tmp_path.unlink()
pass
continue

finally:
rmtree(temporary_dir)

logging.debug(f"Saving processed file {file_path}")
def _download(self, temporary_dir):
response = requests.get(self.URL + self.NAME, str(temporary_dir))
response.raise_for_status()

finally:
rmtree(temporary_dir)
with open(temporary_dir / self.NAME, "w") as f:
f.write(response.text)

def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame:
"""Read Niemegk Kp data for the specified time range.
Expand Down
51 changes: 33 additions & 18 deletions swvo/io/omni/omni_low_res.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import numpy as np
import pandas as pd
import wget
import requests

logging.captureWarnings(True)

_logger = logging.getLogger(__name__)


class OMNILowRes:
"""This is a class for the OMNI Low Resolution data.
Expand Down Expand Up @@ -113,7 +115,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None:
self.data_dir: Path = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)

logging.info(f"OMNI Low Res data directory: {self.data_dir}")
_logger.info(f"OMNI Low Res data directory: {self.data_dir}")

def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None:
"""Download and process OMNI Low Resolution data files.
Expand All @@ -135,29 +137,42 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce
temporary_dir = Path("./temp_omni_low_res_wget")
temporary_dir.mkdir(exist_ok=True, parents=True)

try:
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)
file_paths, time_intervals = self._get_processed_file_list(start_time, end_time)

for file_path, time_interval in zip(file_paths, time_intervals):
filename = "omni2_" + str(time_interval[0].year) + ".dat"
for file_path, time_interval in zip(file_paths, time_intervals):
if file_path.exists() and not reprocess_files:
continue

tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")

if file_path.exists():
if reprocess_files:
file_path.unlink()
else:
continue
try:
filename = "omni2_" + str(time_interval[0].year) + ".dat"

logging.debug(f"Downloading file {self.URL + filename} ...")
_logger.debug(f"Downloading file {self.URL + filename} ...")

wget.download(self.URL + filename, str(temporary_dir))
self._download(temporary_dir, filename)

logging.debug("Processing file ...")
_logger.debug("Processing file ...")

processed_df = self._process_single_file(temporary_dir / filename)
processed_df.to_csv(file_path, index=True, header=True)
processed_df.to_csv(tmp_path, index=True, header=True)
tmp_path.replace(file_path)

except Exception as e:
_logger.error(f"Failed to process {file_path}: {e}")
if tmp_path.exists():
tmp_path.unlink()
pass
continue
finally:
rmtree(temporary_dir, ignore_errors=True)

def _download(self, temporary_dir: Path, filename: str):
repsonse = requests.get(self.URL + filename)
repsonse.raise_for_status()

finally:
rmtree(temporary_dir, ignore_errors=True)
with open(temporary_dir / filename, "wb") as f:
f.write(repsonse.content)

def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]:
"""Get list of file paths and their corresponding time intervals.
Expand Down Expand Up @@ -264,7 +279,7 @@ def read(self, start_time: datetime, end_time: datetime, download: bool = False)
end_time = end_time.replace(tzinfo=timezone.utc)

if start_time < datetime(START_YEAR, 1, 1).replace(tzinfo=timezone.utc):
logging.warning(
_logger.warning(
"Start date chosen falls behind the existing data. Moving start date to first"
" available mission files..."
)
Expand Down
Loading