本文整理汇总了Python中solar.core.signals.connect函数的典型用法代码示例。如果您正苦于以下问题:Python connect函数的具体用法?Python connect怎么用?Python connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了connect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_input_dict_type
def test_input_dict_type(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
values:
schema: {a: int, b: int}
value: {}
""")
sample1 = self.create_resource('sample1', sample_meta_dir, {'values':
{'a': 1,
'b': 2}})
sample2 = self.create_resource('sample2', sample_meta_dir)
xs.connect(sample1, sample2)
self.assertEqual(sample1.args['values'], sample2.args['values'])
# Check update
sample1.update({'values': {'a': 2}})
self.assertEqual(sample1.args['values'], {'a': 2})
self.assertEqual(sample1.args['values'], sample2.args['values'], )
# Check disconnect
# TODO: should sample2.value be reverted to original value?
sample1.disconnect(sample2)
sample1.update({'values': {'a': 3}})
self.assertEqual(sample1.args['values'], {'a': 3})
开发者ID:aglarendil,项目名称:solar,代码行数:28,代码来源:test_signals.py
示例2: connect_list_to_each
def connect_list_to_each(self, resources, args={}, events=None):
"""Connect each resource in self.resources to each resource in resources.
args -- optional list of arguments
"{emitter_num}" -- substitutes for emitter's index in args (from
self.resources)
"{receiver_num}" -- substitutes for receiver's index in args (from
resources argument)
"""
for emitter_num, emitter in enumerate(self.resources):
for receiver_num, receiver in enumerate(resources.resources):
kwargs = {
'emitter_num': emitter_num,
'receiver_num': receiver_num,
}
args_fmt = self.args_fmt(args, kwargs)
signals.connect(
emitter,
receiver,
mapping=args_fmt,
events=events
)
开发者ID:CGenie,项目名称:solar,代码行数:25,代码来源:template.py
示例3: setup_resources
def setup_resources():
ModelMeta.remove_all()
node2 = vr.create('node2', 'resources/ro_node/', {
'ip': '10.0.0.4',
'ssh_key': '/vagrant/.vagrant/machines/solar-dev2/virtualbox/private_key',
'ssh_user': 'vagrant'
})[0]
solar_bootstrap2 = vr.create('solar_bootstrap2', 'resources/solar_bootstrap', {'master_ip': '10.0.0.2'})[0]
signals.connect(node2, solar_bootstrap2)
has_errors = False
for r in locals().values():
if not isinstance(r, resource.Resource):
continue
print 'Validating {}'.format(r.name)
errors = validation.validate_resource(r)
if errors:
has_errors = True
print 'ERROR: %s: %s' % (r.name, errors)
if has_errors:
sys.exit(1)
开发者ID:aglarendil,项目名称:solar,代码行数:26,代码来源:example-bootstrap.py
示例4: deploy
def deploy(filename):
with open(filename) as f:
config = yaml.load(f)
workdir = config['workdir']
resource_save_path = os.path.join(workdir, config['resource-save-path'])
# Clean stuff first
db.clear()
xs.Connections.clear()
shutil.rmtree(resource_save_path, ignore_errors=True)
os.makedirs(resource_save_path)
# Create resources first
for resource_definition in config['resources']:
name = resource_definition['name']
model = os.path.join(workdir, resource_definition['model'])
args = resource_definition.get('args', {})
log.debug('Creating %s %s %s %s', name, model, resource_save_path, args)
xr.create(name, model, resource_save_path, args=args)
# Create resource connections
for connection in config['connections']:
emitter = db.get_resource(connection['emitter'])
receiver = db.get_resource(connection['receiver'])
mapping = connection.get('mapping')
log.debug('Connecting %s %s %s', emitter.name, receiver.name, mapping)
xs.connect(emitter, receiver, mapping=mapping)
# Run all tests
if 'test-suite' in config:
log.debug('Running tests from %s', config['test-suite'])
test_suite = __import__(config['test-suite'], {}, {}, ['main'])
test_suite.main()
开发者ID:sand8080,项目名称:solar,代码行数:34,代码来源:deployment.py
示例5: create_virtual_resource
def create_virtual_resource(vr_name, template):
resources = template["resources"]
connections = []
created_resources = []
cwd = os.getcwd()
for resource in resources:
name = resource["id"]
base_path = os.path.join(cwd, resource["from"])
args = resource["values"]
new_resources = create(name, base_path, args, vr_name)
created_resources += new_resources
if not is_virtual(base_path):
for key, arg in args.items():
if isinstance(arg, basestring) and "::" in arg:
emitter, src = arg.split("::")
connections.append((emitter, name, {src: key}))
db = load_all()
for emitter, reciver, mapping in connections:
emitter = db[emitter]
reciver = db[reciver]
signals.connect(emitter, reciver, mapping)
return created_resources
开发者ID:CGenie,项目名称:solar,代码行数:26,代码来源:virtual_resource.py
示例6: connect_with_events
def connect_with_events(self, receiver, mapping=None, events=None,
use_defaults=False):
signals.connect(self, receiver, mapping=mapping)
if use_defaults:
api.add_default_events(self, receiver)
if events:
api.add_events(self.name, events)
开发者ID:rustyrobot,项目名称:solar,代码行数:7,代码来源:resource.py
示例7: test_revert_removed_child
def test_revert_removed_child():
res1 = orm.DBResource(id='test1', name='test1', base_path='x')
res1.save()
res1.add_input('a', 'str', '9')
res2 = orm.DBResource(id='test2', name='test2', base_path='x')
res2.save()
res2.add_input('a', 'str', 0)
res1 = resource.load('test1')
res2 = resource.load('test2')
signals.connect(res1, res2)
staged_log = change.stage_changes()
assert len(staged_log) == 2
for item in staged_log:
operations.move_to_commited(item.log_action)
res2.remove()
staged_log = change.stage_changes()
assert len(staged_log) == 1
logitem = next(staged_log.collection())
operations.move_to_commited(logitem.log_action)
with mock.patch.object(resource, 'read_meta') as mread:
mread.return_value = {'input': {'a': {'schema': 'str!'}}}
change.revert(logitem.uid)
res2 = resource.load('test2')
assert res2.args == {'a': '9'}
开发者ID:rustyrobot,项目名称:solar,代码行数:30,代码来源:test_system_log_api.py
示例8: test_no_cycles
def test_no_cycles(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
value:
schema: str!
value:
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 'x'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'value': 'y'}
)
xs.connect(sample1, sample2)
with self.assertRaisesRegexp(
Exception,
'Prevented creating a cycle'):
xs.connect(sample2, sample1)
开发者ID:rustyrobot,项目名称:solar,代码行数:25,代码来源:test_signals.py
示例9: test_revert_removed_child
def test_revert_removed_child():
res1 = orm.DBResource(id="test1", name="test1", base_path="x") # NOQA
res1.save()
res1.add_input("a", "str", "9")
res2 = orm.DBResource(id="test2", name="test2", base_path="x") # NOQA
res2.save()
res2.add_input("a", "str", 0)
res1 = resource.load("test1")
res2 = resource.load("test2")
signals.connect(res1, res2)
staged_log = change.stage_changes()
assert len(staged_log) == 2
for item in staged_log:
operations.move_to_commited(item.log_action)
res2.remove()
staged_log = change.stage_changes()
assert len(staged_log) == 1
logitem = next(staged_log.collection())
operations.move_to_commited(logitem.log_action)
with mock.patch.object(resource, "read_meta") as mread:
mread.return_value = {"input": {"a": {"schema": "str!"}}}
change.revert(logitem.uid)
res2 = resource.load("test2")
assert res2.args == {"a": "9"}
开发者ID:aglarendil,项目名称:solar,代码行数:30,代码来源:test_system_log_api.py
示例10: prepare_nodes
def prepare_nodes(nodes_count):
resources = vr.create("nodes", "templates/nodes_with_transports.yaml", {"count": nodes_count})
nodes = [x for x in resources if x.name.startswith("node")]
resources = vr.create("nodes_network", "templates/nodes_network.yaml", {"count": nodes_count})
nodes_sdn = [x for x in resources if x.name.startswith("node")]
r = {}
for node, node_sdn in zip(nodes, nodes_sdn):
r[node.name] = node
r[node_sdn.name] = node_sdn
# LIBRARIAN
librarian = vr.create("librarian_{}".format(node.name), "resources/librarian", {})[0]
r[librarian.name] = librarian
node.connect(librarian, {})
# NETWORKING
# TODO(bogdando) node's IPs should be populated as br-mgmt IPs, but now are hardcoded in templates
signals.connect(node, node_sdn)
node_sdn.connect_with_events(librarian, {"module": "modules"}, {})
evapi.add_dep(librarian.name, node_sdn.name, actions=("run", "update"))
signals.connect(node, node_sdn)
node_sdn.connect_with_events(librarian, {"module": "modules"}, {})
evapi.add_dep(librarian.name, node_sdn.name, actions=("run", "update"))
return r
开发者ID:prmtl,项目名称:solar,代码行数:28,代码来源:openstack.py
示例11: connect_list_to_each
def connect_list_to_each(self, resources, mapping=None, events=None):
"""Connect each resource in self.resources to each resource in resources.
mapping -- optional mapping
"{emitter_num}" -- substitutes for emitter's index in mapping (from
self.resources)
"{receiver_num}" -- substitutes for receiver's index in mapping (from
resources argument)
"""
mapping = mapping or {}
for emitter_num, emitter in enumerate(self.resources):
for receiver_num, receiver in enumerate(resources.resources):
kwargs = {
'emitter_num': emitter_num,
'receiver_num': receiver_num,
}
mapping_fmt = self.args_fmt(mapping, kwargs)
signals.connect(
emitter,
receiver,
mapping=mapping_fmt,
events=events
)
开发者ID:rustyrobot,项目名称:solar,代码行数:26,代码来源:template.py
示例12: prepare_nodes
def prepare_nodes(nodes_count):
resources = cr.create('nodes', 'templates/nodes', {"count": nodes_count})
nodes = resources.like('node')
resources = cr.create('nodes_network', 'templates/nodes_network', {"count": nodes_count})
nodes_sdn = resources.like('node')
r = {}
for node, node_sdn in zip(nodes, nodes_sdn):
r[node.name] = node
r[node_sdn.name] = node_sdn
# LIBRARIAN
librarian = cr.create('librarian_{}'.format(node.name), 'resources/librarian', {})[0]
r[librarian.name] = librarian
node.connect(librarian, {})
# NETWORKING
# TODO(bogdando) node's IPs should be populated as br-mgmt IPs, but now are hardcoded in templates
signals.connect(node, node_sdn)
node_sdn.connect_with_events(librarian, {'module': 'modules'}, {})
evapi.add_dep(librarian.name, node_sdn.name, actions=('run', 'update'))
signals.connect(node, node_sdn)
node_sdn.connect_with_events(librarian, {'module': 'modules'}, {})
evapi.add_dep(librarian.name, node_sdn.name, actions=('run', 'update'))
return r
开发者ID:pigmej,项目名称:solar-resources,代码行数:28,代码来源:openstack.py
示例13: test_hash_input_with_multiple_connections
def test_hash_input_with_multiple_connections(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
ip:
schema: str!
value:
""")
receiver_meta_dir = self.make_resource_meta("""
id: receiver
handler: ansible
version: 1.0.0
input:
ip:
schema: str!
value:
server:
schema: {ip: str!}
""")
sample = self.create_resource('sample',
sample_meta_dir,
args={'ip': '10.0.0.1'})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample, receiver, mapping={'ip': ['ip', 'server:ip']})
self.assertEqual(sample.args['ip'], receiver.args['ip'])
self.assertDictEqual({'ip': sample.args['ip']},
receiver.args['server'], )
开发者ID:aglarendil,项目名称:solar,代码行数:30,代码来源:test_signals.py
示例14: test_simple_observer_unsubscription
def test_simple_observer_unsubscription(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
ip:
schema: str
value:
""")
sample = self.create_resource('sample', sample_meta_dir, {'ip': None})
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
xs.connect(sample1, sample)
self.assertEqual(sample1.args['ip'], sample.args['ip'])
xs.connect(sample2, sample)
self.assertEqual(sample2.args['ip'], sample.args['ip'])
# sample should be unsubscribed from sample1 and subscribed to sample2
sample2.update({'ip': '10.0.0.3'})
self.assertEqual(sample2.args['ip'], sample.args['ip'])
开发者ID:aglarendil,项目名称:solar,代码行数:26,代码来源:test_signals.py
示例15: test_hash_input_with_list
def test_hash_input_with_list(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
ip:
schema: str!
value:
port:
schema: int!
value:
""")
receiver_meta_dir = self.make_resource_meta("""
id: receiver
handler: ansible
version: 1.0.0
input:
server:
schema: [{ip: str!, port: int!}]
""")
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample1,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'],
'port': sample1.args['port']}],
receiver.args['server'], )
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
xs.connect(sample2,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'],
'port': sample1.args['port']}, {'ip': sample2.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
sample1.disconnect(receiver)
self.assertItemsEqual(
[{'ip': sample2.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
开发者ID:aglarendil,项目名称:solar,代码行数:57,代码来源:test_signals.py
示例16: creating_process
def creating_process():
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {}
)
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])
开发者ID:CGenie,项目名称:solar,代码行数:9,代码来源:test_resource.py
示例17: update_inputs
def update_inputs(child, args):
child = load_resource(child)
connections, assignments = parse_inputs(args)
for c in connections:
mapping = {}
parent = load_resource(c['parent'])
events = c['events']
mapping[c['parent_input']] = c['child_input']
signals.connect(parent, child, mapping, events)
child.update(assignments)
开发者ID:michalskalski,项目名称:solar,代码行数:11,代码来源:virtual_resource.py
示例18: _revert_remove
def _revert_remove(logitem):
"""Resource should be created with all previous connections
"""
commited = orm.DBCommitedState.load(logitem.res)
args = dictdiffer.revert(logitem.diff, commited.inputs)
connections = dictdiffer.revert(logitem.signals_diff, sorted(commited.connections))
resource.Resource(logitem.res, logitem.base_path, args=args, tags=commited.tags)
for emitter, emitter_input, receiver, receiver_input in connections:
emmiter_obj = resource.load(emitter)
receiver_obj = resource.load(receiver)
signals.connect(emmiter_obj, receiver_obj, {emitter_input: receiver_input})
开发者ID:rustyrobot,项目名称:solar,代码行数:11,代码来源:change.py
示例19: test_multiple_resource_disjoint_connect
def test_multiple_resource_disjoint_connect(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
ip:
schema: string
value:
port:
schema: int
value:
""")
sample_ip_meta_dir = self.make_resource_meta("""
id: sample-ip
handler: ansible
version: 1.0.0
input:
ip:
schema: string
value:
""")
sample_port_meta_dir = self.make_resource_meta("""
id: sample-port
handler: ansible
version: 1.0.0
input:
port:
schema: int
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'ip': None, 'port': None}
)
sample_ip = self.create_resource(
'sample-ip', sample_ip_meta_dir, {'ip': '10.0.0.1'}
)
sample_port = self.create_resource(
'sample-port', sample_port_meta_dir, {'port': '8000'}
)
xs.connect(sample_ip, sample)
xs.connect(sample_port, sample)
self.assertEqual(sample.args['ip'], sample_ip.args['ip'])
self.assertEqual(sample.args['port'], sample_port.args['port'])
self.assertEqual(
sample.args['ip'].emitter,
sample_ip.args['ip']
)
self.assertEqual(
sample.args['port'].emitter,
sample_port.args['port']
)
开发者ID:CGenie,项目名称:solar,代码行数:53,代码来源:test_signals.py
示例20: test_backtrack_list
def test_backtrack_list(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
value:
schema: str!
value:
""")
sample_list_meta_dir = self.make_resource_meta("""
id: sample_list
handler: ansible
version: 1.0.0
input:
values:
schema: [str!]
value:
""")
sample_list = self.create_resource(
'sample_list', sample_list_meta_dir
)
vi = sample_list.resource_inputs()['values']
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 'x'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'value': 'y'}
)
sample3 = self.create_resource(
'sample3', sample_meta_dir, {'value': 'z'}
)
self.assertEqual(vi.backtrack_value_emitter(), vi)
# [sample1] -> sample_list
signals.connect(sample1, sample_list, {'value': 'values'})
self.assertEqual(vi.backtrack_value_emitter(),
[sample1.resource_inputs()['value']])
# [sample3, sample1] -> sample_list
signals.connect(sample3, sample_list, {'value': 'values'})
self.assertSetEqual(set(vi.backtrack_value_emitter()),
set([sample1.resource_inputs()['value'],
sample3.resource_inputs()['value']]))
# sample2 disconnected
signals.disconnect(sample1, sample_list)
self.assertEqual(vi.backtrack_value_emitter(),
[sample3.resource_inputs()['value']])
开发者ID:torgartor21,项目名称:solar,代码行数:50,代码来源:test_orm.py
注:本文中的solar.core.signals.connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论