本文整理汇总了Python中sentry.utils.redis.clusters.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_is_rate_limited_script
def test_is_rate_limited_script():
now = int(time.time())
cluster = clusters.get('default')
client = cluster.get_local_client(six.next(iter(cluster.hosts)))
# The item should not be rate limited by either key.
assert list(map(bool, is_rate_limited(client, ('foo', 'bar'), (1, now + 60, 2, now + 120)))
) == [False, False]
# The item should be rate limited by the first key (1).
assert list(map(bool, is_rate_limited(client, ('foo', 'bar'), (1, now + 60, 2, now + 120)))
) == [True, False]
# The item should still be rate limited by the first key (1), but *not*
# rate limited by the second key (2) even though this is the third time
# we've checked the quotas. This ensures items that are rejected by a lower
# quota don't affect unrelated items that share a parent quota.
assert list(map(bool, is_rate_limited(client, ('foo', 'bar'), (1, now + 60, 2, now + 120)))
) == [True, False]
assert client.get('foo') == '1'
assert 59 <= client.ttl('foo') <= 60
assert client.get('bar') == '1'
assert 119 <= client.ttl('bar') <= 120
开发者ID:alshopov,项目名称:sentry,代码行数:26,代码来源:tests.py
示例2: pytest_runtest_teardown
def pytest_runtest_teardown(item):
from sentry.app import tsdb
tsdb.flush()
from sentry.utils.redis import clusters
with clusters.get('default').all() as client:
client.flushdb()
from celery.task.control import discard_all
discard_all()
开发者ID:Cobbyzhang,项目名称:sentry,代码行数:11,代码来源:pytest.py
示例3: pytest_runtest_teardown
def pytest_runtest_teardown(item):
from sentry import tsdb
# TODO(dcramer): this only works if this is the correct tsdb backend
tsdb.backend.flush()
from sentry.utils.redis import clusters
with clusters.get('default').all() as client:
client.flushdb()
from celery.task.control import discard_all
discard_all()
开发者ID:sashahilton00,项目名称:sentry,代码行数:12,代码来源:sentry.py
示例4: test_ensure_timeline_scheduled_script
def test_ensure_timeline_scheduled_script(self):
cluster = clusters.get('default')
client = cluster.get_local_client(six.next(iter(cluster.hosts)))
timeline = 'timeline'
timestamp = 100.0
waiting_set_size = functools.partial(client.zcard, 'waiting')
ready_set_size = functools.partial(client.zcard, 'ready')
timeline_score_in_waiting_set = functools.partial(client.zscore, 'waiting', timeline)
timeline_score_in_ready_set = functools.partial(client.zscore, 'ready', timeline)
keys = ('waiting', 'ready', 'last-processed')
# The first addition should cause the timeline to be added to the ready set.
with self.assertChanges(ready_set_size, before=0, after=1), \
self.assertChanges(timeline_score_in_ready_set, before=None, after=timestamp):
assert ensure_timeline_scheduled(client, keys, (timeline, timestamp, 1, 10)) == 1
# Adding it again with a timestamp in the future should not change the schedule time.
with self.assertDoesNotChange(waiting_set_size), \
self.assertDoesNotChange(ready_set_size), \
self.assertDoesNotChange(timeline_score_in_ready_set):
assert ensure_timeline_scheduled(client, keys, (timeline, timestamp + 50, 1, 10)) is None
# Move the timeline from the ready set to the waiting set.
client.zrem('ready', timeline)
client.zadd('waiting', timestamp, timeline)
client.set('last-processed', timestamp)
increment = 1
with self.assertDoesNotChange(waiting_set_size), \
self.assertChanges(timeline_score_in_waiting_set, before=timestamp, after=timestamp + increment):
assert ensure_timeline_scheduled(client, keys, (timeline, timestamp, increment, 10)) is None
# Make sure the schedule respects the maximum value.
with self.assertDoesNotChange(waiting_set_size), \
self.assertChanges(timeline_score_in_waiting_set, before=timestamp + 1, after=timestamp):
assert ensure_timeline_scheduled(client, keys, (timeline, timestamp, increment, 0)) is None
# Test to ensure a missing last processed timestamp can be handled
# correctly (chooses minimum of schedule value and record timestamp.)
client.zadd('waiting', timestamp, timeline)
client.delete('last-processed')
with self.assertDoesNotChange(waiting_set_size), \
self.assertDoesNotChange(timeline_score_in_waiting_set):
assert ensure_timeline_scheduled(client, keys, (timeline, timestamp + 100, increment, 10)) is None
with self.assertDoesNotChange(waiting_set_size), \
self.assertChanges(timeline_score_in_waiting_set, before=timestamp, after=timestamp - 100):
assert ensure_timeline_scheduled(client, keys, (timeline, timestamp - 100, increment, 10)) is None
开发者ID:ForkRepo,项目名称:sentry,代码行数:52,代码来源:test_redis.py
示例5: test_is_rate_limited_script
def test_is_rate_limited_script():
now = int(time.time())
cluster = clusters.get('default')
client = cluster.get_local_client(six.next(iter(cluster.hosts)))
# The item should not be rate limited by either key.
assert list(map(bool, is_rate_limited(
client, ('foo', 'r:foo', 'bar', 'r:bar'), (1, now + 60, 2, now + 120)))
) == [False, False]
# The item should be rate limited by the first key (1).
assert list(map(bool, is_rate_limited(
client, ('foo', 'r:foo', 'bar', 'r:bar'), (1, now + 60, 2, now + 120)))
) == [True, False]
# The item should still be rate limited by the first key (1), but *not*
# rate limited by the second key (2) even though this is the third time
# we've checked the quotas. This ensures items that are rejected by a lower
# quota don't affect unrelated items that share a parent quota.
assert list(map(bool, is_rate_limited(
client, ('foo', 'r:foo', 'bar', 'r:bar'), (1, now + 60, 2, now + 120)))
) == [True, False]
assert client.get('foo') == '1'
assert 59 <= client.ttl('foo') <= 60
assert client.get('bar') == '1'
assert 119 <= client.ttl('bar') <= 120
# make sure "refund/negative" keys haven't been incremented
assert client.get('r:foo') is None
assert client.get('r:bar') is None
# Test that refunded quotas work
client.set('apple', 5)
# increment
is_rate_limited(
client, ('orange', 'baz'), (1, now + 60)
)
# test that it's rate limited without refund
assert list(map(bool, is_rate_limited(
client, ('orange', 'baz'), (1, now + 60)
))) == [True, ]
# test that refund key is used
assert list(map(bool, is_rate_limited(
client, ('orange', 'apple'), (1, now + 60)
))) == [False, ]
开发者ID:binlee1990,项目名称:sentry,代码行数:48,代码来源:tests.py
示例6: pytest_runtest_teardown
def pytest_runtest_teardown(item):
from sentry import tsdb
# TODO(dcramer): this only works if this is the correct tsdb backend
tsdb.flush()
# XXX(dcramer): only works with DummyNewsletter
from sentry import newsletter
if hasattr(newsletter.backend, 'clear'):
newsletter.backend.clear()
from sentry.utils.redis import clusters
with clusters.get('default').all() as client:
client.flushdb()
from celery.task.control import discard_all
discard_all()
开发者ID:hosmelq,项目名称:sentry,代码行数:17,代码来源:sentry.py
示例7: pytest_runtest_teardown
def pytest_runtest_teardown(item):
from sentry import tsdb
# TODO(dcramer): this only works if this is the correct tsdb backend
tsdb.flush()
# XXX(dcramer): only works with DummyNewsletter
from sentry import newsletter
if hasattr(newsletter.backend, 'clear'):
newsletter.backend.clear()
from sentry.utils.redis import clusters
with clusters.get('default').all() as client:
client.flushdb()
from celery.task.control import discard_all
discard_all()
from sentry.models import OrganizationOption, ProjectOption, UserOption
for model in (OrganizationOption, ProjectOption, UserOption):
model.objects.clear_local_cache()
开发者ID:binlee1990,项目名称:sentry,代码行数:21,代码来源:sentry.py
示例8: test_truncate_timeline_script
def test_truncate_timeline_script(self):
cluster = clusters.get('default')
client = cluster.get_local_client(six.next(iter(cluster.hosts)))
timeline = 'timeline'
# Preload some fake records (the contents don't matter.)
records = list(itertools.islice(self.records, 10))
for record in records:
client.zadd(timeline, record.timestamp, record.key)
client.set(make_record_key(timeline, record.key), 'data')
with self.assertChanges(lambda: client.zcard(timeline), before=10, after=5):
truncate_timeline(client, (timeline,), (5, timeline))
# Ensure the early records don't exist.
for record in records[:5]:
assert not client.zscore(timeline, record.key)
assert not client.exists(make_record_key(timeline, record.key))
# Ensure the later records do exist.
for record in records[-5:]:
assert client.zscore(timeline, record.key) == float(record.timestamp)
assert client.exists(make_record_key(timeline, record.key))
开发者ID:ForkRepo,项目名称:sentry,代码行数:24,代码来源:test_redis.py
示例9: pytest_configure
def pytest_configure(config):
# HACK: Only needed for testing!
os.environ.setdefault('_SENTRY_SKIP_CONFIGURATION', '1')
os.environ.setdefault('RECAPTCHA_TESTING', 'True')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sentry.conf.server')
if not settings.configured:
# only configure the db if its not already done
test_db = os.environ.get('DB', 'postgres')
if test_db == 'mysql':
settings.DATABASES['default'].update({
'ENGINE': 'django.db.backends.mysql',
'NAME': 'sentry',
'USER': 'root',
'HOST': '127.0.0.1',
})
# mysql requires running full migration all the time
settings.SOUTH_TESTS_MIGRATE = True
elif test_db == 'postgres':
settings.DATABASES['default'].update({
'ENGINE': 'sentry.db.postgres',
'USER': 'postgres',
'NAME': 'sentry',
})
# postgres requires running full migration all the time
# since it has to install stored functions which come from
# an actual migration.
settings.SOUTH_TESTS_MIGRATE = True
elif test_db == 'sqlite':
settings.DATABASES['default'].update({
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
})
settings.SOUTH_TESTS_MIGRATE = os.environ.get('SENTRY_SOUTH_TESTS_MIGRATE', '1') == '1'
else:
raise RuntimeError('oops, wrong database: %r' % test_db)
settings.TEMPLATE_DEBUG = True
# Disable static compiling in tests
settings.STATIC_BUNDLES = {}
# override a few things with our test specifics
settings.INSTALLED_APPS = tuple(settings.INSTALLED_APPS) + (
'tests',
)
# Need a predictable key for tests that involve checking signatures
settings.SENTRY_PUBLIC = False
if not settings.SENTRY_CACHE:
settings.SENTRY_CACHE = 'sentry.cache.django.DjangoCache'
settings.SENTRY_CACHE_OPTIONS = {}
# This speeds up the tests considerably, pbkdf2 is by design, slow.
settings.PASSWORD_HASHERS = [
'django.contrib.auth.hashers.MD5PasswordHasher',
]
# Replace real sudo middleware with our mock sudo middleware
# to assert that the user is always in sudo mode
middleware = list(settings.MIDDLEWARE_CLASSES)
sudo = middleware.index('sentry.middleware.sudo.SudoMiddleware')
middleware[sudo] = 'sentry.testutils.middleware.SudoMiddleware'
settings.MIDDLEWARE_CLASSES = tuple(middleware)
# enable draft features
settings.SENTRY_OPTIONS['mail.enable-replies'] = True
settings.SENTRY_ALLOW_ORIGIN = '*'
settings.SENTRY_TSDB = 'sentry.tsdb.inmemory.InMemoryTSDB'
settings.SENTRY_TSDB_OPTIONS = {}
settings.RECAPTCHA_PUBLIC_KEY = 'a' * 40
settings.RECAPTCHA_PRIVATE_KEY = 'b' * 40
settings.BROKER_BACKEND = 'memory'
settings.BROKER_URL = None
settings.CELERY_ALWAYS_EAGER = False
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
settings.DEBUG_VIEWS = True
settings.DISABLE_RAVEN = True
settings.CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
}
}
if not hasattr(settings, 'SENTRY_OPTIONS'):
settings.SENTRY_OPTIONS = {}
settings.SENTRY_OPTIONS.update({
'redis.clusters': {
'default': {
'hosts': {
0: {
#.........这里部分代码省略.........
开发者ID:Cobbyzhang,项目名称:sentry,代码行数:101,代码来源:pytest.py
示例10: _client
def _client(self):
return clusters.get('default').get_local_client_for_key(self.redis_key)
开发者ID:Kayle009,项目名称:sentry,代码行数:2,代码来源:session_store.py
示例11: cluster
def cluster(self):
return clusters.get('default')
开发者ID:yaoqi,项目名称:sentry,代码行数:2,代码来源:test_redis.py
示例12: manager
from sentry.utils.dates import to_datetime, to_timestamp
from sentry.models import Project, Group, Event
from django.utils import timezone
from django.conf import settings
MAX_RECENT = 15
RECENT_HOURS = 24 * 30
# The Redis cluster manager (``clusters``) was added in Sentry 8.2 (GH-2714)
# and replaces ``make_rb_cluster`` (which will be removed in a future version.)
try:
from sentry.utils.redis import clusters
cluster = clusters.get('default')
except ImportError:
from sentry.utils.redis import make_rb_cluster
cluster = make_rb_cluster(settings.SENTRY_REDIS_OPTIONS['hosts'])
def get_key(tenant):
return 'sentry-hipchat-ac:%s:mentions' % tenant.id
def get_recent_mentions(tenant):
client = cluster.get_routing_client()
key = get_key(tenant)
ids = [x for x in client.zrangebyscore(
key, time.time() - (RECENT_HOURS * 60), '+inf')][-MAX_RECENT:]
开发者ID:getsentry,项目名称:sentry-hipchat-ac,代码行数:30,代码来源:mentions.py
示例13: pytest_configure
def pytest_configure(config):
# HACK: Only needed for testing!
os.environ.setdefault("_SENTRY_SKIP_CONFIGURATION", "1")
os.environ.setdefault("RECAPTCHA_TESTING", "True")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server")
settings.SOUTH_TESTS_MIGRATE = os.environ.get("SENTRY_SOUTH_TESTS_MIGRATE", "1") == "1"
if not settings.configured:
# only configure the db if its not already done
test_db = os.environ.get("DB", "postgres")
if test_db == "mysql":
settings.DATABASES["default"].update(
{"ENGINE": "django.db.backends.mysql", "NAME": "sentry", "USER": "root"}
)
# mysql requires running full migration all the time
settings.SOUTH_TESTS_MIGRATE = True
elif test_db == "postgres":
settings.DATABASES["default"].update({"ENGINE": "sentry.db.postgres", "USER": "postgres", "NAME": "sentry"})
# postgres requires running full migration all the time
# since it has to install stored functions which come from
# an actual migration.
settings.SOUTH_TESTS_MIGRATE = True
elif test_db == "sqlite":
settings.DATABASES["default"].update({"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"})
settings.TEMPLATE_DEBUG = True
# Disable static compiling in tests
settings.STATIC_BUNDLES = {}
# override a few things with our test specifics
settings.INSTALLED_APPS = tuple(settings.INSTALLED_APPS) + ("tests",)
# Need a predictable key for tests that involve checking signatures
settings.SENTRY_PUBLIC = False
if not settings.SENTRY_CACHE:
settings.SENTRY_CACHE = "sentry.cache.django.DjangoCache"
settings.SENTRY_CACHE_OPTIONS = {}
# This speeds up the tests considerably, pbkdf2 is by design, slow.
settings.PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
# Replace real sudo middleware with our mock sudo middleware
# to assert that the user is always in sudo mode
middleware = list(settings.MIDDLEWARE_CLASSES)
sudo = middleware.index("sentry.middleware.sudo.SudoMiddleware")
middleware[sudo] = "sentry.testutils.middleware.SudoMiddleware"
settings.MIDDLEWARE_CLASSES = tuple(middleware)
# enable draft features
settings.SENTRY_OPTIONS["mail.enable-replies"] = True
settings.SENTRY_ALLOW_ORIGIN = "*"
settings.SENTRY_TSDB = "sentry.tsdb.inmemory.InMemoryTSDB"
settings.SENTRY_TSDB_OPTIONS = {}
settings.RECAPTCHA_PUBLIC_KEY = "a" * 40
settings.RECAPTCHA_PRIVATE_KEY = "b" * 40
settings.BROKER_BACKEND = "memory"
settings.BROKER_URL = None
settings.CELERY_ALWAYS_EAGER = False
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
settings.DISABLE_RAVEN = True
settings.CACHES = {"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}
if not hasattr(settings, "SENTRY_OPTIONS"):
settings.SENTRY_OPTIONS = {}
settings.SENTRY_OPTIONS.update(
{
"redis.clusters": {"default": {"hosts": {0: {"db": 9}}}},
"mail.backend": "django.core.mail.backends.locmem.EmailBackend",
"system.url-prefix": "http://testserver",
}
)
# django mail uses socket.getfqdn which doesn't play nice if our
# networking isn't stable
patcher = mock.patch("socket.getfqdn", return_value="localhost")
patcher.start()
from sentry.runner.initializer import bootstrap_options, initialize_receivers, fix_south, bind_cache_to_option_store
bootstrap_options(settings)
fix_south(settings)
bind_cache_to_option_store()
initialize_receivers()
from sentry.utils.redis import clusters
with clusters.get("default").all() as client:
client.flushdb()
#.........这里部分代码省略.........
开发者ID:pombredanne,项目名称:django-sentry,代码行数:101,代码来源:pytest.py
示例14: pytest_configure
def pytest_configure(config):
# HACK: Only needed for testing!
os.environ.setdefault('_SENTRY_SKIP_CONFIGURATION', '1')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'sentry.conf.server')
# override docs which are typically synchronized from an upstream server
# to ensure tests are consistent
os.environ.setdefault(
'INTEGRATION_DOC_FOLDER',
os.path.join(
TEST_ROOT,
'fixtures',
'integration-docs'))
from sentry.utils import integrationdocs
integrationdocs.DOC_FOLDER = os.environ['INTEGRATION_DOC_FOLDER']
if not settings.configured:
# only configure the db if its not already done
test_db = os.environ.get('DB', 'postgres')
if test_db == 'mysql':
settings.DATABASES['default'].update(
{
'ENGINE': 'django.db.backends.mysql',
'NAME': 'sentry',
'USER': 'root',
'HOST': '127.0.0.1',
}
)
# mysql requires running full migration all the time
elif test_db == 'postgres':
settings.DATABASES['default'].update(
{
'ENGINE': 'sentry.db.postgres',
'USER': 'postgres',
'NAME': 'sentry',
'HOST': '127.0.0.1',
}
)
# postgres requires running full migration all the time
# since it has to install stored functions which come from
# an actual migration.
elif test_db == 'sqlite':
settings.DATABASES['default'].update(
{
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
)
else:
raise RuntimeError('oops, wrong database: %r' % test_db)
settings.TEMPLATE_DEBUG = True
# Disable static compiling in tests
settings.STATIC_BUNDLES = {}
# override a few things with our test specifics
settings.INSTALLED_APPS = tuple(settings.INSTALLED_APPS) + ('tests', )
# Need a predictable key for tests that involve checking signatures
settings.SENTRY_PUBLIC = False
if not settings.SENTRY_CACHE:
settings.SENTRY_CACHE = 'sentry.cache.django.DjangoCache'
settings.SENTRY_CACHE_OPTIONS = {}
# This speeds up the tests considerably, pbkdf2 is by design, slow.
settings.PASSWORD_HASHERS = [
'django.contrib.auth.hashers.MD5PasswordHasher',
]
settings.AUTH_PASSWORD_VALIDATORS = []
# Replace real sudo middleware with our mock sudo middleware
# to assert that the user is always in sudo mode
middleware = list(settings.MIDDLEWARE_CLASSES)
sudo = middleware.index('sentry.middleware.sudo.SudoMiddleware')
middleware[sudo] = 'sentry.testutils.middleware.SudoMiddleware'
settings.MIDDLEWARE_CLASSES = tuple(middleware)
settings.SENTRY_OPTIONS['cloudflare.secret-key'] = 'cloudflare-secret-key'
# enable draft features
settings.SENTRY_OPTIONS['mail.enable-replies'] = True
settings.SENTRY_ALLOW_ORIGIN = '*'
settings.SENTRY_TSDB = 'sentry.tsdb.inmemory.InMemoryTSDB'
settings.SENTRY_TSDB_OPTIONS = {}
if settings.SENTRY_NEWSLETTER == 'sentry.newsletter.base.Newsletter':
settings.SENTRY_NEWSLETTER = 'sentry.newsletter.dummy.DummyNewsletter'
settings.SENTRY_NEWSLETTER_OPTIONS = {}
settings.BROKER_BACKEND = 'memory'
settings.BROKER_URL = None
settings.CELERY_ALWAYS_EAGER = False
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
settings.DEBUG_VIEWS = True
#.........这里部分代码省略.........
开发者ID:Kayle009,项目名称:sentry,代码行数:101,代码来源:sentry.py
注:本文中的sentry.utils.redis.clusters.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论