• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python _compat.unicode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中redis._compat.unicode函数的典型用法代码示例。如果您正苦于以下问题:Python unicode函数的具体用法?Python unicode怎么用?Python unicode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了unicode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: annotate_exception

 def annotate_exception(self, exception, number, command):
     """
     """
     cmd = unicode(' ').join(imap(unicode, command))
     msg = unicode('Command # {0} ({1}) of pipeline caused error: {2}').format(
         number, cmd, unicode(exception.args[0]))
     exception.args = (msg,) + exception.args[1:]
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:7,代码来源:pipeline.py


示例2: test_empty_startup_nodes

def test_empty_startup_nodes(s):
    """
    Test that exception is raised when empty providing empty startup_nodes
    """
    with pytest.raises(RedisClusterException) as ex:
        _get_client(init_slot_cache=False, startup_nodes=[])

    assert unicode(ex.value).startswith("No startup nodes provided"), unicode(ex.value)
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:8,代码来源:test_cluster_obj.py


示例3: test_execute_command_errors

def test_execute_command_errors(r):
    """
    If no command is given to `_determine_nodes` then exception
    should be raised.

    Test that if no key is provided then exception should be raised.
    """
    with pytest.raises(RedisClusterException) as ex:
        r.execute_command()
    assert unicode(ex.value).startswith("Unable to determine command to use")

    with pytest.raises(RedisClusterException) as ex:
        r.execute_command("GET")
    assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster. Missing key.")
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:14,代码来源:test_cluster_obj.py


示例4: test_blocked_arguments

    def test_blocked_arguments(self, r):
        """
        Currently some arguments is blocked when using in cluster mode.
        They maybe implemented in the future.
        """
        with pytest.raises(RedisClusterException) as ex:
            r.pipeline(transaction=True)

        assert unicode(ex.value).startswith("transaction is deprecated in cluster mode"), True

        with pytest.raises(RedisClusterException) as ex:
            r.pipeline(shard_hint=True)

        assert unicode(ex.value).startswith("shard_hint is deprecated in cluster mode"), True
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:14,代码来源:test_pipeline.py


示例5: test_exec_error_in_no_transaction_pipeline_unicode_command

    def test_exec_error_in_no_transaction_pipeline_unicode_command(self, r):
        key = unichr(3456) + u('abcd') + unichr(3421)
        r[key] = 1
        with r.pipeline(transaction=False) as pipe:
            pipe.llen(key)
            pipe.expire(key, 100)

            with pytest.raises(ResponseError) as ex:
                pipe.execute()

            expected = unicode('Command # 1 (LLEN {0}) of pipeline caused error: ').format(key)
            assert unicode(ex.value).startswith(expected)

        assert r[key] == b('1')
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:14,代码来源:test_pipeline.py


示例6: test_rename

    def test_rename(self, r):
        r['a'] = '1'
        assert r.rename('a', 'b')
        assert r.get('a') is None
        assert r['b'] == b('1')

        with pytest.raises(ResponseError) as ex:
            r.rename("foo", "foo")
        assert unicode(ex.value).startswith("source and destination objects are the same")

        assert r.get("foo") is None
        with pytest.raises(ResponseError) as ex:
            r.rename("foo", "bar")
        assert unicode(ex.value).startswith("no such key")
开发者ID:baranbartu,项目名称:redis-py-cluster,代码行数:14,代码来源:test_commands.py


示例7: test_password_procted_nodes

def test_password_procted_nodes():
    """
    Test that it is possible to connect to password protected nodes
    """
    startup_nodes = [{"host": "127.0.0.1", "port": "7000"}]
    password_protected_startup_nodes = [{"host": "127.0.0.1", "port": "7100"}]
    with pytest.raises(RedisClusterException) as ex:
        _get_client(startup_nodes=password_protected_startup_nodes)
    assert unicode(ex.value).startswith("ERROR sending 'cluster slots' command to redis server:")
    _get_client(startup_nodes=password_protected_startup_nodes, password="password_is_protected")

    with pytest.raises(RedisClusterException) as ex:
        _get_client(startup_nodes=startup_nodes, password="password_is_protected")
    assert unicode(ex.value).startswith("ERROR sending 'cluster slots' command to redis server:")
    _get_client(startup_nodes=startup_nodes)
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:15,代码来源:test_cluster_obj.py


示例8: test_exec_error_in_no_transaction_pipeline_unicode_command

    def test_exec_error_in_no_transaction_pipeline_unicode_command(self):
        key = unichr(3456) + u('abcd') + unichr(3421)
        self.rc[key] = 1
        with self.rc.pipeline(transaction=False) as pipe:
            pipe.llen(key)
            pipe.expire(key, 100)

            with pytest.raises(redis.ResponseError) as ex:
                pipe.execute()
            print ex.value
            expected = unicode('Command # 1 (LLEN %s) of pipeline caused '
                               'error: ') % key
            assert unicode(ex.value).startswith(expected)

        assert self.rc[key] == b('1')
开发者ID:kevinsu,项目名称:redis-py-multi-node,代码行数:15,代码来源:test_pipeline.py


示例9: test_get_connection_by_slot

    def test_get_connection_by_slot(self):
        """
        """
        pool = self.get_pool(connection_kwargs={})

        # Patch the call that is made inside the method to allow control of the returned connection object
        with patch.object(ClusterReadOnlyConnectionPool, 'get_master_connection_by_slot', autospec=True) as pool_mock:
            def side_effect(self, *args, **kwargs):
                return DummyConnection(port=1337)
            pool_mock.side_effect = side_effect

            # Try a master only command
            connection = pool.get_connection_by_key("foo", 'ZSCAN')
            assert connection.port == 1337

        with patch.object(ClusterReadOnlyConnectionPool, 'get_random_master_slave_connection_by_slot', autospec=True) as pool_mock:
            def side_effect(self, *args, **kwargs):
                return DummyConnection(port=1337)
            pool_mock.side_effect = side_effect

            # try a random node command
            connection = pool.get_connection_by_key('foo', 'GET')
            assert connection.port == 1337

        with pytest.raises(RedisClusterException) as ex:
            pool.get_connection_by_key(None, None)
        assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster."), True
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:27,代码来源:test_cluster_connection_pool.py


示例10: test_init_slots_cache_not_all_slots

    def test_init_slots_cache_not_all_slots(self, s):
        """
        Test that if not all slots are covered it should raise an exception
        """
        # Save the old function so we can wrap it
        old_get_link_from_node = s.get_redis_link_from_node

        # Create wrapper function so we can inject custom 'CLUSTER SLOTS' command result
        def new_get_link_from_node(node):
            link = old_get_link_from_node(node)

            # Missing slot 5460
            bad_slots_resp = [[0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
                              [5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
                              [10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]]]

            # Missing slot 5460
            link.execute_command = lambda *args: bad_slots_resp

            return link

        s.get_redis_link_from_node = new_get_link_from_node

        with pytest.raises(RedisClusterException) as ex:
            s.initialize_slots_cache()

        assert unicode(ex.value).startswith("All slots are not covered after querry all startup_nodes."), True
开发者ID:yekeqiang,项目名称:redis-py-cluster,代码行数:27,代码来源:test_cluster_obj.py


示例11: test_blocked_transaction

def test_blocked_transaction(r):
    """
    Method transaction is blocked/NYI and should raise exception on use
    """
    with pytest.raises(RedisClusterException) as ex:
        r.transaction(None)
    assert unicode(ex.value).startswith("method StrictRedisCluster.transaction() is not implemented"), unicode(ex.value)
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:7,代码来源:test_cluster_obj.py


示例12: test_init_slots_cache_not_all_slots

def test_init_slots_cache_not_all_slots(s):
    """
    Test that if not all slots are covered it should raise an exception
    """
    # Create wrapper function so we can inject custom 'CLUSTER SLOTS' command result
    def get_redis_link_wrapper(host, port, decode_responses=False):
        link = StrictRedis(host="127.0.0.1", port=7000, decode_responses=True)

        # Missing slot 5460
        bad_slots_resp = [
            [0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
            [5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
            [10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]],
        ]

        # Missing slot 5460
        link.execute_command = lambda *args: bad_slots_resp

        return link

    s.connection_pool.nodes.get_redis_link = get_redis_link_wrapper

    with pytest.raises(RedisClusterException) as ex:
        s.connection_pool.nodes.initialize()

    assert unicode(ex.value).startswith("All slots are not covered after query all startup_nodes.")
开发者ID:huangfupeng,项目名称:redis-py-cluster,代码行数:26,代码来源:test_node_manager.py


示例13: test_clusterdown_wrapper

def test_clusterdown_wrapper():
    @clusterdown_wrapper
    def bad_func():
        raise ClusterDownException()

    with pytest.raises(ClusterDownException) as cex:
        bad_func()
    assert unicode(cex.value).startswith("CLUSTERDOWN error. Unable to rebuild the cluster")
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:8,代码来源:test_utils.py


示例14: test_first_key

def test_first_key():
    assert first_key("foobar", {"foo": 1}) == 1

    with pytest.raises(RedisClusterException) as ex:
        first_key("foobar", {"foo": 1, "bar": 2})
    assert unicode(ex.value).startswith("More then 1 result from command: foobar")

    with pytest.raises(AssertionError):
        first_key("foobar", None)
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:9,代码来源:test_utils.py


示例15: test_raise_regular_exception

def test_raise_regular_exception(r):
    """
    If a non redis server exception is passed in it shold be
    raised again.
    """
    e = Exception("foobar")
    with pytest.raises(Exception) as ex:
        r.handle_cluster_command_exception(e)
    assert unicode(ex.value).startswith("foobar")
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:9,代码来源:test_cluster_obj.py


示例16: test_blocked_strict_redis_args

    def test_blocked_strict_redis_args(self):
        """
        Some arguments should explicitly be blocked because they will not work in a cluster setup
        """
        params = {'startup_nodes': [{'host': '127.0.0.1', 'port': 7000}]}
        c = RedisCluster(**params)
        assert c.opt["socket_timeout"] == RedisCluster.RedisClusterDefaultTimeout

        with pytest.raises(RedisClusterException) as ex:
            _get_client(db=1)
        assert unicode(ex.value).startswith("(error) [Remove 'db' from kwargs]"), True

        with pytest.raises(RedisClusterException) as ex:
            _get_client(host="foo.bar")
        assert unicode(ex.value).startswith("(error) [Remove 'host' from kwargs]"), True

        with pytest.raises(RedisClusterException) as ex:
            _get_client(port=1337)
        assert unicode(ex.value).startswith("(error) [Remove 'port' from kwargs]"), True
开发者ID:yekeqiang,项目名称:redis-py-cluster,代码行数:19,代码来源:test_cluster_obj.py


示例17: test_get_connection_blocked

    def test_get_connection_blocked(self):
        """
        Currently get_connection() should only be used by pubsub command.
        All other commands should be blocked and exception raised.
        """
        pool = self.get_pool()

        with pytest.raises(RedisClusterException) as ex:
            pool.get_connection("GET")
        assert unicode(ex.value).startswith("Only 'pubsub' commands can be used by get_connection()")
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:10,代码来源:test_cluster_connection_pool.py


示例18: test_keyslot

def test_keyslot():
    """
    Test that method will compute correct key in all supported cases
    """
    n = NodeManager([{}])

    assert n.keyslot("foo") == 12182
    assert n.keyslot("{foo}bar") == 12182
    assert n.keyslot("{foo}") == 12182
    assert n.keyslot(1337) == 4314

    assert n.keyslot(125) == n.keyslot(b"125")
    assert n.keyslot(125) == n.keyslot("\x31\x32\x35")
    assert n.keyslot("大奖") == n.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96")
    assert n.keyslot(u"大奖") == n.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96")
    assert n.keyslot(1337.1234) == n.keyslot("1337.1234")
    assert n.keyslot(1337) == n.keyslot("1337")
    assert n.keyslot(b"abc") == n.keyslot("abc")
    assert n.keyslot("abc") == n.keyslot(unicode("abc"))
    assert n.keyslot(unicode("abc")) == n.keyslot(b"abc")
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:20,代码来源:test_node_manager.py


示例19: test_get_connection_by_key

    def test_get_connection_by_key(self, r):
        """
        This test assumes that when hashing key 'foo' will be sent to server with port 7002
        """
        connection = r.get_connection_by_key("foo")
        assert connection.connection_pool.connection_kwargs["port"] == 7002

        with pytest.raises(RedisClusterException) as ex:
            r.get_connection_by_key(None)

        assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster."), True
开发者ID:yekeqiang,项目名称:redis-py-cluster,代码行数:11,代码来源:test_cluster_obj.py


示例20: test_blocked_strict_redis_args

def test_blocked_strict_redis_args():
    """
    Some arguments should explicitly be blocked because they will not work in a cluster setup
    """
    params = {"startup_nodes": [{"host": "127.0.0.1", "port": 7000}]}
    c = StrictRedisCluster(**params)
    assert c.connection_pool.connection_kwargs["socket_timeout"] == ClusterConnectionPool.RedisClusterDefaultTimeout

    with pytest.raises(RedisClusterException) as ex:
        _get_client(db=1)
    assert unicode(ex.value).startswith("Argument 'db' is not possible to use in cluster mode")
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:11,代码来源:test_cluster_obj.py



注:本文中的redis._compat.unicode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python control.addonBanner函数代码示例发布时间:2022-05-27
下一篇:
Python _compat.unichr函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap