本文整理汇总了Python中zenoss.protocols.jsonformat.from_dict函数的典型用法代码示例。如果您正苦于以下问题:Python from_dict函数的具体用法?Python from_dict怎么用?Python from_dict使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了from_dict函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: getEventSummariesGenerator
def getEventSummariesGenerator(self, filter=None, exclude=None, sort=None, archive=False, timeout=None):
if isinstance(exclude, dict):
exclude = from_dict(EventFilter, exclude)
if isinstance(filter, dict):
filter = from_dict(EventFilter, filter)
if sort is not None:
sort = tuple(self._getEventSort(s) for s in safeTuple(sort))
searchid = self.client.createSavedSearch(event_filter=filter, exclusion_filter=exclude, sort=sort,
archive=archive, timeout=timeout)
log.debug("created saved search %s", searchid)
eventSearchFn = partial(self.client.savedSearch, searchid, archive=archive)
offset = 0
limit = 1000
try:
while offset is not None:
result = self._getEventSummaries(eventSearchFn, offset, limit)
for evt in result['events']:
yield evt
offset = result['next_offset']
finally:
try:
log.debug("closing saved search %s", searchid)
self.client.deleteSavedSearch(searchid, archive=archive)
except Exception as e:
log.debug("error closing saved search %s (%s) - %s", searchid, type(e), e)
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:25,代码来源:zepfacade.py
示例2: updateEventSummaries
def updateEventSummaries(self, update, eventFilter=None, exclusionFilter=None, limit=None, timeout=None):
update_pb = from_dict(EventSummaryUpdate, update)
event_filter_pb = None if (eventFilter is None) else from_dict(EventFilter, eventFilter)
exclusion_filter_pb = None if (exclusionFilter is None) else from_dict(EventFilter, exclusionFilter)
status, response = self.client.updateEventSummaries(update_pb, event_filter_pb, exclusion_filter_pb,
limit=limit, timeout=timeout)
return status, to_dict(response)
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:7,代码来源:zepfacade.py
示例3: reopenEventSummaries
def reopenEventSummaries(self, eventFilter=None, exclusionFilter=None, limit=None, userName=None, timeout=None):
if eventFilter:
eventFilter = from_dict(EventFilter, eventFilter)
if exclusionFilter:
exclusionFilter = from_dict(EventFilter, exclusionFilter)
if not userName:
userUuid, userName = self._findUserInfo()
else:
userUuid = self._getUserUuid(userName)
status, response = self.client.reopenEventSummaries(
userUuid, userName, eventFilter, exclusionFilter, limit, timeout=timeout)
return status, to_dict(response)
开发者ID:SteelHouseLabs,项目名称:zenoss-prodbin,代码行数:13,代码来源:zepfacade.py
示例4: load
def load(self, configData):
"""
configData is a JSON dict that contains a key of the same
name as specified in this class (ie self.key).
The value from this key is expected to be an arrary of dictionaries
as used by the ZEP system. See the documentation in the
ZenModel/ZenPackTemplate/CONTENT/zep/zep.json.example file.
"""
if configData:
self.zep = getFacade('zep')
items = configData.get(EventDetailItemHandler.key)
if items is None:
log.warn("Expected key '%s' for details is missing from the zep.json file",
self.key)
return
detailItemSet = from_dict(EventDetailItemSet, dict(
# An empty array in details casues ZEP to puke
details = items
))
try:
self.zep.addIndexedDetails(detailItemSet)
except Exception as ex:
log.error("ZEP %s error adding detailItemSet data: %s\nconfigData= %s",
getattr(ex, 'status', 'unknown'), detailItemSet, configData)
log.error("See the ZEP logs for more information.")
开发者ID:c0ns0le,项目名称:zenoss-4,代码行数:26,代码来源:ZenPackLoader.py
示例5: updateTrigger
def updateTrigger(self, **data):
user = getSecurityManager().getUser()
triggerObj = self._guidManager.getObject(data['uuid'])
log.debug('Trying to update trigger: %s' % triggerObj.id)
if self.triggerPermissions.userCanManageTrigger(user, triggerObj):
if 'globalRead' in data:
triggerObj.globalRead = data.get('globalRead', False)
log.debug('setting globalRead %s' % triggerObj.globalRead)
if 'globalWrite' in data:
triggerObj.globalWrite = data.get('globalWrite', False)
log.debug('setting globalWrite %s' % triggerObj.globalWrite)
if 'globalManage' in data:
triggerObj.globalManage = data.get('globalManage', False)
log.debug('setting globalManage %s' % triggerObj.globalManage)
triggerObj.users = data.get('users', [])
self.triggerPermissions.clearPermissions(triggerObj)
self.triggerPermissions.updatePermissions(self._guidManager, triggerObj)
if self.triggerPermissions.userCanUpdateTrigger(user, triggerObj):
if "name" in data:
triggerObj.setTitle(data["name"])
trigger = from_dict(zep.EventTrigger, data)
response, content = self.triggers_service.updateTrigger(trigger)
return content
开发者ID:c0ns0le,项目名称:zenoss-4,代码行数:30,代码来源:triggersfacade.py
示例6: _processArgs
def _processArgs(self, eventFilter, exclusionFilter, userName):
if eventFilter:
eventFilter = from_dict(EventFilter, eventFilter)
if exclusionFilter:
exclusionFilter = from_dict(EventFilter, exclusionFilter)
if not userName:
userUuid, userName = self._findUserInfo()
else:
userUuid = self._getUserUuid(userName)
if eventFilter is None and exclusionFilter is None:
raise NoFiltersException("Cannot modify event summaries without at least one filter specified.")
return {'eventFilter': eventFilter, 'exclusionFilter': exclusionFilter,
'userName': userName, 'userUuid': userUuid}
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:16,代码来源:zepfacade.py
示例7: get_alert
def get_alert(alrt_evid, alrt_type, alrt_urls):
message = None
try:
filter_zep = zep.createEventFilter(uuid=alrt_evid)
for summary in zep.getEventSummariesGenerator(filter=filter_zep):
sync()
evt = EventSummaryProxy(from_dict(EventSummary, summary))
if evt.severity == 5:
severity_string='Critical'
elif evt.severity == 4:
severity_string='Error'
elif evt.severity == 3:
severity_string='Warning'
elif evt.severity == 2:
severity_string='Info'
elif evt.severity == 1:
severity_string='Debug'
if alrt_type == 'incident':
message = """[casc-zenoss] SYSTEMS - {1} {2} <br /> Device: {1} <br /> Component: {3} <br /> Severity: {0} <br /> Time: {4} <br /> Message: <br /> {5} <br /> <a href="{6}">Event Detail</a> <br /> <a href="{7}">Acknowledge</a> <br /> <a href="{8}">Close</a> <br /> <a href="{9}">Device Events</a>""".format(severity_string, evt.device, evt.summary, evt.component, evt.lastTime, evt.message, alrt_urls[0], alrt_urls[1], alrt_urls[2], alrt_urls[3])
else:
message = """[casc-zenoss] SYSTEMS - CLEAR: {0} {7} <br /> Event: {1} <br /> Cleared by: {7} <br /> At: {2} <br /> Device: {0} <br /> Component: {3} <br /> Severity: {4} <br /> Message: <br /> {5} <br /> <a href="{6}">Undelete</a>""".format(evt.device, evt.summary, alrt_urls[2], evt.component, severity_string, evt.message, alrt_urls[0], alrt_urls[1])
if message != None:
post_alert(alrt_type, severity_string, message)
else:
raise Exception("Invalid region name: {0}".format(regions[x]))
开发者ID:damascopaul,项目名称:zenoss-hippy,代码行数:31,代码来源:zenoss-hipv2.py
示例8: setConfigValues
def setConfigValues(self, values):
"""
@type values: Dictionary
@param values: Key Value pairs of config values
"""
zepConfigProtobuf = from_dict(ZepConfig, values)
self.configClient.setConfigValues(zepConfigProtobuf)
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:7,代码来源:zepfacade.py
示例9: upgrade
def upgrade(self, configData):
if configData:
self.zep = getFacade('zep')
items = configData.get(EventDetailItemHandler.key, [])
for item in items:
log.info("Upgrading the following to be indexed by ZEP: %s" % item)
detailItem = from_dict(EventDetailItem, item)
self.zep.updateIndexedDetailItem(detailItem)
开发者ID:c0ns0le,项目名称:zenoss-4,代码行数:8,代码来源:ZenPackLoader.py
示例10: getEventSummaries
def getEventSummaries(self, offset, limit=1000, sort=None, filter=None, exclusion_filter=None):
client_fn = self.client.getEventSummaries
if filter is not None and isinstance(filter, dict):
filter = from_dict(EventFilter, filter)
if exclusion_filter is not None and isinstance(exclusion_filter, dict):
exclusion_filter = from_dict(EventFilter, exclusion_filter)
if sort is not None:
sort = tuple(self._getEventSort(s) for s in safeTuple(sort))
result = self._getEventSummaries(source=partial(client_fn,
filter=filter,
exclusion_filter=exclusion_filter,
sort=sort),
offset=offset, limit=limit)
return result
开发者ID:TifaSky,项目名称:zenoss_midrest,代码行数:17,代码来源:zepservice.py
示例11: updateTrigger
def updateTrigger(self, **data):
user = getSecurityManager().getUser()
triggerObj = self._guidManager.getObject(data['uuid'])
log.debug('Trying to update trigger: %s' % triggerObj.id)
if self.triggerPermissions.userCanManageTrigger(user, triggerObj):
if 'globalRead' in data:
triggerObj.globalRead = data.get('globalRead', False)
log.debug('setting globalRead %s' % triggerObj.globalRead)
if 'globalWrite' in data:
triggerObj.globalWrite = data.get('globalWrite', False)
log.debug('setting globalWrite %s' % triggerObj.globalWrite)
if 'globalManage' in data:
triggerObj.globalManage = data.get('globalManage', False)
log.debug('setting globalManage %s' % triggerObj.globalManage)
triggerObj.users = data.get('users', [])
self.triggerPermissions.clearPermissions(triggerObj)
self.triggerPermissions.updatePermissions(
self._guidManager, triggerObj
)
if self.triggerPermissions.userCanUpdateTrigger(user, triggerObj):
if "name" in data:
triggerObj.setTitle(cgi.escape(data["name"]))
parent = triggerObj.getPrimaryParent()
path = triggerObj.absolute_url_path()
oldId = triggerObj.getId()
newId = triggerObj.title
if not isinstance(newId, unicode):
newId = Utils.prepId(newId)
newId = newId.strip()
if not newId:
raise Exception("New trigger id cannot be empty.")
if newId != oldId:
# rename the trigger id since its title changed
try:
if triggerObj.getDmd().Triggers.findObject(newId):
message = 'Trigger %s already exists' % newId
# Duplicate trigger found
raise Exception(message)
except AttributeError as ex:
# the newId is not a duplicate
pass
try:
parent.manage_renameObject(oldId, newId)
triggerObj.id = newId
except CopyError:
raise Exception("Trigger rename failed.")
trigger = from_dict(zep.EventTrigger, data)
response, content = self.triggers_service.updateTrigger(trigger)
return content
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:58,代码来源:triggersfacade.py
示例12: reopenEventSummaries
def reopenEventSummaries(self, userUuid, userName=None, event_filter=None, exclusionFilter=None, limit=None,
timeout=None):
update = from_dict(EventSummaryUpdate, dict(
status = STATUS_NEW,
current_user_uuid = userUuid,
current_user_name = userName,
))
return self.updateEventSummaries(update, event_filter=event_filter, exclusion_filter=exclusionFilter,
limit=limit, timeout=timeout)
开发者ID:jhanson,项目名称:zenoss-protocols,代码行数:9,代码来源:zep.py
示例13: _getEventSort
def _getEventSort(self, sortParam):
eventSort = {}
if isinstance(sortParam, (list, tuple)):
field, direction = sortParam
eventSort['direction'] = self.SORT_DIRECTIONAL_MAP[direction.lower()]
else:
field = sortParam
eventSort.update(getDetailsInfo().getSortMap()[field.lower()])
return from_dict(EventSort, eventSort)
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:9,代码来源:zepfacade.py
示例14: getEventSummaries
def getEventSummaries(self, offset, limit=1000, sort=None, filter=None, exclusion_filter=None, client_fn=None,
use_permissions=False):
if client_fn is None:
client_fn = self.client.getEventSummaries
if filter is not None and isinstance(filter,dict):
filter = from_dict(EventFilter, filter)
if exclusion_filter is not None and isinstance(exclusion_filter, dict):
exclusion_filter = from_dict(EventFilter, exclusion_filter)
if sort is not None:
sort = tuple(self._getEventSort(s) for s in safeTuple(sort))
result = None
if use_permissions:
user = getSecurityManager().getUser()
userSettings = self._dmd.ZenUsers._getOb(user.getId())
hasGlobalRoles = not userSettings.hasNoGlobalRoles()
if not hasGlobalRoles:
adminRoles = userSettings.getAllAdminRoles()
if adminRoles:
# get ids for the objects they have permission to access
# and add to filter
ids = [IGlobalIdentifier(x.managedObject()).getGUID() for x in adminRoles]
if filter is None:
filter = EventFilter()
tf = filter.tag_filter.add()
tf.tag_uuids.extend(ids)
else:
# no permission to see events, return 0
result = {
'total' : 0,
'events' : [],
}
if not result:
result = self._getEventSummaries(source=partial(client_fn,
filter=filter,
exclusion_filter=exclusion_filter,
sort=sort
),
offset=offset, limit=limit
)
return result
开发者ID:SteelHouseLabs,项目名称:zenoss-prodbin,代码行数:43,代码来源:zepfacade.py
示例15: getEventSummaries
def getEventSummaries(self, offset, limit=1000, sort=None, filter=None, exclusion_filter=None, client_fn=None,
use_permissions=False):
if client_fn is None:
client_fn = self.client.getEventSummaries
if filter is not None and isinstance(filter,dict):
filter = from_dict(EventFilter, filter)
if exclusion_filter is not None and isinstance(exclusion_filter, dict):
exclusion_filter['operator'] = 1 # Set operator to OR
exclusion_filter = from_dict(EventFilter, exclusion_filter)
if sort is not None:
sort = tuple(self._getEventSort(s) for s in safeTuple(sort))
result = None
if use_permissions:
user = getSecurityManager().getUser()
userSettings = self._dmd.ZenUsers._getOb(user.getId())
hasGlobalRoles = not userSettings.hasNoGlobalRoles()
if not hasGlobalRoles:
# get guids for the objects user has permission to access
# and add to filter
guids = userSettings.getAllAdminGuids(returnChildrenForRootObj=True)
if guids:
if filter is None:
filter = EventFilter()
tf = filter.tag_filter.add()
tf.tag_uuids.extend(guids)
else:
# no permission to see events, return 0
result = {
'total' : 0,
'events' : [],
}
if not result:
result = self._getEventSummaries(source=partial(client_fn,
filter=filter,
exclusion_filter=exclusion_filter,
sort=sort
),
offset=offset, limit=limit
)
return result
开发者ID:zenoss,项目名称:zenoss-prodbin,代码行数:43,代码来源:zepfacade.py
示例16: addNoteBulkAsync
def addNoteBulkAsync(self, uuids, message, userUuid=None, userName=None):
"""
Add a note (what used to be called log) to several event summaries.
"""
note = from_dict(EventNote, dict(
user_uuid = userUuid,
user_name = userName,
message = message
))
return self.client.post('notes_async', body=note, params={'uuid': uuids})
开发者ID:malysoun,项目名称:zenoss-protocols,代码行数:12,代码来源:zep.py
示例17: addNote
def addNote(self, uuid, message, userUuid=None, userName=None):
"""
Add a note (what used to be called log) to an event summary.
"""
note = from_dict(EventNote, dict(
user_uuid = userUuid,
user_name = userName,
message = message
))
return self.client.post('%s/notes' % uuid, body=note)
开发者ID:jhanson,项目名称:zenoss-protocols,代码行数:12,代码来源:zep.py
示例18: _getEventTagSeverities
def _getEventTagSeverities(self, eventClass=(), severity=(), status=(), tags=()):
if not severity:
severity = (SEVERITY_CRITICAL, SEVERITY_ERROR, SEVERITY_WARNING)
if not status:
status = (STATUS_NEW, STATUS_ACKNOWLEDGED)
eventFilter = self.createEventFilter(
severity=severity,
status=status,
event_class=eventClass,
tags=tags,
)
response, content = self.client.getEventTagSeverities(from_dict(EventFilter, eventFilter))
return content.severities
开发者ID:SteelHouseLabs,项目名称:zenoss-prodbin,代码行数:14,代码来源:zepfacade.py
示例19: updateNotificationSubscriptions
def updateNotificationSubscriptions(self, notification):
triggerSubscriptions = []
notification_guid = IGlobalIdentifier(notification).getGUID()
for subscription in notification.subscriptions:
triggerSubscriptions.append(dict(
delay_seconds = notification.delay_seconds,
repeat_seconds = notification.repeat_seconds,
subscriber_uuid = notification_guid,
send_initial_occurrence = notification.send_initial_occurrence,
trigger_uuid = subscription,
))
subscriptionSet = from_dict(zep.EventTriggerSubscriptionSet, dict(
subscriptions = triggerSubscriptions
))
self.triggers_service.updateSubscriptions(notification_guid, subscriptionSet)
开发者ID:c0ns0le,项目名称:zenoss-4,代码行数:15,代码来源:triggersfacade.py
示例20: test_1_DetailUpdates
def test_1_DetailUpdates(self):
eventproto = from_dict(Event, self.initialEvent)
proxy = EventProxy(eventproto)
proxy.details['A'] = 1000
proxy.details['B'] = (1000,)
self.assertEqual(len(proxy.details), 2)
self.assertEqual(proxy.details['A'], '1000')
self.assertRaises(KeyError, proxy.details.__getitem__, 'C')
del proxy.details['B']
self.assertEqual(len(proxy.details), 1)
self.assertRaises(KeyError, proxy.details.__getitem__, 'B')
# but this does not raise a KeyError
del proxy.details['B']
del proxy.details['A']
self.assertEqual(len(proxy.details), 0)
开发者ID:SteelHouseLabs,项目名称:zenoss-prodbin,代码行数:17,代码来源:testProxies.py
注:本文中的zenoss.protocols.jsonformat.from_dict函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论