#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################################ # Copyright 2018 ROBOTIS CO., LTD. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # Author: Ryu Woon Jung (Leon), Gilbert, Ashe Kim import rospy import math import tf from enum import Enum from std_msgs.msg import UInt8 from geometry_msgs.msg import Twist, PoseStamped, PoseWithCovarianceStamped from nav_msgs.msg import Odometry from move_base_msgs.msg import MoveBaseActionResult from turtlebot3_autorace_msgs.msg import MovingParam class DetectTunnel(): def __init__(self): # subscribes state self.sub_tunnel_order = rospy.Subscriber('/detect/tunnel_order', UInt8, self.cbTunnelOrder, queue_size=1) self.sub_moving_completed = rospy.Subscriber('/control/moving/complete', UInt8, self.cbMovingComplete, queue_size = 1) self.sub_arrival_status = rospy.Subscriber("/move_base/result", MoveBaseActionResult, self.cbGetNavigationResult, queue_size=1) self.sub_odom = rospy.Subscriber('/odom', Odometry, self.cbOdom, queue_size=1) # publishes state self.pub_tunnel_return = rospy.Publisher('/detect/tunnel_stamped', UInt8, queue_size=1) self.pub_moving = rospy.Publisher('/control/moving/state', MovingParam, queue_size= 1) self.pub_goal_pose_stamped = rospy.Publisher("/move_base_simple/goal", PoseStamped, queue_size=1) self.StepOfTunnel = Enum('StepOfTunnel', 'searching_tunnel_sign go_in_to_tunnel navigation go_out_from_tunnel exit') self.is_navigation_finished = False self.is_tunnel_finished = False self.is_moving_complete = False self.last_current_theta = 0.0 def cbGetNavigationResult(self, msg_nav_result): if msg_nav_result.status.status == 3: rospy.loginfo("Reached") self.is_navigation_finished = True def cbMovingComplete(self, data): self.is_moving_complete = True def cbTunnelOrder(self, order): pub_tunnel_return = UInt8() if order.data == self.StepOfTunnel.searching_tunnel_sign.value: rospy.loginfo("Now lane_following") pub_tunnel_return.data = self.StepOfTunnel.searching_tunnel_sign.value elif order.data == self.StepOfTunnel.go_in_to_tunnel.value: rospy.loginfo("Now go_in_to_tunnel") rospy.sleep(1) rospy.loginfo("go straight") msg_moving = MovingParam() msg_moving.moving_type= 4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.2 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go left") msg_moving = MovingParam() msg_moving.moving_type=2 msg_moving.moving_value_angular=45 msg_moving.moving_value_linear=0.0 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go straight") msg_moving = MovingParam() msg_moving.moving_type= 4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.15 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) rospy.loginfo("go_in_to_tunnel finished") pub_tunnel_return.data = self.StepOfTunnel.go_in_to_tunnel.value elif order.data == self.StepOfTunnel.navigation.value: rospy.loginfo("Now navigation") initialPose = PoseWithCovarianceStamped() initialPose.header.frame_id = "map" initialPose.header.stamp = rospy.Time.now() initialPose.pose.pose = self.odom_msg.pose.pose self.fnPubGoalPose() while True: if self.is_navigation_finished == True: break else: pass pub_tunnel_return.data = self.StepOfTunnel.navigation.value rospy.loginfo("navigation!") elif order.data == self.StepOfTunnel.go_out_from_tunnel.value: rospy.loginfo("Now go_out_from_tunnel") rospy.loginfo("go straight") msg_moving = MovingParam() msg_moving.moving_type= 4 msg_moving.moving_value_angular=0.0 msg_moving.moving_value_linear=0.45 self.pub_moving.publish(msg_moving) while True: if self.is_moving_complete == True: break self.is_moving_complete = False rospy.sleep(1) pub_tunnel_return.data = self.StepOfTunnel.go_out_from_tunnel.value elif order.data == self.StepOfTunnel.exit.value: rospy.loginfo("Now exit") pub_tunnel_return.data = self.StepOfTunnel.exit.value self.pub_tunnel_return.publish(pub_tunnel_return) def cbOdom(self, odom_msg): quaternion = (odom_msg.pose.pose.orientation.x, odom_msg.pose.pose.orientation.y, odom_msg.pose.pose.orientation.z, odom_msg.pose.pose.orientation.w) self.current_theta = self.euler_from_quaternion(quaternion) self.odom_msg = odom_msg if (self.current_theta - self.last_current_theta) < -math.pi: self.current_theta = 2. * math.pi + self.current_theta self.last_current_theta = math.pi elif (self.current_theta - self.last_current_theta) > math.pi: self.current_theta = -2. * math.pi + self.current_theta self.last_current_theta = -math.pi else: self.last_current_theta = self.current_theta self.current_pos_x = odom_msg.pose.pose.position.x self.current_pos_y = odom_msg.pose.pose.position.y def euler_from_quaternion(self, quaternion): theta = tf.transformations.euler_from_quaternion(quaternion)[2] return theta def fnPubGoalPose(self): goalPoseStamped = PoseStamped() goalPoseStamped.header.frame_id = "map" goalPoseStamped.header.stamp = rospy.Time.now() goalPoseStamped.pose.position.x = -0.3 goalPoseStamped.pose.position.y = -1.78 goalPoseStamped.pose.position.z = 0.0 goalPoseStamped.pose.orientation.x = 0.0 goalPoseStamped.pose.orientation.y = 0.0 goalPoseStamped.pose.orientation.z = 0.0 goalPoseStamped.pose.orientation.w = 1.0 self.pub_goal_pose_stamped.publish(goalPoseStamped) def cbTunnelFinished(self, tunnel_finished_msg): self.is_tunnel_finished = True def main(self): rospy.spin() if __name__ == '__main__': rospy.init_node('detect_tunnel') node = DetectTunnel() node.main()