本文整理汇总了Python中pyndn.security.KeyChain类的典型用法代码示例。如果您正苦于以下问题:Python KeyChain类的具体用法?Python KeyChain怎么用?Python KeyChain使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了KeyChain类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: benchmarkDecodeDataSeconds
def benchmarkDecodeDataSeconds(nIterations, useCrypto, keyType, encoding):
"""
Loop to decode a data packet nIterations times.
:param int nIterations: The number of iterations.
:param bool useCrypto: If true, verify the signature. If false, don't
verify.
:param KeyType keyType: KeyType.RSA or EC, used if useCrypto is True.
:param Blob encoding: The wire encoding to decode.
:return: The number of seconds for all iterations.
:rtype: float
"""
# Initialize the KeyChain in case useCrypto is true.
keyChain = KeyChain("pib-memory:", "tpm-memory:")
# This puts the public key in the pibImpl used by the SelfVerifyPolicyManager.
keyChain.importSafeBag(SafeBag
(Name("/testname/KEY/123"),
Blob(DEFAULT_EC_PRIVATE_KEY_DER if keyType == KeyType.EC
else DEFAULT_RSA_PRIVATE_KEY_DER, False),
Blob(DEFAULT_EC_PUBLIC_KEY_DER if keyType == KeyType.EC
else DEFAULT_RSA_PUBLIC_KEY_DER, False)))
validator = Validator(ValidationPolicyFromPib(keyChain.getPib()))
start = getNowSeconds()
for i in range(nIterations):
data = Data()
data.wireDecode(encoding)
if useCrypto:
validator.validate(data,onVerifySuccess, onVerifyFailed)
finish = getNowSeconds()
return finish - start
开发者ID:named-data,项目名称:PyNDN2,代码行数:34,代码来源:test_encode_decode_benchmark.py
示例2: main
def main():
face = Face("localhost")
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage), None)
keyChain.setFace(face)
# Initialize the storage.
keyName = Name("/testname/DSK-reposerver")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(
keyName, DEFAULT_PUBLIC_KEY_DER, DEFAULT_PRIVATE_KEY_DER)
echo = RepoServer(keyChain, certificateName)
prefix = Name("/ndn/ucla.edu/bms")
dump("Register prefix", prefix.toUri())
face.registerPrefix(prefix, echo.onInterest, echo.onRegisterFailed)
while True:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:remap,项目名称:BMS-REPO,代码行数:28,代码来源:server.py
示例3: main
def main():
# Silence the warning from Interest wire encode.
Interest.setDefaultCanBePrefix(True)
interest = Interest()
interest.wireDecode(TlvInterest)
# Use a hard-wired secret for testing. In a real application the signer
# ensures that the verifier knows the shared key and its keyName.
key = Blob(bytearray([
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31
]))
if KeyChain.verifyInterestWithHmacWithSha256(interest, key):
dump("Hard-coded interest signature verification: VERIFIED")
else:
dump("Hard-coded interest signature verification: FAILED")
freshInterest = Interest(Name("/ndn/abc"))
freshInterest.setMustBeFresh(False)
keyName = Name("key1")
dump("Signing fresh interest", freshInterest.getName().toUri())
KeyChain.signWithHmacWithSha256(freshInterest, key, keyName)
if KeyChain.verifyInterestWithHmacWithSha256(freshInterest, key):
dump("Freshly-signed interest signature verification: VERIFIED")
else:
dump("Freshly-signed interest signature verification: FAILED")
开发者ID:named-data,项目名称:PyNDN2,代码行数:29,代码来源:test_sign_verify_interest_hmac.py
示例4: test_self_verification
def test_self_verification(self):
policyManager = SelfVerifyPolicyManager(self.identityStorage)
keyChain = KeyChain(self.identityManager, policyManager)
identityName = Name('TestValidator/RsaSignatureVerification')
keyChain.createIdentityAndCertificate(identityName)
data = Data(Name('/TestData/1'))
keyChain.signByIdentity(data, identityName)
vr = doVerify(policyManager, data)
self.assertFalse(vr.hasFurtherSteps,
"SelfVerifyPolicyManager returned a ValidationRequest")
self.assertEqual(vr.failureCount, 0,
"Verification of identity-signed data failed")
self.assertEqual(vr.successCount, 1,
"Verification success called {} times instead of 1".format(
vr.successCount))
data2 = Data(Name('/TestData/2'))
vr = doVerify(policyManager,
data2)
self.assertFalse(vr.hasFurtherSteps,
"SelfVerifyPolicyManager returned a ValidationRequest")
self.assertEqual(vr.successCount, 0,
"Verification of unsigned data succeeded")
self.assertEqual(vr.failureCount, 1,
"Verification failure callback called {} times instead of 1".format(
vr.failureCount))
开发者ID:mjycom,项目名称:PyNDN2,代码行数:32,代码来源:test_policy_manager.py
示例5: main
def main():
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
dataPrefix = "/home/test1/data"
repoDataPrefix = "/home/test1/data"
# Set up repo-ng, register prefix for repo-ng's fetch prefix
# Per configuration file in /usr/local/etc/ndn/repo-ng.conf
# memCache is not used for now; repo is hoping that the piece of data in question is still being held at nfd
#memCache = MemoryContentCache(face, 100000)
#memCache.registerPrefix(Name(repoDataPrefix), onRegisterFailed, onDataNotFound)
counter = Counter(face, repoDataPrefix)
interest = Interest(Name(dataPrefix))
interest.setChildSelector(1)
interest.setInterestLifetimeMilliseconds(defaultInterestLifetime)
face.expressInterest(interest, counter.onData, counter.onTimeout)
while True:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(1)
face.shutdown()
开发者ID:zhehaowang,项目名称:ndn-pi,代码行数:26,代码来源:test_udp_multicast_consumer_outstanding.py
示例6: wrap_content
def wrap_content(self, name, content, key=None, key_locator=None):
"""
@param name - name of the data
@param content - data to be wrapped
@param key - key used to sign the data
@return the content object created
wraps the given name and content into a content object
"""
co = Data(Name(name))
co.setContent(content)
co.getMetaInfo().setFreshnessPeriod(5000)
co.getMetaInfo().setFinalBlockID(Name("/%00%09")[0])
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
identityManager = IdentityManager(identityStorage, privateKeyStorage)
keyChain = KeyChain(identityManager, None)
# Initialize the storage.
keyName = Name("/ndn/bms/DSK-default")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(keyName, DEFAULT_PUBLIC_KEY_DER,
DEFAULT_PRIVATE_KEY_DER)
keyChain.sign(co, certificateName)
_data = co.wireEncode()
return _data.toRawStr()
开发者ID:remap,项目名称:BMS-REPO,代码行数:31,代码来源:repo.py
示例7: main
def main():
data = Data()
data.wireDecode(TlvData)
# Use a hard-wired secret for testing. In a real application the signer
# ensures that the verifier knows the shared key and its keyName.
key = Blob(bytearray([
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31
]))
if KeyChain.verifyDataWithHmacWithSha256(data, key):
dump("Hard-coded data signature verification: VERIFIED")
else:
dump("Hard-coded data signature verification: FAILED")
freshData = Data(Name("/ndn/abc"))
signature = HmacWithSha256Signature()
signature.getKeyLocator().setType(KeyLocatorType.KEYNAME)
signature.getKeyLocator().setKeyName(Name("key1"))
freshData.setSignature(signature)
freshData.setContent("SUCCESS!")
dump("Signing fresh data packet", freshData.getName().toUri())
KeyChain.signWithHmacWithSha256(freshData, key)
if KeyChain.verifyDataWithHmacWithSha256(freshData, key):
dump("Freshly-signed data signature verification: VERIFIED")
else:
dump("Freshly-signed data signature verification: FAILED")
开发者ID:MAHIS,项目名称:PyNDN2,代码行数:29,代码来源:test_sign_verify_data_hmac.py
示例8: benchmarkDecodeDataSeconds
def benchmarkDecodeDataSeconds(nIterations, useCrypto, encoding):
"""
Loop to decode a data packet nIterations times.
:param int nIterations: The number of iterations.
:param bool useCrypto: If true, verify the signature. If false, don't
verify.
:param Blob encoding: The wire encoding to decode.
"""
# Initialize the private key storage in case useCrypto is true.
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(IdentityManager(identityStorage, privateKeyStorage),
SelfVerifyPolicyManager(identityStorage))
keyName = Name("/testname/DSK-123")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_RSA_PUBLIC_KEY_DER))
start = getNowSeconds()
for i in range(nIterations):
data = Data()
data.wireDecode(encoding)
if useCrypto:
keyChain.verifyData(data, onVerified, onVerifyFailed)
finish = getNowSeconds()
return finish - start
开发者ID:WeiqiJust,项目名称:NDN-total,代码行数:30,代码来源:test_encode_decode_benchmark.py
示例9: main
def main():
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage), None)
keyChain.setFace(face)
# Initialize the storage.
keyName = Name("/testname/DSK-123")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_RSA_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(
keyName, KeyType.RSA, DEFAULT_RSA_PUBLIC_KEY_DER, DEFAULT_RSA_PRIVATE_KEY_DER)
echo = Echo(keyChain, certificateName)
prefix = Name("/testecho")
dump("Register prefix", prefix.toUri())
face.registerPrefix(prefix, echo.onInterest, echo.onRegisterFailed)
while echo._responseCount < 1:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:bboalimoe,项目名称:PyNDN2,代码行数:29,代码来源:test_publish_async_ndnx.py
示例10: main
def main():
interest = Interest()
interest.wireDecode(TlvInterest)
dump("Interest:")
dumpInterest(interest)
# Set the name again to clear the cached encoding so we encode again.
interest.setName(interest.getName())
encoding = interest.wireEncode()
dump("")
dump("Re-encoded interest", encoding.toHex())
reDecodedInterest = Interest()
reDecodedInterest.wireDecode(encoding)
dump("Re-decoded Interest:")
dumpInterest(reDecodedInterest)
freshInterest = Interest(Name("/ndn/abc"))
freshInterest.setMustBeFresh(False)
dump(freshInterest.toUri())
freshInterest.setMinSuffixComponents(4)
freshInterest.setMaxSuffixComponents(6)
freshInterest.getKeyLocator().setType(KeyLocatorType.KEY_LOCATOR_DIGEST)
freshInterest.getKeyLocator().setKeyData(bytearray(
[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F]))
freshInterest.getExclude().appendComponent(Name("abc")[0]).appendAny()
freshInterest.setInterestLifetimeMilliseconds(30000)
freshInterest.setChildSelector(1)
freshInterest.setMustBeFresh(True);
freshInterest.setScope(2)
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(IdentityManager(identityStorage, privateKeyStorage),
SelfVerifyPolicyManager(identityStorage))
# Initialize the storage.
keyName = Name("/testname/DSK-123")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_RSA_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(
keyName, KeyType.RSA, DEFAULT_RSA_PUBLIC_KEY_DER, DEFAULT_RSA_PRIVATE_KEY_DER)
# Make a Face just so that we can sign the interest.
face = Face("localhost")
face.setCommandSigningInfo(keyChain, certificateName)
face.makeCommandInterest(freshInterest)
reDecodedFreshInterest = Interest()
reDecodedFreshInterest.wireDecode(freshInterest.wireEncode())
dump("")
dump("Re-decoded fresh Interest:")
dumpInterest(reDecodedFreshInterest)
keyChain.verifyInterest(
reDecodedFreshInterest, makeOnVerified("Freshly-signed Interest"),
makeOnVerifyFailed("Freshly-signed Interest"))
开发者ID:WeiqiJust,项目名称:NDN-total,代码行数:59,代码来源:test_encode_decode_interest.py
示例11: Producer
class Producer(object):
def __init__(self, delay=None):
self.delay = delay
self.nDataServed = 0
self.isDone = False
def run(self, prefix):
self.keyChain = KeyChain()
self.consoleThread = ConsoleThread()
self.consoleThread.start()
# The default Face will connect using a Unix socket
face = Face()
prefix = Name(prefix)
# Use the system default key chain and certificate name to sign commands.
face.setCommandSigningInfo(self.keyChain, self.keyChain.getDefaultCertificateName())
# Also use the default certificate name to sign data packets.
face.registerPrefix(prefix, self.onInterest, self.onRegisterFailed)
print "Registering prefix", prefix.toUri()
while not self.isDone:
face.processEvents()
time.sleep(0.01)
def onInterest(self, prefix, interest, transport, registeredPrefixId):
global stopServing
if stopServing:
print "refusing to serve " + interest.getName().toUri()
self.consoleThread.join()
print "join'd thread"
return
if self.delay is not None:
time.sleep(self.delay)
interestName = interest.getName()
data = Data(interestName)
data.setContent("Hello " + interestName.toUri())
data.getMetaInfo().setFreshnessPeriod(3600 * 1000)
self.keyChain.sign(data, self.keyChain.getDefaultCertificateName())
transport.send(data.wireEncode().toBuffer())
self.nDataServed += 1
print "Replied to: %s (#%d)" % (interestName.toUri(), self.nDataServed)
def onRegisterFailed(self, prefix):
print "Register failed for prefix", prefix.toUri()
开发者ID:dibenede,项目名称:nfd-strategies,代码行数:59,代码来源:producer.py
示例12: onSetupFailed
def onSetupFailed(msg):
print(msg)
print("In this test, try start publishing with default keychain certificate anyway")
keyChain = KeyChain()
try:
defaultCertificateName = keyChain.getDefaultCertificateName()
startProducers(defaultCertificateName, keyChain)
except SecurityException as e:
print str(e)
开发者ID:remap,项目名称:ndn-flow,代码行数:9,代码来源:rpi_helper.py
示例13: main
def main():
# Silence the warning from Interest wire encode.
Interest.setDefaultCanBePrefix(True)
interest = Interest()
interest.wireDecode(TlvInterest)
dump("Interest:")
dumpInterest(interest)
# Set the name again to clear the cached encoding so we encode again.
interest.setName(interest.getName())
encoding = interest.wireEncode()
dump("")
dump("Re-encoded interest", encoding.toHex())
reDecodedInterest = Interest()
reDecodedInterest.wireDecode(encoding)
dump("Re-decoded Interest:")
dumpInterest(reDecodedInterest)
freshInterest = (Interest(Name("/ndn/abc"))
.setMustBeFresh(False)
.setMinSuffixComponents(4)
.setMaxSuffixComponents(6)
.setInterestLifetimeMilliseconds(30000)
.setChildSelector(1)
.setMustBeFresh(True))
freshInterest.getKeyLocator().setType(KeyLocatorType.KEY_LOCATOR_DIGEST)
freshInterest.getKeyLocator().setKeyData(bytearray(
[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F]))
freshInterest.getExclude().appendComponent(Name("abc")[0]).appendAny()
freshInterest.getForwardingHint().add(1, Name("/A"))
dump(freshInterest.toUri())
# Set up the KeyChain.
keyChain = KeyChain("pib-memory:", "tpm-memory:")
keyChain.importSafeBag(SafeBag
(Name("/testname/KEY/123"),
Blob(DEFAULT_RSA_PRIVATE_KEY_DER, False),
Blob(DEFAULT_RSA_PUBLIC_KEY_DER, False)))
validator = Validator(ValidationPolicyFromPib(keyChain.getPib()))
# Make a Face just so that we can sign the interest.
face = Face("localhost")
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
face.makeCommandInterest(freshInterest)
reDecodedFreshInterest = Interest()
reDecodedFreshInterest.wireDecode(freshInterest.wireEncode())
dump("")
dump("Re-decoded fresh Interest:")
dumpInterest(reDecodedFreshInterest)
validator.validate(
reDecodedFreshInterest, makeSuccessCallback("Freshly-signed Interest"),
makeFailureCallback("Freshly-signed Interest"))
开发者ID:named-data,项目名称:PyNDN2,代码行数:57,代码来源:test_encode_decode_interest.py
示例14: start
def start(self):
self.isStopped = False
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.face = ThreadsafeFace(self.loop, '')
k = KeyChain()
self.face.setCommandSigningInfo(k, k.getDefaultCertificateName())
self.face.stopWhen(lambda:self.isStopped)
try:
self.loop.run_until_complete(self.sendNextInsertRequest())
finally:
self.face.shutdown()
开发者ID:thecodemaiden,项目名称:ndn-repo-3,代码行数:12,代码来源:bms_ping.py
示例15: main
def main():
"""
Call requestInsert and register a prefix so that ProduceSegments will answer
interests from the repo to send the data packets. This assumes that repo-ng
is already running (e.g. `sudo ndn-repo-ng`).
"""
repoCommandPrefix = Name("/example/repo/1")
repoDataPrefix = Name("/example/data/1")
nowMilliseconds = int(time.time() * 1000.0)
fetchPrefix = Name(repoDataPrefix).append("testinsert").appendVersion(nowMilliseconds)
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
# Use the system default key chain and certificate name to sign commands.
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# Register the prefix and send the repo insert command at the same time.
startBlockId = 0
endBlockId = 1
enabled = [True]
def onFinished():
dump("All data was inserted.")
enabled[0] = False
produceSegments = ProduceSegments(
keyChain, keyChain.getDefaultCertificateName(), startBlockId, endBlockId,
onFinished)
dump("Register prefix", fetchPrefix.toUri())
def onRegisterFailed(prefix):
dump("Register failed for prefix", prefix.toUri())
enabled[0] = False
face.registerPrefix(
fetchPrefix, produceSegments.onInterest, onRegisterFailed)
def onInsertStarted():
dump("Insert started for", fetchPrefix.toUri())
def onFailed():
enabled[0] = False
requestInsert(
face, repoCommandPrefix, fetchPrefix, onInsertStarted, onFailed,
startBlockId, endBlockId)
# Run until all the data is sent.
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:named-data,项目名称:PyNDN2,代码行数:50,代码来源:basic_insertion.py
示例16: status_put
def status_put(name, data):
# Use the system default key chain and certificate name to sign commands.
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# Also use the default certificate name to sign data packets.
echo = Echo(keyChain, keyChain.getDefaultCertificateName(), data)
prefix = Name(name)
dump("Register prefix", prefix.toUri())
logging.debug('Register prefix ' + prefix.toUri())
face.registerPrefix(prefix, echo.onInterest, echo.onRegisterFailed)
face.processEvents()
开发者ID:agawande,项目名称:ndn-status,代码行数:14,代码来源:process.py
示例17: main
def main():
loop = asyncio.get_event_loop()
face = ThreadsafeFace(loop, "localhost")
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# TODO: NFD hack: uncomment once NFD forwarding fixed
# discoveree = Discoveree(loop, face, keyChain)
# TODO: NFD hack: remove once NFD forwarding fixed
discoveree = LocalDiscoveree(loop, face, keyChain)
cecTv = CecTv(loop, face, keyChain, discoveree)
loop.run_forever()
face.shutdown()
开发者ID:RayneHwang,项目名称:ndn-pi,代码行数:15,代码来源:hdmi_cec_node.py
示例18: main
def main():
"""
Call startRepoWatch and register a prefix so that SendSegments will answer
interests from the repo to send data packets for the watched prefix. When
all the data is sent (or an error), call stopRepoWatch. This assumes that
repo-ng is already running (e.g. `sudo ndn-repo-ng`).
"""
repoCommandPrefix = Name("/example/repo/1")
repoDataPrefix = Name("/example/data/1")
nowMilliseconds = int(time.time() * 1000.0)
watchPrefix = Name(repoDataPrefix).append("testwatch").appendVersion(
nowMilliseconds)
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
# Use the system default key chain and certificate name to sign commands.
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# Register the prefix and start the repo watch at the same time.
enabled = [True]
def onFinishedSending():
stopRepoWatchAndQuit(face, repoCommandPrefix, watchPrefix, enabled)
sendSegments = SendSegments(
keyChain, keyChain.getDefaultCertificateName(), onFinishedSending)
def onRegisterFailed(prefix):
dump("Register failed for prefix", prefix.toUri())
enabled[0] = False
dump("Register prefix", watchPrefix.toUri())
face.registerPrefix(watchPrefix, sendSegments.onInterest, onRegisterFailed)
def onRepoWatchStarted():
dump("Watch started for", watchPrefix.toUri())
def onStartFailed():
dump("startRepoWatch failed.")
stopRepoWatchAndQuit(face, repoCommandPrefix, watchPrefix, enabled)
startRepoWatch(
face, repoCommandPrefix, watchPrefix, onRepoWatchStarted, onStartFailed)
# Run until someone sets enabled[0] = False.
enabled[0] = True
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:named-data,项目名称:PyNDN2,代码行数:48,代码来源:watched_insertion.py
示例19: Producer
class Producer(object):
def __init__(self):
self.keyChain = KeyChain()
self.isDone = False
def run(self, namespace):
# Create a connection to the local forwarder over a Unix socket
face = Face()
prefix = Name(namespace)
# Use the system default key chain and certificate name to sign commands.
face.setCommandSigningInfo(self.keyChain, \
self.keyChain.getDefaultCertificateName())
# Also use the default certificate name to sign Data packets.
face.registerPrefix(prefix, self.onInterest, self.onRegisterFailed)
print "Registering prefix", prefix.toUri()
# Run the event loop forever. Use a short sleep to
# prevent the Producer from using 100% of the CPU.
while not self.isDone:
face.processEvents()
time.sleep(0.01)
def onInterest(self, prefix, interest, transport, registeredPrefixId):
interestName = interest.getName()
data = Data(interestName)
data.setContent("Hello, " + interestName.toUri())
hourMilliseconds = 3600 * 1000
data.getMetaInfo().setFreshnessPeriod(hourMilliseconds)
self.keyChain.sign(data, self.keyChain.getDefaultCertificateName())
transport.send(data.wireEncode().toBuffer())
print "Replied to: %s" % interestName.toUri()
def onRegisterFailed(self, prefix):
print "Register failed for prefix", prefix.toUri()
self.isDone = True
开发者ID:dibenede,项目名称:ndn-tutorial-gec21,代码行数:48,代码来源:hello_producer.py
示例20: setUp
def setUp(self):
testCertDirectory = 'policy_config/certs'
self.testCertFile = os.path.join(testCertDirectory, 'test.cert')
# Reuse the policy_config subdirectory for the temporary SQLite file.
self.databaseFilePath = "policy_config/test-public-info.db"
try:
os.remove(self.databaseFilePath)
except OSError:
# no such file
pass
self.identityStorage = BasicIdentityStorage(self.databaseFilePath)
self.privateKeyStorage = MemoryPrivateKeyStorage()
self.identityManager = IdentityManager(self.identityStorage,
self.privateKeyStorage)
self.policyManager = ConfigPolicyManager('policy_config/simple_rules.conf')
self.identityName = Name('/TestConfigPolicyManager/temp')
# to match the anchor cert
keyName = Name(self.identityName).append('ksk-1416010123')
self.privateKeyStorage.setKeyPairForKeyName(
keyName, KeyType.RSA, TEST_RSA_PUBLIC_KEY_DER, TEST_RSA_PRIVATE_KEY_DER)
self.identityStorage.addKey(
keyName, KeyType.RSA, Blob(TEST_RSA_PUBLIC_KEY_DER))
cert = self.identityManager.selfSign(keyName)
self.identityStorage.setDefaultKeyNameForIdentity(keyName)
self.identityManager.addCertificateAsDefault(cert)
self.keyChain = KeyChain(self.identityManager, self.policyManager)
self.keyName = keyName
self.face = Face()
开发者ID:mjycom,项目名称:PyNDN2,代码行数:34,代码来源:test_policy_manager.py
注:本文中的pyndn.security.KeyChain类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论