diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 723c664..600eef8 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.13"] + python-version: ["3.9"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/.gitignore b/.gitignore index b7faf40..b140be7 100644 --- a/.gitignore +++ b/.gitignore @@ -205,3 +205,4 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ +.vscode/settings.json diff --git a/theta_capture.py b/theta_capture.py index c2f2305..0a42548 100755 --- a/theta_capture.py +++ b/theta_capture.py @@ -3,16 +3,10 @@ import time import datetime -import requests +import theta_wireless as tw +from theta_wireless import Options +import theta_init -# pylint: disable=duplicate-code - -# カメラIP -THETA_IP = "192.168.1.1" -EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute" -STATUS_URL = f"http://{THETA_IP}/osc/commands/status" - -HEADERS = {"Content-Type": "application/json;charset=utf-8"} # 撮影設定リスト (ISO, shutter_speed, ColorTemperature) settings_list = [ @@ -33,38 +27,28 @@ def set_options(iso, shutter_speed, white_balance): """オプションを設定""" - options_command = { - "name": "camera.setOptions", - "parameters": { - "options": { - "iso": iso, - "shutterSpeed": shutter_speed, - "whiteBalance": "_colorTemperature", - "colorTemperature": white_balance, - } - }, - } - resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS, timeout=10) - resp.raise_for_status() - return resp.json() - - -def take_picture(): - """Theta Web APIを用いて撮影""" - take_command = {"name": "camera.takePicture"} - resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS, timeout=10) - resp.raise_for_status() - return resp.json() + tw.set_options( + { + Options.ISO:iso, + Options.SHUTTER_SPEED: shutter_speed, + Options.WHITE_BALANCE: Options.COLOR_TEMPERATURE, + Options.COLOR_TEMPERATURE: white_balance + } + ) + resp = tw.get_options( + { + Options.ISO, + Options.SHUTTER_SPEED, + Options.COLOR_TEMPERATURE + } + ) + return resp def wait_for_completion(command_id): """撮影完了まで処理を停止""" while True: - status_resp = requests.post( - STATUS_URL, json={"id": command_id}, headers=HEADERS, timeout=10 - ) - status_resp.raise_for_status() - status = status_resp.json() + status = tw.check_status(command_id=command_id) if status.get("state") == "done": return status.get("results") time.sleep(0.5) @@ -77,7 +61,7 @@ def capture_12(): set_options(**s) print("設定完了:", s) - result = take_picture() + result = tw.take_picture() print("撮影コマンド送信:", result) if "id" in result: @@ -119,5 +103,11 @@ def schedule_shoots(): time.sleep(wait_sec / 60) -if __name__ == "__main__": +def main(): + """メイン関数""" + theta_init.init() schedule_shoots() + + +if __name__ == "__main__": + main() diff --git a/theta_init.py b/theta_init.py index 28fd9e6..f1b913f 100755 --- a/theta_init.py +++ b/theta_init.py @@ -1,24 +1,34 @@ -"""Script for controlling RICOH THETA camera.""" +"""Thetaカメラを初期化します""" -import requests +import theta_wireless +from theta_wireless import Options -# pylint: disable=duplicate-code +def init(): + """Thetaの初期化処理を行います。""" + try: + theta_wireless.set_options( + { + Options.CAPTURE_MODE:"image", + Options.EXPOSURE_PROGRAM:1, + Options.SLEEP_DELAY:65535, + Options.SHUTTER_VOLUME:0 + } + ) + response = theta_wireless.get_options( + { + Options.CAPTURE_MODE, + Options.EXPOSURE_PROGRAM, + Options.SLEEP_DELAY, + Options.SHUTTER_VOLUME + } + ) + print(response) -URL = "http://192.168.1.1/osc/commands/execute" -HEADERS = {"Content-Type": "application/json;charset=utf-8"} + except theta_wireless.CameraInternalError as e: + print(f"Caught OSC API error: {e.code} -> {e.message}") -payload = { - "name": "camera.setOptions", - "parameters": {"options": {"captureMode": "image"}}, -} + except theta_wireless.CameraTimeout as e: + print(f"Timeoutしました: {e}") -resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10) -print(resp.json()) - -payload = { - "name": "camera.setOptions", - "parameters": {"options": {"exposureProgram": 1}}, -} - -resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10) -print(resp.json()) +if __name__ == "__main__": + init() diff --git a/theta_status.py b/theta_status.py deleted file mode 100755 index 08a1060..0000000 --- a/theta_status.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Script for controlling RICOH THETA camera.""" - -import requests - -# pylint: disable=duplicate-code - -URL = "http://192.168.1.1/osc/commands/execute" -HEADERS = {"Content-Type": "application/json;charset=utf-8"} - -# 確認したいオプションを parameters に必ず入れる -payload = { - "name": "camera.getOptions", - "parameters": { - "optionNames": ["iso", "shutterSpeed", "aperture", "_colorTemperature"] - }, -} - -resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10) -print(resp.json()) diff --git a/theta_test_capture.py b/theta_test_capture.py index d264817..5a2a9bf 100755 --- a/theta_test_capture.py +++ b/theta_test_capture.py @@ -2,16 +2,11 @@ import time -import requests +import theta_wireless as tw +from theta_wireless import Options # pylint: disable=duplicate-code -# カメラIP -THETA_IP = "192.168.1.1" -EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute" -STATUS_URL = f"http://{THETA_IP}/osc/commands/status" - -HEADERS = {"Content-Type": "application/json;charset=utf-8"} # 撮影設定リスト (ISO, shutter_speed, f, ColorTemperature) settings_list = [ @@ -32,37 +27,28 @@ def set_options(iso, shutter_speed, white_balance): """オプションを設定""" - options_command = { - "name": "camera.setOptions", - "parameters": { - "options": { - "iso": iso, - "shutterSpeed": shutter_speed, - "_colorTemperature": white_balance, - } - }, - } - resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS, timeout=10) - resp.raise_for_status() - return resp.json() - - -def take_picture(): - """Take a picture using the Theta API""" - take_command = {"name": "camera.takePicture"} - resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS, timeout=10) - resp.raise_for_status() - return resp.json() + tw.set_options( + { + Options.ISO:iso, + Options.SHUTTER_SPEED: shutter_speed, + Options.WHITE_BALANCE: Options.COLOR_TEMPERATURE, + Options.COLOR_TEMPERATURE: white_balance + } + ) + resp = tw.get_options( + { + Options.ISO, + Options.SHUTTER_SPEED, + Options.COLOR_TEMPERATURE + } + ) + return resp def wait_for_completion(command_id): """Wait a few seconds to complete""" while True: - status_resp = requests.post( - STATUS_URL, json={"id": command_id}, headers=HEADERS, timeout=10 - ) - status_resp.raise_for_status() - status = status_resp.json() + status = tw.check_status(command_id=command_id) if status.get("state") == "done": return status.get("results") time.sleep(0.5) @@ -74,7 +60,7 @@ def wait_for_completion(command_id): set_options(**s) print("設定完了:", s) - result = take_picture() + result = tw.take_picture() print("撮影コマンド送信:", result) if "id" in result: diff --git a/theta_wireless.py b/theta_wireless.py new file mode 100644 index 0000000..d9f33be --- /dev/null +++ b/theta_wireless.py @@ -0,0 +1,174 @@ +"""Theta Web APIを利用した関数""" + +from enum import Enum +from typing import Optional +import requests + + +class _ThetaConstans(str, Enum): + THETA_IP = "192.168.1.1" + EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute" + STATUS_URL = f"http://{THETA_IP}/osc/commands/status" + HEADERS = {"Content-Type": "application/json;charset=utf-8"} + + +class _Name(str, Enum): + SET_OPTIONS = "camera.setOptions" + GET_OPTIONS = "camera.getOptions" + TAKE_PICTURE = "camera.takePicture" + + +class _Parameter(str, Enum): + OPTIONS = "options" + OPTION_NAMES = "optionNames" + +class Options(str, Enum): + """Thetaに指定できるOptionsのうち輝度画像合成に必要なもの""" + CAPTURE_MODE = "captureMode" + EXPOSURE_PROGRAM = "exposureProgram" + SLEEP_DELAY = "sleepDelay" + ISO = "iso" + SHUTTER_SPEED = "shutterSpeed" + WHITE_BALANCE = "whiteBalance" + COLOR_TEMPERATURE = "_colorTemperature" #Theta API only + SHUTTER_VOLUME = "_shutterVolume" #Theta API only + +class CameraError(Exception): + """全てのカメラ関連エラーの基底クラス""" + +class CameraTimeout(CameraError): + """カメラ操作のタイムアウトした場合の例外""" + +class CameraInternalError(CameraError): + """Open Spherical Camera APIがエラーを返した場合の例外""" + + def __init__(self, code:str, message:str, response_json:dict[str,any]=None): + """ + Args: + code: APIが出力するエラーコード(e.g., "invalidParameterValue") + message: エラーメッセージ + response_json: レスポンスの情報が入ったJSONデータ(Optional) + """ + self.code = code + self.message = message + self.response_json = response_json + super().__init__(f"OSC API Error [{code}]: {message}") + + +def _create_payload(name: str, parameters: Optional[dict[str, any]] = None) -> dict[str, any]: + if parameters is None: + return {"name":name} + return {"name": name, "parameters": parameters} + +def _send_request( + url:_ThetaConstans, + payload:dict[str, any], + timeout:float + ) -> dict[str, any]: + try: + response = requests.post( + url=url, + json=payload, + headers=_ThetaConstans.HEADERS, + timeout=timeout, + ) + response.raise_for_status() + data = response.json() + if "error" in data: + err = data["error"] + raise CameraInternalError(err.get("code"), err.get("message"), data) + return response.json() + except requests.exceptions.Timeout as e: + raise CameraTimeout("タイムアウトしました") from e + +def set_options(parameters:dict[str, any], timeout:float = 10.0) -> dict[str, any]: + """カメラにオプションを設定します。 + + Args: + parameters (dict[str, any]): Theta Web API v2.1に掲載のオプション名と設定値 + timeout (float, optional): POSTリクエストのタイムアウト時間. Defaults to 10.0. + + Raises: + CameraInternalError: Theta内部でエラーが発生 + CameraTimeout: リクエストのタイムアウト + + Returns: + dict[str, any]: Thetaからのレスポンス + """ + payload = _create_payload( + _Name.SET_OPTIONS, + {_Parameter.OPTIONS: parameters}, + ) + response_json = _send_request( + _ThetaConstans.EXECUTE_URL, + payload=payload, + timeout=timeout + ) + return response_json + +def get_options(option_names:set[str], timeout:float = 10.0) -> dict[str, any]: + """カメラの状態を確認します。 + + Args: + option_names (set[str]): Theta Web API v2.1に掲載の確認したいオプション名 + timeout (float, optional): POSTリクエストのタイムアウト時間. Defaults to 10.0. + + Raises: + CameraInternalError: Theta内部でエラーが発生 + CameraTimeout: リクエストのタイムアウト + + Returns: + dict[str, any]: Thetaからのレスポンス + """ + payload = _create_payload( + _Name.GET_OPTIONS, + {_Parameter.OPTION_NAMES:option_names} + ) + response_json = _send_request( + _ThetaConstans.EXECUTE_URL, + payload=payload, + timeout=timeout + ) + return response_json + +def take_picture(timeout:float=10.0) -> dict[str, any]: + """カメラで撮影します。 + + Args: + timeout (float, optional): POSTリクエストのタイムアウト時間. Defaults to 10.0. + + Raises: + CameraInternalError: Theta内部でエラーが発生 + CameraTimeout: リクエストのタイムアウト + + Returns: + dict[str, any]: Thetaからのレスポンス + """ + payload = _create_payload(_Name.TAKE_PICTURE) + response_json = _send_request( + _ThetaConstans.EXECUTE_URL, + payload=payload, + timeout=timeout + ) + return response_json + +def check_status(command_id:str, timeout:float=10.0) -> dict[str, any]: + """コマンドのステータスをチェックします。 + + Args: + command_id (str): コマンドのID + timeout (float, optional): POSTリクエストのタイムアウト時間. Defaults to 10.0. + + Raises: + CameraInternalError: Theta内部でエラーが発生 + CameraTimeout: リクエストのタイムアウト + + Returns: + dict[str, any]: Thetaからのレスポンス + """ + response_json = _send_request( + _ThetaConstans.STATUS_URL, + payload={"id":command_id}, + timeout=timeout + ) + return response_json