diff --git a/swvo/io/dst/wdc.py b/swvo/io/dst/wdc.py index a07bb0b..98bdc9a 100644 --- a/swvo/io/dst/wdc.py +++ b/swvo/io/dst/wdc.py @@ -17,10 +17,12 @@ import numpy as np import pandas as pd -import wget +import requests logging.captureWarnings(True) +_logger = logging.getLogger(__name__) + class DSTWDC: """This is a class for the WDC Dst data. @@ -56,7 +58,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"WDC Dst data directory: {self.data_dir}") + _logger.info(f"WDC Dst data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process WDC Dst data files. @@ -80,35 +82,48 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce temporary_dir = Path("./temp_wdc") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) - - for file_path, time_interval in zip(file_paths, time_intervals): - filename = "index.html" + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) - URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m")) + for file_path, time_interval in zip(file_paths, time_intervals): + filename = "index.html" + if file_path.exists() and not reprocess_files: + continue - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") - logging.debug(f"Downloading file {URL + filename} ...") + URL = self.URL.replace("YYYYMM", time_interval.strftime("%Y%m")) - wget.download(URL + filename, str(temporary_dir)) + if file_path.exists(): + if reprocess_files: + file_path.unlink() + else: + continue - logging.debug("Processing file ...") + try: + _logger.debug(f"Downloading file {URL} ...") + response = requests.get(URL) + response.raise_for_status() + data = response.text.splitlines() + with open(temporary_dir / filename, "w") as file: + file.write("\n".join(data)) + _logger.debug("Processing file ...") processed_df = self._process_single_file( temporary_dir / filename, year=time_interval.year, month=time_interval.month, ) - processed_df.to_csv(file_path, index=True, header=True) + processed_df.to_csv(tmp_path, index=True, header=True) + tmp_path.replace(file_path) + except Exception as e: + _logger.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue - finally: - rmtree(temporary_dir, ignore_errors=True) + finally: + rmtree(temporary_dir, ignore_errors=True) def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]: """Get list of file paths and their corresponding time intervals. diff --git a/swvo/io/kp/niemegk.py b/swvo/io/kp/niemegk.py index bc11425..e3c3aee 100755 --- a/swvo/io/kp/niemegk.py +++ b/swvo/io/kp/niemegk.py @@ -16,10 +16,12 @@ import numpy as np import pandas as pd -import wget +import requests logging.captureWarnings(True) +_logger = logging.getLogger(__name__) + class KpNiemegk: """A class to handle Niemegk Kp data. @@ -58,7 +60,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"Kp Niemegk data directory: {self.data_dir}") + _logger.info(f"Kp Niemegk data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process Niemegk Kp data file. @@ -78,32 +80,28 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce Raise `FileNotFoundError` if the file is not downloaded successfully. """ if start_time < datetime.now(timezone.utc) - timedelta(days=30): - logging.info("We can only download and process a Kp Niemegk file from the last 30 days!") + _logger.info("We can only download and process a Kp Niemegk file from the last 30 days!") return temporary_dir = Path("./temp_kp_niemegk_wget") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - logging.debug(f"Downloading file {self.URL + self.NAME} ...") - - wget.download(self.URL + self.NAME, str(temporary_dir)) - - # check if download was successfull - if os.stat(str(temporary_dir / self.NAME)).st_size == 0: - raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!") + _logger.debug(f"Downloading file {self.URL + self.NAME} ...") - logging.debug("Processing file ...") - processed_df = self._process_single_file(temporary_dir) + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) + for file_path, time_interval in zip(file_paths, time_intervals): + if file_path.exists() and not reprocess_files: + continue + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") + try: + self._download(temporary_dir) - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) + # check if download was successfull + if os.stat(str(temporary_dir / self.NAME)).st_size == 0: + raise FileNotFoundError(f"Error while downloading file: {self.URL + self.NAME}!") - for file_path, time_interval in zip(file_paths, time_intervals): - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + _logger.debug("Processing file ...") + processed_df = self._process_single_file(temporary_dir) data_single_file = processed_df[ (processed_df.index >= time_interval[0]) & (processed_df.index <= time_interval[1]) @@ -112,12 +110,26 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce if len(data_single_file.index) == 0: continue - data_single_file.to_csv(file_path, index=True, header=False) + data_single_file.to_csv(tmp_path, index=True, header=False) + tmp_path.replace(file_path) + + _logger.debug(f"Saving processed file {file_path}") + except Exception as e: + _logger.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue + + finally: + rmtree(temporary_dir) - logging.debug(f"Saving processed file {file_path}") + def _download(self, temporary_dir): + response = requests.get(self.URL + self.NAME, str(temporary_dir)) + response.raise_for_status() - finally: - rmtree(temporary_dir) + with open(temporary_dir / self.NAME, "w") as f: + f.write(response.text) def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame: """Read Niemegk Kp data for the specified time range. diff --git a/swvo/io/omni/omni_low_res.py b/swvo/io/omni/omni_low_res.py index 9ac4b3f..ab2bec1 100755 --- a/swvo/io/omni/omni_low_res.py +++ b/swvo/io/omni/omni_low_res.py @@ -16,10 +16,12 @@ import numpy as np import pandas as pd -import wget +import requests logging.captureWarnings(True) +_logger = logging.getLogger(__name__) + class OMNILowRes: """This is a class for the OMNI Low Resolution data. @@ -113,7 +115,7 @@ def __init__(self, data_dir: Optional[Path] = None) -> None: self.data_dir: Path = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"OMNI Low Res data directory: {self.data_dir}") + _logger.info(f"OMNI Low Res data directory: {self.data_dir}") def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None: """Download and process OMNI Low Resolution data files. @@ -135,29 +137,42 @@ def download_and_process(self, start_time: datetime, end_time: datetime, reproce temporary_dir = Path("./temp_omni_low_res_wget") temporary_dir.mkdir(exist_ok=True, parents=True) - try: - file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) + file_paths, time_intervals = self._get_processed_file_list(start_time, end_time) - for file_path, time_interval in zip(file_paths, time_intervals): - filename = "omni2_" + str(time_interval[0].year) + ".dat" + for file_path, time_interval in zip(file_paths, time_intervals): + if file_path.exists() and not reprocess_files: + continue + + tmp_path = file_path.with_suffix(file_path.suffix + ".tmp") - if file_path.exists(): - if reprocess_files: - file_path.unlink() - else: - continue + try: + filename = "omni2_" + str(time_interval[0].year) + ".dat" - logging.debug(f"Downloading file {self.URL + filename} ...") + _logger.debug(f"Downloading file {self.URL + filename} ...") - wget.download(self.URL + filename, str(temporary_dir)) + self._download(temporary_dir, filename) - logging.debug("Processing file ...") + _logger.debug("Processing file ...") processed_df = self._process_single_file(temporary_dir / filename) - processed_df.to_csv(file_path, index=True, header=True) + processed_df.to_csv(tmp_path, index=True, header=True) + tmp_path.replace(file_path) + + except Exception as e: + _logger.error(f"Failed to process {file_path}: {e}") + if tmp_path.exists(): + tmp_path.unlink() + pass + continue + finally: + rmtree(temporary_dir, ignore_errors=True) + + def _download(self, temporary_dir: Path, filename: str): + repsonse = requests.get(self.URL + filename) + repsonse.raise_for_status() - finally: - rmtree(temporary_dir, ignore_errors=True) + with open(temporary_dir / filename, "wb") as f: + f.write(repsonse.content) def _get_processed_file_list(self, start_time: datetime, end_time: datetime) -> Tuple[List, List]: """Get list of file paths and their corresponding time intervals. @@ -264,7 +279,7 @@ def read(self, start_time: datetime, end_time: datetime, download: bool = False) end_time = end_time.replace(tzinfo=timezone.utc) if start_time < datetime(START_YEAR, 1, 1).replace(tzinfo=timezone.utc): - logging.warning( + _logger.warning( "Start date chosen falls behind the existing data. Moving start date to first" " available mission files..." )