本文整理汇总了Python中pysnmp.debug.logger函数的典型用法代码示例。如果您正苦于以下问题:Python logger函数的具体用法?Python logger怎么用?Python logger使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了logger函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: processIncomingMsg
def processIncomingMsg(
self,
snmpEngine,
messageProcessingModel,
maxMessageSize,
securityParameters,
securityModel,
securityLevel,
wholeMsg,
msg
):
# rfc2576: 5.2.1
( communityName, transportInformation ) = securityParameters
scope = dict(communityName=communityName,
transportInformation=transportInformation)
snmpEngine.observer.storeExecutionContext(
snmpEngine,
'rfc2576.processIncomingMsg:writable',
scope
)
try:
securityName, contextEngineId, contextName = self._com2sec(
snmpEngine,
scope.get('communityName', communityName),
scope.get('transportInformation', transportInformation)
)
except error.StatusInformation:
snmpInBadCommunityNames, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMPv2-MIB', 'snmpInBadCommunityNames')
snmpInBadCommunityNames.syntax = snmpInBadCommunityNames.syntax+1
raise error.StatusInformation(
errorIndication = errind.unknownCommunityName
)
snmpEngineID, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineID')
securityEngineID = snmpEngineID.syntax
debug.logger & debug.flagSM and debug.logger('processIncomingMsg: looked up securityName %r securityModel %r contextEngineId %r contextName %r by communityName %r AND transportInformation %r' % (securityName, self.securityModelID, contextEngineId, contextName, communityName, transportInformation))
stateReference = self._cache.push(
communityName=communityName
)
scopedPDU = (
contextEngineId, contextName,
msg.getComponentByPosition(2).getComponent()
)
maxSizeResponseScopedPDU = maxMessageSize - 128
securityStateReference = stateReference
debug.logger & debug.flagSM and debug.logger('processIncomingMsg: generated maxSizeResponseScopedPDU %s securityStateReference %s' % (maxSizeResponseScopedPDU, securityStateReference))
return ( securityEngineID,
securityName,
scopedPDU,
maxSizeResponseScopedPDU,
securityStateReference )
开发者ID:imron,项目名称:scalyr-agent-2,代码行数:60,代码来源:rfc2576.py
示例2: exportSymbols
def exportSymbols(self, modName, *anonymousSyms, **namedSyms):
if not self.mibSymbols.has_key(modName):
self.mibSymbols[modName] = {}
mibSymbols = self.mibSymbols[modName]
for symObj in anonymousSyms:
debug.logger & debug.flagBld and debug.logger('exportSymbols: anonymous symbol %s::__pysnmp_%ld' % (modName, self._autoName))
mibSymbols['__pysnmp_%ld' % self._autoName] = symObj
self._autoName = self._autoName + 1
for symName, symObj in namedSyms.items():
if mibSymbols.has_key(symName):
raise error.SmiError(
'Symbol %s already exported at %s' % (symName, modName)
)
if hasattr(symObj, 'label'):
symName = symObj.label or symName # class
if type(symObj) == types.InstanceType:
symName = symObj.getLabel() or symName # class instance
mibSymbols[symName] = symObj
debug.logger & debug.flagBld and debug.logger('exportSymbols: symbol %s::%s' % (modName, symName))
self.lastBuildId = self.lastBuildId + 1
开发者ID:rolfschr,项目名称:testerman,代码行数:25,代码来源:builder.py
示例3: generateRequestMsg
def generateRequestMsg(self, snmpEngine, messageProcessingModel,
globalData, maxMessageSize, securityModel,
securityEngineId, securityName, securityLevel,
scopedPDU):
msg, = globalData
contextEngineId, contextName, pdu = scopedPDU
# rfc2576: 5.2.3
communityName = self._sec2com(snmpEngine, securityName,
contextEngineId, contextName)
debug.logger & debug.flagSM and debug.logger('generateRequestMsg: using community %r for securityModel %r, securityName %r, contextEngineId %r contextName %r' % (communityName, securityModel, securityName, contextEngineId, contextName))
securityParameters = communityName
msg.setComponentByPosition(1, securityParameters)
msg.setComponentByPosition(2)
msg.getComponentByPosition(2).setComponentByType(
pdu.tagSet, pdu, verifyConstraints=False
)
debug.logger & debug.flagMP and debug.logger('generateRequestMsg: %s' % (msg.prettyPrint(),))
try:
return securityParameters, encoder.encode(msg)
except PyAsn1Error:
debug.logger & debug.flagMP and debug.logger('generateRequestMsg: serialization failure: %s' % sys.exc_info()[1])
raise error.StatusInformation(errorIndication=errind.serializationError)
开发者ID:437049211,项目名称:PyQYT,代码行数:29,代码来源:rfc2576.py
示例4: generateResponseMsg
def generateResponseMsg(
self,
snmpEngine,
messageProcessingModel,
globalData,
maxMessageSize,
securityModel,
securityEngineID,
securityName,
securityLevel,
scopedPDU,
securityStateReference
):
# rfc2576: 5.2.2
msg, = globalData
contextEngineId, contextName, pdu = scopedPDU
cachedSecurityData = self._cache.pop(securityStateReference)
communityName = cachedSecurityData['communityName']
debug.logger & debug.flagSM and debug.logger('generateResponseMsg: recovered community %r by securityStateReference %s' % (communityName, securityStateReference))
msg.setComponentByPosition(1, communityName)
msg.setComponentByPosition(2)
msg.getComponentByPosition(2).setComponentByType(
pdu.tagSet, pdu, verifyConstraints=False
)
debug.logger & debug.flagMP and debug.logger('generateResponseMsg: %s' % (msg.prettyPrint(),))
wholeMsg = encoder.encode(msg)
return ( communityName, wholeMsg )
开发者ID:chilikk,项目名称:snmp-course-project,代码行数:31,代码来源:rfc2576.py
示例5: generateResponseMsg
def generateResponseMsg(self, snmpEngine, messageProcessingModel,
globalData, maxMessageSize, securityModel,
securityEngineID, securityName, securityLevel,
scopedPDU, securityStateReference):
# rfc2576: 5.2.2
msg, = globalData
contextEngineId, contextName, pdu = scopedPDU
cachedSecurityData = self._cache.pop(securityStateReference)
communityName = cachedSecurityData['communityName']
debug.logger & debug.flagSM and debug.logger('generateResponseMsg: recovered community %r by securityStateReference %s' % (communityName, securityStateReference))
msg.setComponentByPosition(1, communityName)
msg.setComponentByPosition(2)
msg.getComponentByPosition(2).setComponentByType(
pdu.tagSet, pdu, verifyConstraints=False
)
debug.logger & debug.flagMP and debug.logger('generateResponseMsg: %s' % (msg.prettyPrint(),))
try:
return communityName, encoder.encode(msg)
except PyAsn1Error:
debug.logger & debug.flagMP and debug.logger('generateResponseMsg: serialization failure: %s' % sys.exc_info()[1])
raise error.StatusInformation(errorIndication=errind.serializationError)
开发者ID:437049211,项目名称:PyQYT,代码行数:26,代码来源:rfc2576.py
示例6: processResponseVarBinds
def processResponseVarBinds(self,
snmpEngine,
sendRequestHandle,
errorIndication,
PDU,
cbCtx):
targetName, contextEngineId, contextName, reqPDU, cbFun, cbCtx = cbCtx
if errorIndication:
cbFun(snmpEngine,
sendRequestHandle,
errorIndication,
0, 0, (),
cbCtx)
return
varBindTable = v2c.apiPDU.getVarBindTable(reqPDU, PDU)
if v2c.apiPDU.getErrorStatus(PDU):
errorIndication, varBinds = None, ()
elif not varBindTable:
errorIndication, varBinds = errind.emptyResponse, ()
else:
errorIndication, varBinds = getNextVarBinds(
varBindTable[-1], v2c.apiPDU.getVarBinds(reqPDU)
)
if not cbFun(snmpEngine,
sendRequestHandle,
errorIndication,
v2c.apiPDU.getErrorStatus(PDU),
v2c.apiPDU.getErrorIndex(PDU, muteErrors=True),
varBindTable,
cbCtx):
debug.logger & debug.flagApp and debug.logger('processResponseVarBinds: sendRequestHandle %s, app says to stop walking' % sendRequestHandle)
return # app says enough
if not varBinds:
return # no more objects available
v2c.apiPDU.setRequestID(reqPDU, v2c.getNextRequestID())
v2c.apiPDU.setVarBinds(reqPDU, varBinds)
try:
self.sendPdu(snmpEngine,
targetName,
contextEngineId,
contextName,
reqPDU,
self.processResponseVarBinds,
(targetName, contextEngineId, contextName,
reqPDU, cbFun, cbCtx))
except StatusInformation:
statusInformation = sys.exc_info()[1]
debug.logger & debug.flagApp and debug.logger('sendVarBinds: sendPduHandle %s: sendPdu() failed with %r' % (sendRequestHandle, statusInformation))
cbFun(snmpEngine,
sendRequestHandle,
statusInformation['errorIndication'],
0, 0, (), cbCtx)
开发者ID:imron,项目名称:scalyr-agent-2,代码行数:60,代码来源:cmdgen.py
示例7: handle_read
def handle_read(self):
try:
incomingMessage, transportAddress = self._recvfrom(self.socket, 65535)
transportAddress = self.normalizeAddress(transportAddress)
debug.logger & debug.FLAG_IO and debug.logger(
'handle_read: transportAddress %r -> %r incomingMessage (%d '
'octets) %s' % (transportAddress, transportAddress.getLocalAddress(),
len(incomingMessage), debug.hexdump(incomingMessage)))
if not incomingMessage:
self.handle_close()
return
else:
self._cbFun(self, transportAddress, incomingMessage)
return
except socket.error as exc:
if exc.args[0] in SOCK_ERRORS:
debug.logger & debug.FLAG_IO and debug.logger(
'handle_read: known socket error %s' % exc)
SOCK_ERRORS[exc.args[0]] and self.handle_close()
return
else:
raise error.CarrierError('recvfrom() failed: %s' % exc)
开发者ID:etingof,项目名称:pysnmp,代码行数:28,代码来源:base.py
示例8: exportSymbols
def exportSymbols(self, modName, *anonymousSyms, **namedSyms):
if modName not in self.mibSymbols:
self.mibSymbols[modName] = {}
mibSymbols = self.mibSymbols[modName]
for symObj in anonymousSyms:
debug.logger & debug.flagBld and debug.logger(
'exportSymbols: anonymous symbol %s::__pysnmp_%ld' % (modName, self._autoName))
mibSymbols['__pysnmp_%ld' % self._autoName] = symObj
self._autoName += 1
for symName, symObj in namedSyms.items():
if symName in mibSymbols:
raise error.SmiError(
'Symbol %s already exported at %s' % (symName, modName)
)
if symName != self.moduleID and \
not isinstance(symObj, classTypes):
label = symObj.getLabel()
if label:
symName = label
else:
symObj.setLabel(symName)
mibSymbols[symName] = symObj
debug.logger & debug.flagBld and debug.logger('exportSymbols: symbol %s::%s' % (modName, symName))
self.lastBuildId += 1
开发者ID:rbreesems,项目名称:pysnmp,代码行数:29,代码来源:builder.py
示例9: __init__
def __init__(self, sock=None, sockMap=None):
asyncore.dispatcher.__init__(self)
if sock is None:
if self.sockFamily is None:
raise error.CarrierError(
'Address family %s not supported' % self.__class__.__name__
)
if self.sockType is None:
raise error.CarrierError(
'Socket type %s not supported' % self.__class__.__name__
)
try:
sock = socket.socket(self.sockFamily, self.sockType)
except socket.error:
raise error.CarrierError('socket() failed: %s' % sys.exc_info()[1])
try:
for b in socket.SO_RCVBUF, socket.SO_SNDBUF:
bsize = sock.getsockopt(socket.SOL_SOCKET, b)
if bsize < self.bufferSize:
sock.setsockopt(socket.SOL_SOCKET, b, self.bufferSize)
debug.logger & debug.flagIO and debug.logger('%s: socket %d buffer size increased from %d to %d for buffer %d' % (self.__class__.__name__, sock.fileno(), bsize, self.bufferSize, b))
except Exception:
debug.logger & debug.flagIO and debug.logger('%s: socket buffer size option mangling failure for buffer: %s' % (self.__class__.__name__, sys.exc_info()[1]))
# The socket map is managed by the AsyncoreDispatcher on
# which this transport is registered. Here we just prepare
# socket and postpone transport registration at dispatcher
# till AsyncoreDispatcher invokes registerSocket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
self.set_socket(sock)
开发者ID:bbmorten,项目名称:pysnmp,代码行数:33,代码来源:base.py
示例10: returnResponsePdu
def returnResponsePdu(
self,
snmpEngine,
messageProcessingModel,
securityModel,
securityName,
securityLevel,
contextEngineId,
contextName,
pduVersion,
PDU,
maxSizeResponseScopedPDU,
stateReference,
statusInformation,
):
"""PDU dispatcher -- prepare and serialize a response"""
# Extract input values and initialize defaults
k = int(messageProcessingModel)
if k in snmpEngine.messageProcessingSubsystems:
mpHandler = snmpEngine.messageProcessingSubsystems[k]
else:
raise error.StatusInformation(errorIndication=errind.unsupportedMsgProcessingModel)
debug.logger & debug.flagDsp and debug.logger(
"returnResponsePdu: PDU %s" % (PDU and PDU.prettyPrint() or "<empty>",)
)
# 4.1.2.2
try:
(destTransportDomain, destTransportAddress, outgoingMessage) = mpHandler.prepareResponseMessage(
snmpEngine,
messageProcessingModel,
securityModel,
securityName,
securityLevel,
contextEngineId,
contextName,
pduVersion,
PDU,
maxSizeResponseScopedPDU,
stateReference,
statusInformation,
)
debug.logger & debug.flagDsp and debug.logger("returnResponsePdu: MP suceeded")
except error.StatusInformation:
# 4.1.2.3
raise
# Handle oversized messages XXX transport constrains?
snmpEngineMaxMessageSize, = self.mibInstrumController.mibBuilder.importSymbols(
"__SNMP-FRAMEWORK-MIB", "snmpEngineMaxMessageSize"
)
if snmpEngineMaxMessageSize.syntax and len(outgoingMessage) > snmpEngineMaxMessageSize.syntax:
snmpSilentDrops, = self.mibInstrumController.mibBuilder.importSymbols("__SNMPv2-MIB", "snmpSilentDrops")
snmpSilentDrops.syntax = snmpSilentDrops.syntax + 1
raise error.StatusInformation(errorIndication=errind.tooBig)
# 4.1.2.4
snmpEngine.transportDispatcher.sendMessage(outgoingMessage, destTransportDomain, destTransportAddress)
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:59,代码来源:rfc3412.py
示例11: sendPdu
def sendPdu(self, snmpEngine, targetName, contextEngineId,
contextName, pdu, cbFun=None, cbCtx=None):
(transportDomain, transportAddress, timeout,
retryCount, params) = config.getTargetAddr(snmpEngine, targetName)
(messageProcessingModel, securityModel, securityName,
securityLevel) = config.getTargetParams(snmpEngine, params)
# User-side API assumes SMIv2
if messageProcessingModel == 0:
reqPDU = rfc2576.v2ToV1(pdu)
pduVersion = 0
else:
reqPDU = pdu
pduVersion = 1
# 3.3.5
if reqPDU.tagSet in rfc3411.CONFIRMED_CLASS_PDUS:
# Convert timeout in seconds into timeout in timer ticks
timeoutInTicks = (float(timeout) / 100 /
snmpEngine.transportDispatcher.getTimerResolution())
sendRequestHandle = getNextHandle()
# 3.3.6a
sendPduHandle = snmpEngine.msgAndPduDsp.sendPdu(
snmpEngine, transportDomain, transportAddress,
messageProcessingModel, securityModel, securityName,
securityLevel, contextEngineId, contextName,
pduVersion, reqPDU, True, timeoutInTicks,
self.processResponsePdu, (sendRequestHandle, cbFun, cbCtx)
)
debug.logger & debug.FLAG_APP and debug.logger(
'sendPdu: sendPduHandle %s, timeout %d' % (sendPduHandle, timeout))
# 3.3.6b
self.__pendingReqs[sendPduHandle] = (
transportDomain, transportAddress, messageProcessingModel,
securityModel, securityName, securityLevel, contextEngineId,
contextName, pdu, timeout, retryCount, 0, 0
)
snmpEngine.transportDispatcher.jobStarted(id(self))
else:
snmpEngine.msgAndPduDsp.sendPdu(
snmpEngine, transportDomain, transportAddress,
messageProcessingModel, securityModel,
securityName, securityLevel, contextEngineId,
contextName, pduVersion, reqPDU, False
)
sendRequestHandle = None
debug.logger & debug.FLAG_APP and debug.logger('sendPdu: message sent')
return sendRequestHandle
开发者ID:etingof,项目名称:pysnmp,代码行数:59,代码来源:ntforg.py
示例12: processResponseVarBinds
def processResponseVarBinds(self, snmpEngine, sendRequestHandle,
errorIndication, PDU, cbCtx):
(targetName, nonRepeaters, maxRepetitions,
contextEngineId, contextName, reqPDU, cbFun, cbCtx) = cbCtx
if errorIndication:
cbFun(snmpEngine, sendRequestHandle, errorIndication,
0, 0, (), cbCtx)
return
varBindTable = v2c.apiBulkPDU.getVarBindTable(reqPDU, PDU)
if v2c.apiBulkPDU.getErrorStatus(PDU):
errorIndication, varBinds = None, ()
elif not varBindTable:
errorIndication, varBinds = errind.emptyResponse, ()
else:
errorIndication, varBinds = v2c.apiBulkPDU.getNextVarBinds(varBindTable[-1])
nonRepeaters = v2c.apiBulkPDU.getNonRepeaters(reqPDU)
if nonRepeaters:
varBinds = v2c.apiBulkPDU.getVarBinds(reqPDU)[:int(nonRepeaters)] + varBinds[int(nonRepeaters):]
if not cbFun(
snmpEngine, sendRequestHandle, errorIndication,
v2c.apiBulkPDU.getErrorStatus(PDU),
v2c.apiBulkPDU.getErrorIndex(PDU, muteErrors=True),
varBindTable, cbCtx):
debug.logger & debug.FLAG_APP and debug.logger(
'processResponseVarBinds: sendRequestHandle %s, app says to stop walking' % sendRequestHandle)
return # app says it's enough
if not varBinds:
return # no more objects available
v2c.apiBulkPDU.setRequestID(reqPDU, v2c.getNextRequestID())
v2c.apiBulkPDU.setVarBinds(reqPDU, varBinds)
try:
self.sendPdu(
snmpEngine, targetName, contextEngineId, contextName, reqPDU,
self.processResponseVarBinds,
(targetName, nonRepeaters, maxRepetitions,
contextEngineId, contextName, reqPDU, cbFun, cbCtx))
except StatusInformation as exc:
statusInformation = exc
debug.logger & debug.FLAG_APP and debug.logger(
'processResponseVarBinds: sendPduHandle %s: _sendPdu() failed '
'with %r' % (sendRequestHandle, statusInformation))
cbFun(snmpEngine, sendRequestHandle,
statusInformation['errorIndication'], 0, 0, (), cbCtx)
开发者ID:etingof,项目名称:pysnmp,代码行数:58,代码来源:cmdgen.py
示例13: flipFlopFsm
def flipFlopFsm(self, fsmTable, inputNameVals, acInfo):
self.__indexMib()
debug.logger & debug.flagIns and debug.logger('flipFlopFsm: inputNameVals %r' % (inputNameVals,))
mibTree, = self.mibBuilder.importSymbols('SNMPv2-SMI', 'iso')
outputNameVals = []
state, status = 'start', 'ok'
origExc = None
while 1:
k = (state, status)
if k in fsmTable:
fsmState = fsmTable[k]
else:
k = ('*', status)
if k in fsmTable:
fsmState = fsmTable[k]
else:
raise error.SmiError(
'Unresolved FSM state %s, %s' % (state, status)
)
debug.logger & debug.flagIns and debug.logger('flipFlopFsm: state %s status %s -> fsmState %s' % (state, status, fsmState))
state = fsmState
status = 'ok'
if state == 'stop':
break
idx = 0
for name, val in inputNameVals:
f = getattr(mibTree, state, None)
if f is None:
raise error.SmiError(
'Unsupported state handler %s at %s' % (state, self)
)
try:
# Convert to tuple to avoid ObjectName instantiation
# on subscription
rval = f(tuple(name), val, idx, acInfo)
except error.SmiError:
exc_t, exc_v, exc_tb = sys.exc_info()
debug.logger & debug.flagIns and debug.logger('flipFlopFsm: fun %s exception %s for %s=%r with traceback: %s' % (f, exc_t, name, val, traceback.format_exception(exc_t, exc_v, exc_tb)))
if origExc is None: # Take the first exception
origExc, origTraceback = exc_v, exc_tb
status = 'err'
break
else:
debug.logger & debug.flagIns and debug.logger('flipFlopFsm: fun %s suceeded for %s=%r' % (f, name, val))
if rval is not None:
outputNameVals.append((rval[0], rval[1]))
idx = idx + 1
if origExc:
if sys.version_info[0] <= 2:
raise origExc
else:
try:
raise origExc.with_traceback(origTraceback)
finally:
# Break cycle between locals and traceback object
# (seems to be irrelevant on Py3 but just in case)
del origTraceback
return outputNameVals
开发者ID:BoundaryDev,项目名称:boundary-plugin-mongodb-enterprise-dev,代码行数:58,代码来源:instrum.py
示例14: startProtocol
def startProtocol(self):
debug.logger & debug.flagIO and debug.logger('startProtocol: invoked')
while self._writeQ:
outgoingMessage, transportAddress = self._writeQ.pop(0)
debug.logger & debug.flagIO and debug.logger('startProtocol: transportAddress %r outgoingMessage %s' % (transportAddress, debug.hexdump(outgoingMessage)))
try:
self.transport.write(outgoingMessage, transportAddress)
except Exception:
raise error.CarrierError('Twisted exception: %s' % (sys.exc_info()[1],))
开发者ID:BoundaryDev,项目名称:boundary-plugin-mongodb-enterprise-dev,代码行数:9,代码来源:base.py
示例15: startProtocol
def startProtocol(self):
debug.logger & debug.flagIO and debug.logger('startProtocol: invoked')
while self._writeQ:
outgoingMessage, transportAddress = self._writeQ.pop(0)
debug.logger & debug.flagIO and debug.logger('startProtocol: transportAddress %s outgoingMessage %s' % (transportAddress, repr(outgoingMessage)))
try:
self.transport.write(outgoingMessage, transportAddress)
except Exception, why:
raise error.CarrierError('Twisted exception: %s' % (why,))
开发者ID:rolfschr,项目名称:testerman,代码行数:9,代码来源:base.py
示例16: getMibInstrum
def getMibInstrum(self, contextName):
contextName = univ.OctetString(contextName).asOctets()
if contextName not in self.contextNames:
debug.logger & debug.flagIns and debug.logger('getMibInstrum: contextName \"%s\" not registered' % contextName)
raise error.PySnmpError(
'Missing contextName %s' % contextName
)
else:
debug.logger & debug.flagIns and debug.logger('getMibInstrum: contextName \"%s\", mibInstum %s' % (contextName, self.contextNames[contextName]))
return self.contextNames[contextName]
开发者ID:carmackjia,项目名称:pysnmp,代码行数:10,代码来源:context.py
示例17: handle_write
def handle_write(self):
outgoingMessage, transportAddress = self.__outQueue.pop()
debug.logger & debug.flagIO and debug.logger('handle_write: transportAddress %r outgoingMessage %r' % (transportAddress, outgoingMessage))
try:
self.socket.sendto(outgoingMessage, transportAddress)
except socket.error:
if sys.exc_info()[1].args[0] in sockErrors:
debug.logger & debug.flagIO and debug.logger('handle_write: ignoring socket error %s' % (sys.exc_info()[1],))
else:
raise socket.error(sys.exc_info()[1])
开发者ID:chilikk,项目名称:snmp-course-project,代码行数:10,代码来源:base.py
示例18: loadModules
def loadModules(self, *modNames, **userCtx):
# Build a list of available modules
if not modNames:
modNames = {}
for mibSource in self.__mibSources:
for modName in mibSource.listdir():
modNames[modName] = None
modNames = list(modNames.keys())
if not modNames:
raise error.SmiError(
'No MIB module to load at %s' % (self,)
)
for modName in modNames:
for mibSource in self.__mibSources:
debug.logger & debug.flagBld and debug.logger('loadModules: trying %s at %s' % (modName, mibSource))
try:
modData, sfx = mibSource.read(modName)
except IOError:
debug.logger & debug.flagBld and debug.logger('loadModules: read %s from %s failed: %s' % (modName, mibSource, sys.exc_info()[1]))
continue
modPath = mibSource.fullPath(modName, sfx)
if modPath in self.__modPathsSeen:
debug.logger & debug.flagBld and debug.logger('loadModules: seen %s' % modPath)
break
else:
self.__modPathsSeen[modPath] = 1
debug.logger & debug.flagBld and debug.logger('loadModules: evaluating %s' % modPath)
g = { 'mibBuilder': self,
'userCtx': userCtx }
try:
exec(modData, g)
except Exception:
del self.__modPathsSeen[modPath]
raise error.SmiError(
'MIB module \"%s\" load error: %s' % (modPath, sys.exc_info()[1])
)
self.__modSeen[modName] = modPath
debug.logger & debug.flagBld and debug.logger('loadModules: loaded %s' % modPath)
break
if modName not in self.__modSeen:
raise error.SmiError(
'MIB file \"%s\" not found in search path' % (modName and modName + ".py[co]")
)
return self
开发者ID:carmackjia,项目名称:pysnmp,代码行数:55,代码来源:builder.py
示例19: connection_made
def connection_made(self, transport):
self.transport = transport
debug.logger & debug.flagIO and debug.logger('connection_made: invoked')
while self._writeQ:
outgoingMessage, transportAddress = self._writeQ.pop(0)
debug.logger & debug.flagIO and debug.logger('connection_made: transportAddress %r outgoingMessage %s' %
(transportAddress, debug.hexdump(outgoingMessage)))
try:
self.transport.sendto(outgoingMessage, self.normalizeAddress(transportAddress))
except Exception:
raise error.CarrierError(';'.join(traceback.format_exception(*sys.exc_info())))
开发者ID:437049211,项目名称:PyQYT,代码行数:11,代码来源:base.py
示例20: loadModule
def loadModule(self, modName, **userCtx):
"""Load and execute MIB modules as Python code"""
for mibSource in self._mibSources:
debug.logger & debug.FLAG_BLD and debug.logger(
'loadModule: trying %s at %s' % (modName, mibSource))
try:
codeObj, sfx = mibSource.read(modName)
except IOError as exc:
debug.logger & debug.FLAG_BLD and debug.logger(
'loadModule: read %s from %s failed: '
'%s' % (modName, mibSource, exc))
continue
modPath = mibSource.fullPath(modName, sfx)
if modPath in self._modPathsSeen:
debug.logger & debug.FLAG_BLD and debug.logger(
'loadModule: seen %s' % modPath)
break
else:
self._modPathsSeen.add(modPath)
debug.logger & debug.FLAG_BLD and debug.logger(
'loadModule: evaluating %s' % modPath)
g = {'mibBuilder': self,
'userCtx': userCtx}
try:
exec(codeObj, g)
except Exception:
self._modPathsSeen.remove(modPath)
raise error.MibLoadError(
'MIB module "%s" load error: '
'%s' % (modPath, traceback.format_exception(*sys.exc_info())))
self._modSeen[modName] = modPath
debug.logger & debug.FLAG_BLD and debug.logger(
'loadModule: loaded %s' % modPath)
break
if modName not in self._modSeen:
raise error.MibNotFoundError(
'MIB file "%s" not found in search path '
'(%s)' % (modName and modName + ".py[co]", ', '.join(
[str(x) for x in self._mibSources])))
return self
开发者ID:etingof,项目名称:pysnmp,代码行数:54,代码来源:builder.py
注:本文中的pysnmp.debug.logger函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论