Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Calibrate_Latest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import sys
import exoboot
import time
import csv
import util
import constants
import config_util
import numpy as np

def calibrate_encoder_to_ankle_conversion(c,exo: exoboot.Exo):
'''This routine can be used to manually calibrate the relationship
between ankle and motor angles. Move through the full RoM!!!'''
exo.update_gains(Kp=constants.DEFAULT_KP, Ki=constants.DEFAULT_KI,
Kd=constants.DEFAULT_KD, ff=constants.DEFAULT_FF)
# exo.command_current(exo.motor_sign*1000)
print('begin!')
temp_ankle_angle_array = []
for _ in range(1000):
exo.command_current(exo.motor_sign*1000)
time.sleep(0.02)
exo.read_data(c)
temp_ankle_angle_array.append(exo.data.ankle_angle)
print("ankle_angle",exo.data.ankle_angle)
exo.write_data(c,False)
print('Done! File saved.')
print("Mean_ankle_angle",np.mean(temp_ankle_angle_array))


if __name__ == '__main__':
config = config_util.load_config_from_args()
exo_list = exoboot.connect_to_exos(file_ID='calibration2_2ndJune',config=config)
if len(exo_list) > 1:
raise ValueError("Just turn on one exo for calibration")
exo = exo_list[0]
exo.standing_calibration(config)
calibrate_encoder_to_ankle_conversion(config,exo=exo)
exo.close()
86 changes: 86 additions & 0 deletions ControllerCommunication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import threading
from typing import Type

from concurrent import futures
import grpc
import Message_pb2
import Message_pb2_grpc

import config_util
import exoboot
import time

import asyncio

class ControllerCommunication(threading.Thread):
def __init__(self,config: Type[config_util.ConfigurableConstants], exo_list,
new_params_event=Type[threading.Event], name = 'communication-thread'):
super().__init__(name=name)
self.daemon=True
self.config=config
self.new_params_event = new_params_event
self.exo_list = exo_list
self.stride_counter = 0
self.temp_ankle_angle = []
self.temp_ankle_angular_velocity = []
#self.temp_counter = 0
self.action_received = False
self.start()

#def storing_the_stride_values(self):


def sending_data(self, ankle_a_l, ankle_v_l, ankle_a_r, ankle_v_r):
for exo in self.exo_list:
#print(exo.data.did_heel_strike, self.temp_ankle_angle, self.temp_ankle_angular_velocity)
"""if (exo.data.did_heel_strike == True):
print("Here.............", self.stride_counter)
self.stride_counter += 1
#if(self.stride_counter == 1 and self.stride_counter == 2):
self.temp_ankle_angle.append(exo.data.ankle_angle)
self.temp_ankle_angular_velocity.append(exo.data.ankle_velocity)
if(self.stride_counter == 2):
print("Sending values")
#print(exo.data.ankle_angle)"""
#print("Sending values")
with grpc.insecure_channel(
self.config.CONTROLLER_ALGORITHM_COMMUNICATION, options=(('grpc.enable_http_proxy',0), )) as channel:
try:
print("Sending.............")
stub = Message_pb2_grpc.ControllerAlgorithmStub(channel)
response = stub.ControllerMessage(
Message_pb2.ControllerPing(ankle_angle_LEFT = ankle_a_l, ankle_angular_velocity_LEFT = ankle_v_l, ankle_angle_RIGHT = ankle_a_r, ankle_angular_velocity_RIGHT = ankle_v_r))
print('Response received')
#Message_pb2.ControllerPing(ankle_angle = exo.data.ankle_angle, ankle_angular_velocity = exo.data.ankle_velocity))
except grpc.RpcError as e:
print("ERROR!!!!: ", e, "Check if the Computer IP is correct or if the computer side server is running")

self.action_received = False
self.stride_counter = 0
self.temp_ankle_angle = []
self.temp_ankle_angular_velocity = []

class ActionState(Message_pb2_grpc.ActionStateServicer):
def __init__(self, ControllerCommunication):
self.ControllerCommunication = ControllerCommunication

def ActionMessage(self,request,context):
print("Action received")#, request.action_torque_profile)
self.ControllerCommunication.config.torque_profile = request.action_torque_profile
time.sleep(10) #Waiting for the action to be implemented on the exo
self.ControllerCommunication.action_received = True
self.ControllerCommunication.config.action_received = True
self.ControllerCommunication.new_params_event.set()
return Message_pb2.Null()

def server_to_receive_actions_for_the_controller(self):
print(".... Starting the server on the controller.....")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Message_pb2_grpc.add_ActionStateServicer_to_server(self.ActionState(self),server)
server.add_insecure_port(self.config.ALGORITHM_CONTROLLER_COMMUNICATION)
server.start()
server.wait_for_termination()

def run(self):
#while True:
self.server_to_receive_actions_for_the_controller()
83 changes: 83 additions & 0 deletions Message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
syntax = "proto3";

message Null {};

/* This service is between
GUI: Running on the Surface Tablet {Client}
and
Algorithm: Running on the computer {Server}
DESCRIPTION:
The GUI will send the user input {i.e int32 LIKE(1), DISLIKE(-1) or SKIP(0)} and will receive a
NULL message indicating new control parameters are generated -> new torque values
are given to the motor by the RL Algorithm
*/
service GuiAlgorithm{
rpc UserInput (GuiInput) returns (Null) {}

}
enum EnumPreference{
SKIP = 0;
LIKE = 1;
DISLIKE = -1;
}

message GuiInput {
EnumPreference enumpreference = 1;
}

/* This service is between
Algorithm: Running on the computer {Client}
and
GUI: Running on the Surface Tablet {Server}
DESCRIPTION:
The Algorithm will send commands to the GUI after "X" number of strides (X is a hyperparameter which has to be tuned)
to ask user's fro preference.
THe reason for this is because we want to do continous updates based on numeric rewards like every "Y" strides and only ask for
user preference after certain number of steps. Getting user preference is costly, so we want to continuauly learn based on numeric reward functions and
use user preference for torque profile tuning on a high level
*/

service AskingforPreference{
rpc PreferenceUpdateStep (PreferenceFlag) returns (Null) {}
}

message PreferenceFlag{
bool PFlag = 1;
}

/*
message PreferenceValue{
EnumPreference enumpreference = 1;
}
*/
/* This service is between
Lowlevel controlller: Running on the Rpi {Client}
and
Algorithm: Running on the computer {Server}
DESCRIPTION:

*/
service ControllerAlgorithm{
rpc ControllerMessage (ControllerPing) returns (Null) {}
}

message ControllerPing{
repeated float ankle_angle_LEFT = 1;
repeated float ankle_angular_velocity_LEFT = 2;
repeated float ankle_angle_RIGHT = 3;
repeated float ankle_angular_velocity_RIGHT = 4;
}

/* This service is between
Algorithm: Running on the computer {Client}
Lowlevel controlller: Running on the Rpi {Server}
DESCRIPTION:

*/
service ActionState{
rpc ActionMessage (SendingAction) returns (Null) {}
}

message SendingAction{
repeated float action_torque_profile = 1;
}
43 changes: 43 additions & 0 deletions Message_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions Message_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from google.protobuf.internal import containers as _containers
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Iterable as _Iterable, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor
DISLIKE: EnumPreference
LIKE: EnumPreference
SKIP: EnumPreference

class ControllerPing(_message.Message):
__slots__ = ["ankle_angle_LEFT", "ankle_angle_RIGHT", "ankle_angular_velocity_LEFT", "ankle_angular_velocity_RIGHT"]
ANKLE_ANGLE_LEFT_FIELD_NUMBER: _ClassVar[int]
ANKLE_ANGLE_RIGHT_FIELD_NUMBER: _ClassVar[int]
ANKLE_ANGULAR_VELOCITY_LEFT_FIELD_NUMBER: _ClassVar[int]
ANKLE_ANGULAR_VELOCITY_RIGHT_FIELD_NUMBER: _ClassVar[int]
ankle_angle_LEFT: _containers.RepeatedScalarFieldContainer[float]
ankle_angle_RIGHT: _containers.RepeatedScalarFieldContainer[float]
ankle_angular_velocity_LEFT: _containers.RepeatedScalarFieldContainer[float]
ankle_angular_velocity_RIGHT: _containers.RepeatedScalarFieldContainer[float]
def __init__(self, ankle_angle_LEFT: _Optional[_Iterable[float]] = ..., ankle_angular_velocity_LEFT: _Optional[_Iterable[float]] = ..., ankle_angle_RIGHT: _Optional[_Iterable[float]] = ..., ankle_angular_velocity_RIGHT: _Optional[_Iterable[float]] = ...) -> None: ...

class GuiInput(_message.Message):
__slots__ = ["enumpreference"]
ENUMPREFERENCE_FIELD_NUMBER: _ClassVar[int]
enumpreference: EnumPreference
def __init__(self, enumpreference: _Optional[_Union[EnumPreference, str]] = ...) -> None: ...

class Null(_message.Message):
__slots__ = []
def __init__(self) -> None: ...

class PreferenceFlag(_message.Message):
__slots__ = ["PFlag"]
PFLAG_FIELD_NUMBER: _ClassVar[int]
PFlag: bool
def __init__(self, PFlag: bool = ...) -> None: ...

class SendingAction(_message.Message):
__slots__ = ["action_torque_profile"]
ACTION_TORQUE_PROFILE_FIELD_NUMBER: _ClassVar[int]
action_torque_profile: _containers.RepeatedScalarFieldContainer[float]
def __init__(self, action_torque_profile: _Optional[_Iterable[float]] = ...) -> None: ...

class EnumPreference(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
Loading