Source code for ugokukun.ugoku_kun

"""Control multiple cannon cameras and turntable."""

import os
import time
import json
import logging
from typing import Any, Tuple

import pandas as pd
from pykeigan import utils

from .cannon_wrapper import CannonWrapper
from .keigan_wrapper import KeiganWrapper
from .ugoku_helpers import UgokuHelpers as helpers


[docs]class UgokuKun: """Control multiple cannon cameras and turntable. Attributes ---------- wait_time : float Wait time between HTTP request attempts. max_attempts : int Maximum number of HTTP requests attempts. req_timeout : Tuple[float, float] Timeout of the HTTP request. device_json_path : str Path of the json file containing the device id and addresses. devices : Dict[str, Dict[str, str]] Dictionary containing the devices. task_csv_path : str Path of the task csv file. csv_df : pd.DataFrame The csv file as a pandas DataFrame. log_path : str Path of the log txt file. logger : logging.Logger Logger object. keigan_motors : Dict[str, str] Dictionary containing the keigan motors. keigan_instances : Dict[str, KeiganWrapper] Dictionary containing the KeiganWrapper instances. cannon_cameras : Dict[str, str] Dictionary containing the cannon cameras. cannon_instances : Dict[str, CannonWrapper] Dictionary containing the CannonWrapper instances. """ def __init__( self, task_csv_path: str, device_json_path: str, log_path: str = "log.txt", wait_time: float = 0.5, max_attempts: int = 20, req_timeout: Tuple[float, float] = (3.0, 7.5), ): """Readin json and csv and connect to devices. Parameters ---------- task_csv_path : str Path of the task csv file. device_json_path : str Path of the json file containing the device id and addresses. log_path : str, optional Path of the log txt file., by default "log.txt" wait_time : float, optional Wait time between HTTP request attempts., by default 0.5 max_attempts : int, optional Maximum number of HTTP requests attempts., by default 20 req_timeout : Tuple[float, float], optional Timeout of the HTTP request., by default (3.0, 7.5) Raises ------ ValueError No cameras found in the device json file. """ self.wait_time = wait_time self.max_attempts = max_attempts self.req_timeout = req_timeout if self.wait_time * self.max_attempts < 2.0: print("NOTE: Camera might not respond fast enough with current parameters.") print(" - Current wait time: " + str(self.wait_time)) print(" - Current max attempts: " + str(self.max_attempts)) self.device_json_path = device_json_path self.devices = helpers.import_json(device_json_path) # csv self.task_csv_path = task_csv_path self.csv_df = None self.load_task_csv(task_csv_path) # log self.log_path = log_path if os.path.dirname(log_path) and not os.path.exists(os.path.dirname(log_path)): os.makedirs(os.path.dirname(log_path)) self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) _console_handler = logging.StreamHandler() _console_handler.setLevel(logging.DEBUG) self.logger.addHandler(_console_handler) _file_handler = logging.FileHandler(self.log_path) _file_handler.setLevel(logging.DEBUG) _file_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ) self.logger.addHandler(_file_handler) # turn table self.keigan_motors = self.devices["keigan"] if not self.keigan_motors: self.logger.info("No Keigan devices found in the device json file.") self.keigan_instances = {} for motor, serial_port in self.keigan_motors.items(): self.keigan_instances[motor] = KeiganWrapper( port=serial_port, log_path=self.log_path ) # camera self.cannon_cameras = self.devices["cannon"] if not self.cannon_cameras: raise ValueError("No devices found in the device json file.") self.cannon_instances = {} for camera, camera_address in self.cannon_cameras.items(): self.cannon_instances[camera] = CannonWrapper( wait_time=self.wait_time, max_attempts=self.max_attempts, req_timeout=self.req_timeout, ip_port=camera_address, log_path=self.log_path, )
[docs] def load_task_csv(self, task_csv_path: str) -> pd.DataFrame: """Load a new csv file and overwrite the current self.csv_df. Parameters ---------- task_csv_path : str Path of the csv file. Returns ------- pd.DataFrame The csv file as a pandas DataFrame. Raises ------ ValueError Missing header or duplicate task_id. """ self.task_csv_path = task_csv_path self.csv_df = pd.read_csv(task_csv_path, header=0, index_col=None) # check if csv file has the correct headers csv_headers = list(self.csv_df.columns) must_have_headers = [ "task_id", "wait_time", "target", "action", "param", "payload", ] for header in must_have_headers: if header not in csv_headers: raise ValueError(f"Missing header: {header}") # check if task_id includes duplicates task_id_list = self.csv_df["task_id"].to_list() if len(task_id_list) != len(set(task_id_list)): raise ValueError("task_id must be unique.") return self.csv_df
[docs] def execute_row(self, row_index: int) -> None: """Execure a single row in the csv file. Parameters ---------- row_index : int Index of the row to be executed. Raises ------ ValueError Invalid target. """ if self.csv_df.at[row_index, "target"] == "all": self.execute_all(row_index) # camera elif self.csv_df.at[row_index, "target"] in self.cannon_cameras.keys(): self.execute_cannon(self.csv_df.at[row_index, "target"], row_index) # turn table elif self.csv_df.at[row_index, "target"] in self.keigan_motors.keys(): self.execute_keigan(self.csv_df.at[row_index, "target"], row_index) else: raise ValueError(f"Invalid target: {self.csv_df.at[row_index, 'target']}")
[docs] def execute_cannon(self, camera: str, row_index: int) -> Any: """Execute a single row in the csv file where target==camera_id. Parameters ---------- camera : str camera_id in the device json file. row_index : int Index of the row of the csv to be executed. Returns ------- Any The return of the executed action. Most likely requests.Response object. Raises ------ ValueError Invalid action. Notes ----- if self.csv_df.at[row_index, "action"] is: - "get": self.cannon_instances[camera].http_get() - "post": self.cannon_instances[camera].http_post() - "put": self.cannon_instances[camera].http_put() - "delete": self.cannon_instances[camera].http_delete() - "shutter": self.cannon_instances[camera].shutter() - "aperture": self.cannon_instances[camera].set_aperture() - "exposure": self.cannon_instances[camera].set_exposure() - "iso": self.cannon_instances[camera].set_iso() - "color_temperature": self.cannon_instances[camera].set_color_temp() - "white_balance": self.cannon_instances[camera].set_white_balance() - "shutter_speed": self.cannon_instances[camera].set_shutter_speed() - "sync_time": self.cannon_instances[camera].sync_time() - "dump": self.cannon_instances[camera].dump_attributes() """ # HTTP GET if self.csv_df.at[row_index, "action"] == "get": api_str = self.cannon_instances[camera].get_full_uri( self.csv_df.at[row_index, "param"] ) return self.cannon_instances[camera].http_get(api_str) # HTTP POST if self.csv_df.at[row_index, "action"] == "post": api_str = self.cannon_instances[camera].get_full_uri( self.csv_df.at[row_index, "param"] ) return self.cannon_instances[camera].http_post( api_str, json.loads(self.csv_df.at[row_index, "payload"]), ) # HTTP PUT if self.csv_df.at[row_index, "action"] == "put": api_str = self.cannon_instances[camera].get_full_uri( self.csv_df.at[row_index, "param"] ) return self.cannon_instances[camera].http_put( api_str, json.loads(self.csv_df.at[row_index, "payload"]), ) # HTTP DELETE if self.csv_df.at[row_index, "action"] == "delete": api_str = self.cannon_instances[camera].get_full_uri( self.csv_df.at[row_index, "param"] ) return self.cannon_instances[camera].http_delete(api_str) # Shutter if self.csv_df.at[row_index, "action"] == "shutter": if pd.isna(self.csv_df.at[row_index, "param"]): return self.cannon_instances[camera].shutter() do_autofocus = helpers.strtobool(self.csv_df.at[row_index, "param"]) return self.cannon_instances[camera].shutter(do_autofocus) # Aperture if self.csv_df.at[row_index, "action"] == "aperture": return self.cannon_instances[camera].set_aperture( aperture=self.csv_df.at[row_index, "param"] ) # Exposure if self.csv_df.at[row_index, "action"] == "exposure": return self.cannon_instances[camera].set_exposure( exposure=self.csv_df.at[row_index, "param"] ) # ISO if self.csv_df.at[row_index, "action"] == "iso": return self.cannon_instances[camera].set_iso( iso=self.csv_df.at[row_index, "param"] ) # Color Temperature if self.csv_df.at[row_index, "action"] == "color_temperature": return self.cannon_instances[camera].set_color_temp( color_temp=int(self.csv_df.at[row_index, "param"]) ) # White Balance if self.csv_df.at[row_index, "action"] == "white_balance": return self.cannon_instances[camera].set_white_balance( white_balance=self.csv_df.at[row_index, "param"] ) # Shutter Speed if self.csv_df.at[row_index, "action"] == "shutter_speed": return self.cannon_instances[camera].set_shutter_speed( shutter_speed=self.csv_df.at[row_index, "param"] ) if self.csv_df.at[row_index, "action"] == "sync_time": return self.cannon_instances[camera].sync_time() if self.csv_df.at[row_index, "action"] == "dump": return self.cannon_instances[camera].dump_attributes( self.csv_df.at[row_index, "param"] ) raise ValueError(f"Invalid action: {self.csv_df.at[row_index, 'action']}")
[docs] def execute_keigan(self, motor_id: str, row_index: int): """Execute a single row in the csv file where target==keigan_id. Parameters ---------- motor_id : str motor_id in the device json file. row_index : int Index of the row of the csv to be executed. Returns ------- Any The return of the executed action. Most likely None. Raises ------ ValueError Invalid action. """ if self.csv_df.at[row_index, "action"] == "cw": return self.keigan_instances[motor_id].turn_relative( clock_wise=True, degrees=int(self.csv_df.at[row_index, "param"]) ) if self.csv_df.at[row_index, "action"] == "ccw": return self.keigan_instances[motor_id].turn_relative( clock_wise=False, degrees=int(self.csv_df.at[row_index, "param"]) ) if self.csv_df.at[row_index, "action"] == "speed": return self.keigan_instances[motor_id].motor.set_speed( utils.rpm2rad_per_sec(int(self.csv_df.at[row_index, "param"])) ) raise ValueError(f"Invalid action: {self.csv_df.at[row_index, 'action']}")
[docs] def execute_all(self, row: int): """Actions for target=="all". Parameters ---------- row : int Index of the row of the csv to be executed. Only action=="sleep" is implemeted. Raises ------ ValueError Invalid action. """ if self.csv_df.at[row, "action"] == "sleep": return raise ValueError(f"Invalid action: {self.csv_df.at[row, 'action']}")
[docs] def test_run(self) -> None: """Run all commannds in the csv file except WAIT commands. This method is useful to check if the commands are valid. Run this method before running the `run` method and fix if any errors are found. """ for index, row in self.csv_df.iterrows(): self.logger.info( "Executing task id: %s , action: %s", row["task_id"], row["action"] ) self.execute_row(index)
[docs] def run(self) -> None: """Run all commands in the csv file.""" for index, row in self.csv_df.iterrows(): time.sleep(row["wait_time"]) self.logger.info( "Executing task id: %s , action: %s", row["task_id"], row["action"] ) self.execute_row(index)