Source code for ugokukun.keigan_wrapper

"""Wrapper for the pykeigan library, the official library for Keigan motor."""

import os
import time
import subprocess
import logging

import serial
import serial.tools.list_ports
from pykeigan import usbcontroller, utils
import timeout_decorator


[docs]class KeiganWrapper: """Class to supplement the pykeigan library, the official library for Keigan motor. Attributes ---------- port : str Serial port to connect to. baudrate : int Baudrate for serial communication. motor : pykeigan.usbcontroller.USBController Motor object. motor.serial is the serial.Serial object. speed : int Motor speed in rpm. log_path : str Path to log file. logger : logging.Logger Logger object. Notes ----- Anything not implemented in this class should be accessed directly from motor object. pykeigan is poorly documented so I advise you to just go read the source code. https://github.com/keigan-motor/pykeigan_motor/tree/master """ def __init__( self, port: str = "/dev/ttyUSB0", speed: int = 30, baudrate: int = 115200, log_path: str = "log.txt", ): """Connect to Keigan motor. Parameters ---------- port : str, optional Serial port to connect to., by default "/dev/ttyUSB0" speed : int, optional Motor speed in rpm., by default 30 baudrate : int, optional Baudrate., by default 115200 log_path : str, optional Path to log file. If file doesn't exist, it will be created., by default "log.txt" Raises ------ ValueError Port not available. """ # check if probided port is available serial_ports = serial.tools.list_ports.comports() available_ports = [port.device for port in serial_ports] if port not in available_ports: raise ValueError(f"Port {port} not available.") # set serial port attributes self.port = port self.baudrate = baudrate self.speed = speed # logging self.log_path = log_path if os.path.dirname(log_path) and not os.path.exists(os.path.dirname(log_path)): os.makedirs(os.path.dirname(log_path)) self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) _console_handler = logging.StreamHandler() _console_handler.setLevel(logging.DEBUG) self.logger.addHandler(_console_handler) _file_handler = logging.FileHandler(self.log_path) _file_handler.setLevel(logging.DEBUG) _file_handler.setFormatter( logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ) self.logger.addHandler(_file_handler) # close serial port if it's already open # still needs work if os.name == "nt": # windows not implemented yet serial.Serial(self.port, self.baudrate).close() else: # This will kill this process if ran the second time on the same port. command = f"fuser -k {self.port}" print( f"NOTICE: attempting to kill any process using the serial port: {self.port}" ) try: shell_return = subprocess.run( command, shell=True, capture_output=True, check=True, text=True ) print(" - shell return: ", shell_return) except subprocess.CalledProcessError as e: print(" - shell error: ", e.stderr) print(" - Connecting to motor...") # connect to motor self.motor = None self.connect_motor() # turn off LED so it doesn"t affect the camera self.motor.set_led(0, 0, 0, 0) # (LED state:0==OFF, Red:0, Green:0, Blue:0) # motor doesn't turn off automatically so it might be already turning self.motor.disable_action() # set speed self.motor.set_speed(utils.rpm2rad_per_sec(self.speed)) # # initialize current motor position to 0 # self.motor.preset_position(0) # set notify position arrival settings # self.motor.set_notify_pos_arrival_settings(True, utils.deg2rad(1), 1) # set acceleration curve self.motor.set_curve_type(0) # # message connected # # there is no way to check if the connection was successful # print("\n#####################################################################") self.logger.info("Initialized Keigan motor at: ") self.logger.info(" - %s", self.port) # print("#####################################################################\n")
[docs] @timeout_decorator.timeout(5, timeout_exception=serial.SerialTimeoutException) def connect_motor(self) -> None: """pykeigan.usbcontorller.USBcontroller() with timeout.""" # disconnect if already connected if self.motor is not None: if self.motor.is_connected(): self.motor.disconnect() self.motor = usbcontroller.USBController(port=self.port, baud=self.baudrate) print( "Probably connected to Keigan motor (not sure until it's used).\n", " If this is not the first connection to the motor,", "it can get stuck in an endless loop in:\n", " - pykeigan.usbcontroller.USBController.reconnect()\n", " If the connection works fine, you can just ignore it.", )
[docs] def turn_relative(self, clock_wise: False, degrees: int) -> None: """Turn motor based on relative position. Parameters ---------- clock_wise : bool, optional Direction to turn the motor. Clockwise if True, counter-clockwise if False., by default False degrees : int Degrees to rotate. Will accept negative values, if that's what you want. Raises ------ ValueError If the motor is not connected. """ self.motor.enable_action() if clock_wise: degrees = -degrees self.motor.move_by_dist_wait(utils.deg2rad(degrees)) # wait for the motor to finish turning # second = degrees to turn / (6 * rpm) wait_time = abs(degrees / (self.speed * 6)) time.sleep(wait_time) direction = "clockwise" if clock_wise else "counter-clockwise" self.logger.info( "Motor turned %d degrees %s at %d rpm.", degrees, direction, self.speed )