本文整理汇总了Python中rclpy.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了init函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main(args=None):
global logger
rclpy.init(args=args)
node = rclpy.create_node('minimal_action_server')
logger = node.get_logger()
# Use a ReentrantCallbackGroup to enable processing multiple goals concurrently
# Default goal callback accepts all goals
# Default cancel callback rejects cancel requests
action_server = ActionServer(
node,
Fibonacci,
'fibonacci',
execute_callback=execute_callback,
cancel_callback=cancel_callback,
callback_group=ReentrantCallbackGroup())
# Use a MultiThreadedExecutor to enable processing goals concurrently
executor = MultiThreadedExecutor()
rclpy.spin(node, executor=executor)
action_server.destroy()
node.destroy_node()
rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:26,代码来源:server_not_composable.py
示例2: talker
def talker(message_name, rmw_implementation, number_of_cycles):
import rclpy
from rclpy.qos import qos_profile_default
from rclpy.impl.rmw_implementation_tools import select_rmw_implementation
from message_fixtures import get_test_msg
message_pkg = 'test_communication'
module = importlib.import_module(message_pkg + '.msg')
msg_mod = getattr(module, message_name)
select_rmw_implementation(rmw_implementation)
rclpy.init([])
node = rclpy.create_node('talker')
chatter_pub = node.create_publisher(
msg_mod, 'test_message_' + message_name, qos_profile_default)
cycle_count = 0
print('talker: beginning loop')
msgs = get_test_msg(message_name)
while rclpy.ok() and cycle_count < number_of_cycles:
cycle_count += 1
msg_count = 0
for msg in msgs:
chatter_pub.publish(msg)
msg_count += 1
print('publishing message #%d' % msg_count)
time.sleep(0.1)
time.sleep(1)
rclpy.shutdown()
开发者ID:rohbotics,项目名称:system_tests,代码行数:32,代码来源:publisher_py.py
示例3: main
def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--reliable', dest='reliable', action='store_true',
help='set qos profile to reliable')
parser.set_defaults(reliable=False)
parser.add_argument(
'-n', '--number_of_cycles', type=int, default=20,
help='max number of spin_once iterations')
args = parser.parse_args(argv)
rclpy.init()
if args.reliable:
custom_qos_profile = qos_profile_default
print('Reliable listener')
else:
custom_qos_profile = qos_profile_sensor_data
print('Best effort listener')
node = rclpy.create_node('listener_qos')
sub = node.create_subscription(String, 'chatter_qos', chatter_callback, custom_qos_profile)
assert sub # prevent unused warning
cycle_count = 0
while rclpy.ok() and cycle_count < args.number_of_cycles:
rclpy.spin_once(node)
cycle_count += 1
开发者ID:rohbotics,项目名称:demos,代码行数:29,代码来源:listener_qos_py.py
示例4: main
def main(args=None):
rclpy.init(args=args)
node = rclpy.create_node('minimal_client_async')
cli = node.create_client(AddTwoInts, 'add_two_ints')
req = AddTwoInts.Request()
req.a = 41
req.b = 1
while not cli.wait_for_service(timeout_sec=1.0):
node.get_logger().info('service not available, waiting again...')
future = cli.call_async(req)
while rclpy.ok():
rclpy.spin_once(node)
if future.done():
if future.result() is not None:
node.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(req.a, req.b, future.result().sum))
else:
node.get_logger().info('Service call failed %r' % (future.exception(),))
break
node.destroy_node()
rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:27,代码来源:client_async.py
示例5: main
def main(args=None):
rclpy.init(args=args)
node = rclpy.create_node('minimal_publisher')
publisher = node.create_publisher(String, 'topic', 10)
msg = String()
i = 0
def timer_callback():
nonlocal i
msg.data = 'Hello World: %d' % i
i += 1
node.get_logger().info('Publishing: "%s"' % msg.data)
publisher.publish(msg)
timer_period = 0.5 # seconds
timer = node.create_timer(timer_period, timer_callback)
rclpy.spin(node)
# Destroy the timer attached to the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
node.destroy_timer(timer)
node.destroy_node()
rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:27,代码来源:publisher_local_function.py
示例6: replier
def replier(service_name, number_of_cycles, namespace):
from test_msgs.service_fixtures import get_test_srv
import rclpy
service_pkg = 'test_msgs'
module = importlib.import_module(service_pkg + '.srv')
srv_mod = getattr(module, service_name)
rclpy.init(args=[])
node = rclpy.create_node('replier', namespace=namespace)
srv_fixtures = get_test_srv(service_name)
chatter_callback = functools.partial(
replier_callback, srv_fixtures=srv_fixtures)
node.create_service(
srv_mod, 'test/service/' + service_name, chatter_callback)
spin_count = 1
print('replier: beginning loop')
while rclpy.ok() and spin_count < number_of_cycles:
rclpy.spin_once(node, timeout_sec=2)
spin_count += 1
print('spin_count: ' + str(spin_count))
node.destroy_node()
rclpy.shutdown()
开发者ID:ros2,项目名称:system_tests,代码行数:28,代码来源:replier_py.py
示例7: main
def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--reliable', dest='reliable', action='store_true',
help='set qos profile to reliable')
parser.set_defaults(reliable=False)
parser.add_argument(
'-n', '--number_of_cycles', type=int, default=20,
help='number of sending attempts')
args = parser.parse_args(argv)
rclpy.init()
if args.reliable:
custom_qos_profile = qos_profile_default
print('Reliable publisher')
else:
custom_qos_profile = qos_profile_sensor_data
print('Best effort publisher')
node = rclpy.create_node('talker_qos')
chatter_pub = node.create_publisher(String, 'chatter_qos', custom_qos_profile)
msg = String()
cycle_count = 0
while rclpy.ok() and cycle_count < args.number_of_cycles:
msg.data = 'Hello World: {0}'.format(cycle_count)
print('Publishing: "{0}"'.format(msg.data))
chatter_pub.publish(msg)
cycle_count += 1
sleep(1)
开发者ID:rohbotics,项目名称:demos,代码行数:32,代码来源:talker_qos_py.py
示例8: talker
def talker(message_name, number_of_cycles, namespace):
from test_msgs.message_fixtures import get_test_msg
import rclpy
message_pkg = 'test_msgs'
module = importlib.import_module(message_pkg + '.msg')
msg_mod = getattr(module, message_name)
rclpy.init(args=[])
node = rclpy.create_node('talker', namespace=namespace)
chatter_pub = node.create_publisher(
msg_mod, 'test/message/' + message_name)
cycle_count = 0
print('talker: beginning loop')
msgs = get_test_msg(message_name)
while rclpy.ok() and cycle_count < number_of_cycles:
msg_count = 0
for msg in msgs:
chatter_pub.publish(msg)
msg_count += 1
print('publishing message #%d' % msg_count)
time.sleep(0.01)
cycle_count += 1
time.sleep(0.1)
node.destroy_node()
rclpy.shutdown()
开发者ID:ros2,项目名称:system_tests,代码行数:29,代码来源:publisher_py.py
示例9: func_destroy_timers
def func_destroy_timers():
import rclpy
rclpy.init()
node = rclpy.create_node('test_node3')
timer1 = node.create_timer(0.1, None)
timer2 = node.create_timer(1, None)
timer2 # noqa
assert 2 == len(node.timers)
assert node.destroy_timer(timer1) is True
assert 1 == len(node.timers)
try:
assert node.destroy_node() is True
except SystemError:
ret = False
else:
assert 0 == len(node.timers)
assert node.handle is None
ret = True
finally:
rclpy.shutdown()
return ret
开发者ID:ros2,项目名称:rclpy,代码行数:26,代码来源:test_destruction.py
示例10: func_destroy_node_twice
def func_destroy_node_twice():
import rclpy
rclpy.init()
node = rclpy.create_node('test_node2')
assert node.destroy_node() is True
assert node.destroy_node() is True
return True
开发者ID:ros2,项目名称:rclpy,代码行数:7,代码来源:test_destruction.py
示例11: main
def main(args=None):
rclpy.init(args=args)
node = rclpy.create_node('minimal_client')
cli = node.create_client(AddTwoInts, 'add_two_ints')
req = AddTwoInts.Request()
req.a = 41
req.b = 1
# TODO(mikaelarguedas) remove this once wait for service implemented
# wait for connection to be established
# (no wait for service in Python yet)
time.sleep(1)
cli.call(req)
# when calling wait for future
# spin should not be called in the main loop
cli.wait_for_future()
# TODO(mikaelarguedas) This is not the final API, and this does not scale
# for multiple pending requests. This will change once an executor model is implemented
# In the future the response will not be stored in cli.response
print(
'Result of add_two_ints: for %d + %d = %d' %
(req.a, req.b, cli.response.sum))
node.destroy_node()
rclpy.shutdown()
开发者ID:esteve,项目名称:examples,代码行数:26,代码来源:client.py
示例12: _rostopic_echo_test
def _rostopic_echo_test(topic):
rclpy.init(None)
node = rclpy.create_node('rostopic')
# TODO FIXME no path resolving for topics yet implemented thereby the initial "/" is
# omited.
sub = node.create_subscription(String, topic[1:], chatter_callback, qos_profile_default)
while rclpy.ok():
rclpy.spin_once(node)
开发者ID:erlerobot,项目名称:rclpy,代码行数:8,代码来源:rostopic.py
示例13: func_init_shutdown
def func_init_shutdown():
import rclpy
try:
rclpy.init()
rclpy.shutdown()
return True
except RuntimeError:
return False
开发者ID:ros2,项目名称:rclpy,代码行数:8,代码来源:test_init_shutdown.py
示例14: main
def main(args=None):
rclpy.init(args=args)
action_client = MinimalActionClient()
action_client.send_goal()
rclpy.spin(action_client)
开发者ID:ros2,项目名称:examples,代码行数:8,代码来源:client.py
示例15: main
def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()
rclpy.spin(minimal_service)
rclpy.shutdown()
开发者ID:esteve,项目名称:examples,代码行数:8,代码来源:service_member_function.py
示例16: main
def main(args=None):
rclpy.init(args=args)
try:
talker = ThrottledTalker()
rclpy.spin(talker)
finally:
talker.destroy_node()
rclpy.shutdown()
开发者ID:se7oluti0n,项目名称:examples,代码行数:8,代码来源:custom_callback_group.py
示例17: main
def main(argv=sys.argv):
rclpy.init(args=argv)
node = rclpy.create_node(
'initial_params_node', namespace='/', allow_undeclared_parameters=True)
rclpy.spin(node)
rclpy.shutdown()
return 0
开发者ID:ros2,项目名称:system_tests,代码行数:9,代码来源:initial_params.py
示例18: func_double_shutdown
def func_double_shutdown():
import rclpy
rclpy.init()
rclpy.shutdown()
try:
rclpy.shutdown()
except RuntimeError:
return True
return False
开发者ID:ros2,项目名称:rclpy,代码行数:10,代码来源:test_init_shutdown.py
示例19: main
def main(args=None):
rclpy.init(args=args)
action_server = MinimalActionServer()
# We use a MultiThreadedExecutor to handle incoming goal requests concurrently
executor = MultiThreadedExecutor()
rclpy.spin(action_server, executor=executor)
action_server.destroy()
rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:11,代码来源:server_single_goal.py
示例20: main
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_publisher.destroy_node()
rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:12,代码来源:publisher_member_function.py
注:本文中的rclpy.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论