diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 3ca7b11..723c664 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -18,6 +18,7 @@ jobs: run: | python -m pip install --upgrade pip pip install pylint + pip install -r requirements.txt - name: Analysing the code with pylint run: | pylint $(git ls-files '*.py') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..663bd1f --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +requests \ No newline at end of file diff --git a/theta_capture.py b/theta_capture.py index baef8a1..c2f2305 100755 --- a/theta_capture.py +++ b/theta_capture.py @@ -1,7 +1,12 @@ -import requests +"""Script for controlling RICOH THETA camera.""" + import time import datetime +import requests + +# pylint: disable=duplicate-code + # カメラIP THETA_IP = "192.168.1.1" EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute" @@ -9,53 +14,62 @@ HEADERS = {"Content-Type": "application/json;charset=utf-8"} -# 撮影設定リスト (ISO, ShutterSpeed, ColorTemperature) +# 撮影設定リスト (ISO, shutter_speed, ColorTemperature) settings_list = [ - {"iso": 100, "shutterSpeed": 0.00004, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.00008, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.0003125, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.000625, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.0025, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.005, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.02, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.04, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.16666666, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.33333333, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.625, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 3.2, "whiteBalance": 5200}, + {"iso": 100, "shutter_speed": 0.00004, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.00008, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.0003125, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.000625, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.0025, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.005, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.02, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.04, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.16666666, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.33333333, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.625, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 3.2, "white_balance": 5200}, ] -def set_options(iso, shutterSpeed, whiteBalance): + +def set_options(iso, shutter_speed, white_balance): + """オプションを設定""" options_command = { "name": "camera.setOptions", "parameters": { "options": { "iso": iso, - "shutterSpeed": shutterSpeed, + "shutterSpeed": shutter_speed, "whiteBalance": "_colorTemperature", - "colorTemperature": whiteBalance + "colorTemperature": white_balance, } - } + }, } - resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS) + resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS, timeout=10) resp.raise_for_status() return resp.json() + def take_picture(): + """Theta Web APIを用いて撮影""" take_command = {"name": "camera.takePicture"} - resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS) + resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS, timeout=10) resp.raise_for_status() return resp.json() + def wait_for_completion(command_id): + """撮影完了まで処理を停止""" while True: - status_resp = requests.post(STATUS_URL, json={"id": command_id}, headers=HEADERS) + status_resp = requests.post( + STATUS_URL, json={"id": command_id}, headers=HEADERS, timeout=10 + ) status_resp.raise_for_status() status = status_resp.json() if status.get("state") == "done": return status.get("results") time.sleep(0.5) + def capture_12(): """設定リストに従って12枚連続撮影""" for idx, s in enumerate(settings_list, start=1): @@ -72,6 +86,7 @@ def capture_12(): else: print("即時撮影結果:", result) + def schedule_shoots(): """午前6時~午後7時まで1時間おきに3回ずつ撮影""" while True: @@ -89,15 +104,20 @@ def schedule_shoots(): print(f"\n=== {hour}時の撮影完了 ===") # 次の「正時」まで待つ - next_hour = (now + datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) + next_hour = (now + datetime.timedelta(hours=1)).replace( + minute=0, second=0, microsecond=0 + ) wait_sec = (next_hour - datetime.datetime.now()).total_seconds() if wait_sec < 0: - next_hour = (now + datetime.timedelta(hours=2)).replace(minute=0, second=0, microsecond=0) + next_hour = (now + datetime.timedelta(hours=2)).replace( + minute=0, second=0, microsecond=0 + ) wait_sec = (next_hour - datetime.datetime.now()).total_seconds() print(f"{wait_sec/60:.1f} 分後の {next_hour} に再開します…") - for sleepCount in range(0,61): - print(f"残り{wait_sec * (60 - sleepCount)/60}秒") - time.sleep(wait_sec/60) + for sleep_count in range(0, 61): + print(f"残り{wait_sec * (60 - sleep_count)/60}秒") + time.sleep(wait_sec / 60) + if __name__ == "__main__": schedule_shoots() diff --git a/theta_init.py b/theta_init.py index 2405c89..28fd9e6 100755 --- a/theta_init.py +++ b/theta_init.py @@ -1,28 +1,24 @@ +"""Script for controlling RICOH THETA camera.""" + import requests -url = "http://192.168.1.1/osc/commands/execute" -headers = {"Content-Type": "application/json;charset=utf-8"} +# pylint: disable=duplicate-code + +URL = "http://192.168.1.1/osc/commands/execute" +HEADERS = {"Content-Type": "application/json;charset=utf-8"} payload = { "name": "camera.setOptions", - "parameters": { - "options": { - "captureMode":"image" - } - } + "parameters": {"options": {"captureMode": "image"}}, } -resp = requests.post(url, json=payload, headers=headers) +resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10) print(resp.json()) payload = { "name": "camera.setOptions", - "parameters": { - "options": { - "exposureProgram":1 - } - } + "parameters": {"options": {"exposureProgram": 1}}, } -resp = requests.post(url, json=payload, headers=headers) -print(resp.json()) \ No newline at end of file +resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10) +print(resp.json()) diff --git a/theta_status.py b/theta_status.py index 1998a68..08a1060 100755 --- a/theta_status.py +++ b/theta_status.py @@ -1,20 +1,19 @@ +"""Script for controlling RICOH THETA camera.""" + import requests -url = "http://192.168.1.1/osc/commands/execute" -headers = {"Content-Type": "application/json;charset=utf-8"} +# pylint: disable=duplicate-code + +URL = "http://192.168.1.1/osc/commands/execute" +HEADERS = {"Content-Type": "application/json;charset=utf-8"} # 確認したいオプションを parameters に必ず入れる payload = { "name": "camera.getOptions", "parameters": { - "optionNames": [ - "iso", - "shutterSpeed", - "aperture", - "_colorTemperature" - ] - } + "optionNames": ["iso", "shutterSpeed", "aperture", "_colorTemperature"] + }, } -resp = requests.post(url, json=payload, headers=headers) +resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10) print(resp.json()) diff --git a/theta_test_capture.py b/theta_test_capture.py index fe0982e..d264817 100755 --- a/theta_test_capture.py +++ b/theta_test_capture.py @@ -1,70 +1,82 @@ -import requests +"""Script for controlling RICOH THETA camera.""" + import time +import requests + +# pylint: disable=duplicate-code + # カメラIP THETA_IP = "192.168.1.1" EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute" STATUS_URL = f"http://{THETA_IP}/osc/commands/status" -HEADERS = { - "Content-Type": "application/json;charset=utf-8" -} +HEADERS = {"Content-Type": "application/json;charset=utf-8"} -# 撮影設定リスト (ISO, ShutterSpeed, f, ColorTemperature) +# 撮影設定リスト (ISO, shutter_speed, f, ColorTemperature) settings_list = [ - {"iso": 100, "shutterSpeed": 0.00004, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.00008, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.0003125, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.000625, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.0025, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.005, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.02, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.04, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.16666666, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.33333333, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 0.625, "whiteBalance": 5200}, - {"iso": 100, "shutterSpeed": 3.2, "whiteBalance": 5200}, + {"iso": 100, "shutter_speed": 0.00004, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.00008, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.0003125, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.000625, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.0025, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.005, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.02, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.04, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.16666666, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.33333333, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 0.625, "white_balance": 5200}, + {"iso": 100, "shutter_speed": 3.2, "white_balance": 5200}, ] -def set_options(iso, shutterSpeed, whiteBalance): + +def set_options(iso, shutter_speed, white_balance): + """オプションを設定""" options_command = { "name": "camera.setOptions", "parameters": { "options": { "iso": iso, - "shutterSpeed": shutterSpeed, - "_colorTemperature": whiteBalance + "shutterSpeed": shutter_speed, + "_colorTemperature": white_balance, } - } + }, } - resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS) + resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS, timeout=10) resp.raise_for_status() return resp.json() + def take_picture(): + """Take a picture using the Theta API""" take_command = {"name": "camera.takePicture"} - resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS) + resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS, timeout=10) resp.raise_for_status() return resp.json() + def wait_for_completion(command_id): + """Wait a few seconds to complete""" while True: - status_resp = requests.post(STATUS_URL, json={"id": command_id}, headers=HEADERS) + status_resp = requests.post( + STATUS_URL, json={"id": command_id}, headers=HEADERS, timeout=10 + ) status_resp.raise_for_status() status = status_resp.json() if status.get("state") == "done": return status.get("results") time.sleep(0.5) + # 連続撮影 for idx, s in enumerate(settings_list, start=1): print(f"\n=== 撮影 {idx}/12 ===") set_options(**s) print("設定完了:", s) - + result = take_picture() print("撮影コマンド送信:", result) - + if "id" in result: pic_result = wait_for_completion(result["id"]) print("撮影完了:", pic_result)