def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
# set id and name (as they are set in base class call)
self.id = string.replace('%s_%d' % (os.uname()[1], os.getpid()), ".", "_")
self.name = "cc_agent_%s" % self.id
Container.instance = self
# TODO: Bug: Replacing CFG instance not work because references are already public. Update directly
dict_merge(CFG, kwargs)
from pyon.core import bootstrap
bootstrap.sys_name = CFG.system.name or bootstrap.sys_name
log.debug("Container (sysname=%s) initializing ..." % bootstrap.sys_name)
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = DictModifier(CFG, kwargs)
# Load object and service registry etc.
bootstrap_pyon()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
log.debug("Container initialized, OK.")
开发者ID:wfrench,项目名称:pyon,代码行数:34,代码来源:cc.py
示例2: setUp
def setUp(self):
self.ex_manager = ExchangeManager(Mock())
self.ex_manager._transport = Mock(BaseTransport)
# mock out _client, which is a property, to return a sentinel
propmock = Mock()
propmock.__get__ = Mock(return_value=sentinel.client)
patcher = patch.object(ExchangeManager, '_client', propmock)
patcher.start()
self.addCleanup(patcher.stop)
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
self._is_started = False
# set id and name (as they are set in base class call)
self.id = string.replace('%s_%d' % (os.uname()[1], os.getpid()), ".", "_")
self.name = "cc_agent_%s" % self.id
Container.instance = self
# TODO: Bug: Replacing CFG instance not work because references are already public. Update directly
dict_merge(CFG, kwargs, inplace=True)
from pyon.core import bootstrap
bootstrap.container_instance = self
bootstrap.assert_configuration(CFG)
log.debug("Container (sysname=%s) initializing ..." % bootstrap.get_sys_name())
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = kwargs
# Load object and service registry etc.
bootstrap_pyon()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
# File System - Interface to the OS File System, using correct path names and setups
self.file_system = FileSystem(CFG)
# Governance Controller - manages the governance related interceptors
self.governance_controller = GovernanceController(self)
# sFlow manager - controls sFlow stat emission
self.sflow_manager = SFlowManager(self)
# Coordinates the container start
self._is_started = False
self._capabilities = []
self._status = "INIT"
# protection for when the container itself is used as a Process for clients
self.container = self
log.debug("Container initialized, OK.")
开发者ID:ooici-dm,项目名称:pyon,代码行数:54,代码来源:cc.py
示例4: __init__
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
self._is_started = False
# set container id and cc_agent name (as they are set in base class call)
self.id = get_default_container_id()
self.name = "cc_agent_%s" % self.id
self._capabilities = []
from pyon.core import bootstrap
bootstrap.container_instance = self
Container.instance = self
log.debug("Container (sysname=%s) initializing ..." % bootstrap.get_sys_name())
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = kwargs
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
# TODO: Do not start a capability here. Symmetric start/stop
self.datastore_manager.start()
self._capabilities.append("DATASTORE_MANAGER")
# Instantiate Directory
self.directory = Directory()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# File System - Interface to the OS File System, using correct path names and setups
self.file_system = FileSystem(CFG)
# Governance Controller - manages the governance related interceptors
self.governance_controller = GovernanceController(self)
# sFlow manager - controls sFlow stat emission
self.sflow_manager = SFlowManager(self)
# Coordinates the container start
self._status = "INIT"
# protection for when the container itself is used as a Process for clients
self.container = self
log.debug("Container initialized, OK.")
开发者ID:swarbhanu,项目名称:pyon,代码行数:53,代码来源:cc.py
示例5: setUp
def setUp(self):
self.ex_manager = ExchangeManager(Mock())
self.pt = Mock(spec=BaseTransport)
self.ex_manager.get_transport = Mock(return_value=self.pt)
# set up some nodes
self.ex_manager._nodes = {'primary': Mock(), 'priviledged': Mock()}
# patch for setUp and test
self.patch_cfg('pyon.ion.exchange.CFG', {'container':{'exchange':{'auto_register':False}}, 'messaging':{'server':{}}})
# start ex manager
self.ex_manager.start()
def begin(self):
self._active_queues = set()
self._test_changes = {}
self._queues_declared = [] # ordered list of queues declared
self._queues = defaultdict(list) # queue name -> list of accesses
from pyon.ion.exchange import ExchangeManager
from pyon.util.containers import DotDict
from pyon.core.bootstrap import CFG
from mock import Mock
containermock = Mock()
containermock.resource_registry.find_resources.return_value = ([], None)
self.ex_manager = ExchangeManager(containermock) # needs to be able to setattr
self.ex_manager._nodes['priviledged'] = DotDict(client=DotDict(parameters=DotDict(host=CFG.get_safe('server.amqp.host', 'localhost'))))
class Container(BaseContainerAgent):
"""
The Capability Container. Its purpose is to spawn/monitor processes and services
that do the bulk of the work in the ION system. It also manages connections to the Exchange
and the various forms of datastores in the systems.
"""
# Singleton static variables
#node = None
id = None
name = None
pidfile = None
instance = None
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
self._is_started = False
# set container id and cc_agent name (as they are set in base class call)
self.id = get_default_container_id()
self.name = "cc_agent_%s" % self.id
self._capabilities = []
bootstrap.container_instance = self
Container.instance = self
log.debug("Container (sysname=%s) initializing ..." % bootstrap.get_sys_name())
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = kwargs
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
# TODO: Do not start a capability here. Symmetric start/stop
self.datastore_manager.start()
self._capabilities.append("DATASTORE_MANAGER")
# Instantiate Directory
self.directory = Directory()
# internal router
self.local_router = None
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# File System - Interface to the OS File System, using correct path names and setups
self.file_system = FileSystem(CFG)
# Governance Controller - manages the governance related interceptors
self.governance_controller = GovernanceController(self)
# sFlow manager - controls sFlow stat emission
self.sflow_manager = SFlowManager(self)
# Coordinates the container start
self._status = "INIT"
# protection for when the container itself is used as a Process for clients
self.container = self
# publisher, initialized in start()
self.event_pub = None
# context-local storage
self.context = LocalContextMixin()
log.debug("Container initialized, OK.")
def start(self):
log.debug("Container starting...")
if self._is_started:
raise ContainerError("Container already started")
# Check if this UNIX process already runs a Container.
self.pidfile = "cc-pid-%d" % os.getpid()
if os.path.exists(self.pidfile):
raise ContainerError("Container.on_start(): Container is a singleton per UNIX process. Existing pid file found: %s" % self.pidfile)
# write out a PID file containing our agent messaging name
with open(self.pidfile, 'w') as f:
pid_contents = {'messaging': dict(CFG.server.amqp),
'container-agent': self.name,
'container-xp': bootstrap.get_sys_name()}
f.write(msgpack.dumps(pid_contents))
atexit.register(self._cleanup_pid)
self._capabilities.append("PID_FILE")
# set up abnormal termination handler for this container
def handl(signum, frame):
try:
self._cleanup_pid() # cleanup the pidfile first
self.quit() # now try to quit - will not error on second cleanup pidfile call
#.........这里部分代码省略.........
class QueueBlame(Plugin):
name = 'queueblame'
def __init__(self):
Plugin.__init__(self)
import uuid
self.ds_name = "queueblame-%s" % str(uuid.uuid4())[0:6]
self.queues_by_test = defaultdict(lambda: defaultdict(dict))
def options(self, parser, env):
super(QueueBlame, self).options(parser, env=env)
parser.add_option('--queueblame-purge', action='store_true', dest='queueblame_purge', help='Purge queues with leftover messages and remove all bindings')
def configure(self, options, conf):
"""Configure the plugin and system, based on selected options."""
super(QueueBlame, self).configure(options, conf)
self._queueblame_purge = options.queueblame_purge
def begin(self):
self._active_queues = set()
self._test_changes = {}
self._queues_declared = [] # ordered list of queues declared
self._queues = defaultdict(list) # queue name -> list of accesses
from pyon.ion.exchange import ExchangeManager
from pyon.util.containers import DotDict
from pyon.core.bootstrap import CFG
from mock import Mock
containermock = Mock()
containermock.resource_registry.find_resources.return_value = ([], None)
self.ex_manager = ExchangeManager(containermock) # needs to be able to setattr
self.ex_manager._nodes['priviledged'] = DotDict(client=DotDict(parameters=DotDict(host=CFG.get_safe('server.amqp.host', 'localhost'))))
def finalize(self, result):
pass
def beforeTest(self, test):
self._pre_defs = self.ex_manager.get_definitions()
import os
os.environ['QUEUE_BLAME'] = str(test.id())
def afterTest(self, test):
import os
from pyon.core.bootstrap import get_sys_name # can't guarantee exclusive access
#os.environ.pop('QUEUE_BLAME')
tid = test.id()
post_defs = self.ex_manager.get_definitions()
# diff the defs
pre_queues = {str(x['name']) for x in self._pre_defs['queues']}
post_queues = {str(x['name']) for x in post_defs['queues']}
pre_exchanges = {str(x['name']) for x in self._pre_defs['exchanges']}
post_exchanges = {str(x['name']) for x in post_defs['exchanges']}
pre_binds = { (x['source'], x['destination'], x['routing_key']) for x in self._pre_defs['bindings'] if x['destination_type'] == 'queue' }
post_binds = { (x['source'], x['destination'], x['routing_key']) for x in post_defs['bindings'] if x['destination_type'] == 'queue' }
queue_diff_add = post_queues.difference(pre_queues)
exchange_diff_add = post_exchanges.difference(pre_exchanges)
binds_diff_add = post_binds.difference(pre_binds)
queue_diff_sub = pre_queues.difference(post_queues)
exchange_diff_sub = pre_exchanges.difference(post_exchanges)
binds_diff_sub = pre_binds.difference(post_binds)
# maintain active queue set
map(self._active_queues.add, queue_diff_add)
map(self._active_queues.discard, queue_diff_sub)
# maintain changelog for tests
self._test_changes[tid] = (queue_diff_add, queue_diff_sub, exchange_diff_add, exchange_diff_sub, binds_diff_add, binds_diff_sub)
# add any new leftover queues to the list
for q in queue_diff_add:
if not q in self._queues_declared:
self._queues_declared.append(q)
# get stats about each leftover queue and record the access
raw_queues_list = self.ex_manager._list_queues()
raw_queues = { str(x['name']) : x for x in raw_queues_list }
for q in self._queues_declared:
# detect if queue has been deleted (and not readded)
if len(self._queues[q]) > 0 and isinstance(self._queues[q][-1], str) and not q in queue_diff_add:
continue
# did we just delete it this test? add the sentinel
if q in queue_diff_sub:
self._queues[q].append(tid)
#.........这里部分代码省略.........
class Container(BaseContainerAgent):
"""
The Capability Container. Its purpose is to spawn/monitor processes and services
that do the bulk of the work in the ION system.
"""
# Singleton static variables
node = None
id = None
name = None
pidfile = None
instance = None
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
self._is_started = False
# set id and name (as they are set in base class call)
self.id = string.replace('%s_%d' % (os.uname()[1], os.getpid()), ".", "_")
self.name = "cc_agent_%s" % self.id
Container.instance = self
# TODO: Bug: Replacing CFG instance not work because references are already public. Update directly
dict_merge(CFG, kwargs, inplace=True)
from pyon.core import bootstrap
bootstrap.container_instance = self
bootstrap.assert_configuration(CFG)
log.debug("Container (sysname=%s) initializing ..." % bootstrap.get_sys_name())
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = kwargs
# Load object and service registry etc.
bootstrap_pyon()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
# File System - Interface to the OS File System, using correct path names and setups
self.file_system = FileSystem(CFG)
# Governance Controller - manages the governance related interceptors
self.governance_controller = GovernanceController(self)
# sFlow manager - controls sFlow stat emission
self.sflow_manager = SFlowManager(self)
# Coordinates the container start
self._is_started = False
self._capabilities = []
self._status = "INIT"
# protection for when the container itself is used as a Process for clients
self.container = self
log.debug("Container initialized, OK.")
def start(self):
log.debug("Container starting...")
if self._is_started:
raise ContainerError("Container already started")
# Check if this UNIX process already runs a Container.
self.pidfile = "cc-pid-%d" % os.getpid()
if os.path.exists(self.pidfile):
raise ContainerError("Container.on_start(): Container is a singleton per UNIX process. Existing pid file found: %s" % self.pidfile)
# write out a PID file containing our agent messaging name
with open(self.pidfile, 'w') as f:
pid_contents = {'messaging': dict(CFG.server.amqp),
'container-agent': self.name,
'container-xp': bootstrap.get_sys_name() }
f.write(msgpack.dumps(pid_contents))
atexit.register(self._cleanup_pid)
self._capabilities.append("PID_FILE")
# set up abnormal termination handler for this container
def handl(signum, frame):
try:
self._cleanup_pid() # cleanup the pidfile first
self.quit() # now try to quit - will not error on second cleanup pidfile call
finally:
signal.signal(signal.SIGTERM, self._normal_signal)
os.kill(os.getpid(), signal.SIGTERM)
self._normal_signal = signal.signal(signal.SIGTERM, handl)
self._capabilities.append("EXCHANGE_CONNECTION")
#.........这里部分代码省略.........
开发者ID:ooici-dm,项目名称:pyon,代码行数:101,代码来源:cc.py
示例14: Container
class Container(BaseContainerAgent):
"""
The Capability Container. Its purpose is to spawn/monitor processes and services
that do the bulk of the work in the ION system.
"""
node = None
id = None
name = None
pidfile = None
instance = None
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
# set id and name (as they are set in base class call)
self.id = string.replace('%s_%d' % (os.uname()[1], os.getpid()), ".", "_")
self.name = "cc_agent_%s" % self.id
Container.instance = self
# TODO: Bug: Replacing CFG instance not work because references are already public. Update directly
dict_merge(CFG, kwargs)
from pyon.core import bootstrap
bootstrap.sys_name = CFG.system.name or bootstrap.sys_name
log.debug("Container (sysname=%s) initializing ..." % bootstrap.sys_name)
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = DictModifier(CFG, kwargs)
# Load object and service registry
bootstrap_pyon()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
log.debug("Container initialized, OK.")
def start(self):
log.debug("Container starting...")
# Check if this UNIX process already runs a Container.
self.pidfile = "cc-pid-%d" % os.getpid()
if os.path.exists(self.pidfile):
raise Exception("Container.on_start(): Container is a singleton per UNIX process. Existing pid file found: %s" % self.pidfile)
# write out a PID file containing our agent messaging name
with open(self.pidfile, 'w') as f:
from pyon.core.bootstrap import sys_name
pid_contents = {'messaging': dict(CFG.server.amqp),
'container-agent': self.name,
'container-xp': sys_name }
f.write(msgpack.dumps(pid_contents))
atexit.register(self._cleanup_pid)
# set up abnormal termination handler for this container
def handl(signum, frame):
try:
self._cleanup_pid() # cleanup the pidfile first
self.quit() # now try to quit - will not error on second cleanup pidfile call
finally:
signal.signal(signal.SIGTERM, self._normal_signal)
os.kill(os.getpid(), signal.SIGTERM)
self._normal_signal = signal.signal(signal.SIGTERM, handl)
# Start ExchangeManager. In particular establish broker connection
self.ex_manager.start()
# TODO: Move this in ExchangeManager - but there is an error
self.node, self.ioloop = messaging.make_node() # TODO: shortcut hack
# Instantiate Directory singleton and self-register
# TODO: At this point, there is no special config override
self.directory = Directory()
self.directory.register("/Containers", self.id, cc_agent=self.name)
self.proc_manager.start()
self.app_manager.start()
# Start the CC-Agent API
rsvc = ProcessRPCServer(node=self.node, name=self.name, service=self, process=self)
# Start an ION process with the right kind of endpoint factory
self.proc_manager.proc_sup.spawn((CFG.cc.proctype or 'green', None), listener=rsvc)
rsvc.get_ready_event().wait(timeout=10) # @TODO: no hardcode
log.info("Container started, OK.")
def serve_forever(self):
""" Run the container until killed. """
log.debug("In Container.serve_forever")
#.........这里部分代码省略.........
请发表评论