Newer
Older
zlac8015d_ros / scripts / motor_driver_node.py
@mori mori on 6 Jul 2022 13 KB first commit
#!/usr/bin/env python3

import rospy
import ZLAC8015D
import tf2_ros
import numpy as np
import time
import yaml
from decimal import Decimal, ROUND_HALF_UP
from std_msgs.msg import Float32MultiArray
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from tf.transformations import quaternion_from_euler
from geometry_msgs.msg import TransformStamped

class MotorDriverNode:
	def __init__(self):
		print("\033[32m" + "Start motor driver node!!" + "\033[0m")
		print("#########################")
		#-----Initialize publisher-----
		self.wheels_rpm_pub = rospy.Publisher("/wheels_rpm", Float32MultiArray, queue_size=10)
		self.wheels_rpm_msg = Float32MultiArray()
		self.odom_pub = rospy.Publisher("/odom", Odometry, queue_size=10)
		self.odom_msg = Odometry()

		#-----Initialize subscriber-----
		rospy.Subscriber("/zlac8015d/twist/cmd_vel", Twist, self.zlac8015d_twist_cmd_callback)
		rospy.Subscriber("/zlac8015d/vel/cmd_vel", Float32MultiArray, self.zlac8015d_vel_cmd_callback)
		rospy.Subscriber("/zlac8015d/vel/cmd_rpm", Float32MultiArray, self.zlac8015d_rpm_cmd_callback)
		rospy.Subscriber("/zlac8015d/pos/cmd_deg", Float32MultiArray, self.zlac8015d_deg_cmd_callback)
		rospy.Subscriber("/zlac8015d/pos/cmd_dist", Float32MultiArray, self.zlac8015d_dist_cmd_callback)
		
		#-----Import parameters from yaml file-----
		self.load_file = "/home/ubuntu/catkin_ws/src/zlac8015d_ros/params/motor_driver_params.yaml"
		with open(self.load_file) as file:
			self.obj = yaml.safe_load(file)
		
		#-----Initialize ZLAC8015D-----
		self.zlc = ZLAC8015D.Controller(self.obj["port"])
		self.zlc.travel_in_one_rev = float(self.obj["travel_in_one_rev"])
		self.zlc.cpr = int(self.obj["cpr"])
		self.zlc.R_Wheel = float(self.obj["R_Wheel"])
		self.control_mode = int(self.obj["control_mode"])
		if self.control_mode == 3:
			self.zlac8015d_speed_mode_init()
		elif self.control_mode == 1:
			self.zlac8015d_position_mode_init()
		
		#-----Initialize variable-----
		self.last_subscribed_time = 0.0
		self.callback_timeout = float(self.obj["callback_timeout"])
		self.deadband_rpm = int(self.obj["deadband_rpm"])
		self.linear_vel_cmd = 0.0
		self.angular_vel_cmd = 0.0
		self.got_twist_cmd = False
		
		self.left_vel_cmd = 0.0
		self.right_vel_cmd = 0.0
		self.got_vel_cmd = False

		self.left_rpm_cmd = 0.0
		self.right_rpm_cmd = 0.0
		self.got_vel_rpm_cmd = False

		self.left_pos_deg_cmd  = 0.0
		self.right_pos_deg_cmd  = 0.0
		self.got_pos_deg_cmd = False

		self.left_pos_dist_cmd = 0.0
		self.right_pos_dist_cmd = 0.0
		self.got_pos_dist_cmd = False
		
		self.theta = 0.0
		self.x = 0.0
		self.y = 0.0
		self.period = 0.05
		self.l_meter = 0.0
		self.r_meter = 0.0
		self.l_meter_init, self.r_meter_init = self.zlc.get_wheels_travelled()
		self.prev_l_meter = 0.0
		self.prev_r_meter = 0.0

		self.br = tf2_ros.TransformBroadcaster()
		self.t = TransformStamped()
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_twist_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/twist/cmd_vel".
	"""
	def zlac8015d_twist_cmd_callback(self, msg):
		self.linear_vel_cmd = msg.linear.x
		self.angular_vel_cmd = msg.angular.z
		self.got_twist_cmd = True
		self.last_subscribed_time = rospy.Time.now().nsecs
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_vel_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/vel/cmd_vel".
	"""
	def zlac8015d_vel_cmd_callback(self, msg):
		self.left_vel_cmd = msg.data[0]
		self.right_vel_cmd = -msg.data[1]
		self.got_vel_cmd = True
		self.last_subscribed_time = time.perf_counter()

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_rpm_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/vel/cmd_rpm".
	"""
	def zlac8015d_rpm_cmd_callback(self, msg):
		self.left_rpm_cmd = msg.data[0]
		self.right_rpm_cmd = -msg.data[1]
		self.got_vel_rpm_cmd = True
		self.last_subscribed_time = time.perf_counter()

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_deg_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/pos/cmd_deg".
	"""
	def zlac8015d_deg_cmd_callback(self, msg):
		self.left_pos_deg_cmd = msg.data[0]
		self.right_pos_deg_cmd = -msg.data[1]
		self.got_pos_deg_cmd = True

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_dist_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/pos/cmd_dist".
	"""
	def zlac8015d_dist_cmd_callback(self, msg):
		self.left_pos_dist_cmd = msg.data[0]
		self.right_pos_dist_cmd = msg.data[1]
		self.got_pos_dist_cmd = True

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_speed_mode_init function
	++++++++++++++++++++++++++++++++++++++++++
	Initialization of speed rpm control mode.
	"""
	def zlac8015d_speed_mode_init(self):
		self.zlc.disable_motor()
		self.zlc.set_accel_time(int(self.obj["set_accel_time_left"]), int(self.obj["set_accel_time_right"]))
		self.zlc.set_decel_time(int(self.obj["set_decel_time_left"]), int(self.obj["set_decel_time_right"]))
		self.zlc.set_mode(3)
		self.zlc.enable_motor()

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_position_mode_init function
	++++++++++++++++++++++++++++++++++++++++++
	Initialization of relative position control mode.
	"""
	def zlac8015d_position_mode_init(self):
		self.zlc.disable_motor()
		self.zlc.set_accel_time(int(self.obj["set_accel_time_left"]), int(self.obj["set_accel_time_right"]))
		self.zlc.set_decel_time(int(self.obj["set_decel_time_left"]), int(self.obj["set_decel_time_right"]))
		self.zlc.set_mode(1)
		self.zlc.set_position_async_control()
		self.zlc.set_maxRPM_pos(int(self.obj["max_left_rpm"]), int(self.obj["max_right_rpm"]))
		self.zlc.enable_motor()
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		twist_to_rpm function
	++++++++++++++++++++++++++++++++++++++++++
	Convert from twist to rpm.
	"""
	def twist_to_rpm(self, linear_vel, angular_vel):
		left_vel = linear_vel - float(self.obj["wheels_base_width"]) / 2 * angular_vel
		right_vel = linear_vel + float(self.obj["wheels_base_width"]) / 2 * angular_vel
		left_rpm, right_rpm = self.vel_to_rpm(left_vel, -right_vel)
		return left_rpm, right_rpm
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		vel_to_rpm function
	++++++++++++++++++++++++++++++++++++++++++
	Convert from speed to rpm.
	"""
	def vel_to_rpm(self, left_vel, right_vel):
		left_rpm = 60 * left_vel / (2 * np.pi * self.zlc.R_Wheel)
		right_rpm = 60 * right_vel / (2 * np.pi * self.zlc.R_Wheel)
		return left_rpm, right_rpm

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_set_rpm_with_limit function
	++++++++++++++++++++++++++++++++++++++++++
	Set rpm with limit.
	"""
	def zlac8015d_set_rpm_with_limit(self, left_rpm, right_rpm):
		if (-int(self.obj["max_left_rpm"]) < left_rpm < int(self.obj["max_left_rpm"])) and (-int(self.obj["max_right_rpm"]) < right_rpm < int(self.obj["max_right_rpm"])):
			if (-self.deadband_rpm < left_rpm < self.deadband_rpm):
				left_rpm = 0
			if (-self.deadband_rpm < right_rpm < self.deadband_rpm):
				right_rpm = 0

			self.zlc.set_rpm(int(left_rpm), int(right_rpm))
		else:
			rospy.logerr("Set RPM exceeds the limit.")

	"""
	++++++++++++++++++++++++++++++++++++++++++
		calculate_odometry function
	++++++++++++++++++++++++++++++++++++++++++
	Odometry computation.
	"""
	def calculate_odometry(self):
		self.l_meter, self.r_meter = self.zlc.get_wheels_travelled()
		self.l_meter = self.l_meter - self.l_meter_init
		self.r_meter = (-1 * self.r_meter) - (-1 * self.r_meter_init)
		if self.control_mode == 3:
			vl, vr = self.zlc.get_linear_velocities()
		elif self.control_mode == 1:
			vl = (self.l_meter - self.prev_l_meter) / self.period
			vr = (self.r_meter - self.prev_r_meter) / self.period
		
		round_vl = float(Decimal(str(vl)).quantize(Decimal(self.obj["decimil_coefficient"]), rounding = ROUND_HALF_UP))
		round_vr = float(Decimal(str(vr)).quantize(Decimal('0.001'), rounding = ROUND_HALF_UP))
        self.obj["TF_header_flame"]
	
		#-----Rotatiing-----
		if ((round_vl * round_vr) < 0.0) and (abs(round_vl) == abs(round_vr)):
			V = 0.0
			Wz = 2.0 * vr / float(self.obj["wheels_base_width"])
			self.theta = self.theta + Wz * self.period
			path = "Rotatiing"
		
		#-----Curving-----
		elif (round_vl - round_vr) != 0:
			V = (vl + vr) / 2.0
			Wz = (vr - vl) / float(self.obj["wheels_base_width"])
			R_ICC = (float(self.obj["wheels_base_width"]) / 2.0) * ((vr + vl) / (vr - vl))
			self.x = self.x - R_ICC * np.sin(self.theta) + R_ICC * np.sin(self.theta + Wz * self.period)
			self.y = self.y + R_ICC * np.cos(self.theta) - R_ICC * np.cos(self.theta + Wz * self.period)
			self.theta = self.theta + Wz * self.period
			path = "Curving"
			
		#-----Going straight-----
		else:
			V = (vl + vr)/2.0
			Wz = 0.0
			self.x = self.x + V * np.cos(self.theta) * self.period
			self.y = self.y + V * np.sin(self.theta) * self.period
			path = "Going straight"

		#-----Construct tf-----
		q = quaternion_from_euler(0, 0, self.theta)
		self.t.header.stamp = rospy.Time.now()
		self.t.header.frame_id = self.obj["TF_header_flame"]
		self.t.child_frame_id = self.obj["TF_child_flame"]
		self.t.transform.translation.x = self.x
		self.t.transform.translation.y = self.y
		self.t.transform.translation.z = 0.0
		self.t.transform.rotation.x = q[0]
		self.t.transform.rotation.y = q[1]
		self.t.transform.rotation.z = q[2]
		self.t.transform.rotation.w = q[3]
		self.br.sendTransform(self.t)

		#-----Construct odom message-----
		self.odom_msg.header.stamp = rospy.Time.now()
		self.odom_msg.header.frame_id = self.obj["odom_header_flame"]
		self.odom_msg.child_frame_id = self.obj["odom_child_flame"]
		self.odom_msg.pose.pose.position.x = self.x
		self.odom_msg.pose.pose.position.y = self.y
		self.odom_msg.pose.pose.position.z = 0.0
		self.odom_msg.pose.pose.orientation.x = q[0]
		self.odom_msg.pose.pose.orientation.y = q[1]
		self.odom_msg.pose.pose.orientation.z = q[2]
		self.odom_msg.pose.pose.orientation.w = q[3]
		self.odom_msg.pose.covariance[0] = 0.0001
		self.odom_msg.pose.covariance[7] = 0.0001
		self.odom_msg.pose.covariance[14] = 0.000001
		self.odom_msg.pose.covariance[21] = 0.000001
		self.odom_msg.pose.covariance[28] = 0.000001
		self.odom_msg.pose.covariance[35] = 0.0001
		self.odom_msg.twist.twist.linear.x = V
		self.odom_msg.twist.twist.linear.y = 0.0
		self.odom_msg.twist.twist.angular.z = Wz
		self.odom_pub.publish(self.odom_msg)
		
		return vl, vr

	"""
	++++++++++++++++++++++++++++++++++++++++++
		control_loop function
	++++++++++++++++++++++++++++++++++++++++++
	Control loop.
	"""
	def control_loop(self):
		rate = rospy.Rate(20) # 20Hz
		while True:
			if rospy.is_shutdown():
				self.zlc.disable_motor()
				break
				
			start_time = time.perf_counter()

			#-----Speed rpm control mode-----
			if self.control_mode == 3:
				if self.got_twist_cmd:
					self.left_rpm_cmd, self.right_rpm_cmd = self.twist_to_rpm(self.linear_vel_cmd, self.angular_vel_cmd)
					self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
					self.got_twist_cmd = False
				elif self.got_vel_cmd:
					self.left_rpm_cmd, self.right_rpm_cmd = self.vel_to_rpm(self.left_vel_cmd, self.right_vel_cmd)
					self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
					self.got_vel_cmd = False
				elif self.got_vel_rpm_cmd:
					self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
					self.got_vel_rpm_cmd = False
				elif (time.perf_counter() - self.last_subscribed_time > self.callback_timeout):
					self.left_rpm_cmd = 0.0
					self.right_rpm_cmd = 0.0
					self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
				
				fb_L_rpm, fb_R_rpm = self.zlc.get_rpm()
				self.wheels_rpm_msg.data = [fb_L_rpm, fb_R_rpm]
				self.wheels_rpm_pub.publish(self.wheels_rpm_msg)
			
			#-----relative position control mode-----
			elif self.control_mode == 1:
				if self.got_pos_deg_cmd:
					self.zlc.set_relative_angle(self.left_pos_deg_cmd ,self.right_pos_deg_cmd)
					self.zlc.move_left_wheel()
					self.zlc.move_right_wheel()
					self.got_pos_deg_cmd = False
				elif self.got_pos_dist_cmd:
					left_cmd_deg = (self.left_pos_dist_cmd * 360.0) / self.zlc.travel_in_one_rev
					right_cmd_deg = (-self.right_pos_dist_cmd * 360.0) / self.zlc.travel_in_one_rev
					self.zlc.set_relative_angle(left_cmd_deg, right_cmd_deg)
					self.zlc.move_left_wheel()
					self.zlc.move_right_wheel()
					self.got_pos_dist_cmd = False

			#-----Odometry computation-----
			vl, vr = self.calculate_odometry()
	
			self.period = time.perf_counter() - start_time
			self.prev_l_meter = self.l_meter
			self.prev_r_meter = self.r_meter
			
			#-----Logging to screen-----
			if self.control_mode == 1:
				print("\033[32m" + "\rcontrol_mode: {:d} | l_meter: {:.2f} | r_meter: {:.2f} | VL: {:.3f} | VR: {:.3f}".format(\
				self.control_mode, self.l_meter, self.r_meter, vl, vr) + "\033[0m", end = "")
			else:
				print("\033[32m" + "\rcontrol_mode: {:d} | left_rpm: {:.2f} | right_rpm: {:.2f} | l_meter: {:.2f} | r_meter: {:.2f} | VL: {:.3f} | VR: {:.3f}".format(\
				self.control_mode, fb_L_rpm, -fb_R_rpm, self.l_meter, self.r_meter, vl, vr) + "\033[0m", end = "")
			
			rate.sleep()

"""
++++++++++++++++++++++++++++++++++++++++++
	main function
++++++++++++++++++++++++++++++++++++++++++
Main.
"""
def main():
	rospy.init_node("motor_driver_node")
	MDN = MotorDriverNode()
	MDN.control_loop()
	rospy.spin()

if __name__ == "__main__":
	main()