本文整理汇总了Python中redis._compat.unicode函数的典型用法代码示例。如果您正苦于以下问题:Python unicode函数的具体用法?Python unicode怎么用?Python unicode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了unicode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: annotate_exception
def annotate_exception(self, exception, number, command):
"""
"""
cmd = unicode(' ').join(imap(unicode, command))
msg = unicode('Command # {0} ({1}) of pipeline caused error: {2}').format(
number, cmd, unicode(exception.args[0]))
exception.args = (msg,) + exception.args[1:]
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:7,代码来源:pipeline.py
示例2: test_empty_startup_nodes
def test_empty_startup_nodes(s):
"""
Test that exception is raised when empty providing empty startup_nodes
"""
with pytest.raises(RedisClusterException) as ex:
_get_client(init_slot_cache=False, startup_nodes=[])
assert unicode(ex.value).startswith("No startup nodes provided"), unicode(ex.value)
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:8,代码来源:test_cluster_obj.py
示例3: test_execute_command_errors
def test_execute_command_errors(r):
"""
If no command is given to `_determine_nodes` then exception
should be raised.
Test that if no key is provided then exception should be raised.
"""
with pytest.raises(RedisClusterException) as ex:
r.execute_command()
assert unicode(ex.value).startswith("Unable to determine command to use")
with pytest.raises(RedisClusterException) as ex:
r.execute_command("GET")
assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster. Missing key.")
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:14,代码来源:test_cluster_obj.py
示例4: test_blocked_arguments
def test_blocked_arguments(self, r):
"""
Currently some arguments is blocked when using in cluster mode.
They maybe implemented in the future.
"""
with pytest.raises(RedisClusterException) as ex:
r.pipeline(transaction=True)
assert unicode(ex.value).startswith("transaction is deprecated in cluster mode"), True
with pytest.raises(RedisClusterException) as ex:
r.pipeline(shard_hint=True)
assert unicode(ex.value).startswith("shard_hint is deprecated in cluster mode"), True
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:14,代码来源:test_pipeline.py
示例5: test_exec_error_in_no_transaction_pipeline_unicode_command
def test_exec_error_in_no_transaction_pipeline_unicode_command(self, r):
key = unichr(3456) + u('abcd') + unichr(3421)
r[key] = 1
with r.pipeline(transaction=False) as pipe:
pipe.llen(key)
pipe.expire(key, 100)
with pytest.raises(ResponseError) as ex:
pipe.execute()
expected = unicode('Command # 1 (LLEN {0}) of pipeline caused error: ').format(key)
assert unicode(ex.value).startswith(expected)
assert r[key] == b('1')
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:14,代码来源:test_pipeline.py
示例6: test_rename
def test_rename(self, r):
r['a'] = '1'
assert r.rename('a', 'b')
assert r.get('a') is None
assert r['b'] == b('1')
with pytest.raises(ResponseError) as ex:
r.rename("foo", "foo")
assert unicode(ex.value).startswith("source and destination objects are the same")
assert r.get("foo") is None
with pytest.raises(ResponseError) as ex:
r.rename("foo", "bar")
assert unicode(ex.value).startswith("no such key")
开发者ID:baranbartu,项目名称:redis-py-cluster,代码行数:14,代码来源:test_commands.py
示例7: test_password_procted_nodes
def test_password_procted_nodes():
"""
Test that it is possible to connect to password protected nodes
"""
startup_nodes = [{"host": "127.0.0.1", "port": "7000"}]
password_protected_startup_nodes = [{"host": "127.0.0.1", "port": "7100"}]
with pytest.raises(RedisClusterException) as ex:
_get_client(startup_nodes=password_protected_startup_nodes)
assert unicode(ex.value).startswith("ERROR sending 'cluster slots' command to redis server:")
_get_client(startup_nodes=password_protected_startup_nodes, password="password_is_protected")
with pytest.raises(RedisClusterException) as ex:
_get_client(startup_nodes=startup_nodes, password="password_is_protected")
assert unicode(ex.value).startswith("ERROR sending 'cluster slots' command to redis server:")
_get_client(startup_nodes=startup_nodes)
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:15,代码来源:test_cluster_obj.py
示例8: test_exec_error_in_no_transaction_pipeline_unicode_command
def test_exec_error_in_no_transaction_pipeline_unicode_command(self):
key = unichr(3456) + u('abcd') + unichr(3421)
self.rc[key] = 1
with self.rc.pipeline(transaction=False) as pipe:
pipe.llen(key)
pipe.expire(key, 100)
with pytest.raises(redis.ResponseError) as ex:
pipe.execute()
print ex.value
expected = unicode('Command # 1 (LLEN %s) of pipeline caused '
'error: ') % key
assert unicode(ex.value).startswith(expected)
assert self.rc[key] == b('1')
开发者ID:kevinsu,项目名称:redis-py-multi-node,代码行数:15,代码来源:test_pipeline.py
示例9: test_get_connection_by_slot
def test_get_connection_by_slot(self):
"""
"""
pool = self.get_pool(connection_kwargs={})
# Patch the call that is made inside the method to allow control of the returned connection object
with patch.object(ClusterReadOnlyConnectionPool, 'get_master_connection_by_slot', autospec=True) as pool_mock:
def side_effect(self, *args, **kwargs):
return DummyConnection(port=1337)
pool_mock.side_effect = side_effect
# Try a master only command
connection = pool.get_connection_by_key("foo", 'ZSCAN')
assert connection.port == 1337
with patch.object(ClusterReadOnlyConnectionPool, 'get_random_master_slave_connection_by_slot', autospec=True) as pool_mock:
def side_effect(self, *args, **kwargs):
return DummyConnection(port=1337)
pool_mock.side_effect = side_effect
# try a random node command
connection = pool.get_connection_by_key('foo', 'GET')
assert connection.port == 1337
with pytest.raises(RedisClusterException) as ex:
pool.get_connection_by_key(None, None)
assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster."), True
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:27,代码来源:test_cluster_connection_pool.py
示例10: test_init_slots_cache_not_all_slots
def test_init_slots_cache_not_all_slots(self, s):
"""
Test that if not all slots are covered it should raise an exception
"""
# Save the old function so we can wrap it
old_get_link_from_node = s.get_redis_link_from_node
# Create wrapper function so we can inject custom 'CLUSTER SLOTS' command result
def new_get_link_from_node(node):
link = old_get_link_from_node(node)
# Missing slot 5460
bad_slots_resp = [[0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
[5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
[10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]]]
# Missing slot 5460
link.execute_command = lambda *args: bad_slots_resp
return link
s.get_redis_link_from_node = new_get_link_from_node
with pytest.raises(RedisClusterException) as ex:
s.initialize_slots_cache()
assert unicode(ex.value).startswith("All slots are not covered after querry all startup_nodes."), True
开发者ID:yekeqiang,项目名称:redis-py-cluster,代码行数:27,代码来源:test_cluster_obj.py
示例11: test_blocked_transaction
def test_blocked_transaction(r):
"""
Method transaction is blocked/NYI and should raise exception on use
"""
with pytest.raises(RedisClusterException) as ex:
r.transaction(None)
assert unicode(ex.value).startswith("method StrictRedisCluster.transaction() is not implemented"), unicode(ex.value)
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:7,代码来源:test_cluster_obj.py
示例12: test_init_slots_cache_not_all_slots
def test_init_slots_cache_not_all_slots(s):
"""
Test that if not all slots are covered it should raise an exception
"""
# Create wrapper function so we can inject custom 'CLUSTER SLOTS' command result
def get_redis_link_wrapper(host, port, decode_responses=False):
link = StrictRedis(host="127.0.0.1", port=7000, decode_responses=True)
# Missing slot 5460
bad_slots_resp = [
[0, 5459, [b'127.0.0.1', 7000], [b'127.0.0.1', 7003]],
[5461, 10922, [b'127.0.0.1', 7001], [b'127.0.0.1', 7004]],
[10923, 16383, [b'127.0.0.1', 7002], [b'127.0.0.1', 7005]],
]
# Missing slot 5460
link.execute_command = lambda *args: bad_slots_resp
return link
s.connection_pool.nodes.get_redis_link = get_redis_link_wrapper
with pytest.raises(RedisClusterException) as ex:
s.connection_pool.nodes.initialize()
assert unicode(ex.value).startswith("All slots are not covered after query all startup_nodes.")
开发者ID:huangfupeng,项目名称:redis-py-cluster,代码行数:26,代码来源:test_node_manager.py
示例13: test_clusterdown_wrapper
def test_clusterdown_wrapper():
@clusterdown_wrapper
def bad_func():
raise ClusterDownException()
with pytest.raises(ClusterDownException) as cex:
bad_func()
assert unicode(cex.value).startswith("CLUSTERDOWN error. Unable to rebuild the cluster")
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:8,代码来源:test_utils.py
示例14: test_first_key
def test_first_key():
assert first_key("foobar", {"foo": 1}) == 1
with pytest.raises(RedisClusterException) as ex:
first_key("foobar", {"foo": 1, "bar": 2})
assert unicode(ex.value).startswith("More then 1 result from command: foobar")
with pytest.raises(AssertionError):
first_key("foobar", None)
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:9,代码来源:test_utils.py
示例15: test_raise_regular_exception
def test_raise_regular_exception(r):
"""
If a non redis server exception is passed in it shold be
raised again.
"""
e = Exception("foobar")
with pytest.raises(Exception) as ex:
r.handle_cluster_command_exception(e)
assert unicode(ex.value).startswith("foobar")
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:9,代码来源:test_cluster_obj.py
示例16: test_blocked_strict_redis_args
def test_blocked_strict_redis_args(self):
"""
Some arguments should explicitly be blocked because they will not work in a cluster setup
"""
params = {'startup_nodes': [{'host': '127.0.0.1', 'port': 7000}]}
c = RedisCluster(**params)
assert c.opt["socket_timeout"] == RedisCluster.RedisClusterDefaultTimeout
with pytest.raises(RedisClusterException) as ex:
_get_client(db=1)
assert unicode(ex.value).startswith("(error) [Remove 'db' from kwargs]"), True
with pytest.raises(RedisClusterException) as ex:
_get_client(host="foo.bar")
assert unicode(ex.value).startswith("(error) [Remove 'host' from kwargs]"), True
with pytest.raises(RedisClusterException) as ex:
_get_client(port=1337)
assert unicode(ex.value).startswith("(error) [Remove 'port' from kwargs]"), True
开发者ID:yekeqiang,项目名称:redis-py-cluster,代码行数:19,代码来源:test_cluster_obj.py
示例17: test_get_connection_blocked
def test_get_connection_blocked(self):
"""
Currently get_connection() should only be used by pubsub command.
All other commands should be blocked and exception raised.
"""
pool = self.get_pool()
with pytest.raises(RedisClusterException) as ex:
pool.get_connection("GET")
assert unicode(ex.value).startswith("Only 'pubsub' commands can be used by get_connection()")
开发者ID:OBJ-feye,项目名称:redis-py-cluster,代码行数:10,代码来源:test_cluster_connection_pool.py
示例18: test_keyslot
def test_keyslot():
"""
Test that method will compute correct key in all supported cases
"""
n = NodeManager([{}])
assert n.keyslot("foo") == 12182
assert n.keyslot("{foo}bar") == 12182
assert n.keyslot("{foo}") == 12182
assert n.keyslot(1337) == 4314
assert n.keyslot(125) == n.keyslot(b"125")
assert n.keyslot(125) == n.keyslot("\x31\x32\x35")
assert n.keyslot("大奖") == n.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96")
assert n.keyslot(u"大奖") == n.keyslot(b"\xe5\xa4\xa7\xe5\xa5\x96")
assert n.keyslot(1337.1234) == n.keyslot("1337.1234")
assert n.keyslot(1337) == n.keyslot("1337")
assert n.keyslot(b"abc") == n.keyslot("abc")
assert n.keyslot("abc") == n.keyslot(unicode("abc"))
assert n.keyslot(unicode("abc")) == n.keyslot(b"abc")
开发者ID:Grokzen,项目名称:redis-py-cluster,代码行数:20,代码来源:test_node_manager.py
示例19: test_get_connection_by_key
def test_get_connection_by_key(self, r):
"""
This test assumes that when hashing key 'foo' will be sent to server with port 7002
"""
connection = r.get_connection_by_key("foo")
assert connection.connection_pool.connection_kwargs["port"] == 7002
with pytest.raises(RedisClusterException) as ex:
r.get_connection_by_key(None)
assert unicode(ex.value).startswith("No way to dispatch this command to Redis Cluster."), True
开发者ID:yekeqiang,项目名称:redis-py-cluster,代码行数:11,代码来源:test_cluster_obj.py
示例20: test_blocked_strict_redis_args
def test_blocked_strict_redis_args():
"""
Some arguments should explicitly be blocked because they will not work in a cluster setup
"""
params = {"startup_nodes": [{"host": "127.0.0.1", "port": 7000}]}
c = StrictRedisCluster(**params)
assert c.connection_pool.connection_kwargs["socket_timeout"] == ClusterConnectionPool.RedisClusterDefaultTimeout
with pytest.raises(RedisClusterException) as ex:
_get_client(db=1)
assert unicode(ex.value).startswith("Argument 'db' is not possible to use in cluster mode")
开发者ID:svrana,项目名称:redis-py-cluster,代码行数:11,代码来源:test_cluster_obj.py
注:本文中的redis._compat.unicode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论