#!/usr/bin/env python # -*- coding: utf-8 -*- ################################################################################ # Copyright 2018 ROBOTIS CO., LTD. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ # Authors: Leon Jung, [AuTURBO] Kihoon Kim (https://github.com/auturbo), Gilbert, Ashe Kim import rospy, roslaunch import subprocess import os import sys from enum import Enum from std_msgs.msg import UInt8, Float64 class CoreNodeController(): def __init__(self): self.ros_package_path = os.path.dirname(os.path.realpath(__file__)) self.ros_package_path = self.ros_package_path.replace('turtlebot3_autorace_core/nodes', '') # subscribes : status returned self.sub_tunnel_stamped = rospy.Subscriber('/detect/tunnel_stamped', UInt8, self.cbTunnelStamped, queue_size=1) self.sub_mode_control = rospy.Subscriber('/core/decided_mode', UInt8, self.cbReceiveMode, queue_size=1) # publishes orders self.pub_tunnel_order = rospy.Publisher('/detect/tunnel_order', UInt8, queue_size=1) self.pub_mode_return = rospy.Publisher('/core/returned_mode', UInt8, queue_size=1) self.CurrentMode = Enum('CurrentMode', 'idle lane_following tunnel') self.current_mode = self.CurrentMode.idle.value self.StepOfTunnel = Enum('StepOfTunnel', 'searching_tunnel_sign go_in_to_tunnel navigation go_out_from_tunnel exit') self.current_step_tunnel = self.StepOfTunnel.searching_tunnel_sign.value self.Launcher = Enum('Launcher', 'launch_camera_ex_calib launch_detect_sign launch_detect_lane launch_detect_tunnel launch_control_lane launch_control_tunnel launch_control_moving') self.uuid = roslaunch.rlutil.get_or_generate_uuid(None, False) self.launch_camera_launched = False self.launch_detect_sign_launched = False self.launch_detect_lane_launched = False self.launch_detect_tunnel_launched = False self.launch_control_lane_launched = False self.launch_control_tunnel_launched = False self.launch_control_moving_launched = False self.is_triggered = False loop_rate = rospy.Rate(10) # 10hz while not rospy.is_shutdown(): if self.is_triggered == True: self.fnControlNode() loop_rate.sleep() def cbReceiveMode(self, mode_msg): rospy.loginfo("starts the progress with %d", mode_msg.data) self.current_mode = mode_msg.data self.is_triggered = True # Which step is in Tunnel def cbTunnelStamped(self, tunnel_msg): rospy.loginfo("Tunnel Step changed from %d", self.current_step_tunnel) self.current_step_tunnel = tunnel_msg.data rospy.loginfo("into %d", self.current_step_tunnel) if self.current_step_tunnel == self.StepOfTunnel.searching_tunnel_sign.value: self.current_mode = self.CurrentMode.tunnel.value msg_mode_return = UInt8 msg_mode_return.data = self.current_mode self.pub_mode_return.publish(msg_mode_return) self.is_triggered = True def fnControlNode(self): # lane_following if self.current_mode == self.CurrentMode.lane_following.value: rospy.loginfo("New trigger for lane_following") self.fnLaunch(self.Launcher.launch_camera_ex_calib.value, True) self.fnLaunch(self.Launcher.launch_detect_lane.value, True) self.fnLaunch(self.Launcher.launch_detect_sign.value, True) self.fnLaunch(self.Launcher.launch_detect_tunnel.value, False) self.fnLaunch(self.Launcher.launch_control_lane.value, True) self.fnLaunch(self.Launcher.launch_control_tunnel.value, False) self.fnLaunch(self.Launcher.launch_control_moving.value, False) # tunnel elif self.current_mode == self.CurrentMode.tunnel.value: rospy.loginfo("New trigger for tunnel") msg_pub_tunnel_order = UInt8() if self.current_step_tunnel == self.StepOfTunnel.searching_tunnel_sign.value: rospy.loginfo("Current step : searching_tunnel_sign") rospy.loginfo("Go to next step : go_in_to_tunnel") msg_pub_tunnel_order.data = self.StepOfTunnel.go_in_to_tunnel.value self.fnLaunch(self.Launcher.launch_camera_ex_calib.value, False) self.fnLaunch(self.Launcher.launch_detect_lane.value, False) self.fnLaunch(self.Launcher.launch_detect_tunnel.value, True) self.fnLaunch(self.Launcher.launch_detect_sign.value, False) self.fnLaunch(self.Launcher.launch_control_lane.value, False) self.fnLaunch(self.Launcher.launch_control_tunnel.value, False) self.fnLaunch(self.Launcher.launch_control_moving.value, True) elif self.current_step_tunnel == self.StepOfTunnel.go_in_to_tunnel.value: rospy.loginfo("Current step : go_in_to_tunnel") rospy.loginfo("Go to next step : navigation") msg_pub_tunnel_order.data = self.StepOfTunnel.navigation.value self.fnLaunch(self.Launcher.launch_camera_ex_calib.value, False) self.fnLaunch(self.Launcher.launch_detect_lane.value, False) self.fnLaunch(self.Launcher.launch_detect_tunnel.value, True) self.fnLaunch(self.Launcher.launch_detect_sign.value, False) self.fnLaunch(self.Launcher.launch_control_lane.value, False) self.fnLaunch(self.Launcher.launch_control_tunnel.value, True) self.fnLaunch(self.Launcher.launch_control_moving.value, False) elif self.current_step_tunnel == self.StepOfTunnel.navigation.value: rospy.loginfo("Current step : navigation") rospy.loginfo("Go to next step : go_out_from_tunnel") msg_pub_tunnel_order.data = self.StepOfTunnel.go_out_from_tunnel.value self.fnLaunch(self.Launcher.launch_camera_ex_calib.value, True) self.fnLaunch(self.Launcher.launch_detect_sign.value, False) self.fnLaunch(self.Launcher.launch_detect_lane.value, False) self.fnLaunch(self.Launcher.launch_detect_tunnel.value, True) self.fnLaunch(self.Launcher.launch_control_lane.value, False) self.fnLaunch(self.Launcher.launch_control_tunnel.value, False) self.fnLaunch(self.Launcher.launch_control_moving.value, True) elif self.current_step_tunnel == self.StepOfTunnel.go_out_from_tunnel.value: rospy.loginfo("Current step : go_out_from_tunnel") rospy.loginfo("Go to next step : exit") msg_pub_tunnel_order.data = self.StepOfTunnel.exit.value self.fnLaunch(self.Launcher.launch_camera_ex_calib.value, True) self.fnLaunch(self.Launcher.launch_detect_tunnel.value, False) self.fnLaunch(self.Launcher.launch_detect_lane.value, True) self.fnLaunch(self.Launcher.launch_detect_sign.value, True) self.fnLaunch(self.Launcher.launch_control_lane.value, True) self.fnLaunch(self.Launcher.launch_control_tunnel.value, False) self.fnLaunch(self.Launcher.launch_control_moving.value, False) elif self.current_step_tunnel == self.StepOfTunnel.exit.value: rospy.loginfo("Current step : exit") rospy.loginfo("Go to next step : searching_tunnel_sign") msg_pub_tunnel_order.data = self.StepOfTunnel.searching_tunneROSLaunchParentvalue self.fnLaunch(self.Launcher.launch_camera_ex_calib.value, True) self.fnLaunch(self.Launcher.launch_detect_tunnel.value, False) self.fnLaunch(self.Launcher.launch_detect_lane.value, True) self.fnLaunch(self.Launcher.launch_detect_sign.value, True) self.fnLaunch(self.Launcher.launch_control_lane.value, True) self.fnLaunch(self.Launcher.launch_control_tunnel.value, False) self.fnLaunch(self.Launcher.launch_control_moving.value, False) rospy.sleep(2) self.pub_tunnel_order.publish(msg_pub_tunnel_order) self.is_triggered = False def fnLaunch(self, launch_num, is_start): if launch_num == self.Launcher.launch_camera_ex_calib.value: if is_start == True: if self.launch_camera_launched == False: self.launch_camera = roslaunch.scriptapi.ROSLaunch() self.launch_camera = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_camera/launch/extrinsic_camera_calibration.launch"]) self.launch_camera_launched = True self.launch_camera.start() else: pass else: if self.launch_camera_launched == True: self.launch_camera_launched = False self.launch_camera.shutdown() else: pass elif launch_num == self.Launcher.launch_detect_sign.value: if is_start == True: if self.launch_detect_sign_launched == False: self.launch_detect_sign = roslaunch.scriptapi.ROSLaunch() self.launch_detect_sign = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_detect/launch/detect_sign_tunnel.launch"]) self.launch_detect_sign_launched = True self.launch_detect_sign.start() else: pass else: if self.launch_detect_sign_launched == True: self.launch_detect_sign_launched = False self.launch_detect_sign.shutdown() else: pass elif launch_num == self.Launcher.launch_detect_lane.value: if is_start == True: if self.launch_detect_lane_launched == False: self.launch_detect_lane = roslaunch.scriptapi.ROSLaunch() self.launch_detect_lane = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_detect/launch/detect_lane.launch"]) self.launch_detect_lane_launched = True self.launch_detect_lane.start() else: pass else: if self.launch_detect_lane_launched == True: self.launch_detect_lane_launched = False self.launch_detect_lane.shutdown() else: pass elif launch_num == self.Launcher.launch_detect_tunnel.value: if is_start == True: if self.launch_detect_tunnel_launched == False: self.launch_detect_tunnel = roslaunch.scriptapi.ROSLaunch() self.launch_detect_tunnel = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_detect/launch/detect_tunnel.launch"]) self.launch_detect_tunnel_launched = True self.launch_detect_tunnel.start() else: pass else: if self.launch_detect_tunnel_launched == True: self.launch_detect_tunnel_launched = False self.launch_detect_tunnel.shutdown() else: pass elif launch_num == self.Launcher.launch_control_lane.value: if is_start == True: if self.launch_control_lane_launched == False: self.launch_control_lane = roslaunch.scriptapi.ROSLaunch() self.launch_control_lane = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_driving/launch/turtlebot3_autorace_control_lane.launch"]) self.launch_control_lane_launched = True self.launch_control_lane.start() else: pass else: if self.launch_control_lane_launched == True: self.launch_control_lane_launched = False self.launch_control_lane.shutdown() else: pass elif launch_num == self.Launcher.launch_control_tunnel.value: if is_start == True: if self.launch_control_tunnel_launched == False: self.launch_control_tunnel = roslaunch.scriptapi.ROSLaunch() self.launch_control_tunnel = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_driving/launch/turtlebot3_autorace_control_tunnel.launch"]) self.launch_control_tunnel_launched = True self.launch_control_tunnel.start() else: pass else: if self.launch_control_tunnel_launched == True: self.launch_control_tunnel_launched = False self.launch_control_tunnel.shutdown() else: pass elif launch_num == self.Launcher.launch_control_moving.value: if is_start == True: if self.launch_control_moving_launched == False: self.launch_control_moving = roslaunch.scriptapi.ROSLaunch() self.launch_control_moving = roslaunch.parent.ROSLaunchParent(self.uuid, [self.ros_package_path + "turtlebot3_autorace_driving/launch/turtlebot3_autorace_control_moving.launch"]) self.launch_control_moving_launched = True self.launch_control_moving.start() else: pass else: if self.launch_control_moving_launched == True: self.launch_control_moving_launched = False self.launch_control_moving.shutdown() pass def main(self): rospy.spin() if __name__ == '__main__': rospy.init_node('core_node_controller') node = CoreNodeController() node.main()