From e6aa3ed0ceec5239ff135cc980943285664e1802 Mon Sep 17 00:00:00 2001 From: sahiljhawar Date: Tue, 11 Nov 2025 11:08:13 +0100 Subject: [PATCH 1/4] refactor reprocess_files partially fixes #34 --- swvo/io/omni/omni_high_res.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/swvo/io/omni/omni_high_res.py b/swvo/io/omni/omni_high_res.py index 357a87a..5d1476b 100644 --- a/swvo/io/omni/omni_high_res.py +++ b/swvo/io/omni/omni_high_res.py @@ -94,16 +94,15 @@ def download_and_process( temporary_dir = Path("./temp_omni_high_res_wget") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time, cadence_min) + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time, cadence_min) - for file_path, time_interval in zip(file_paths, time_intervals): - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + for file_path, time_interval in zip(file_paths, time_intervals): + if file_path.exists() and not reprocess_files: + continue + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") + + try: data = self._get_data_from_omni( start=time_interval[0], end=time_interval[1], @@ -113,10 +112,17 @@ def download_and_process( logging.debug("Processing file ...") processed_df = self._process_single_year(data) - processed_df.to_csv(file_path, index=True, header=True) - - finally: - rmtree(temporary_dir, ignore_errors=True) + processed_df.to_csv(tmp_path, index=True, header=True) + tmp_path.replace(file_path) + + except Exception as e: + logging.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue + finally: + rmtree(temporary_dir, ignore_errors=True) def read( self, @@ -361,6 +367,7 @@ def _get_data_from_omni(self, start: datetime, end: datetime, cadence: int = 1) raise ValueError(msg) logging.debug(f"Fetching data from {self.URL} with payload: {payload}") response = requests.post(self.URL, data=payload) + response.raise_for_status() data = response.text.splitlines() if data and "Error" in data[0]: From 019536d46c693a092e19b49a28262d10964465f4 Mon Sep 17 00:00:00 2001 From: sahiljhawar Date: Wed, 12 Nov 2025 16:13:18 +0100 Subject: [PATCH 2/4] refactor reprocess_files in DSTWDC #34 --- swvo/io/dst/wdc.py | 52 ++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/swvo/io/dst/wdc.py b/swvo/io/dst/wdc.py index a07bb0b..584c36e 100644 --- a/swvo/io/dst/wdc.py +++ b/swvo/io/dst/wdc.py @@ -17,8 +17,11 @@ import numpy as np import pandas as pd +import requests import wget +logger = logging.getLogger(__name__) + logging.captureWarnings(True) @@ -56,7 +59,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"WDC Dst data directory: {self.data_dir}") + logger.info(f"WDC Dst data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process WDC Dst data files. @@ -80,35 +83,48 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce temporary_dir = Path("./temp_wdc") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) - - for file_path, time_interval in zip(file_paths, time_intervals): - filename = "index.html" + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) - URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m")) + for file_path, time_interval in zip(file_paths, time_intervals): + filename = "index.html" + if file_path.exists() and not reprocess_files: + continue - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") - logging.debug(f"Downloading file {URL + filename} ...") + URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m")) - wget.download(URL + filename, str(temporary_dir)) + if file_path.exists(): + if reprocess_files: + file_path.unlink() + else: + continue - logging.debug("Processing file ...") + try: + logger.debug(f"Downloading file {URL} ...") + response = requests.get(URL) + response.raise_for_status() + data = response.text.splitlines() + with open(temporary_dir / filename, "w") as file: + file.write("\n".join(data)) + logger.debug("Processing file ...") processed_df = self._process_single_file( temporary_dir / filename, year=time_interval.year, month=time_interval.month, ) - processed_df.to_csv(file_path, index=True, header=True) + processed_df.to_csv(tmp_path, index=True, header=True) + tmp_path.replace(file_path) + except Exception as e: + logger.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue - finally: - rmtree(temporary_dir, ignore_errors=True) + finally: + rmtree(temporary_dir, ignore_errors=True) def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]: """Get list of file paths and their corresponding time intervals. From a7a82c83ee998978b0d0dbb04303636a46cdbaf3 Mon Sep 17 00:00:00 2001 From: sahiljhawar Date: Wed, 12 Nov 2025 16:48:19 +0100 Subject: [PATCH 3/4] refactor reprocess_files in Niemegk --- swvo/io/dst/wdc.py | 1 - swvo/io/kp/niemegk.py | 60 ++++++++++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/swvo/io/dst/wdc.py b/swvo/io/dst/wdc.py index 584c36e..196b540 100644 --- a/swvo/io/dst/wdc.py +++ b/swvo/io/dst/wdc.py @@ -18,7 +18,6 @@ import numpy as np import pandas as pd import requests -import wget logger = logging.getLogger(__name__) diff --git a/swvo/io/kp/niemegk.py b/swvo/io/kp/niemegk.py index bc11425..276e0a0 100755 --- a/swvo/io/kp/niemegk.py +++ b/swvo/io/kp/niemegk.py @@ -16,7 +16,9 @@ import numpy as np import pandas as pd -import wget +import requests + +logger = logging.getLogger(__name__) logging.captureWarnings(True) @@ -58,7 +60,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"Kp Niemegk data directory: {self.data_dir}") + logger.info(f"Kp Niemegk data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process Niemegk Kp data file. @@ -78,32 +80,28 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce Raise `FileNotFoundError` if the file is not downloaded successfully. """ if start_time < datetime.now(timezone.utc) - timedelta(days=30): - logging.info("We can only download and process a Kp Niemegk file from the last 30 days!") + logger.info("We can only download and process a Kp Niemegk file from the last 30 days!") return temporary_dir = Path("./temp_kp_niemegk_wget") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - logging.debug(f"Downloading file {self.URL + self.NAME} ...") - - wget.download(self.URL + self.NAME, str(temporary_dir)) - - # check if download was successfull - if os.stat(str(temporary_dir / self.NAME)).st_size == 0: - raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!") + logger.debug(f"Downloading file {self.URL + self.NAME} ...") - logging.debug("Processing file ...") - processed_df = self._process_single_file(temporary_dir) + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) + for file_path, time_interval in zip(file_paths, time_intervals): + if file_path.exists() and not reprocess_files: + continue + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") + try: + self._download(temporary_dir) - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) + # check if download was successfull + if os.stat(str(temporary_dir / self.NAME)).st_size == 0: + raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!") - for file_path, time_interval in zip(file_paths, time_intervals): - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + logger.debug("Processing file ...") + processed_df = self._process_single_file(temporary_dir) data_single_file = processed_df[ (processed_df.index >= time_interval[0]) & (processed_df.index <= time_interval[1]) @@ -112,12 +110,26 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce if len(data_single_file.index) == 0: continue - data_single_file.to_csv(file_path, index=True, header=False) + data_single_file.to_csv(tmp_path, index=True, header=False) + tmp_path.replace(file_path) + + logger.debug(f"Saving processed file {file_path}") + except Exception as e: + logger.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue + + finally: + rmtree(temporary_dir) - logging.debug(f"Saving processed file {file_path}") + def _download(self, temporary_dir): + response = requests.get(self.URL + self.NAME, str(temporary_dir)) + response.raise_for_status() - finally: - rmtree(temporary_dir) + with open(temporary_dir / self.NAME, "w") as f: + f.write(response.text) def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame: """Read Niemegk Kp data for the specified time range. From d2c35615994ca758f85d050dd897118e74f7e735 Mon Sep 17 00:00:00 2001 From: sahiljhawar Date: Wed, 12 Nov 2025 17:33:34 +0100 Subject: [PATCH 4/4] refactor reprocess_files in OMNILowRes --- swvo/io/dst/wdc.py | 12 ++++----- swvo/io/kp/niemegk.py | 16 +++++------ swvo/io/omni/omni_low_res.py | 51 +++++++++++++++++++++++------------- 3 files changed, 47 insertions(+), 32 deletions(-) diff --git a/swvo/io/dst/wdc.py b/swvo/io/dst/wdc.py index 196b540..98bdc9a 100644 --- a/swvo/io/dst/wdc.py +++ b/swvo/io/dst/wdc.py @@ -19,10 +19,10 @@ import pandas as pd import requests -logger = logging.getLogger(__name__) - logging.captureWarnings(True) +_logger = logging.getLogger(__name__) + class DSTWDC: """This is a class for the WDC Dst data. @@ -58,7 +58,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logger.info(f"WDC Dst data directory: {self.data_dir}") + _logger.info(f"WDC Dst data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process WDC Dst data files. @@ -100,13 +100,13 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce continue try: - logger.debug(f"Downloading file {URL} ...") + _logger.debug(f"Downloading file {URL} ...") response = requests.get(URL) response.raise_for_status() data = response.text.splitlines() with open(temporary_dir / filename, "w") as file: file.write("\n".join(data)) - logger.debug("Processing file ...") + _logger.debug("Processing file ...") processed_df = self._process_single_file( temporary_dir / filename, @@ -116,7 +116,7 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce processed_df.to_csv(tmp_path, index=True, header=True) tmp_path.replace(file_path) except Exception as e: - logger.error(f"Failed to process {file_path}: {e}") + _logger.error(f"Failed to process {file_path}: {e}") if tmp_path.exists(): tmp_path.unlink() pass diff --git a/swvo/io/kp/niemegk.py b/swvo/io/kp/niemegk.py index 276e0a0..e3c3aee 100755 --- a/swvo/io/kp/niemegk.py +++ b/swvo/io/kp/niemegk.py @@ -18,10 +18,10 @@ import pandas as pd import requests -logger = logging.getLogger(__name__) - logging.captureWarnings(True) +_logger = logging.getLogger(__name__) + class KpNiemegk: """A class to handle Niemegk Kp data. @@ -60,7 +60,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logger.info(f"Kp Niemegk data directory: {self.data_dir}") + _logger.info(f"Kp Niemegk data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process Niemegk Kp data file. @@ -80,13 +80,13 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce Raise `FileNotFoundError` if the file is not downloaded successfully. """ if start_time < datetime.now(timezone.utc) - timedelta(days=30): - logger.info("We can only download and process a Kp Niemegk file from the last 30 days!") + _logger.info("We can only download and process a Kp Niemegk file from the last 30 days!") return temporary_dir = Path("./temp_kp_niemegk_wget") temporary_dir.mkdir(exist_ok=True, parents=True) - logger.debug(f"Downloading file {self.URL + self.NAME} ...") + _logger.debug(f"Downloading file {self.URL + self.NAME} ...") file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) for file_path, time_interval in zip(file_paths, time_intervals): @@ -100,7 +100,7 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce if os.stat(str(temporary_dir / self.NAME)).st_size == 0: raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!") - logger.debug("Processing file ...") + _logger.debug("Processing file ...") processed_df = self._process_single_file(temporary_dir) data_single_file = processed_df[ @@ -113,9 +113,9 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce data_single_file.to_csv(tmp_path, index=True, header=False) tmp_path.replace(file_path) - logger.debug(f"Saving processed file {file_path}") + _logger.debug(f"Saving processed file {file_path}") except Exception as e: - logger.error(f"Failed to process {file_path}: {e}") + _logger.error(f"Failed to process {file_path}: {e}") if tmp_path.exists(): tmp_path.unlink() pass diff --git a/swvo/io/omni/omni_low_res.py b/swvo/io/omni/omni_low_res.py index 9ac4b3f..ab2bec1 100755 --- a/swvo/io/omni/omni_low_res.py +++ b/swvo/io/omni/omni_low_res.py @@ -16,10 +16,12 @@ import numpy as np import pandas as pd -import wget +import requests logging.captureWarnings(True) +_logger = logging.getLogger(__name__) + class OMNILowRes: """This is a class for the OMNI Low Resolution data. @@ -113,7 +115,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"OMNI Low Res data directory: {self.data_dir}") + _logger.info(f"OMNI Low Res data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process OMNI Low Resolution data files. @@ -135,29 +137,42 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce temporary_dir = Path("./temp_omni_low_res_wget") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) - for file_path, time_interval in zip(file_paths, time_intervals): - filename = "omni2_" + str(time_interval[0].year) + ".dat" + for file_path, time_interval in zip(file_paths, time_intervals): + if file_path.exists() and not reprocess_files: + continue + + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + try: + filename = "omni2_" + str(time_interval[0].year) + ".dat" - logging.debug(f"Downloading file {self.URL + filename} ...") + _logger.debug(f"Downloading file {self.URL + filename} ...") - wget.download(self.URL + filename, str(temporary_dir)) + self._download(temporary_dir, filename) - logging.debug("Processing file ...") + _logger.debug("Processing file ...") processed_df = self._process_single_file(temporary_dir / filename) - processed_df.to_csv(file_path, index=True, header=True) + processed_df.to_csv(tmp_path, index=True, header=True) + tmp_path.replace(file_path) + + except Exception as e: + _logger.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue + finally: + rmtree(temporary_dir, ignore_errors=True) + + def _download(self, temporary_dir: Path, filename: str): + repsonse = requests.get(self.URL + filename) + repsonse.raise_for_status() - finally: - rmtree(temporary_dir, ignore_errors=True) + with open(temporary_dir / filename, "wb") as f: + f.write(repsonse.content) def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]: """Get list of file paths and their corresponding time intervals. @@ -264,7 +279,7 @@ def read(self, start_time: datetime, end_time: datetime, download: bool = False) end_time = end_time.replace(tzinfo=timezone.utc) if start_time < datetime(START_YEAR, 1, 1).replace(tzinfo=timezone.utc): - logging.warning( + _logger.warning( "Start date chosen falls behind the existing data. Moving start date to first" " available mission files..." )