Newer
Older
orange2022 / src / zlac8015d_ros / scripts / motor_driver_node.py
@koki koki on 20 Sep 2022 18 KB update
#!/usr/bin/env python3
"""
Copyright (C) 2022  rasheeddo
Copyright (C) 2022  Alpaca-zip

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

import rospy
import ZLAC8015D
import tf
import tf2_ros
import numpy as np
import time
from decimal import Decimal, ROUND_HALF_UP
from std_msgs.msg import Float32MultiArray
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from tf.transformations import quaternion_from_euler
from geometry_msgs.msg import TransformStamped

class MotorDriverNode:
	def __init__(self):
		print("\033[32m" + "Start motor driver node!!" + "\033[0m")
		print("#########################")
		#-----Initialize publisher-----
		self.odom_pub = rospy.Publisher("/odom", Odometry, queue_size=10)
		self.odom_msg = Odometry()

		#-----Initialize subscriber-----
		rospy.Subscriber(rospy.get_param("/motor_driver_node/twist_cmd_vel_topic"), Twist, self.zlac8015d_twist_cmd_callback)
		rospy.Subscriber(rospy.get_param("/motor_driver_node/cmd_vel_topic"), Float32MultiArray, self.zlac8015d_vel_cmd_callback)
		rospy.Subscriber(rospy.get_param("/motor_driver_node/cmd_rpm_topic"), Float32MultiArray, self.zlac8015d_rpm_cmd_callback)
		rospy.Subscriber(rospy.get_param("/motor_driver_node/cmd_deg_topic"), Float32MultiArray, self.zlac8015d_deg_cmd_callback)
		rospy.Subscriber(rospy.get_param("/motor_driver_node/cmd_dist_topic"), Float32MultiArray, self.zlac8015d_dist_cmd_callback)
		#-For E-stop-
		rospy.Subscriber("/estop", Bool, self.zlac8015d_estop_callback)
		
		#-----Initialize ZLAC8015D-----
		self.zlc = ZLAC8015D.Controller(rospy.get_param("/motor_driver_node/port"))
		self.zlc.cpr = int(rospy.get_param("/motor_driver_node/cpr"))
		self.zlc.left_wheel_radius = float(rospy.get_param("/motor_driver_node/left_wheel_radius"))
		self.zlc.right_wheel_radius = float(rospy.get_param("/motor_driver_node/right_wheel_radius"))
		self.zlc.left_circumference = self.zlc.left_wheel_radius * 2 * np.pi
		self.zlc.right_circumference = self.zlc.right_wheel_radius * 2 * np.pi
		self.zlc.computation_left_wheel_radius = float(rospy.get_param("/motor_driver_node/computation_left_wheel_radius"))
		self.zlc.computation_right_wheel_radius = float(rospy.get_param("/motor_driver_node/computation_right_wheel_radius"))
		
		
		self.control_mode = int(rospy.get_param("/motor_driver_node/control_mode"))
		if (self.control_mode == 3):
			self.zlac8015d_speed_mode_init()
		elif (self.control_mode == 1):
			self.zlac8015d_position_mode_init()
		
		#-----Initialize variable-----
		self.last_subscribed_time = 0.0
		self.callback_timeout = float(rospy.get_param("/motor_driver_node/callback_timeout"))
		self.deadband_rpm = int(rospy.get_param("/motor_driver_node/deadband_rpm"))
		self.linear_vel_cmd = 0.0
		self.angular_vel_cmd = 0.0
		self.got_twist_cmd = False
		
		self.left_vel_cmd = 0.0
		self.right_vel_cmd = 0.0
		self.got_vel_cmd = False

		self.left_rpm_cmd = 0.0
		self.right_rpm_cmd = 0.0
		self.got_vel_rpm_cmd = False

		self.left_pos_deg_cmd  = 0.0
		self.right_pos_deg_cmd  = 0.0
		self.got_pos_deg_cmd = False

		self.left_pos_dist_cmd = 0.0
		self.right_pos_dist_cmd = 0.0
		self.got_pos_dist_cmd = False
		
		self.theta = 0.0
		self.x = 0.0
		self.y = 0.0
		self.Wr = 0.0
		self.Wl = 0.0
		self.period = 0.05
		self.l_meter = 0.0
		self.r_meter = 0.0
		self.l_meter_init, self.r_meter_init = self.zlc.get_wheels_travelled()
		self.prev_l_meter = 0.0
		self.prev_r_meter = 0.0
		self.wheels_base_width = float(rospy.get_param("/motor_driver_node/wheels_base_width"))

		self.br = tf2_ros.TransformBroadcaster()
		self.t = TransformStamped()
		self.state_vector = np.zeros((3, 1))
		
		#-For E-stop-
		self.estop = False
		
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_estop_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/estop".
	"""
	def zlac8015d_estop_callback(self, msg):
		self.estop = msg.data
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_twist_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/twist/cmd_vel".
	"""
	def zlac8015d_twist_cmd_callback(self, msg):
		self.linear_vel_cmd = msg.linear.x
		self.angular_vel_cmd = msg.angular.z
		self.got_twist_cmd = True
		self.last_subscribed_time = time.perf_counter()
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_vel_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/vel/cmd_vel".
	"""
	def zlac8015d_vel_cmd_callback(self, msg):
		self.left_vel_cmd = msg.data[0]
		self.right_vel_cmd = -msg.data[1]
		self.got_vel_cmd = True
		self.last_subscribed_time = time.perf_counter()

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_rpm_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/vel/cmd_rpm".
	"""
	def zlac8015d_rpm_cmd_callback(self, msg):
		self.left_rpm_cmd = msg.data[0]
		self.right_rpm_cmd = -msg.data[1]
		self.got_vel_rpm_cmd = True
		self.last_subscribed_time = time.perf_counter()

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_deg_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/pos/cmd_deg".
	"""
	def zlac8015d_deg_cmd_callback(self, msg):
		self.left_pos_deg_cmd = msg.data[0]
		self.right_pos_deg_cmd = -msg.data[1]
		self.got_pos_deg_cmd = True

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_dist_cmd_callback function
	++++++++++++++++++++++++++++++++++++++++++
	Callback function to subscribe for "/zlac8015d/pos/cmd_dist".
	"""
	def zlac8015d_dist_cmd_callback(self, msg):
		self.left_pos_dist_cmd = msg.data[0]
		self.right_pos_dist_cmd = msg.data[1]
		self.got_pos_dist_cmd = True

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_speed_mode_init function
	++++++++++++++++++++++++++++++++++++++++++
	Initialization of speed rpm control mode.
	"""
	def zlac8015d_speed_mode_init(self):
		self.zlc.disable_motor()
		self.zlc.set_accel_time(int(rospy.get_param("/motor_driver_node/set_accel_time_left")), int(rospy.get_param("/motor_driver_node/set_accel_time_right")))
		self.zlc.set_decel_time(int(rospy.get_param("/motor_driver_node/set_decel_time_left")), int(rospy.get_param("/motor_driver_node/set_accel_time_right")))
		self.zlc.set_mode(3)
		self.zlc.enable_motor()

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_position_mode_init function
	++++++++++++++++++++++++++++++++++++++++++
	Initialization of relative position control mode.
	"""
	def zlac8015d_position_mode_init(self):
		self.zlc.disable_motor()
		self.zlc.set_accel_time(int(rospy.get_param("/motor_driver_node/set_accel_time_left")), int(rospy.get_param("/motor_driver_node/set_accel_time_right")))
		self.zlc.set_decel_time(int(rospy.get_param("/motor_driver_node/set_decel_time_left")), int(rospy.get_param("/motor_driver_node/set_decel_time_right")))
		self.zlc.set_mode(1)
		self.zlc.set_position_async_control()
		self.zlc.set_maxRPM_pos(int(rospy.get_param("/motor_driver_node/max_left_rpm")), int(rospy.get_param("/motor_driver_node/max_right_rpm")))
		self.zlc.enable_motor()
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		twist_to_rpm function
	++++++++++++++++++++++++++++++++++++++++++
	Convert from twist to rpm.
	"""
	def twist_to_rpm(self, linear_vel, angular_vel):
		left_vel = linear_vel - self.wheels_base_width / 2 * angular_vel
		right_vel = linear_vel + self.wheels_base_width / 2 * angular_vel
		left_rpm, right_rpm = self.vel_to_rpm(left_vel, -right_vel)
		return left_rpm, right_rpm
	
	"""
	++++++++++++++++++++++++++++++++++++++++++
		vel_to_rpm function
	++++++++++++++++++++++++++++++++++++++++++
	Convert from speed to rpm.
	"""
	def vel_to_rpm(self, left_vel, right_vel):
		left_rpm = 60 * left_vel / (2 * np.pi * self.zlc.left_wheel_radius)
		right_rpm = 60 * right_vel / (2 * np.pi * self.zlc.right_wheel_radius)
		return left_rpm, right_rpm

	"""
	++++++++++++++++++++++++++++++++++++++++++
		zlac8015d_set_rpm_with_limit function
	++++++++++++++++++++++++++++++++++++++++++
	Set rpm with limit.
	"""
	def zlac8015d_set_rpm_with_limit(self, left_rpm, right_rpm):
        	left_rpm_lim = int(rospy.get_param("/motor_driver_node/max_left_rpm"))
        	right_rpm_lim = int(rospy.get_param("/motor_driver_node/max_right_rpm"))
        	if ((-left_rpm_lim < left_rpm < left_rpm_lim) and (-right_rpm_lim < right_rpm < right_rpm_lim)):
        		if (-self.deadband_rpm < left_rpm < self.deadband_rpm):
        			left_rpm = 0
        		if (-self.deadband_rpm < right_rpm < self.deadband_rpm):
        			right_rpm = 0
        			
        		self.zlc.set_rpm(int(left_rpm), int(right_rpm))
        	else:
        		rospy.logerr("Set RPM exceeds the limit.")

	"""
	++++++++++++++++++++++++++++++++++++++++++
		calculate_odometry function
	++++++++++++++++++++++++++++++++++++++++++
	Odometry computation.
	"""
	def calculate_odometry(self):
		"""
		l_rpm, r_rpm = self.zlc.get_rpm()
		self.Wl = self.zlc.rpm_to_radPerSec(l_rpm)
		self.Wr = self.zlc.rpm_to_radPerSec(-r_rpm)
		"""
		self.l_meter, self.r_meter = self.zlc.get_wheels_travelled()
		self.l_meter = self.l_meter - self.l_meter_init
		self.r_meter = (-1 * self.r_meter) - (-1 * self.r_meter_init)
		vl = (self.l_meter - self.prev_l_meter) / self.period
		vr = (self.r_meter - self.prev_r_meter) / self.period
		self.Wl = vl / self.zlc.computation_left_wheel_radius
		self.Wr = vr / self.zlc.computation_right_wheel_radius
		
		matrix = np.array([[self.zlc.computation_right_wheel_radius / 2, self.zlc.computation_left_wheel_radius / 2], [self.zlc.computation_right_wheel_radius / self.wheels_base_width, -self.zlc.computation_left_wheel_radius / self.wheels_base_width]])
		vector = np.array([[self.Wr], [self.Wl]])
		input_vector = np.dot(matrix, vector)
		V = input_vector[0, 0]
		Wz = input_vector[1, 0]
		
		out_matrix = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
		dirc_matrix = np.array([[np.cos(self.state_vector[2, 0]) * self.period, 0], [np.sin(self.state_vector[2, 0]) * self.period, 0], [0, self.period]])
		out_vector = np.dot(out_matrix, self.state_vector) + np.dot(dirc_matrix, input_vector)
		self.x = out_vector[0, 0]
		self.y = out_vector[1, 0]
		self.theta = out_vector[2, 0]
		
		#-----Construct tf-----
		if(rospy.get_param("/motor_driver_node/publish_TF")):
			self.t.header.stamp = rospy.Time.now()
			self.t.header.frame_id = rospy.get_param("/motor_driver_node/TF_header_frame")
			self.t.child_frame_id = rospy.get_param("/motor_driver_node/TF_child_frame")
			self.t.transform.translation.x = self.x
			self.t.transform.translation.y = self.y
			self.t.transform.translation.z = 0.0
			rotation = tf.transformations.quaternion_from_euler(0, 0, self.theta)
			self.t.transform.rotation.x = rotation[0]
			self.t.transform.rotation.y = rotation[1]
			self.t.transform.rotation.z = rotation[2]
			self.t.transform.rotation.w = rotation[3]
			self.br.sendTransform(self.t)
			
		#-----Construct odom message-----
		if(rospy.get_param("/motor_driver_node/publish_odom")):
			self.odom_msg.header.stamp = rospy.Time.now()
			self.odom_msg.header.frame_id = rospy.get_param("/motor_driver_node/odom_header_frame")
			self.odom_msg.child_frame_id = rospy.get_param("/motor_driver_node/odom_child_frame")
			self.odom_msg.pose.pose.position.x = self.x
			self.odom_msg.pose.pose.position.y = self.y
			self.odom_msg.pose.pose.position.z = 0.0
			orientation = tf.transformations.quaternion_from_euler(0, 0, self.theta)
			self.odom_msg.pose.pose.orientation.x = orientation[0]
			self.odom_msg.pose.pose.orientation.y = orientation[1]
			self.odom_msg.pose.pose.orientation.z = orientation[2]
			self.odom_msg.pose.pose.orientation.w = orientation[3]
			self.odom_msg.pose.covariance[0] = 0.0001
			self.odom_msg.pose.covariance[7] = 0.0001
			self.odom_msg.pose.covariance[14] = 0.000001
			self.odom_msg.pose.covariance[21] = 0.000001
			self.odom_msg.pose.covariance[28] = 0.000001
			self.odom_msg.pose.covariance[35] = 0.0001
			self.odom_msg.twist.twist.linear.x = V
			self.odom_msg.twist.twist.linear.y = 0.0
			self.odom_msg.twist.twist.angular.z = Wz
			self.odom_pub.publish(self.odom_msg)
		
		self.state_vector[0, 0] = self.x
		self.state_vector[1, 0] = self.y
		self.state_vector[2, 0] = self.theta
	
	"""
	def calculate_odometry(self):
		self.l_meter, self.r_meter = self.zlc.get_wheels_travelled()
		self.l_meter = self.l_meter - self.l_meter_init
		self.r_meter = (-1 * self.r_meter) - (-1 * self.r_meter_init)
		if (self.control_mode == 3):
			vl, vr = self.zlc.get_linear_velocities()
		elif (self.control_mode == 1):
			vl = (self.l_meter - self.prev_l_meter) / self.period
			vr = (self.r_meter - self.prev_r_meter) / self.period
		
		round_vl = float(Decimal(str(vl)).quantize(Decimal(str(rospy.get_param("/motor_driver_node/decimil_coefficient"))), rounding = ROUND_HALF_UP))
		round_vr = float(Decimal(str(vr)).quantize(Decimal(str(rospy.get_param("/motor_driver_node/decimil_coefficient"))), rounding = ROUND_HALF_UP))
	
		#-----Rotatiing-----
		if ((round_vl * round_vr) < 0.0) and (abs(round_vl) == abs(round_vr)):
			V = 0.0
			Wz = 2.0 * vr / self.wheels_base_width
			self.theta = self.theta + Wz * self.period
			path = "Rotatiing"
		
		#-----Curving-----
		elif ((round_vl - round_vr) != 0):
			V = (vl + vr) / 2.0
			Wz = (vr - vl) / self.wheels_base_width
			R_ICC = (self.wheels_base_width / 2.0) * ((vr + vl) / (vr - vl))
			self.x = self.x - R_ICC * np.sin(self.theta) + R_ICC * np.sin(self.theta + Wz * self.period)
			self.y = self.y + R_ICC * np.cos(self.theta) - R_ICC * np.cos(self.theta + Wz * self.period)
			self.theta = self.theta + Wz * self.period
			path = "Curving"
			
		#-----Going straight-----
		else:
			V = (vl + vr)/2.0
			Wz = 0.0
			self.x = self.x + V * np.cos(self.theta) * self.period
			self.y = self.y + V * np.sin(self.theta) * self.period
			path = "Going straight"

		#-----Construct tf-----
		q = quaternion_from_euler(0, 0, self.theta)
		if(rospy.get_param("/motor_driver_node/publish_TF")):
			self.t.header.stamp = rospy.Time.now()
			self.t.header.frame_id = rospy.get_param("/motor_driver_node/TF_header_frame")
			self.t.child_frame_id = rospy.get_param("/motor_driver_node/TF_child_frame")
			self.t.transform.translation.x = self.x
			self.t.transform.translation.y = self.y
			self.t.transform.translation.z = 0.0
			self.t.transform.rotation.x = q[0]
			self.t.transform.rotation.y = q[1]
			self.t.transform.rotation.z = q[2]
			self.t.transform.rotation.w = q[3]
			self.br.sendTransform(self.t)

		#-----Construct odom message-----
		if(rospy.get_param("/motor_driver_node/publish_odom")):
			self.odom_msg.header.stamp = rospy.Time.now()
			self.odom_msg.header.frame_id = rospy.get_param("/motor_driver_node/odom_header_frame")
			self.odom_msg.child_frame_id = rospy.get_param("/motor_driver_node/odom_child_frame")
			self.odom_msg.pose.pose.position.x = self.x
			self.odom_msg.pose.pose.position.y = self.y
			self.odom_msg.pose.pose.position.z = 0.0
			self.odom_msg.pose.pose.orientation.x = q[0]
			self.odom_msg.pose.pose.orientation.y = q[1]
			self.odom_msg.pose.pose.orientation.z = q[2]
			self.odom_msg.pose.pose.orientation.w = q[3]
			self.odom_msg.pose.covariance[0] = 0.0001
			self.odom_msg.pose.covariance[7] = 0.0001
			self.odom_msg.pose.covariance[14] = 0.000001
			self.odom_msg.pose.covariance[21] = 0.000001
			self.odom_msg.pose.covariance[28] = 0.000001
			self.odom_msg.pose.covariance[35] = 0.0001
			self.odom_msg.twist.twist.linear.x = V
			self.odom_msg.twist.twist.linear.y = 0.0
			self.odom_msg.twist.twist.angular.z = Wz
			self.odom_pub.publish(self.odom_msg)
		
		return vl, vr
		"""

	"""
	++++++++++++++++++++++++++++++++++++++++++
		control_loop function
	++++++++++++++++++++++++++++++++++++++++++
	Control loop.
	"""
	def control_loop(self):
		rate = rospy.Rate(20) # 20Hz
		reset_flag = False
		while True:
			if (rospy.is_shutdown()):
				self.zlc.disable_motor()
				break
				
			start_time = time.perf_counter()
			
			if (self.estop == True):
				self.zlc.estop()
				reset_flag = True
			else:
				if(reset_flag == True):
					self.zlc.clear_alarm()
					self.zlc.disable_motor()
					self.zlc.enable_motor()
					reset_flag = False
				#-----Speed rpm control mode-----
				if (self.control_mode == 3):
					if (self.got_twist_cmd):
						self.left_rpm_cmd, self.right_rpm_cmd = self.twist_to_rpm(self.linear_vel_cmd, self.angular_vel_cmd)
						self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
						self.got_twist_cmd = False
					elif (self.got_vel_cmd):
						self.left_rpm_cmd, self.right_rpm_cmd = self.vel_to_rpm(self.left_vel_cmd, self.right_vel_cmd)
						self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
						self.got_vel_cmd = False
					elif (self.got_vel_rpm_cmd):
						self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
						self.got_vel_rpm_cmd = False
					elif (time.perf_counter() - self.last_subscribed_time > self.callback_timeout):
						self.left_rpm_cmd = 0.0
						self.right_rpm_cmd = 0.0
						self.zlac8015d_set_rpm_with_limit(self.left_rpm_cmd, self.right_rpm_cmd)
				
				#-----relative position control mode-----
				elif (self.control_mode == 1):
					if (self.got_pos_deg_cmd):
						self.zlc.set_relative_angle(self.left_pos_deg_cmd ,self.right_pos_deg_cmd)
						self.zlc.move_left_wheel()
						self.zlc.move_right_wheel()
						self.got_pos_deg_cmd = False
					elif (self.got_pos_dist_cmd):
						left_cmd_deg = (self.left_pos_dist_cmd * 360.0) / self.zlc.left_circumference
						right_cmd_deg = (-self.right_pos_dist_cmd * 360.0) / self.zlc.right_circumference
						self.zlc.set_relative_angle(left_cmd_deg, right_cmd_deg)
						self.zlc.move_left_wheel()
						self.zlc.move_right_wheel()
						self.got_pos_dist_cmd = False

			#-----Odometry computation-----
			self.calculate_odometry()
			self.period = time.perf_counter() - start_time
			self.prev_l_meter = self.l_meter
			self.prev_r_meter = self.r_meter
			
			#-----Logging to screen-----
			if (rospy.get_param("/motor_driver_node/debug")):
				print("\033[32m" + "x: {:f} | y: {:f} | yaw: {:f} | Wr: {:f} | Wl: {:f}".format(self.x, self.y, np.rad2deg(self.theta), self.Wr, self.Wl) + "\033[0m")
			else:
				#print("\033[32m" + "Debugging features disable" + "\033[0m")
				pass
			
			rate.sleep()

"""
++++++++++++++++++++++++++++++++++++++++++
	main function
++++++++++++++++++++++++++++++++++++++++++
Main.
"""
def main():
	rospy.init_node("motor_driver_node")
	MDN = MotorDriverNode()
	MDN.control_loop()
	rospy.spin()

if __name__ == "__main__":
	main()