Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install pylint
pip install -r requirements.txt
- name: Analysing the code with pylint
run: |
pylint $(git ls-files '*.py')
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requests
72 changes: 46 additions & 26 deletions theta_capture.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,75 @@
import requests
"""Script for controlling RICOH THETA camera."""

import time
import datetime

import requests

# pylint: disable=duplicate-code

# カメラIP
THETA_IP = "192.168.1.1"
EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute"
STATUS_URL = f"http://{THETA_IP}/osc/commands/status"

HEADERS = {"Content-Type": "application/json;charset=utf-8"}

# 撮影設定リスト (ISO, ShutterSpeed, ColorTemperature)
# 撮影設定リスト (ISO, shutter_speed, ColorTemperature)
settings_list = [
{"iso": 100, "shutterSpeed": 0.00004, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.00008, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.0003125, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.000625, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.0025, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.005, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.02, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.04, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.16666666, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.33333333, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.625, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 3.2, "whiteBalance": 5200},
{"iso": 100, "shutter_speed": 0.00004, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.00008, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.0003125, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.000625, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.0025, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.005, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.02, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.04, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.16666666, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.33333333, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.625, "white_balance": 5200},
{"iso": 100, "shutter_speed": 3.2, "white_balance": 5200},
]

def set_options(iso, shutterSpeed, whiteBalance):

def set_options(iso, shutter_speed, white_balance):
"""オプションを設定"""
options_command = {
"name": "camera.setOptions",
"parameters": {
"options": {
"iso": iso,
"shutterSpeed": shutterSpeed,
"shutterSpeed": shutter_speed,
"whiteBalance": "_colorTemperature",
"colorTemperature": whiteBalance
"colorTemperature": white_balance,
}
}
},
}
resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS)
resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS, timeout=10)
resp.raise_for_status()
return resp.json()


def take_picture():
"""Theta Web APIを用いて撮影"""
take_command = {"name": "camera.takePicture"}
resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS)
resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS, timeout=10)
resp.raise_for_status()
return resp.json()


def wait_for_completion(command_id):
"""撮影完了まで処理を停止"""
while True:
status_resp = requests.post(STATUS_URL, json={"id": command_id}, headers=HEADERS)
status_resp = requests.post(
STATUS_URL, json={"id": command_id}, headers=HEADERS, timeout=10
)
status_resp.raise_for_status()
status = status_resp.json()
if status.get("state") == "done":
return status.get("results")
time.sleep(0.5)


def capture_12():
"""設定リストに従って12枚連続撮影"""
for idx, s in enumerate(settings_list, start=1):
Expand All @@ -72,6 +86,7 @@ def capture_12():
else:
print("即時撮影結果:", result)


def schedule_shoots():
"""午前6時~午後7時まで1時間おきに3回ずつ撮影"""
while True:
Expand All @@ -89,15 +104,20 @@ def schedule_shoots():
print(f"\n=== {hour}時の撮影完了 ===")

# 次の「正時」まで待つ
next_hour = (now + datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
next_hour = (now + datetime.timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
)
wait_sec = (next_hour - datetime.datetime.now()).total_seconds()
if wait_sec < 0:
next_hour = (now + datetime.timedelta(hours=2)).replace(minute=0, second=0, microsecond=0)
next_hour = (now + datetime.timedelta(hours=2)).replace(
minute=0, second=0, microsecond=0
)
wait_sec = (next_hour - datetime.datetime.now()).total_seconds()
print(f"{wait_sec/60:.1f} 分後の {next_hour} に再開します…")
for sleepCount in range(0,61):
print(f"残り{wait_sec * (60 - sleepCount)/60}秒")
time.sleep(wait_sec/60)
for sleep_count in range(0, 61):
print(f"残り{wait_sec * (60 - sleep_count)/60}秒")
time.sleep(wait_sec / 60)


if __name__ == "__main__":
schedule_shoots()
26 changes: 11 additions & 15 deletions theta_init.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
"""Script for controlling RICOH THETA camera."""

import requests

url = "http://192.168.1.1/osc/commands/execute"
headers = {"Content-Type": "application/json;charset=utf-8"}
# pylint: disable=duplicate-code

URL = "http://192.168.1.1/osc/commands/execute"
HEADERS = {"Content-Type": "application/json;charset=utf-8"}

payload = {
"name": "camera.setOptions",
"parameters": {
"options": {
"captureMode":"image"
}
}
"parameters": {"options": {"captureMode": "image"}},
}

resp = requests.post(url, json=payload, headers=headers)
resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10)
print(resp.json())

payload = {
"name": "camera.setOptions",
"parameters": {
"options": {
"exposureProgram":1
}
}
"parameters": {"options": {"exposureProgram": 1}},
}

resp = requests.post(url, json=payload, headers=headers)
print(resp.json())
resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10)
print(resp.json())
19 changes: 9 additions & 10 deletions theta_status.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
"""Script for controlling RICOH THETA camera."""

import requests

url = "http://192.168.1.1/osc/commands/execute"
headers = {"Content-Type": "application/json;charset=utf-8"}
# pylint: disable=duplicate-code

URL = "http://192.168.1.1/osc/commands/execute"
HEADERS = {"Content-Type": "application/json;charset=utf-8"}

# 確認したいオプションを parameters に必ず入れる
payload = {
"name": "camera.getOptions",
"parameters": {
"optionNames": [
"iso",
"shutterSpeed",
"aperture",
"_colorTemperature"
]
}
"optionNames": ["iso", "shutterSpeed", "aperture", "_colorTemperature"]
},
}

resp = requests.post(url, json=payload, headers=headers)
resp = requests.post(URL, json=payload, headers=HEADERS, timeout=10)
print(resp.json())
64 changes: 38 additions & 26 deletions theta_test_capture.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,82 @@
import requests
"""Script for controlling RICOH THETA camera."""

import time

import requests

# pylint: disable=duplicate-code

# カメラIP
THETA_IP = "192.168.1.1"
EXECUTE_URL = f"http://{THETA_IP}/osc/commands/execute"
STATUS_URL = f"http://{THETA_IP}/osc/commands/status"

HEADERS = {
"Content-Type": "application/json;charset=utf-8"
}
HEADERS = {"Content-Type": "application/json;charset=utf-8"}

# 撮影設定リスト (ISO, ShutterSpeed, f, ColorTemperature)
# 撮影設定リスト (ISO, shutter_speed, f, ColorTemperature)
settings_list = [
{"iso": 100, "shutterSpeed": 0.00004, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.00008, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.0003125, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.000625, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.0025, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.005, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.02, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.04, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.16666666, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.33333333, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 0.625, "whiteBalance": 5200},
{"iso": 100, "shutterSpeed": 3.2, "whiteBalance": 5200},
{"iso": 100, "shutter_speed": 0.00004, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.00008, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.0003125, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.000625, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.0025, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.005, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.02, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.04, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.16666666, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.33333333, "white_balance": 5200},
{"iso": 100, "shutter_speed": 0.625, "white_balance": 5200},
{"iso": 100, "shutter_speed": 3.2, "white_balance": 5200},
]

def set_options(iso, shutterSpeed, whiteBalance):

def set_options(iso, shutter_speed, white_balance):
"""オプションを設定"""
options_command = {
"name": "camera.setOptions",
"parameters": {
"options": {
"iso": iso,
"shutterSpeed": shutterSpeed,
"_colorTemperature": whiteBalance
"shutterSpeed": shutter_speed,
"_colorTemperature": white_balance,
}
}
},
}
resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS)
resp = requests.post(EXECUTE_URL, json=options_command, headers=HEADERS, timeout=10)
resp.raise_for_status()
return resp.json()


def take_picture():
"""Take a picture using the Theta API"""
take_command = {"name": "camera.takePicture"}
resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS)
resp = requests.post(EXECUTE_URL, json=take_command, headers=HEADERS, timeout=10)
resp.raise_for_status()
return resp.json()


def wait_for_completion(command_id):
"""Wait a few seconds to complete"""
while True:
status_resp = requests.post(STATUS_URL, json={"id": command_id}, headers=HEADERS)
status_resp = requests.post(
STATUS_URL, json={"id": command_id}, headers=HEADERS, timeout=10
)
status_resp.raise_for_status()
status = status_resp.json()
if status.get("state") == "done":
return status.get("results")
time.sleep(0.5)


# 連続撮影
for idx, s in enumerate(settings_list, start=1):
print(f"\n=== 撮影 {idx}/12 ===")
set_options(**s)
print("設定完了:", s)

result = take_picture()
print("撮影コマンド送信:", result)

if "id" in result:
pic_result = wait_for_completion(result["id"])
print("撮影完了:", pic_result)
Expand Down