• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python rclpy.init函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中rclpy.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了init函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main(args=None):
    global logger
    rclpy.init(args=args)

    node = rclpy.create_node('minimal_action_server')
    logger = node.get_logger()

    # Use a ReentrantCallbackGroup to enable processing multiple goals concurrently
    # Default goal callback accepts all goals
    # Default cancel callback rejects cancel requests
    action_server = ActionServer(
        node,
        Fibonacci,
        'fibonacci',
        execute_callback=execute_callback,
        cancel_callback=cancel_callback,
        callback_group=ReentrantCallbackGroup())

    # Use a MultiThreadedExecutor to enable processing goals concurrently
    executor = MultiThreadedExecutor()

    rclpy.spin(node, executor=executor)

    action_server.destroy()
    node.destroy_node()
    rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:26,代码来源:server_not_composable.py


示例2: talker

def talker(message_name, rmw_implementation, number_of_cycles):
    import rclpy
    from rclpy.qos import qos_profile_default
    from rclpy.impl.rmw_implementation_tools import select_rmw_implementation
    from message_fixtures import get_test_msg

    message_pkg = 'test_communication'
    module = importlib.import_module(message_pkg + '.msg')
    msg_mod = getattr(module, message_name)

    select_rmw_implementation(rmw_implementation)

    rclpy.init([])

    node = rclpy.create_node('talker')

    chatter_pub = node.create_publisher(
        msg_mod, 'test_message_' + message_name, qos_profile_default)

    cycle_count = 0
    print('talker: beginning loop')
    msgs = get_test_msg(message_name)
    while rclpy.ok() and cycle_count < number_of_cycles:
        cycle_count += 1
        msg_count = 0
        for msg in msgs:
            chatter_pub.publish(msg)
            msg_count += 1
            print('publishing message #%d' % msg_count)
            time.sleep(0.1)
        time.sleep(1)
    rclpy.shutdown()
开发者ID:rohbotics,项目名称:system_tests,代码行数:32,代码来源:publisher_py.py


示例3: main

def main(argv=sys.argv[1:]):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '--reliable', dest='reliable', action='store_true',
        help='set qos profile to reliable')
    parser.set_defaults(reliable=False)
    parser.add_argument(
        '-n', '--number_of_cycles', type=int, default=20,
        help='max number of spin_once iterations')
    args = parser.parse_args(argv)
    rclpy.init()

    if args.reliable:
        custom_qos_profile = qos_profile_default
        print('Reliable listener')
    else:
        custom_qos_profile = qos_profile_sensor_data
        print('Best effort listener')

    node = rclpy.create_node('listener_qos')

    sub = node.create_subscription(String, 'chatter_qos', chatter_callback, custom_qos_profile)

    assert sub  # prevent unused warning

    cycle_count = 0
    while rclpy.ok() and cycle_count < args.number_of_cycles:
        rclpy.spin_once(node)
        cycle_count += 1
开发者ID:rohbotics,项目名称:demos,代码行数:29,代码来源:listener_qos_py.py


示例4: main

def main(args=None):
    rclpy.init(args=args)

    node = rclpy.create_node('minimal_client_async')

    cli = node.create_client(AddTwoInts, 'add_two_ints')

    req = AddTwoInts.Request()
    req.a = 41
    req.b = 1
    while not cli.wait_for_service(timeout_sec=1.0):
        node.get_logger().info('service not available, waiting again...')

    future = cli.call_async(req)
    while rclpy.ok():
        rclpy.spin_once(node)
        if future.done():
            if future.result() is not None:
                node.get_logger().info(
                    'Result of add_two_ints: for %d + %d = %d' %
                    (req.a, req.b, future.result().sum))
            else:
                node.get_logger().info('Service call failed %r' % (future.exception(),))
            break

    node.destroy_node()
    rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:27,代码来源:client_async.py


示例5: main

def main(args=None):
    rclpy.init(args=args)

    node = rclpy.create_node('minimal_publisher')
    publisher = node.create_publisher(String, 'topic', 10)

    msg = String()
    i = 0

    def timer_callback():
        nonlocal i
        msg.data = 'Hello World: %d' % i
        i += 1
        node.get_logger().info('Publishing: "%s"' % msg.data)
        publisher.publish(msg)

    timer_period = 0.5  # seconds
    timer = node.create_timer(timer_period, timer_callback)

    rclpy.spin(node)

    # Destroy the timer attached to the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    node.destroy_timer(timer)
    node.destroy_node()
    rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:27,代码来源:publisher_local_function.py


示例6: replier

def replier(service_name, number_of_cycles, namespace):
    from test_msgs.service_fixtures import get_test_srv
    import rclpy

    service_pkg = 'test_msgs'
    module = importlib.import_module(service_pkg + '.srv')
    srv_mod = getattr(module, service_name)

    rclpy.init(args=[])

    node = rclpy.create_node('replier', namespace=namespace)

    srv_fixtures = get_test_srv(service_name)

    chatter_callback = functools.partial(
        replier_callback, srv_fixtures=srv_fixtures)

    node.create_service(
        srv_mod, 'test/service/' + service_name, chatter_callback)

    spin_count = 1
    print('replier: beginning loop')
    while rclpy.ok() and spin_count < number_of_cycles:
        rclpy.spin_once(node, timeout_sec=2)
        spin_count += 1
        print('spin_count: ' + str(spin_count))
    node.destroy_node()
    rclpy.shutdown()
开发者ID:ros2,项目名称:system_tests,代码行数:28,代码来源:replier_py.py


示例7: main

def main(argv=sys.argv[1:]):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '--reliable', dest='reliable', action='store_true',
        help='set qos profile to reliable')
    parser.set_defaults(reliable=False)
    parser.add_argument(
        '-n', '--number_of_cycles', type=int, default=20,
        help='number of sending attempts')
    args = parser.parse_args(argv)
    rclpy.init()

    if args.reliable:
        custom_qos_profile = qos_profile_default
        print('Reliable publisher')
    else:
        custom_qos_profile = qos_profile_sensor_data
        print('Best effort publisher')

    node = rclpy.create_node('talker_qos')

    chatter_pub = node.create_publisher(String, 'chatter_qos', custom_qos_profile)

    msg = String()

    cycle_count = 0
    while rclpy.ok() and cycle_count < args.number_of_cycles:
        msg.data = 'Hello World: {0}'.format(cycle_count)
        print('Publishing: "{0}"'.format(msg.data))
        chatter_pub.publish(msg)
        cycle_count += 1
        sleep(1)
开发者ID:rohbotics,项目名称:demos,代码行数:32,代码来源:talker_qos_py.py


示例8: talker

def talker(message_name, number_of_cycles, namespace):
    from test_msgs.message_fixtures import get_test_msg
    import rclpy

    message_pkg = 'test_msgs'
    module = importlib.import_module(message_pkg + '.msg')
    msg_mod = getattr(module, message_name)

    rclpy.init(args=[])

    node = rclpy.create_node('talker', namespace=namespace)

    chatter_pub = node.create_publisher(
        msg_mod, 'test/message/' + message_name)

    cycle_count = 0
    print('talker: beginning loop')
    msgs = get_test_msg(message_name)
    while rclpy.ok() and cycle_count < number_of_cycles:
        msg_count = 0
        for msg in msgs:
            chatter_pub.publish(msg)
            msg_count += 1
            print('publishing message #%d' % msg_count)
            time.sleep(0.01)
        cycle_count += 1
        time.sleep(0.1)
    node.destroy_node()
    rclpy.shutdown()
开发者ID:ros2,项目名称:system_tests,代码行数:29,代码来源:publisher_py.py


示例9: func_destroy_timers

def func_destroy_timers():
    import rclpy
    rclpy.init()

    node = rclpy.create_node('test_node3')

    timer1 = node.create_timer(0.1, None)
    timer2 = node.create_timer(1, None)
    timer2  # noqa

    assert 2 == len(node.timers)

    assert node.destroy_timer(timer1) is True

    assert 1 == len(node.timers)
    try:
        assert node.destroy_node() is True
    except SystemError:
        ret = False
    else:
        assert 0 == len(node.timers)
        assert node.handle is None
        ret = True
    finally:
        rclpy.shutdown()
        return ret
开发者ID:ros2,项目名称:rclpy,代码行数:26,代码来源:test_destruction.py


示例10: func_destroy_node_twice

def func_destroy_node_twice():
    import rclpy
    rclpy.init()
    node = rclpy.create_node('test_node2')
    assert node.destroy_node() is True
    assert node.destroy_node() is True
    return True
开发者ID:ros2,项目名称:rclpy,代码行数:7,代码来源:test_destruction.py


示例11: main

def main(args=None):
    rclpy.init(args=args)
    node = rclpy.create_node('minimal_client')
    cli = node.create_client(AddTwoInts, 'add_two_ints')

    req = AddTwoInts.Request()
    req.a = 41
    req.b = 1
    # TODO(mikaelarguedas) remove this once wait for service implemented
    # wait for connection to be established
    # (no wait for service in Python yet)
    time.sleep(1)

    cli.call(req)
    # when calling wait for future
    # spin should not be called in the main loop
    cli.wait_for_future()
    # TODO(mikaelarguedas) This is not the final API, and this does not scale
    # for multiple pending requests. This will change once an executor model is implemented
    # In the future the response will not be stored in cli.response
    print(
        'Result of add_two_ints: for %d + %d = %d' %
        (req.a, req.b, cli.response.sum))

    node.destroy_node()
    rclpy.shutdown()
开发者ID:esteve,项目名称:examples,代码行数:26,代码来源:client.py


示例12: _rostopic_echo_test

def _rostopic_echo_test(topic):
    rclpy.init(None)
    node = rclpy.create_node('rostopic')
    # TODO FIXME no path resolving for topics yet implemented thereby the initial "/" is
    # omited.
    sub = node.create_subscription(String, topic[1:], chatter_callback, qos_profile_default)
    while rclpy.ok():
       rclpy.spin_once(node)
开发者ID:erlerobot,项目名称:rclpy,代码行数:8,代码来源:rostopic.py


示例13: func_init_shutdown

def func_init_shutdown():
    import rclpy
    try:
        rclpy.init()
        rclpy.shutdown()
        return True
    except RuntimeError:
        return False
开发者ID:ros2,项目名称:rclpy,代码行数:8,代码来源:test_init_shutdown.py


示例14: main

def main(args=None):
    rclpy.init(args=args)

    action_client = MinimalActionClient()

    action_client.send_goal()

    rclpy.spin(action_client)
开发者ID:ros2,项目名称:examples,代码行数:8,代码来源:client.py


示例15: main

def main(args=None):
    rclpy.init(args=args)

    minimal_service = MinimalService()

    rclpy.spin(minimal_service)

    rclpy.shutdown()
开发者ID:esteve,项目名称:examples,代码行数:8,代码来源:service_member_function.py


示例16: main

def main(args=None):
    rclpy.init(args=args)
    try:
        talker = ThrottledTalker()
        rclpy.spin(talker)
    finally:
        talker.destroy_node()
        rclpy.shutdown()
开发者ID:se7oluti0n,项目名称:examples,代码行数:8,代码来源:custom_callback_group.py


示例17: main

def main(argv=sys.argv):
    rclpy.init(args=argv)

    node = rclpy.create_node(
        'initial_params_node', namespace='/', allow_undeclared_parameters=True)
    rclpy.spin(node)

    rclpy.shutdown()
    return 0
开发者ID:ros2,项目名称:system_tests,代码行数:9,代码来源:initial_params.py


示例18: func_double_shutdown

def func_double_shutdown():
    import rclpy
    rclpy.init()
    rclpy.shutdown()
    try:
        rclpy.shutdown()
    except RuntimeError:
        return True

    return False
开发者ID:ros2,项目名称:rclpy,代码行数:10,代码来源:test_init_shutdown.py


示例19: main

def main(args=None):
    rclpy.init(args=args)

    action_server = MinimalActionServer()

    # We use a MultiThreadedExecutor to handle incoming goal requests concurrently
    executor = MultiThreadedExecutor()
    rclpy.spin(action_server, executor=executor)

    action_server.destroy()
    rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:11,代码来源:server_single_goal.py


示例20: main

def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_publisher.destroy_node()
    rclpy.shutdown()
开发者ID:ros2,项目名称:examples,代码行数:12,代码来源:publisher_member_function.py



注:本文中的rclpy.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python rclpy.shutdown函数代码示例发布时间:2022-05-26
下一篇:
Python rc.post_event函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap