• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test.getter函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中wishbone.utils.test.getter函数的典型用法代码示例。如果您正苦于以下问题:Python getter函数的具体用法?Python getter怎么用?Python getter使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了getter函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_module_count_timeout_pass

def test_module_count_timeout_pass():

    # Standard situation.  Event passes through after it appeared 10 times.

    conditions = {
        "data": {
            "value": "one",
            "occurrence": 3,
            "window": 2,
            "action": "pass"
        }
    }

    actor_config = ActorConfig('funnel', 100, 1, {}, "", disable_exception_handling=True)
    count = Count(actor_config, conditions)
    count.pool.queue.inbox.disableFallThrough()
    count.pool.queue.outbox.disableFallThrough()
    count.pool.queue.dropped.disableFallThrough()
    count.start()

    count.pool.queue.inbox.put(Event("one"))
    assert getter(count.pool.queue.dropped).get() == "one"

    count.pool.queue.inbox.put(Event("one"))
    assert getter(count.pool.queue.dropped).get() == "one"

    sleep(2)

    count.pool.queue.inbox.put(Event("one"))
    assert getter(count.pool.queue.dropped).get() == "one"

    count.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:32,代码来源:test_module_count.py


示例2: test_module_fanout

def test_module_fanout():

    actor_config = ActorConfig('fanout', 100, 1, {}, "", disable_exception_handling=True)
    fanout = Fanout(actor_config)
    fanout.pool.queue.inbox.disableFallThrough()

    fanout.pool.createQueue("one")
    fanout.pool.queue.one.disableFallThrough()

    fanout.pool.createQueue("two")
    fanout.pool.queue.two.disableFallThrough()

    fanout.start()

    e = Event(["hello"])

    fanout.pool.queue.inbox.put(e)
    one = getter(fanout.pool.queue.one)
    two = getter(fanout.pool.queue.two)

    fanout.stop()

    assert one.get() == ["hello"]
    assert two.get() == ["hello"]
    assert id(one.get()) != id(two.get())
开发者ID:smetj,项目名称:wishbone,代码行数:25,代码来源:test_module_fanout.py


示例3: test_module_funnel

def test_module_funnel():

    actor_config = ActorConfig('funnel', 100, 1, {}, "")
    funnel = Funnel(actor_config)
    funnel.pool.queue.outbox.disableFallThrough()

    funnel.pool.createQueue("one")
    funnel.pool.queue.one.disableFallThrough()

    funnel.pool.createQueue("two")
    funnel.pool.queue.two.disableFallThrough()

    funnel.start()

    event_one = Event("one")

    event_two = Event("two")

    funnel.pool.queue.one.put(event_one)
    funnel.pool.queue.two.put(event_two)

    assert getter(funnel.pool.queue.outbox).get() in ["one", "two"]
    assert getter(funnel.pool.queue.outbox).get() in ["one", "two"]

    funnel.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:25,代码来源:test_module_funnel.py


示例4: test_module_count_default_dropped

def test_module_count_default_dropped():

    # Standard situation.  Events get dropped after it appeared 10 times.

    conditions = {
        "data": {
            "value": "one",
            "occurrence": 10,
            "window": 60,
            "action": "drop"
        }
    }

    actor_config = ActorConfig('funnel', 100, 1, {}, "", disable_exception_handling=True)
    count = Count(actor_config, conditions)
    count.pool.queue.inbox.disableFallThrough()
    count.pool.queue.outbox.disableFallThrough()
    count.pool.queue.dropped.disableFallThrough()
    count.start()

    for _ in range(0, 9):
        count.pool.queue.inbox.put(
            Event("one")
        )
        result = getter(count.pool.queue.outbox).get()
        assert result == "one"

    count.pool.queue.inbox.put(
        Event("one")
    )
    assert getter(count.pool.queue.dropped).get() == "one"
    count.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:32,代码来源:test_module_count.py


示例5: test_module_roundrobin

def test_module_roundrobin():

    actor_config = ActorConfig('roundrobin', 100, 1, {}, "")
    roundrobin = RoundRobin(actor_config)

    roundrobin.pool.queue.inbox.disableFallThrough()

    roundrobin.pool.createQueue("one")
    roundrobin.pool.queue.one.disableFallThrough()

    roundrobin.pool.createQueue("two")
    roundrobin.pool.queue.two.disableFallThrough()

    roundrobin.start()

    event_one = Event("one")

    event_two = Event("two")

    roundrobin.pool.queue.inbox.put(event_one)
    roundrobin.pool.queue.inbox.put(event_two)

    assert getter(roundrobin.pool.queue.one).get() in ["one", "two"]
    assert getter(roundrobin.pool.queue.two).get() in ["one", "two"]

    roundrobin.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:26,代码来源:test_module_roundrobin.py


示例6: test_module_unpack

def test_module_unpack():

    actor_config = ActorConfig('unpack', 100, 1, {}, "")
    unpack = Unpack(actor_config)

    unpack.pool.queue.inbox.disableFallThrough()
    unpack.pool.queue.outbox.disableFallThrough()
    unpack.start()

    bulk = Event(bulk=True)

    for _ in range(0, 10):
        bulk.appendBulk(Event())

    unpack.pool.queue.inbox.put(bulk)

    for _ in range(0, 10):
        assert getter(unpack.pool.queue.outbox)

    try:
        getter(unpack.pool.queue.outbox)
    except Exception:
        assert True
    else:
        assert False
开发者ID:smetj,项目名称:wishbone,代码行数:25,代码来源:test_module_unpack.py


示例7: test_module_jsonvalidate

def test_module_jsonvalidate():

    actor_config = ActorConfig('jsonvalidate', 100, 1, {}, "")

    with open("/tmp/jsonvalidate.jsonschema", "w") as j:
        j.write('{"type": "object", "properties": {"one": { "type": "integer"}}}')

    jsonvalidate = JSONValidate(actor_config, "/tmp/jsonvalidate.jsonschema")

    jsonvalidate.pool.queue.inbox.disableFallThrough()
    jsonvalidate.pool.queue.outbox.disableFallThrough()
    jsonvalidate.pool.queue.failed.disableFallThrough()
    jsonvalidate.start()

    valid = Event({"one": 1})
    invalid = Event({"one": "one"})

    jsonvalidate.pool.queue.inbox.put(valid)
    valid_event = getter(jsonvalidate.pool.queue.outbox)

    jsonvalidate.pool.queue.inbox.put(invalid)
    invalid_event = getter(jsonvalidate.pool.queue.failed)

    os.remove("/tmp/jsonvalidate.jsonschema")

    assert valid_event.get() == {"one": 1}
    assert invalid_event.get() == {"one": "one"}
开发者ID:smetj,项目名称:wishbone-flow-jsonvalidate,代码行数:27,代码来源:test_module_jsonvalidate.py


示例8: test_module_switch_event

def test_module_switch_event():

    actor_config = ActorConfig('switch', 100, 1, {}, "", disable_exception_handling=True)

    switch = Switch(actor_config, outgoing="one")
    switch.pool.queue.inbox.disableFallThrough()
    switch.pool.queue.outbox.disableFallThrough()

    switch.pool.createQueue("one")
    switch.pool.queue.one.disableFallThrough()

    switch.pool.createQueue("two")
    switch.pool.queue.two.disableFallThrough()

    switch.start()

    event_one = Event("one")
    switch.pool.queue.inbox.put(event_one)
    assert getter(switch.pool.queue.one).get() == "one"

    event_two = Event("two")
    switch_event = Event("two")

    switch.pool.queue.switch.put(switch_event)
    switch.pool.queue.inbox.put(event_two)

    assert getter(switch.pool.queue.two).get() == "two"

    switch.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:29,代码来源:test_module_switch.py


示例9: test_module_http_dynamic_method

def test_module_http_dynamic_method():

    from wishbone.lookup import Lookup
    class GetMethod(Lookup):
        def __init__(self):
            self.a = cycle(["POST", "PUT"])

        def lookup(self):
            return next(self.a)

    webserver = WebServer()
    webserver.start()

    actor_config = ActorConfig('httpoutclient', 100, 1, {"method": GetMethod().lookup}, "")
    http = HTTPOutClient(actor_config, url="http://localhost:8088/", accept="monkeyballs", method="~~method()")

    http.pool.queue.inbox.disableFallThrough()
    http.start()

    http.pool.queue.inbox.put(Event('{"one": 1}'))
    sleep(1)
    assert getter(webserver.q)["REQUEST_METHOD"] == "POST"

    http.pool.queue.inbox.put(Event('{"one": 1}'))
    sleep(1)
    assert getter(webserver.q)["REQUEST_METHOD"] == "PUT"
    http.stop()
    webserver.stop()
开发者ID:smetj,项目名称:wishbone-output-http,代码行数:28,代码来源:test_wishbone_output_http.py


示例10: test_file_load

def test_file_load():

    import os
    import yaml
    import shutil

    os.mkdir('/tmp/wishbone_tests')
    actor_config = ActorConfig('match', 100, 1, {}, "")
    match = Match(actor_config, location="/tmp/wishbone_tests")
    match.pool.createQueue("file")
    match.pool.queue.file.disableFallThrough()
    match.pool.queue.inbox.disableFallThrough()

    #Test 1
    rule_1 = {
        "condition": [
            {"file": "re:.*?one.*"}
        ],
        "queue": [
            {"file": {}}
        ]
    }

    with open('/tmp/wishbone_tests/one.yaml', 'w') as one:
        one.write(yaml.dump(rule_1, default_flow_style=False))
    match.start()
    sleep(1)

    e = Event("test")
    e.set({"file": "zero one two"})
    match.pool.queue.inbox.put(e)

    assert getter(match.pool.queue.file).get()["file"] == "zero one two"

    # Test 2
    rule_2 = {
        "condition": [
            {"file": "re:.*?two.*"}
        ],
        "queue": [
            {"file": {}}
        ]
    }
    with open('/tmp/wishbone_tests/two.yaml', 'w') as one:
        one.write(yaml.dump(rule_2, default_flow_style=False))
    sleep(1)

    e = Event("test")
    e.set({"file": "one two three"})
    match.pool.queue.inbox.put(e)
    assert getter(match.pool.queue.file).get()["file"] == "one two three"
    shutil.rmtree('/tmp/wishbone_tests')
开发者ID:smetj,项目名称:wishbone-flow-match,代码行数:52,代码来源:test_module_match.py


示例11: test_module_httpinclientTimeout

def test_module_httpinclientTimeout():

    actor_config = ActorConfig('httpinclient', 100, 1, {}, "")
    http = HTTPInClient(actor_config, url="http://www.github.com", interval=1, allow_redirects=True, timeout=0.001)

    http.pool.queue.outbox.disableFallThrough()
    http.start()

    sleep(3)
    try:
        getter(http.pool.queue.failed)
    except Exception:
        assert True
    else:
        assert False
开发者ID:smetj,项目名称:wishbone-input-httpclient,代码行数:15,代码来源:test_module_httpinclient.py


示例12: test_module_queueselect_novalidqueue

def test_module_queueselect_novalidqueue():

    actor_config = ActorConfig('queueselect', 100, 1, {}, "", disable_exception_handling=True)

    templates = [
        {
            "name": "rule_1",
            "queue": "{{ 'no_such_queue_1' if data.one == 1 else 'no_such_queue_2' }}",
        },
        {
            "name": "rule_2",
            "queue": "{{ 'no_such_queue_1' if data.one == 1 else 'no_such_queue_2' }}",
        }
    ]

    queueselect = QueueSelect(actor_config, templates=templates)
    queueselect.pool.queue.inbox.disableFallThrough()
    queueselect.pool.queue.outbox.disableFallThrough()
    queueselect.pool.queue.nomatch.disableFallThrough()

    queueselect.pool.createQueue("queue_1")
    queueselect.pool.queue.queue_1.disableFallThrough()

    queueselect.pool.createQueue("queue_2")
    queueselect.pool.queue.queue_2.disableFallThrough()

    queueselect.start()

    queueselect.pool.queue.inbox.put(Event({"one": 1, "two": 2}))

    assert getter(queueselect.pool.queue.nomatch).get() == {"one": 1, "two": 2}

    queueselect.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:33,代码来源:test_module_queueselect.py


示例13: test_module_queueselect_regex

def test_module_queueselect_regex():

    actor_config = ActorConfig('queueselect', 100, 1, {}, "", disable_exception_handling=True)

    template = {
        "name": "name of the rule",
        "queue": "{{ 'queue_1' if regex('\d', data.one) else 'queue_2' }}",
        "payload": {
            "queue_1": {
                "detail_1": "some value",
                "detail_2": "some other value",
            },
            "queue_2": {
                "detail_1": "some value",
                "detail_2": "some other value",
            }
        }
    }

    queueselect = QueueSelect(actor_config, templates=[template])
    queueselect.pool.queue.inbox.disableFallThrough()
    queueselect.pool.queue.outbox.disableFallThrough()
    queueselect.pool.queue.nomatch.disableFallThrough()

    queueselect.start()

    queueselect.pool.queue.inbox.put(Event({"one": 1, "two": "two"}))
    assert getter(queueselect.pool.queue.nomatch).get() == {"one": 1, "two": "two"}

    queueselect.stop()
开发者ID:smetj,项目名称:wishbone,代码行数:30,代码来源:test_module_queueselect.py


示例14: test_module_jq_disk_rule_reload

def test_module_jq_disk_rule_reload():

    condition = {
        "expression": '.greeting | test("hello")',
        "queue": "outbox"
    }

    dumpFile(condition)

    actor_config = ActorConfig('jq', 100, 1, {}, "")
    jq = JQ(actor_config, location="./test_rules")

    jq.pool.queue.inbox.disableFallThrough()
    jq.pool.queue.no_match.disableFallThrough()
    jq.start()

    condition2 = {
        "expression": '.greeting | test( "hi")',
        "queue": "outbox"
    }
    dumpFile(condition2)

    e = Event({"greeting": "hi"})

    jq.pool.queue.inbox.put(e)
    one = getter(jq.pool.queue.no_match)
    assert one.get() == {"greeting": "hi"}
开发者ID:smetj,项目名称:wishbone-flow-jq,代码行数:27,代码来源:test_module_jq.py


示例15: test_module_inotify_default

def test_module_inotify_default():

    # Standard situation.  Monitors the changes to a file.

    actor_config = ActorConfig('inotify', 100, 1, {}, "")

    filename = "/tmp/%s" % str(uuid4())
    open(filename, 'a').close()

    inotify = WBInotify(actor_config, initial_listing=True, paths={filename: []})
    inotify.pool.queue.outbox.disableFallThrough()
    inotify.start()
    sleep(1)
    os.unlink(filename)
    sleep(1)
    e = getter(inotify.pool.queue.outbox)
    assert e.get() == {"path": filename, "inotify_type": "WISHBONE_INIT"}
    assert getter(inotify.pool.queue.outbox).get()["inotify_type"] == "IN_ATTRIB"
    assert getter(inotify.pool.queue.outbox).get()["inotify_type"] == "IN_DELETE_SELF"
开发者ID:smetj,项目名称:wishbone,代码行数:19,代码来源:test_module_inotify.py


示例16: test_module_generator_dict

def test_module_generator_dict():

    actor_config = ActorConfig('generator', 100, 1, {}, "")
    test_event = Generator(actor_config, payload={"one": 1})

    test_event.pool.queue.outbox.disableFallThrough()
    test_event.start()

    event = getter(test_event.pool.queue.outbox)
    assert event.get()["one"] == 1
开发者ID:smetj,项目名称:wishbone,代码行数:10,代码来源:test_module_generator.py



注:本文中的wishbone.utils.test.getter函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python wit.Wit类代码示例发布时间:2022-05-26
下一篇:
Python event.Event类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap