本文整理汇总了Python中streamsx.topology.tester.Tester类的典型用法代码示例。如果您正苦于以下问题:Python Tester类的具体用法?Python Tester怎么用?Python Tester使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Tester类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_TopologyIndirectPackage
def test_TopologyIndirectPackage(self):
topo = Topology("test_TopologyIndirectPackage")
hw = topo.source(["Hello", "World!"])
hwf = hw.transform(test2_pkg_helpers.imported_package)
tester = Tester(topo)
tester.contents(hwf, ["HelloIP"])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:ejpring,项目名称:streamsx.topology,代码行数:7,代码来源:test2_pkg.py
示例2: test_TopologyImportPackage
def test_TopologyImportPackage(self):
topo = Topology("test_TopologyImportPackage")
hw = topo.source(test_package.test_subpackage.test_module.SourceTuples(["Hello", "World!"]))
hwf = hw.filter(test_package.test_subpackage.test_module.filter)
tester = Tester(topo)
tester.contents(hwf, ["Hello"])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:ejpring,项目名称:streamsx.topology,代码行数:7,代码来源:test2_pkg.py
示例3: test_app_log
def test_app_log(self):
topo = Topology()
s = topo.source(['logmsg1', 'logmsg2你好'])
s.for_each(_log_msg)
tester = Tester(topo)
tester.tuple_count(s, 2)
tester.test(self.test_ctxtype, self.test_config)
开发者ID:wmarshall484,项目名称:streamsx.topology,代码行数:7,代码来源:test2_ec.py
示例4: test_TopologySourceItertools
def test_TopologySourceItertools(self):
topo = Topology('test_TopologySourceItertools')
hw = topo.source(itertools.repeat(9, 3))
hw = hw.filter(test_functions.check_asserts_disabled)
tester = Tester(topo)
tester.contents(hw, [9, 9, 9])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:ejpring,项目名称:streamsx.topology,代码行数:7,代码来源:test2.py
示例5: test_feedback_loop
def test_feedback_loop(self):
topo = Topology()
data = ['A','B', 'A', 'A', 'X', 'C', 'C', 'D', 'A', 'A', 'E']
expected = ['B', 'X', 'C', 'C', 'D', 'A', 'A', 'E']
s = topo.source(data)
s = s.filter(lambda t : time.sleep(1) or True).as_string();
feedback = PendingStream(topo)
df = op.Invoke(topo, 'spl.utility::DynamicFilter',
inputs = [s, feedback.stream],
schemas= [schema.CommonSchema.String])
df.params['key'] = df.attribute(s, 'string')
df.params['addKey'] = df.attribute(feedback.stream, 'string')
delayed_out = op.Map('spl.utility::Delay', df.outputs[0], params={'delay': 0.05}).stream
x = delayed_out.filter(lambda s : s == 'X').map(lambda s : 'A').as_string()
i = topo.source(['B', 'X', 'C', 'D', 'E']).as_string()
x = x.union({i})
feedback.complete(x)
result = delayed_out
result.print()
#streamsx.topology.context.submit('TOOLKIT', topo)
tester = Tester(topo)
tester.contents(result, expected)
tester.test(self.test_ctxtype, self.test_config)
开发者ID:vdogaru,项目名称:streamsx.topology,代码行数:31,代码来源:test2_pending.py
示例6: test_verify_no_pint
def test_verify_no_pint(self):
""" Verify pint is not installed on the service """
topo = Topology()
s = topo.source(down_a_pint_source)
tester = Tester(topo)
tester.contents(s, ['NoPintsForYou'])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:IBMStreams,项目名称:streamsx.topology,代码行数:7,代码来源:test_splpy_pip.py
示例7: test_opdriven_source
def test_opdriven_source(self):
topo = Topology()
s = topo.source(TimeCounter(iterations=30, period=0.1))
s.set_consistent(ConsistentRegionConfig.operator_driven(drain_timeout=40, reset_timeout=40, max_consecutive_attempts=3))
tester = Tester(topo)
self.assertFalse(tester.test(self.test_ctxtype, self.test_config, assert_on_fail=False))
开发者ID:IBMStreams,项目名称:streamsx.topology,代码行数:7,代码来源:test2_consistent.py
示例8: _run_app
def _run_app(self, kind, opi='M'):
schema = 'tuple<rstring a, int32 b>'
topo = Topology('TESPL' + str(uuid.uuid4().hex))
streamsx.spl.toolkit.add_toolkit(topo, stu._tk_dir('testtkpy'))
if opi == 'M':
data = [1,2,3]
se = topo.source(data)
se = se.map(lambda x : {'a':'hello', 'b':x} , schema=schema)
prim = op.Map(
"com.ibm.streamsx.topology.pytest.pyexceptions::" + kind,
se, params={'tf':self.tf})
res = prim.stream
elif opi == 'S':
prim = op.Source(
topo,
"com.ibm.streamsx.topology.pytest.pyexceptions::" + kind,
schema=schema, params={'tf':self.tf})
res = prim.stream
elif opi == 'E':
data = [1,2,3]
se = topo.source(data)
se = se.map(lambda x : {'a':'hello', 'b':x} , schema=schema)
prim = op.Sink(
"com.ibm.streamsx.topology.pytest.pyexceptions::" + kind,
se, params={'tf':self.tf})
res = None
tester = Tester(topo)
tester.run_for(3)
ok = tester.test(self.test_ctxtype, self.test_config, assert_on_fail=False)
self.assertFalse(ok)
开发者ID:IBMStreams,项目名称:streamsx.topology,代码行数:32,代码来源:test_splpy_exceptions.py
示例9: test_SPLHashFunc
def test_SPLHashFunc(self):
"""
Test hashing works when the schema is a general SPL one
using an explicit hash function.
"""
raw = []
for v in range(20):
raw.append(''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(v)))
data = []
for v in range(7):
data.extend(raw)
random.shuffle(data)
for width in (1,4):
with self.subTest(width=width):
topo = Topology("test_SPLHash" + str(width))
s = topo.source(data)
s = s.as_string()
f = op.Map('spl.relational::Functor', s,
schema = 'tuple<rstring string, rstring s2>')
f.s2 = f.output('string + "_1234"')
s = f.stream
s = s.parallel(width, Routing.HASH_PARTITIONED, s2_hash)
s = s.map(AddChannel())
s = s.end_parallel()
s = s.map(CheckSameChannel(lambda t : t[0]['s2']))
expected = []
for v in data:
expected.append(v + '_1234')
tester = Tester(topo)
tester.contents(s, expected, ordered=width==1)
tester.test(self.test_ctxtype, self.test_config)
print(tester.result)
开发者ID:ejpring,项目名称:streamsx.topology,代码行数:35,代码来源:test2_udp.py
示例10: test_StringHash
def test_StringHash(self):
"""
Test hashing works when the schema is tuple<rstring string>.
"""
raw = []
for v in range(20):
raw.append(''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(v)))
data = []
for v in range(7):
data.extend(raw)
random.shuffle(data)
for width in (1,3):
with self.subTest(width=width):
topo = Topology("test_StringHash" + str(width))
s = topo.source(data)
s = s.as_string()
s = s.parallel(width, Routing.HASH_PARTITIONED)
s = s.map(AddChannel())
s = s.end_parallel()
s = s.map(CheckSameChannel())
tester = Tester(topo)
tester.contents(s, data, ordered=width==1)
tester.test(self.test_ctxtype, self.test_config)
print(tester.result)
开发者ID:ejpring,项目名称:streamsx.topology,代码行数:26,代码来源:test2_udp.py
示例11: test_WindowPunctuation
def test_WindowPunctuation(self):
"""Trigger an aggregation 4 times. Ensure that window punctuations are submitted each time
by writing them to an output file, and then verifying that the file contains the correct
contents."""
topo = Topology()
s = topo.source([1,2,3,4])
# Aggregate and write to file.
s = s.last(1).trigger(1).aggregate(lambda x: x[0]+7)
# Ensure map/flat_map/filter passes window marks through.
s = s.flat_map(lambda x : [x])
s = s.filter(lambda x : True)
s = s.map(lambda x : (x,), schema='tuple<int32 z>')
op_params = {'file' : 'punct_file', 'writePunctuations' : True, 'flushOnPunctuation' : True}
op.Sink("spl.adapter::FileSink", s, params = op_params)
# Copy the config, since it's shared across all tests, and not every test needs a data
# directory.
cfg = self.test_config.copy()
jc = context.JobConfig(data_directory=os.getcwd())
jc.add(cfg)
tester = Tester(topo)
tester.test(self.test_ctxtype, cfg)
path = os.path.join(os.getcwd(), 'punct_file')
# Validate the contents of the file.
with open(path, 'r') as f:
file_contents = f.read()
self.assertEqual(expected_contents, file_contents)
os.remove(path)
开发者ID:vdogaru,项目名称:streamsx.topology,代码行数:33,代码来源:test2_python_window.py
示例12: test_blob_type
def test_blob_type(self):
topo = Topology()
streamsx.spl.toolkit.add_toolkit(topo, '../testtkpy')
data = ['Hello', 'Blob', 'Did', 'you', 'reset' ]
s = topo.source(data)
s = s.as_string()
toBlob = op.Map(
"com.ibm.streamsx.topology.pytest.pytypes::ToBlob",
s,
'tuple<blob b>')
toBlob = op.Map(
"com.ibm.streamsx.topology.pysamples.positional::Noop",
toBlob.stream,
'tuple<blob b>')
bt = op.Map(
"com.ibm.streamsx.topology.pytest.pytypes::BlobTest",
toBlob.stream,
'tuple<rstring string>',
{'keep': True})
bt2 = op.Map(
"com.ibm.streamsx.topology.pytest.pytypes::BlobTest",
toBlob.stream,
'tuple<rstring string>',
{'keep': False})
tester = Tester(topo)
tester.contents(bt.stream, data)
self.test_config['topology.keepArtifacts'] = True;
tester.test(self.test_ctxtype, self.test_config)
开发者ID:ejpring,项目名称:streamsx.topology,代码行数:33,代码来源:test_types.py
示例13: test_mixed_toolkits
def test_mixed_toolkits(self):
topo = Topology()
streamsx.spl.toolkit.add_toolkit(topo, stu._tk_dir('testtkpy'))
streamsx.spl.toolkit.add_toolkit(topo, stu._tk_dir('tk17'))
data = ['A']
bop = op.Source(topo, "spl.utility::Beacon",
'tuple<rstring a>',
{'iterations': 1})
bop.a = bop.output('"A"')
sv = op.Map(
"com.ibm.streamsx.topology.pytest.pyvers::StreamsxVersion",
bop.stream,
'tuple<rstring a, rstring v1, rstring v2>')
m17f = op.Map(
"com.ibm.streamsx.topology.pytest.tk17::M17F",
sv.stream,
'tuple<rstring a, rstring v1, rstring v2, rstring f1, rstring f2>')
m17c = op.Map(
"com.ibm.streamsx.topology.pytest.tk17::M17C",
m17f.stream,
'tuple<rstring a, rstring v1, rstring v2, rstring f1, rstring f2, rstring c1, rstring c2, int32 x>',
{'x': 27})
tester = Tester(topo)
tester.contents(m17c.stream, [{'a':'A', 'f1':'1.7', 'f2':'F', 'v1':'aggregate', 'v2':'True', 'c1':'1.7', 'c2':'C', 'x':27}])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:IBMStreams,项目名称:streamsx.topology,代码行数:29,代码来源:test_splpy_vers.py
示例14: setUp
def setUp(self):
Tester.setup_distributed(self)
# Tester.setup_standalone(self)
"""TEST: test_basic validate that the operator can talk
开发者ID:joergboe,项目名称:streamsx.inet,代码行数:7,代码来源:tupleTests.py
示例15: test_json_to_schema
def test_json_to_schema(self):
topo = Topology()
s = topo.source([{'a':7}, {'b':8}, {'c':9}]).as_json()
st = s.map(lambda x : (next(iter(x)), x[next(iter(x))]), schema='tuple<rstring y, int32 x>')
tester = Tester(topo)
tester.contents(st, [{'y':'a', 'x':7}, {'y':'b', 'x':8}, {'y':'c', 'x':9}])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:wmarshall484,项目名称:streamsx.topology,代码行数:8,代码来源:test2_python2spl.py
示例16: test_string_to_json
def test_string_to_json(self):
topo = Topology()
s = topo.source(['a', 79, 'c']).as_string()
st = s.map(lambda x: x if x == 'c' else {'v': x + 'd'}, schema=CommonSchema.Json)
tester = Tester(topo)
tester.contents(st, [{'v': 'ad'}, {'v': '79d'}, 'c'])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:wmarshall484,项目名称:streamsx.topology,代码行数:8,代码来源:test2_python2spl.py
示例17: test_string_to_string
def test_string_to_string(self):
topo = Topology()
s = topo.source([False, 'b', 19]).as_string()
st = s.map(lambda x: x + '3', schema=CommonSchema.String)
tester = Tester(topo)
tester.contents(st, ['False3', 'b3', '193'])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:wmarshall484,项目名称:streamsx.topology,代码行数:8,代码来源:test2_python2spl.py
示例18: test_string_to_schema_dict
def test_string_to_schema_dict(self):
topo = Topology()
s = topo.source(['a', 'b', 'c']).as_string()
st = s.map(lambda x : {'z': x+'dict!'}, schema='tuple<rstring z>')
tester = Tester(topo)
tester.contents(st, [{'z':'adict!'}, {'z':'bdict!'}, {'z':'cdict!'}])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:wmarshall484,项目名称:streamsx.topology,代码行数:8,代码来源:test2_python2spl.py
示例19: test_string_to_schema
def test_string_to_schema(self):
topo = Topology()
s = topo.source(['a', 'b', 'c']).as_string()
st = s.map(lambda x : (x+'struct!',), schema='tuple<rstring y>')
tester = Tester(topo)
tester.contents(st, [{'y':'astruct!'}, {'y':'bstruct!'}, {'y':'cstruct!'}])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:wmarshall484,项目名称:streamsx.topology,代码行数:8,代码来源:test2_python2spl.py
示例20: test_as_tuple_flat_map
def test_as_tuple_flat_map(self):
topo = Topology()
s = self._create_stream(topo)
st = s.flat_map(check_is_tuple_flat_map(self.is_named()))
tester = Tester(topo)
tester.contents(st, [(9,'2Hi!-FlatMap'), (18,'4Hi!-FlatMap'), (27,'6Hi!-FlatMap')])
tester.test(self.test_ctxtype, self.test_config)
开发者ID:vdogaru,项目名称:streamsx.topology,代码行数:8,代码来源:test2_schema_tuple.py
注:本文中的streamsx.topology.tester.Tester类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论