• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python reflection.get_callable_name函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中taskflow.utils.reflection.get_callable_name函数的典型用法代码示例。如果您正苦于以下问题:Python get_callable_name函数的具体用法?Python get_callable_name怎么用?Python get_callable_name使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_callable_name函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_inner_class

 def test_inner_class(self):
     obj = self.InnerCallableClass()
     name = reflection.get_callable_name(obj)
     expected_name = '.'.join((__name__,
                               'GetCallableNameTestExtended',
                               'InnerCallableClass'))
     self.assertEqual(expected_name, name)
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:7,代码来源:test_utils.py


示例2: _trigger

 def _trigger(self, event, *args, **kwargs):
     """Execute all handlers for the given event type."""
     for (handler, event_data) in self._events_listeners.get(event, []):
         try:
             handler(self, event_data, *args, **kwargs)
         except Exception:
             LOG.exception("Failed calling `%s` on event '%s'",
                           reflection.get_callable_name(handler), event)
开发者ID:toabctl,项目名称:taskflow,代码行数:8,代码来源:task.py


示例3: __init__

 def __init__(self, name=None, provides=None, requires=None,
              auto_extract=True, rebind=None):
     """Initialize task instance"""
     if name is None:
         name = reflection.get_callable_name(self)
     super(Task, self).__init__(name,
                                provides=provides)
     self.requires = _build_arg_mapping(self.name, requires, rebind,
                                        self.execute, auto_extract)
开发者ID:pombredanne,项目名称:taskflow,代码行数:9,代码来源:task.py


示例4: _with_connection

 def _with_connection(self, functor, *args, **kwargs):
     # NOTE(harlowja): Activate the given function with a backend
     # connection, if a backend is provided in the first place, otherwise
     # don't call the function.
     if self._backend is None:
         LOG.debug("No backend provided, not calling functor '%s'",
                   reflection.get_callable_name(functor))
         return
     with contextlib.closing(self._backend.get_connection()) as conn:
         functor(conn, *args, **kwargs)
开发者ID:zhujzhuo,项目名称:trove-1.0.10.4,代码行数:10,代码来源:storage.py


示例5: test_static_method

 def test_static_method(self):
     name = reflection.get_callable_name(Class.static_method)
     if six.PY3:
         self.assertEqual(name,
                          '.'.join((__name__, 'Class', 'static_method')))
     else:
         # NOTE(imelnikov): static method are just functions, class name
         # is not recorded anywhere in them.
         self.assertEqual(name,
                          '.'.join((__name__, 'static_method')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:10,代码来源:test_utils.py


示例6: test_update_progress_handler_failure

    def test_update_progress_handler_failure(self, mocked_exception):
        def progress_callback(*args, **kwargs):
            raise Exception("Woot!")

        task = ProgressTask()
        with task.autobind("update_progress", progress_callback):
            task.execute([0.5])
        mocked_exception.assert_called_once_with(
            mock.ANY, reflection.get_callable_name(progress_callback), "update_progress"
        )
开发者ID:zhujzhuo,项目名称:trove-1.0.10.4,代码行数:10,代码来源:test_task.py


示例7: _trigger

 def _trigger(self, event, *args, **kwargs):
     """Execute all handlers for the given event type."""
     if event in self._events_listeners:
         for handler in self._events_listeners[event]:
             event_data = self._events_listeners[event][handler]
             try:
                 handler(self, event_data, *args, **kwargs)
             except Exception:
                 LOG.exception("Failed calling `%s` on event '%s'",
                               reflection.get_callable_name(handler), event)
开发者ID:SEJeff,项目名称:taskflow,代码行数:10,代码来源:task.py


示例8: _make_logger

 def _make_logger(self, level=logging.DEBUG):
     log = logging.getLogger(
         reflection.get_callable_name(self._get_test_method()))
     log.propagate = False
     for handler in reversed(log.handlers):
         log.removeHandler(handler)
     handler = test.CapturingLoggingHandler(level=level)
     log.addHandler(handler)
     log.setLevel(level)
     self.addCleanup(handler.reset)
     self.addCleanup(log.removeHandler, handler)
     return (log, handler)
开发者ID:TheSriram,项目名称:taskflow,代码行数:12,代码来源:test_listeners.py


示例9: test_inner_callable_function

    def test_inner_callable_function(self):
        def a():

            def b():
                pass

            return b

        name = reflection.get_callable_name(a())
        expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
                                  'test_inner_callable_function', '<locals>',
                                  'a', '<locals>', 'b'))
        self.assertEqual(expected_name, name)
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:13,代码来源:test_utils.py


示例10: _fetch_validate_factory

def _fetch_validate_factory(flow_factory):
    if isinstance(flow_factory, six.string_types):
        factory_fun = _fetch_factory(flow_factory)
        factory_name = flow_factory
    else:
        factory_fun = flow_factory
        factory_name = reflection.get_callable_name(flow_factory)
        try:
            reimported = _fetch_factory(factory_name)
            assert reimported == factory_fun
        except (ImportError, AssertionError):
            raise ValueError("Flow factory %r is not reimportable by name %s" % (factory_fun, factory_name))
    return (factory_name, factory_fun)
开发者ID:yanheven,项目名称:OpenStackInAction,代码行数:13,代码来源:helpers.py


示例11: __init__

 def __init__(self, execute, name=None, provides=None,
              requires=None, auto_extract=True, rebind=None, revert=None,
              version=None):
     assert six.callable(execute), ("Function to use for executing must be"
                                    " callable")
     if revert:
         assert six.callable(revert), ("Function to use for reverting must"
                                       " be callable")
     if name is None:
         name = reflection.get_callable_name(execute)
     super(FunctorTask, self).__init__(name, provides=provides)
     self._execute = execute
     self._revert = revert
     if version is not None:
         self.version = version
     self._build_arg_mapping(execute, requires, rebind, auto_extract)
开发者ID:toabctl,项目名称:taskflow,代码行数:16,代码来源:task.py


示例12: __init__

 def __init__(self, volume_id):
     # Note here that the volume name is composed of the name of the class
     # along with the volume id that is being created, since a name of a
     # task uniquely identifies that task in storage it is important that
     # the name be relevant and identifiable if the task is recreated for
     # subsequent resumption (if applicable).
     #
     # UUIDs are *not* used as they can not be tied back to a previous tasks
     # state on resumption (since they are unique and will vary for each
     # task that is created). A name based off the volume id that is to be
     # created is more easily tied back to the original task so that the
     # volume create can be resumed/revert, and is much easier to use for
     # audit and tracking purposes.
     base_name = reflection.get_callable_name(self)
     super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
                                                         volume_id))
     self._volume_id = volume_id
开发者ID:celttechie,项目名称:taskflow,代码行数:17,代码来源:create_parallel_volume.py


示例13: load_from_factory

def load_from_factory(
    flow_factory, factory_args=None, factory_kwargs=None, store=None, book=None, engine_conf=None, backend=None
):
    """Load flow from factory function into engine

    Gets flow factory function (or name of it) and creates flow with
    it. Then, flow is loaded into engine with load(), and factory
    function fully qualified name is saved to flow metadata so that
    it can be later resumed with resume.

    :param flow_factory: function or string: function that creates the flow
    :param factory_args: list or tuple of factory positional arguments
    :param factory_kwargs: dict of factory keyword arguments
    :param store: dict -- data to put to storage to satisfy flow requirements
    :param book: LogBook to create flow detail in
    :param engine_conf: engine type and configuration configuration
    :param backend: storage backend to use or configuration
    :returns: engine
    """

    if isinstance(flow_factory, six.string_types):
        factory_fun = importutils.import_class(flow_factory)
        factory_name = flow_factory
    else:
        factory_fun = flow_factory
        factory_name = reflection.get_callable_name(flow_factory)
        try:
            reimported = importutils.import_class(factory_name)
            assert reimported == factory_fun
        except (ImportError, AssertionError):
            raise ValueError("Flow factory %r is not reimportable by name %s" % (factory_fun, factory_name))

    args = factory_args or []
    kwargs = factory_kwargs or {}
    flow = factory_fun(*args, **kwargs)
    factory_data = dict(name=factory_name, args=args, kwargs=kwargs)

    if isinstance(backend, dict):
        backend = p_backends.fetch(backend)
    flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend, meta={"factory": factory_data})
    return load(flow=flow, flow_detail=flow_detail, store=store, book=book, engine_conf=engine_conf, backend=backend)
开发者ID:ntt-sic,项目名称:taskflow,代码行数:41,代码来源:helpers.py


示例14: test_callable_class_call

 def test_callable_class_call(self):
     name = reflection.get_callable_name(CallableClass().__call__)
     self.assertEqual(name, '.'.join((__name__, 'CallableClass',
                                      '__call__')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:4,代码来源:test_utils.py


示例15: test_constructor

 def test_constructor(self):
     name = reflection.get_callable_name(Class)
     self.assertEqual(name, '.'.join((__name__, 'Class')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:3,代码来源:test_utils.py


示例16: test_class_method

 def test_class_method(self):
     name = reflection.get_callable_name(Class.class_method)
     self.assertEqual(name, '.'.join((__name__, 'Class', 'class_method')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:3,代码来源:test_utils.py


示例17: test_instance_method

 def test_instance_method(self):
     name = reflection.get_callable_name(Class().method)
     self.assertEqual(name, '.'.join((__name__, 'Class', 'method')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:3,代码来源:test_utils.py


示例18: test_static_method

 def test_static_method(self):
     # NOTE(imelnikov): static method are just functions, class name
     # is not recorded anywhere in them.
     name = reflection.get_callable_name(Class.static_method)
     self.assertEqual(name, ".".join((__name__, "static_method")))
开发者ID:zhujzhuo,项目名称:trove-1.0.10.4,代码行数:5,代码来源:test_utils.py


示例19: test_callable_class

 def test_callable_class(self):
     name = reflection.get_callable_name(CallableClass())
     self.assertEquals(name, '.'.join((__name__, 'CallableClass')))
开发者ID:jessicalucci,项目名称:TaskManagement,代码行数:3,代码来源:test_utils.py


示例20: __init__

 def __init__(self, timeout, functors):
     self._timeout = timeout
     self._functors = []
     for f in functors:
         self._functors.append((f, reflection.get_callable_name(f)))
开发者ID:devananda,项目名称:taskflow,代码行数:5,代码来源:executor.py



注:本文中的taskflow.utils.reflection.get_callable_name函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python reflection.get_class_name函数代码示例发布时间:2022-05-27
下一篇:
Python reflection.get_callable_args函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap