本文整理汇总了Python中pyndn.Face类的典型用法代码示例。如果您正苦于以下问题:Python Face类的具体用法?Python Face怎么用?Python Face使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Face类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main():
# Silence the warning from Interest wire encode.
Interest.setDefaultCanBePrefix(True)
# The default Face connects to the local NFD.
face = Face()
interest = Interest(Name("/localhost/nfd/faces/list"))
interest.setInterestLifetimeMilliseconds(4000)
dump("Express interest", interest.getName().toUri())
enabled = [True]
def onComplete(content):
enabled[0] = False
printFaceStatuses(content)
def onError(errorCode, message):
enabled[0] = False
dump(message)
SegmentFetcher.fetch(face, interest, None, onComplete, onError)
# Loop calling processEvents until a callback sets enabled[0] = False.
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
开发者ID:named-data,项目名称:PyNDN2,代码行数:29,代码来源:test_list_faces.py
示例2: main
def main():
# The default Face connects to the local NFD.
face = Face()
interest = Interest(Name("/localhost/nfd/rib/list"))
interest.setInterestLifetimeMilliseconds(4000)
dump("Express interest", interest.getName().toUri())
enabled = [True]
def onComplete(content):
enabled[0] = False
printRibEntries(content)
def onError(errorCode, message):
enabled[0] = False
dump(message)
SegmentFetcher.fetch(
face, interest, SegmentFetcher.DontVerifySegment, onComplete, onError)
# Loop calling processEvents until a callback sets enabled[0] = False.
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
开发者ID:mjycom,项目名称:PyNDN2,代码行数:27,代码来源:test_list_rib.py
示例3: main
def main():
interest = Interest()
interest.wireDecode(TlvInterest)
dump("Interest:")
dumpInterest(interest)
# Set the name again to clear the cached encoding so we encode again.
interest.setName(interest.getName())
encoding = interest.wireEncode()
dump("")
dump("Re-encoded interest", encoding.toHex())
reDecodedInterest = Interest()
reDecodedInterest.wireDecode(encoding)
dump("Re-decoded Interest:")
dumpInterest(reDecodedInterest)
freshInterest = Interest(Name("/ndn/abc"))
freshInterest.setMustBeFresh(False)
dump(freshInterest.toUri())
freshInterest.setMinSuffixComponents(4)
freshInterest.setMaxSuffixComponents(6)
freshInterest.getKeyLocator().setType(KeyLocatorType.KEY_LOCATOR_DIGEST)
freshInterest.getKeyLocator().setKeyData(bytearray(
[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F]))
freshInterest.getExclude().appendComponent(Name("abc")[0]).appendAny()
freshInterest.setInterestLifetimeMilliseconds(30000)
freshInterest.setChildSelector(1)
freshInterest.setMustBeFresh(True);
freshInterest.setScope(2)
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(IdentityManager(identityStorage, privateKeyStorage),
SelfVerifyPolicyManager(identityStorage))
# Initialize the storage.
keyName = Name("/testname/DSK-123")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_RSA_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(
keyName, KeyType.RSA, DEFAULT_RSA_PUBLIC_KEY_DER, DEFAULT_RSA_PRIVATE_KEY_DER)
# Make a Face just so that we can sign the interest.
face = Face("localhost")
face.setCommandSigningInfo(keyChain, certificateName)
face.makeCommandInterest(freshInterest)
reDecodedFreshInterest = Interest()
reDecodedFreshInterest.wireDecode(freshInterest.wireEncode())
dump("")
dump("Re-decoded fresh Interest:")
dumpInterest(reDecodedFreshInterest)
keyChain.verifyInterest(
reDecodedFreshInterest, makeOnVerified("Freshly-signed Interest"),
makeOnVerifyFailed("Freshly-signed Interest"))
开发者ID:WeiqiJust,项目名称:NDN-total,代码行数:59,代码来源:test_encode_decode_interest.py
示例4: Consumer
class Consumer(object):
'''Hello World consumer'''
def __init__(self, prefix):
self.prefix = Name(prefix)
self.outstanding = dict()
self.isDone = False
self.face = Face("127.0.0.1")
def run(self):
try:
self._sendNextInterest(self.prefix)
while not self.isDone:
self.face.processEvents()
time.sleep(0.01)
except RuntimeError as e:
print "ERROR: %s" % e
def _sendNextInterest(self, name):
interest = Interest(name)
uri = name.toUri()
interest.setInterestLifetimeMilliseconds(4000)
interest.setMustBeFresh(True)
if uri not in self.outstanding:
self.outstanding[uri] = 1
self.face.expressInterest(interest, self._onData, self._onTimeout)
print "Sent Interest for %s" % uri
def _onData(self, interest, data):
payload = data.getContent()
name = data.getName()
print "Received data: ", payload.toRawStr()
del self.outstanding[name.toUri()]
self.isDone = True
def _onTimeout(self, interest):
name = interest.getName()
uri = name.toUri()
print "TIMEOUT #%d: %s" % (self.outstanding[uri], uri)
self.outstanding[uri] += 1
if self.outstanding[uri] <= 3:
self._sendNextInterest(name)
else:
self.isDone = True
开发者ID:dibenede,项目名称:ndn-tutorial-gec21,代码行数:58,代码来源:hello_consumer.py
示例5: main
def main():
# Silence the warning from Interest wire encode.
Interest.setDefaultCanBePrefix(True)
interest = Interest()
interest.wireDecode(TlvInterest)
dump("Interest:")
dumpInterest(interest)
# Set the name again to clear the cached encoding so we encode again.
interest.setName(interest.getName())
encoding = interest.wireEncode()
dump("")
dump("Re-encoded interest", encoding.toHex())
reDecodedInterest = Interest()
reDecodedInterest.wireDecode(encoding)
dump("Re-decoded Interest:")
dumpInterest(reDecodedInterest)
freshInterest = (Interest(Name("/ndn/abc"))
.setMustBeFresh(False)
.setMinSuffixComponents(4)
.setMaxSuffixComponents(6)
.setInterestLifetimeMilliseconds(30000)
.setChildSelector(1)
.setMustBeFresh(True))
freshInterest.getKeyLocator().setType(KeyLocatorType.KEY_LOCATOR_DIGEST)
freshInterest.getKeyLocator().setKeyData(bytearray(
[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F]))
freshInterest.getExclude().appendComponent(Name("abc")[0]).appendAny()
freshInterest.getForwardingHint().add(1, Name("/A"))
dump(freshInterest.toUri())
# Set up the KeyChain.
keyChain = KeyChain("pib-memory:", "tpm-memory:")
keyChain.importSafeBag(SafeBag
(Name("/testname/KEY/123"),
Blob(DEFAULT_RSA_PRIVATE_KEY_DER, False),
Blob(DEFAULT_RSA_PUBLIC_KEY_DER, False)))
validator = Validator(ValidationPolicyFromPib(keyChain.getPib()))
# Make a Face just so that we can sign the interest.
face = Face("localhost")
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
face.makeCommandInterest(freshInterest)
reDecodedFreshInterest = Interest()
reDecodedFreshInterest.wireDecode(freshInterest.wireEncode())
dump("")
dump("Re-decoded fresh Interest:")
dumpInterest(reDecodedFreshInterest)
validator.validate(
reDecodedFreshInterest, makeSuccessCallback("Freshly-signed Interest"),
makeFailureCallback("Freshly-signed Interest"))
开发者ID:named-data,项目名称:PyNDN2,代码行数:57,代码来源:test_encode_decode_interest.py
示例6: main
def main():
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
dataPrefix = "/home/test1/data"
repoDataPrefix = "/home/test1/data"
# Set up repo-ng, register prefix for repo-ng's fetch prefix
# Per configuration file in /usr/local/etc/ndn/repo-ng.conf
# memCache is not used for now; repo is hoping that the piece of data in question is still being held at nfd
#memCache = MemoryContentCache(face, 100000)
#memCache.registerPrefix(Name(repoDataPrefix), onRegisterFailed, onDataNotFound)
counter = Counter(face, repoDataPrefix)
interest = Interest(Name(dataPrefix))
interest.setChildSelector(1)
interest.setInterestLifetimeMilliseconds(defaultInterestLifetime)
face.expressInterest(interest, counter.onData, counter.onTimeout)
while True:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(1)
face.shutdown()
开发者ID:zhehaowang,项目名称:ndn-pi,代码行数:26,代码来源:test_udp_multicast_consumer_outstanding.py
示例7: setUp
def setUp(self):
testCertDirectory = 'policy_config/certs'
self.testCertFile = os.path.join(testCertDirectory, 'test.cert')
# Reuse the policy_config subdirectory for the temporary SQLite file.
self.databaseFilePath = "policy_config/test-public-info.db"
try:
os.remove(self.databaseFilePath)
except OSError:
# no such file
pass
self.identityStorage = BasicIdentityStorage(self.databaseFilePath)
self.privateKeyStorage = MemoryPrivateKeyStorage()
self.identityManager = IdentityManager(self.identityStorage,
self.privateKeyStorage)
self.policyManager = ConfigPolicyManager('policy_config/simple_rules.conf')
self.identityName = Name('/TestConfigPolicyManager/temp')
# to match the anchor cert
keyName = Name(self.identityName).append('ksk-1416010123')
self.privateKeyStorage.setKeyPairForKeyName(
keyName, KeyType.RSA, TEST_RSA_PUBLIC_KEY_DER, TEST_RSA_PRIVATE_KEY_DER)
self.identityStorage.addKey(
keyName, KeyType.RSA, Blob(TEST_RSA_PUBLIC_KEY_DER))
cert = self.identityManager.selfSign(keyName)
self.identityStorage.setDefaultKeyNameForIdentity(keyName)
self.identityManager.addCertificateAsDefault(cert)
self.keyChain = KeyChain(self.identityManager, self.policyManager)
self.keyName = keyName
self.face = Face()
开发者ID:mjycom,项目名称:PyNDN2,代码行数:34,代码来源:test_policy_manager.py
示例8: __init__
def __init__(self, prefix, pipeline):
self.prefix = Name(prefix)
self.pipeline = pipeline
self.nextSegment = 0
self.outstanding = dict()
self.isDone = False
self.face = Face("127.0.0.1")
开发者ID:dibenede,项目名称:ndn-tutorial-gec21,代码行数:8,代码来源:hello_ext_consumer.py
示例9: __init__
def __init__(self, bld_root, keychain, cert_name, symkey, timestamp):
Thread.__init__(self)
self.bld_root = bld_root
self.keychain = keychain
self.cert_name = cert_name
self.symkey = binascii.hexlify(symkey)
self.timestamp = timestamp
self.face = Face("localhost")
开发者ID:wentaoshang,项目名称:ndn-sensor,代码行数:8,代码来源:kds.py
示例10: test_interest_timestamp
def test_interest_timestamp(self):
interestName = Name('/ndn/ucla/edu/something')
certName = self.keyChain.getPib().getIdentity(self.identityName).getKey(
self.keyName).getDefaultCertificate().getName()
face = Face("localhost")
face.setCommandSigningInfo(self.keyChain, certName)
oldInterest = Interest(interestName)
face.makeCommandInterest(oldInterest)
time.sleep(0.1) # make sure timestamps are different
newInterest = Interest(interestName)
face.makeCommandInterest(newInterest)
vr = doVerify(self.policyManager, newInterest)
self.assertFalse(vr.hasFurtherSteps,
"ConfigPolicyManager returned ValidationRequest but certificate is known")
self.assertEqual(vr.failureCount, 0,
"Verification of valid interest failed")
self.assertEqual(vr.successCount, 1,
"Verification success called {} times instead of 1".format(
vr.successCount))
vr = doVerify(self.policyManager, oldInterest)
self.assertFalse(vr.hasFurtherSteps,
"ConfigPolicyManager returned ValidationRequest but certificate is known")
self.assertEqual(vr.successCount, 0,
"Verification of stale interest succeeded")
self.assertEqual(vr.failureCount, 1,
"Failure callback called {} times instead of 1".format(
vr.failureCount))
开发者ID:named-data,项目名称:PyNDN2,代码行数:33,代码来源:test_policy_manager_v2.py
示例11: main
def main():
# Silence the warning from Interest wire encode.
Interest.setDefaultCanBePrefix(True)
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
counter = Counter()
if sys.version_info[0] <= 2:
word = raw_input("Enter a word to echo: ")
else:
word = input("Enter a word to echo: ")
name = Name("/testecho")
name.append(word)
dump("Express name ", name.toUri())
face.expressInterest(name, counter.onData, counter.onTimeout)
while counter._callbackCount < 1:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:named-data,项目名称:PyNDN2,代码行数:25,代码来源:test_echo_consumer.py
示例12: main
def main():
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage), None)
keyChain.setFace(face)
# Initialize the storage.
keyName = Name("/testname/DSK-123")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_RSA_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(
keyName, KeyType.RSA, DEFAULT_RSA_PUBLIC_KEY_DER, DEFAULT_RSA_PRIVATE_KEY_DER)
echo = Echo(keyChain, certificateName)
prefix = Name("/testecho")
dump("Register prefix", prefix.toUri())
face.registerPrefix(prefix, echo.onInterest, echo.onRegisterFailed)
while echo._responseCount < 1:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:bboalimoe,项目名称:PyNDN2,代码行数:29,代码来源:test_publish_async_ndnx.py
示例13: main
def main():
face = Face("localhost")
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(
IdentityManager(identityStorage, privateKeyStorage), None)
keyChain.setFace(face)
# Initialize the storage.
keyName = Name("/testname/DSK-reposerver")
certificateName = keyName.getSubName(0, keyName.size() - 1).append(
"KEY").append(keyName[-1]).append("ID-CERT").append("0")
identityStorage.addKey(keyName, KeyType.RSA, Blob(DEFAULT_PUBLIC_KEY_DER))
privateKeyStorage.setKeyPairForKeyName(
keyName, DEFAULT_PUBLIC_KEY_DER, DEFAULT_PRIVATE_KEY_DER)
echo = RepoServer(keyChain, certificateName)
prefix = Name("/ndn/ucla.edu/bms")
dump("Register prefix", prefix.toUri())
face.registerPrefix(prefix, echo.onInterest, echo.onRegisterFailed)
while True:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:remap,项目名称:BMS-REPO,代码行数:28,代码来源:server.py
示例14: main
def main():
"""
Call requestInsert and register a prefix so that ProduceSegments will answer
interests from the repo to send the data packets. This assumes that repo-ng
is already running (e.g. `sudo ndn-repo-ng`).
"""
repoCommandPrefix = Name("/example/repo/1")
repoDataPrefix = Name("/example/data/1")
nowMilliseconds = int(time.time() * 1000.0)
fetchPrefix = Name(repoDataPrefix).append("testinsert").appendVersion(nowMilliseconds)
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
# Use the system default key chain and certificate name to sign commands.
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# Register the prefix and send the repo insert command at the same time.
startBlockId = 0
endBlockId = 1
enabled = [True]
def onFinished():
dump("All data was inserted.")
enabled[0] = False
produceSegments = ProduceSegments(
keyChain, keyChain.getDefaultCertificateName(), startBlockId, endBlockId,
onFinished)
dump("Register prefix", fetchPrefix.toUri())
def onRegisterFailed(prefix):
dump("Register failed for prefix", prefix.toUri())
enabled[0] = False
face.registerPrefix(
fetchPrefix, produceSegments.onInterest, onRegisterFailed)
def onInsertStarted():
dump("Insert started for", fetchPrefix.toUri())
def onFailed():
enabled[0] = False
requestInsert(
face, repoCommandPrefix, fetchPrefix, onInsertStarted, onFailed,
startBlockId, endBlockId)
# Run until all the data is sent.
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:named-data,项目名称:PyNDN2,代码行数:50,代码来源:basic_insertion.py
示例15: start
def start(self):
self.face = Face()
self.face.setCommandSigningInfo(self.keychain, self.certificateName)
self.face.registerPrefix(self.prefix, self.onLightingCommand, self.onRegisterFailed)
while self.face is not None:
self.face.processEvents()
if self.registerFailed:
self.stop()
break
开发者ID:thecodemaiden,项目名称:nfd-timing,代码行数:9,代码来源:fixture.py
示例16: __init__
def __init__(self, prefix, pipeline, count):
self.prefix = prefix
self.pipeline = pipeline
self.count = count
self.nextSegment = 0
self.outstanding = dict()
self.isDone = False
self.face = Face("127.0.0.1")
开发者ID:dibenede,项目名称:ndn-tutorial-gec21,代码行数:9,代码来源:consumer.py
示例17: main
def main():
"""
Call startRepoWatch and register a prefix so that SendSegments will answer
interests from the repo to send data packets for the watched prefix. When
all the data is sent (or an error), call stopRepoWatch. This assumes that
repo-ng is already running (e.g. `sudo ndn-repo-ng`).
"""
repoCommandPrefix = Name("/example/repo/1")
repoDataPrefix = Name("/example/data/1")
nowMilliseconds = int(time.time() * 1000.0)
watchPrefix = Name(repoDataPrefix).append("testwatch").appendVersion(
nowMilliseconds)
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
# Use the system default key chain and certificate name to sign commands.
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# Register the prefix and start the repo watch at the same time.
enabled = [True]
def onFinishedSending():
stopRepoWatchAndQuit(face, repoCommandPrefix, watchPrefix, enabled)
sendSegments = SendSegments(
keyChain, keyChain.getDefaultCertificateName(), onFinishedSending)
def onRegisterFailed(prefix):
dump("Register failed for prefix", prefix.toUri())
enabled[0] = False
dump("Register prefix", watchPrefix.toUri())
face.registerPrefix(watchPrefix, sendSegments.onInterest, onRegisterFailed)
def onRepoWatchStarted():
dump("Watch started for", watchPrefix.toUri())
def onStartFailed():
dump("startRepoWatch failed.")
stopRepoWatchAndQuit(face, repoCommandPrefix, watchPrefix, enabled)
startRepoWatch(
face, repoCommandPrefix, watchPrefix, onRepoWatchStarted, onStartFailed)
# Run until someone sets enabled[0] = False.
enabled[0] = True
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
face.shutdown()
开发者ID:named-data,项目名称:PyNDN2,代码行数:48,代码来源:watched_insertion.py
示例18: Consumer
class Consumer(object):
'''Sends Interest, listens for data'''
def __init__(self, name):
self.name = Name(name)
self.face = Face()
self.isDone = False
def run(self):
try:
interest = Interest(self.name)
uri = self.name.toUri()
interest.setInterestLifetimeMilliseconds(4000)
interest.setMustBeFresh(True)
self.face.expressInterest(interest, self._onData, self._onTimeout)
while not self.isDone:
self.face.processEvents()
time.sleep(0.01)
print "Sent Interest for %s" % uri
except RuntimeError as e:
print "ERROR: %s" % e
def _onData(self, interest, data):
payload = data.getContent()
name = data.getName()
print "Received data: %s\n" % payload.toRawStr()
self.isDone = True
def _onTimeout(self, interest):
name = interest.getName()
uri = name.toUri()
print "TIMEOUT ", uri
self.isDone = True
开发者ID:dibenede,项目名称:nfd-strategies,代码行数:43,代码来源:consumer.py
示例19: main
def main():
prefix = Name("/nfd/edu/ucla/remap/test")
# Route to aleph.ndn.ucla.edu. Have to use the canonical name with an IP
# address and port.
uri = "udp4://128.97.98.7:6363"
# The default Face connects to the local NFD.
face = Face()
# Use the system default key chain and certificate name to sign commands.
keyChain = KeyChain()
face.setCommandSigningInfo(keyChain, keyChain.getDefaultCertificateName())
# Create the /localhost/nfd/faces/query command interest, including the
# FaceQueryFilter. Construct the FaceQueryFilter using the structure in
# face_query_filter_pb2 which was produced by protoc.
message = face_query_filter_pb2.FaceQueryFilterMessage()
filter = message.face_query_filter.add()
filter.uri = uri
encodedFilter = ProtobufTlv.encode(message)
interest = Interest(Name("/localhost/nfd/faces/query"))
interest.getName().append(encodedFilter)
enabled = [True]
def onComplete(content):
processFaceStatus(content, prefix, uri, face, enabled)
def onError(errorCode, message):
enabled[0] = False
dump(message)
SegmentFetcher.fetch(
face, interest, SegmentFetcher.DontVerifySegment, onComplete, onError)
# Loop calling processEvents until a callback sets enabled[0] = False.
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
开发者ID:MAHIS,项目名称:PyNDN2,代码行数:42,代码来源:test_register_route.py
示例20: main
def main():
# The default Face will connect using a Unix socket, or to "localhost".
face = Face()
groupName = Name("/Prefix/READ")
userName = Name("/U")
userKeyName = Name("/U/Key")
contentPrefix = Name("/Prefix/SAMPLE")
contentName = Name(contentPrefix).append("Content")
keyChain = createVerifyKeyChain()
# Imitate test_consumer from the PyNDN integration tests.
databaseFilePath = "test.db"
try:
os.remove(databaseFilePath)
except OSError:
pass
namespace = Namespace(contentName)
namespace.setFace(face)
handler = NacConsumerHandler(
namespace, keyChain, groupName, userName, Sqlite3ConsumerDb(databaseFilePath))
fixtureUserDKeyBlob = Blob(FIXTURE_USER_D_KEY)
handler.addDecryptionKey(userKeyName, fixtureUserDKeyBlob)
enabled = [True]
def onContentSet(namespace, contentNamespace, callbackId):
if contentNamespace == namespace:
dump("Got segmented content", contentNamespace.content.toRawStr())
enabled[0] = False
namespace.addOnContentSet(onContentSet)
segmentStream = SegmentStream(namespace)
SegmentedContent(segmentStream)
segmentStream.start()
while enabled[0]:
face.processEvents()
# We need to sleep for a few milliseconds so we don't use 100% of the CPU.
time.sleep(0.01)
开发者ID:named-data,项目名称:PyCNL,代码行数:41,代码来源:test_nac_consumer.py
注:本文中的pyndn.Face类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论