本文整理汇总了Python中socorro.external.elasticsearch.crashstorage.ElasticSearchCrashStorage类的典型用法代码示例。如果您正苦于以下问题:Python ElasticSearchCrashStorage类的具体用法?Python ElasticSearchCrashStorage怎么用?Python ElasticSearchCrashStorage使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ElasticSearchCrashStorage类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_sending_many_emails
def test_sending_many_emails(self, exacttarget_mock):
"""Test that we can send emails to a lot of users in the same run. """
# First add a lot of emails.
now = utc_now() - datetime.timedelta(minutes=30)
config_manager = self._setup_storage_config()
with config_manager.context() as config:
storage = ElasticSearchCrashStorage(config)
for i in range(21):
storage.save_processed({
'uuid': 'fake-%s' % i,
'email': 'fake-%[email protected]' % i,
'product': 'WaterWolf',
'version': '20.0',
'release_channel': 'Release',
'date_processed': now,
})
storage.es.refresh()
config_manager = self._setup_simple_config()
with config_manager.context() as config:
job = automatic_emails.AutomaticEmailsCronApp(config, '')
job.run(utc_now())
et_mock = exacttarget_mock.return_value
# Verify that we have the default 4 results + the 21 we added.
self.assertEqual(et_mock.trigger_send.call_count, 25)
开发者ID:pkucoin,项目名称:socorro,代码行数:30,代码来源:test_automatic_emails.py
示例2: test_success_after_limited_retry
def test_success_after_limited_retry(self):
mock_logging = mock.Mock()
required_config = ElasticSearchCrashStorage.required_config
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'submission_url': 'http://elasticsearch_host/%s',
'timeout': 0,
'backoff_delays': [0, 0, 0],
'transaction_executor_class': TransactionExecutorWithLimitedBackoff
}]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
urllib_str = 'socorro.external.elasticsearch.crashstorage.urllib2'
m_request = mock.Mock()
m_urlopen = mock.Mock()
with mock.patch(urllib_str) as mocked_urllib:
mocked_urllib.Request = m_request
m_request.return_value = 17
mocked_urllib.urlopen = m_urlopen
urlopen_results = [urllib2.socket.timeout,
urllib2.socket.timeout]
def urlopen_fn(*args, **kwargs):
try:
r = urlopen_results.pop(0)
raise r
except IndexError:
return m_urlopen
m_urlopen.side_effect = urlopen_fn
es_storage.save_processed(a_processed_crash)
expected_request_args = (
'http://elasticsearch_host/9120408936ce666-ff3b-4c7a-9674-'
'367fe2120408',
{},
)
m_request.assert_called_with(*expected_request_args)
self.assertEqual(m_urlopen.call_count, 3)
expected_urlopen_args = (17,)
expected_urlopen_kwargs = {'timeout': 0}
m_urlopen.assert_called_with(*expected_urlopen_args,
**expected_urlopen_kwargs)
开发者ID:Meghashyamt,项目名称:socorro,代码行数:54,代码来源:test_crashstorage.py
示例3: test_success_after_limited_retry
def test_success_after_limited_retry(self, pyes_mock):
mock_logging = mock.Mock()
mock_es = mock.Mock()
pyes_mock.ElasticSearch.return_value = mock_es
required_config = ElasticSearchCrashStorage.required_config
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'elasticsearch_urls': 'http://elasticsearch_host:9200',
'timeout': 0,
'backoff_delays': [0, 0, 0],
'transaction_executor_class':
TransactionExecutorWithLimitedBackoff
}]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
esindex_results = [pyelasticsearch.exceptions.Timeout,
pyelasticsearch.exceptions.Timeout]
def esindex_fn(*args, **kwargs):
try:
r = esindex_results.pop(0)
raise r
except IndexError:
return mock_es.index
mock_es.index.side_effect = esindex_fn
es_storage.save_processed(a_processed_crash)
expected_request_args = (
'socorro201214',
'crash_reports',
a_processed_crash
)
expected_request_kwargs = {
'replication': 'async',
'id': a_processed_crash['uuid'],
}
mock_es.index.assert_called_with(
*expected_request_args,
**expected_request_kwargs
)
开发者ID:GabiThume,项目名称:socorro,代码行数:54,代码来源:test_crashstorage.py
示例4: test_success
def test_success(self, pyes_mock):
mock_logging = mock.Mock()
mock_es = mock.Mock()
pyes_mock.ElasticSearch.return_value = mock_es
required_config = ElasticSearchCrashStorage.get_required_config()
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'elasticsearch_urls': 'http://elasticsearch_host:9200',
}],
argv_source=[]
)
with config_manager.context() as config:
crash_id = a_processed_crash['uuid']
es_storage = ElasticSearchCrashStorage(config)
es_storage.save_raw_and_processed(
a_raw_crash,
None,
a_processed_crash.copy(),
crash_id,
)
expected_crash = {
'crash_id': crash_id,
'processed_crash': a_processed_crash.copy(),
'raw_crash': a_raw_crash
}
expected_request_args = (
'socorro201214',
'crash_reports',
expected_crash
)
expected_request_kwargs = {
'id': crash_id,
'replication': 'async',
}
mock_es.index.assert_called_with(
*expected_request_args,
**expected_request_kwargs
)
开发者ID:FishingCactus,项目名称:socorro,代码行数:51,代码来源:test_crashstorage.py
示例5: test_failure_limited_retry
def test_failure_limited_retry(self, pyes_mock):
mock_logging = mock.Mock()
mock_es = mock.Mock()
pyes_mock.ElasticSearch.return_value = mock_es
required_config = ElasticSearchCrashStorage.get_required_config()
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'elasticsearch_urls': 'http://elasticsearch_host:9200',
'timeout': 0,
'backoff_delays': [0, 0, 0],
'transaction_executor_class':
TransactionExecutorWithLimitedBackoff
}],
argv_source=[]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
failure_exception = pyelasticsearch.exceptions.Timeout
mock_es.index.side_effect = failure_exception
crash_id = a_processed_crash['uuid']
assert_raises(
pyelasticsearch.exceptions.Timeout,
es_storage.save_raw_and_processed,
a_raw_crash,
None,
a_processed_crash.copy(),
crash_id,
)
expected_crash = {
'crash_id': crash_id,
'processed_crash': a_processed_crash.copy(),
'raw_crash': a_raw_crash
}
expected_request_args = (
'socorro201214',
'crash_reports',
expected_crash
)
expected_request_kwargs = {
'id': crash_id,
}
mock_es.index.assert_called_with(
*expected_request_args,
**expected_request_kwargs
)
开发者ID:vivekkiran,项目名称:socorro-1,代码行数:60,代码来源:test_crashstorage.py
示例6: __init__
def __init__(self, *args, **kwargs):
super(
IntegrationTestIndexCleaner,
self
).__init__(*args, **kwargs)
storage_config = self._setup_config()
with storage_config.context() as config:
self.storage = ElasticSearchCrashStorage(config)
开发者ID:abudulemusa,项目名称:socorro,代码行数:9,代码来源:test_index_cleaner.py
示例7: test_indexing
def test_indexing(self, pyes_mock):
mock_logging = mock.Mock()
mock_es = mock.Mock()
pyes_mock.exceptions.ElasticHttpNotFoundError = \
pyelasticsearch.exceptions.ElasticHttpNotFoundError
pyes_mock.ElasticSearch.return_value = mock_es
required_config = ElasticSearchCrashStorage.required_config
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'elasticsearch_urls': 'http://elasticsearch_host:9200',
}]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
crash_report = a_processed_crash.copy()
crash_report['date_processed'] = '2013-01-01 10:56:41.558922'
def status_fn(index):
assert 'socorro20130' in index
if index == 'socorro201300':
raise pyelasticsearch.exceptions.ElasticHttpNotFoundError()
mock_es.status = status_fn
# The index does not exist and is created
es_storage.save_processed(crash_report)
self.assertEqual(mock_es.create_index.call_count, 1)
# The index exists and is not created
crash_report['date_processed'] = '2013-01-10 10:56:41.558922'
es_storage.save_processed(crash_report)
self.assertEqual(mock_es.create_index.call_count, 1)
开发者ID:GabiThume,项目名称:socorro,代码行数:42,代码来源:test_crashstorage.py
示例8: _setup_storage_config
def _setup_storage_config(self):
mock_logging = mock.Mock()
storage_conf = ElasticSearchCrashStorage.get_required_config()
storage_conf.add_option('logger', default=mock_logging)
return ConfigurationManager(
[storage_conf],
values_source_list=[os.environ],
argv_source=[]
)
开发者ID:Earth4,项目名称:socorro,代码行数:11,代码来源:test_elasticsearch_cleanup.py
示例9: test_success
def test_success(self):
mock_logging = mock.Mock()
required_config = ElasticSearchCrashStorage.required_config
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'submission_url': 'http://elasticsearch_host/%s'
}]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
urllib_str = 'socorro.external.elasticsearch.crashstorage.urllib2'
m_request = mock.Mock()
m_urlopen = mock.Mock()
with mock.patch(urllib_str) as mocked_urllib:
mocked_urllib.Request = m_request
m_request.return_value = 17
mocked_urllib.urlopen = m_urlopen
es_storage.save_processed(a_processed_crash)
expected_request_args = (
'http://elasticsearch_host/9120408936ce666-ff3b-4c7a-9674-'
'367fe2120408',
{},
)
m_request.assert_called_with(*expected_request_args)
expected_urlopen_args = (17,)
expected_urlopen_kwargs = {'timeout': 2}
m_urlopen.assert_called_with(*expected_urlopen_args,
**expected_urlopen_kwargs)
开发者ID:Meghashyamt,项目名称:socorro,代码行数:38,代码来源:test_crashstorage.py
示例10: _setup_storage_config
def _setup_storage_config(self):
required_config = ElasticSearchCrashStorage.get_required_config()
overrides = {
'elasticsearch_index': 'socorro_integration_test',
'elasticsearch_emails_index': 'socorro_integration_test_emails',
'elasticsearch_timeout': 5,
'backoff_delays': [1],
}
return get_config_manager_for_crontabber(
more_definitions=required_config,
overrides=overrides
)
开发者ID:Earth4,项目名称:socorro,代码行数:14,代码来源:test_automatic_emails.py
示例11: test_indexing
def test_indexing(self, pyes_mock):
mock_logging = mock.Mock()
mock_es = mock.Mock()
pyes_mock.exceptions.ElasticHttpNotFoundError = \
pyelasticsearch.exceptions.ElasticHttpNotFoundError
pyes_mock.ElasticSearch.return_value = mock_es
required_config = ElasticSearchCrashStorage.get_required_config()
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'elasticsearch_urls': 'http://elasticsearch_host:9200',
}],
argv_source=[]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
crash_report = a_processed_crash.copy()
crash_report['date_processed'] = '2013-01-01 10:56:41.558922'
def create_index_fn(index, **kwargs):
assert 'socorro20130' in index
if index == 'socorro201301':
raise IndexAlreadyExistsError()
mock_es.create_index.side_effect = create_index_fn
# The index does not exist and is created
es_storage.save_processed(crash_report)
eq_(mock_es.create_index.call_count, 1)
call_args = [
args for args, kwargs in mock_logging.info.call_args_list
]
ok_(
('created new elasticsearch index: %s', 'socorro201300')
in call_args
)
# The index exists and is not created
crash_report['date_processed'] = '2013-01-10 10:56:41.558922'
es_storage.save_processed(crash_report)
eq_(mock_es.create_index.call_count, 2)
call_args = [
args for args, kwargs in mock_logging.info.call_args_list
]
ok_(
('created new elasticsearch index: %s', 'socorro201301')
not in call_args
)
开发者ID:vivekkiran,项目名称:socorro-1,代码行数:57,代码来源:test_crashstorage.py
示例12: _setup_storage_config
def _setup_storage_config(self):
storage_conf = ElasticSearchCrashStorage.get_required_config()
storage_conf.add_option('logger', default=mock.Mock())
values_source_list = {
'elasticsearch_index': 'socorro_integration_test',
'elasticsearch_emails_index': 'socorro_integration_test_emails',
'elasticsearch_timeout': 5,
'backoff_delays': [1],
}
return ConfigurationManager(
[storage_conf],
values_source_list=[os.environ, values_source_list],
argv_source=[]
)
开发者ID:pkucoin,项目名称:socorro,代码行数:16,代码来源:test_automatic_emails.py
示例13: test_failure_no_retry
def test_failure_no_retry(self, pyes_mock):
mock_logging = mock.Mock()
mock_es = mock.Mock()
pyes_mock.ElasticSearch.return_value = mock_es
required_config = ElasticSearchCrashStorage.get_required_config()
required_config.add_option('logger', default=mock_logging)
config_manager = ConfigurationManager(
[required_config],
app_name='testapp',
app_version='1.0',
app_description='app description',
values_source_list=[{
'logger': mock_logging,
'elasticsearch_urls': 'http://elasticsearch_host:9200',
}]
)
with config_manager.context() as config:
es_storage = ElasticSearchCrashStorage(config)
failure_exception = Exception('horrors')
mock_es.index.side_effect = failure_exception
self.assertRaises(Exception,
es_storage.save_processed,
a_processed_crash)
expected_request_args = (
'socorro201214',
'crash_reports',
a_processed_crash
)
expected_request_kwargs = {
'replication': 'async',
'id': a_processed_crash['uuid'],
}
mock_es.index.assert_called_with(
*expected_request_args,
**expected_request_kwargs
)
开发者ID:plounze,项目名称:socorro,代码行数:43,代码来源:test_crashstorage.py
示例14: get_config_context
def get_config_context(self):
storage_config = ElasticSearchCrashStorage.get_required_config()
storage_config.add_option('logger', default=self.config.logger)
values_source = {
'resource.elasticsearch.elasticsearch_default_index': 'socorro_integration_test',
'resource.elasticsearch.elasticsearch_index': 'socorro_integration_test_reports',
'resource.elasticsearch.backoff_delays': [1],
'resource.elasticsearch.elasticsearch_timeout': 5,
'resource.elasticsearch.use_mapping_file': False,
}
config_manager = ConfigurationManager(
[storage_config],
app_name='test_elasticsearch_indexing',
app_version='1.0',
app_description=__doc__,
values_source_list=[os.environ, values_source],
argv_source=[],
)
return config_manager.get_config()
开发者ID:FrostburnStudios,项目名称:socorro,代码行数:21,代码来源:test_elasticsearch_storage_app.py
示例15: IntegrationTestElasticsearchCleanup
class IntegrationTestElasticsearchCleanup(IntegrationTestBase):
def _setup_config_manager(self):
return get_config_manager_for_crontabber(
jobs='socorro.cron.jobs.elasticsearch_cleanup.'
'ElasticsearchCleanupCronApp|30d',
)
def __init__(self, *args, **kwargs):
super(
IntegrationTestElasticsearchCleanup,
self
).__init__(*args, **kwargs)
storage_config = self._setup_storage_config()
with storage_config.context() as config:
self.storage = ElasticSearchCrashStorage(config)
def tearDown(self):
# Clean up created indices.
self.storage.es.delete_index('socorro*')
super(IntegrationTestElasticsearchCleanup, self).tearDown()
def _setup_storage_config(self):
mock_logging = mock.Mock()
storage_conf = ElasticSearchCrashStorage.get_required_config()
storage_conf.add_option('logger', default=mock_logging)
return ConfigurationManager(
[storage_conf],
values_source_list=[os.environ],
argv_source=[]
)
def test_right_indices_are_deleted(self):
config_manager = self._setup_config_manager()
with config_manager.context() as config:
# clear the indices cache so the index is created on every test
self.storage.indices_cache = set()
es = self.storage.es
# Create old indices to be deleted.
self.storage.create_index('socorro200142', {})
self.storage.create_index('socorro200000', {})
# Create an old aliased index.
self.storage.create_index('socorro200201_20030101', {})
es.update_aliases({
'actions': [{
'add': {
'index': 'socorro200201_20030101',
'alias': 'socorro200201'
}
}]
})
# Create a recent aliased index.
last_week_index = self.storage.get_index_for_crash(
utc_now() - datetime.timedelta(weeks=1)
)
self.storage.create_index('socorro_some_aliased_index', {})
es.update_aliases({
'actions': [{
'add': {
'index': 'socorro_some_aliased_index',
'alias': last_week_index
}
}]
})
# Create a recent index that should not be deleted.
now_index = self.storage.get_index_for_crash(utc_now())
self.storage.create_index(now_index, {})
# These will raise an error if an index was not correctly created.
es.status('socorro200142')
es.status('socorro200000')
es.status('socorro200201')
es.status(now_index)
es.status(last_week_index)
tab = CronTabber(config)
tab.run_all()
information = self._load_structure()
assert information['elasticsearch-cleanup']
assert not information['elasticsearch-cleanup']['last_error']
assert information['elasticsearch-cleanup']['last_success']
# Verify the recent index is still there.
es.status(now_index)
es.status(last_week_index)
# Verify the old indices are gone.
assert_raises(
pyelasticsearch.exceptions.ElasticHttpNotFoundError,
es.status,
'socorro200142'
#.........这里部分代码省略.........
开发者ID:Earth4,项目名称:socorro,代码行数:101,代码来源:test_elasticsearch_cleanup.py
示例16: IntegrationTestIndexCleaner
class IntegrationTestIndexCleaner(ElasticSearchTestCase):
def __init__(self, *args, **kwargs):
super(
IntegrationTestIndexCleaner,
self
).__init__(*args, **kwargs)
storage_config = self._setup_config()
with storage_config.context() as config:
self.storage = ElasticSearchCrashStorage(config)
def setUp(self):
self.indices = []
def tearDown(self):
# Clean up created indices.
for index in self.indices:
try:
self.storage.es.delete_index(index)
# "Missing" indices have already been deleted, no need to worry.
except pyelasticsearch.exceptions.ElasticHttpNotFoundError:
pass
super(IntegrationTestIndexCleaner, self).tearDown()
def _setup_config(self):
mock_logging = mock.Mock()
storage_conf = ElasticSearchCrashStorage.get_required_config()
storage_conf.add_option('logger', default=mock_logging)
cleaner_conf = IndexCleaner.get_required_config()
cleaner_conf.add_option('logger', default=mock_logging)
return ConfigurationManager(
[storage_conf, cleaner_conf],
values_source_list=[environment],
argv_source=[]
)
@maximum_es_version('0.90')
def test_correct_indices_are_deleted(self):
config_manager = self._setup_config()
with config_manager.context() as config:
# clear the indices cache so the index is created on every test
self.storage.indices_cache = set()
es = self.storage.es
# Create old indices to be deleted.
self.storage.create_index('socorro200142', {})
self.indices.append('socorro200142')
self.storage.create_index('socorro200000', {})
self.indices.append('socorro200000')
# Create an old aliased index.
self.storage.create_index('socorro200201_20030101', {})
self.indices.append('socorro200201_20030101')
es.update_aliases({
'actions': [{
'add': {
'index': 'socorro200201_20030101',
'alias': 'socorro200201'
}
}]
})
# Create a recent aliased index.
last_week_index = self.storage.get_index_for_crash(
utc_now() - datetime.timedelta(weeks=1)
)
self.storage.create_index('socorro_some_aliased_index', {})
self.indices.append('socorro_some_aliased_index')
es.update_aliases({
'actions': [{
'add': {
'index': 'socorro_some_aliased_index',
'alias': last_week_index
}
}]
})
# Create a recent index that should not be deleted.
now_index = self.storage.get_index_for_crash(utc_now())
self.storage.create_index(now_index, {})
self.indices.append(now_index)
# These will raise an error if an index was not correctly created.
es.status('socorro200142')
es.status('socorro200000')
es.status('socorro200201')
es.status(now_index)
es.status(last_week_index)
api = IndexCleaner(config)
api.delete_old_indices()
# Verify the recent index is still there.
#.........这里部分代码省略.........
开发者ID:abudulemusa,项目名称:socorro,代码行数:101,代码来源:test_index_cleaner.py
示例17: setUp
def setUp(self):
super(IntegrationTestAutomaticEmails, self).setUp()
# prep a fake table
now = utc_now() - datetime.timedelta(minutes=30)
last_month = now - datetime.timedelta(days=31)
config_manager = self._setup_storage_config()
with config_manager.context() as config:
storage = ElasticSearchCrashStorage(config)
# clear the indices cache so the index is created on every test
storage.indices_cache = set()
storage.save_processed({
'uuid': '1',
'email': '[email protected]',
'product': 'WaterWolf',
'version': '20.0',
'release_channel': 'Release',
'date_processed': now,
'classifications': {
'support': {
'classification': 'unknown'
}
}
})
storage.save_processed({
'uuid': '2',
'email': '"Quidam" <[email protected]>',
'product': 'WaterWolf',
'version': '20.0',
'release_channel': 'Release',
'date_processed': now,
'classifications': {
'support': {
'classification': None
}
}
})
storage.save_processed({
'uuid': '3',
'email': '[email protected]',
'product': 'WaterWolf',
'version': '20.0',
'release_channel': 'Release',
'date_processed': now,
'classifications': {
'support': {
'classification': 'bitguard'
}
}
})
storage.save_processed({
'uuid': '4',
'email': '[email protected]',
'product': 'NightlyTrain',
'version': '1.0',
'release_channel': 'Nightly',
'date_processed': now
})
storage.save_processed({
'uuid': '5',
'email': '[email protected]',
'product': 'NightlyTrain',
'version': '1.0',
'release_channel': 'Nightly',
'date_processed': now
})
storage.save_processed({
'uuid': '6',
'email': '[email protected]',
'product': 'NightlyTrain',
'version': '1.0',
'release_channel': 'Nightly',
'date_processed': now
})
storage.save_processed({
'uuid': '7',
'email': '[email protected]',
'product': 'NightlyTrain',
'version': '1.0',
'release_channel': 'Nightly',
'date_processed': now
})
storage.save_processed({
'uuid': '8',
'email': '[email protected]',
'product': 'NightlyTrain',
'version': '1.0',
'release_channel': 'Nightly',
'date_processed': now
})
storage.save_processed({
'uuid': '9',
'email': '[email protected]',
'product': 'EarthRaccoon',
'version': '1.0',
'release_channel': 'Nightly',
'date_processed': now
})
storage.save_processed({
#.........这里部分代码省略.........
开发者ID:pkucoin,项目名称:socorro,代码行数:101,代码来源:test_automatic_emails.py
示例18: test_email_after_delay
def test_email_after_delay(self, exacttarget_mock):
"""Test that a user will receive an email if he or she sends us a new
crash report after the delay is passed (but not before). """
config_manager = self._setup_config_manager(
delay_between_emails=1,
restrict_products=['EarthRaccoon']
)
email = '[email protected]'
list_service_mock = exacttarget_mock.return_value.list.return_value
list_service_mock.get_subscriber.return_value = {
'token': email
}
trigger_send_mock = exacttarget_mock.return_value.trigger_send
tomorrow = utc_now() + datetime.timedelta(days=1, hours=2)
twohourslater = utc_now() + datetime.timedelta(hours=2)
storage_config_manager = self._setup_storage_config()
with storage_config_manager.context() as storage_config:
storage = ElasticSearchCrashStorage(storage_config)
with config_manager.context() as config:
# 1. Send an email to the user and update emailing data
tab = crontabber.CronTabber(config)
tab.run_all()
information = self._load_structure()
assert information['automatic-emails']
assert not information['automatic-emails']['last_error']
assert information['automatic-emails']['last_success']
exacttarget_mock.return_value.trigger_send.assert_called_with(
'socorro_dev_test',
{
'EMAIL_ADDRESS_': email,
'EMAIL_FORMAT_': 'H',
'TOKEN': email
}
)
self.assertEqual(trigger_send_mock.call_count, 1)
# 2. Test that before 'delay' is passed user doesn't receive
# another email
# Insert a new crash report with the same email address
storage.save_processed({
'uuid': '50',
'email': email,
'product': 'EarthRaccoon',
'version': '20.0',
'release_channel': 'Release',
'date_processed': utc_now() + datetime.timedelta(hours=1)
})
storage.es.refresh()
# Run crontabber with time pushed by two hours
with mock.patch('socorro.cron.crontabber.utc_now') as cronutc_mock:
with mock.patch('socorro.cron.base.utc_now') as baseutc_mock:
cronutc_mock.return_value = twohourslater
baseutc_mock.return_value = twohourslater
tab.run_all()
information = self._load_structure()
assert information['automatic-emails']
assert not information['automatic-emails']['last_error']
assert information['automatic-emails']['last_success']
# No new email was sent
self.assertEqual(trigger_send_mock.call_count, 1)
# 3. Verify that, after 'delay' is passed, a new email is sent
# to our user
# Insert a new crash report with the same email address
storage.save_processed({
'uuid': '51',
'email': email,
'product': 'EarthRaccoon',
'version': '20.0',
'release_channel': 'Release',
'date_processed': utc_now() + datetime.timedelta(days=1)
})
storage.es.refresh()
# Run crontabber with time pushed by a day
with mock.patch('socorro.cron.crontabber.utc_now') as cronutc_mock:
with mock.patch('socorro.cron.base.utc_now') as baseutc_mock:
cronutc_mock.return_value = tomorrow
baseutc_mock.return_value = tomorrow
tab.run_all()
information = self._load_structure()
assert information['automatic-emails']
assert not information['automatic-emails']['last_error']
assert information['automatic-emails']['last_success']
# A new email was sent
self.assertEqual(trigger_send_mock.call_count, 2)
开发者ID:pkucoin,项目名称:socorro,代码行数:97,代码来源:test_automatic_emails.py
注:本文中的socorro.external.elasticsearch.crashstorage.ElasticSearchCrashStorage类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论