本文整理汇总了Python中st2common.config.parse_args函数的典型用法代码示例。如果您正苦于以下问题:Python parse_args函数的具体用法?Python parse_args怎么用?Python parse_args使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_args函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main():
_monkey_patch()
cli_opts = [
cfg.StrOpt('timestamp', default=None,
help='Will delete data older than ' +
'this timestamp. (default 48 hours). ' +
'Example value: 2015-03-13T19:01:27.255542Z'),
cfg.StrOpt('action-ref', default='',
help='action-ref to delete executions for.')
]
do_register_cli_opts(cli_opts)
config.parse_args()
# Get config values
timestamp = cfg.CONF.timestamp
action_ref = cfg.CONF.action_ref
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
# Connect to db.
db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
username=username, password=password)
if not timestamp:
now = datetime.now()
timestamp = now - timedelta(days=DEFAULT_TIMEDELTA_DAYS)
else:
timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')
# Purge models.
_purge_executions(timestamp=timestamp, action_ref=action_ref)
# Disconnect from db.
db_teardown()
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:35,代码来源:purge_executions.py
示例2: main
def main():
config.parse_args()
# Connect to db.
db_setup()
# Migrate rules.
migrate_rules()
# Disconnect from db.
db_teardown()
开发者ID:E-LLP,项目名称:st2,代码行数:11,代码来源:migrate_rules_to_include_pack.py
示例3: main
def main():
monkey_patch()
cli_opts = [
cfg.IntOpt('rate', default=100,
help='Rate of trigger injection measured in instances in per sec.' +
' Assumes a default exponential distribution in time so arrival is poisson.'),
cfg.ListOpt('triggers', required=False,
help='List of triggers for which instances should be fired.' +
' Uniform distribution will be followed if there is more than one' +
'trigger.'),
cfg.StrOpt('schema_file', default=None,
help='Path to schema file defining trigger and payload.'),
cfg.IntOpt('duration', default=60,
help='Duration of stress test in seconds.'),
cfg.BoolOpt('max-throughput', default=False,
help='If True, "rate" argument will be ignored and this script will try to '
'saturize the CPU and achieve max utilization.')
]
do_register_cli_opts(cli_opts)
config.parse_args()
# Get config values
triggers = cfg.CONF.triggers
trigger_payload_schema = {}
if not triggers:
if (cfg.CONF.schema_file is None or cfg.CONF.schema_file == '' or
not os.path.exists(cfg.CONF.schema_file)):
print('Either "triggers" need to be provided or a schema file containing' +
' triggers should be provided.')
return
with open(cfg.CONF.schema_file) as fd:
trigger_payload_schema = yaml.safe_load(fd)
triggers = list(trigger_payload_schema.keys())
print('Triggers=%s' % triggers)
rate = cfg.CONF.rate
rate_per_trigger = int(rate / len(triggers))
duration = cfg.CONF.duration
max_throughput = cfg.CONF.max_throughput
if max_throughput:
rate = 0
rate_per_trigger = 0
dispatcher_pool = eventlet.GreenPool(len(triggers))
for trigger in triggers:
payload = trigger_payload_schema.get(trigger, {})
dispatcher_pool.spawn(_inject_instances, trigger, rate_per_trigger, duration,
payload=payload, max_throughput=max_throughput)
eventlet.sleep(random.uniform(0, 1))
dispatcher_pool.waitall()
开发者ID:nzlosh,项目名称:st2,代码行数:54,代码来源:st2-inject-trigger-instances.py
示例4: _setup
def _setup():
config.parse_args()
# 2. setup logging.
logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s',
level=logging.DEBUG)
# 3. all other setup which requires config to be parsed and logging to
# be correctly setup.
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
username=username, password=password)
开发者ID:bjoernbessert,项目名称:st2,代码行数:13,代码来源:bootstrap.py
示例5: main
def main():
config.parse_args()
# Connect to db.
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
username=username, password=password)
# Migrate rules.
migrate_rules()
# Disconnect from db.
db_teardown()
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:14,代码来源:migrate_rules_to_include_pack.py
示例6: main
def main():
_monkey_patch()
cli_opts = [
cfg.BoolOpt("sensors", default=False, help="diff sensor alone."),
cfg.BoolOpt("actions", default=False, help="diff actions alone."),
cfg.BoolOpt("rules", default=False, help="diff rules alone."),
cfg.BoolOpt("all", default=False, help="diff sensors, actions and rules."),
cfg.BoolOpt("verbose", default=False),
cfg.BoolOpt(
"simple",
default=False,
help="In simple mode, tool only tells you if content is missing."
+ "It doesn't show you content diff between disk and db.",
),
cfg.StrOpt("pack-dir", default=None, help="Path to specific pack to diff."),
]
do_register_cli_opts(cli_opts)
config.parse_args()
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, "username") else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, "password") else None
# Connect to db.
db_setup(
cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port, username=username, password=password
)
# Diff content
pack_dir = cfg.CONF.pack_dir or None
content_diff = not cfg.CONF.simple
if cfg.CONF.all:
_diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
_diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
_diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
return
if cfg.CONF.sensors:
_diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
if cfg.CONF.actions:
_diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
if cfg.CONF.rules:
_diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
# Disconnect from db.
db_teardown()
开发者ID:azamsheriff,项目名称:st2,代码行数:49,代码来源:diff-db-disk.py
示例7: main
def main():
monkey_patch()
cli_opts = [
cfg.BoolOpt('sensors', default=False,
help='diff sensor alone.'),
cfg.BoolOpt('actions', default=False,
help='diff actions alone.'),
cfg.BoolOpt('rules', default=False,
help='diff rules alone.'),
cfg.BoolOpt('all', default=False,
help='diff sensors, actions and rules.'),
cfg.BoolOpt('verbose', default=False),
cfg.BoolOpt('simple', default=False,
help='In simple mode, tool only tells you if content is missing.' +
'It doesn\'t show you content diff between disk and db.'),
cfg.StrOpt('pack-dir', default=None, help='Path to specific pack to diff.')
]
do_register_cli_opts(cli_opts)
config.parse_args()
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
# Connect to db.
db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
username=username, password=password)
# Diff content
pack_dir = cfg.CONF.pack_dir or None
content_diff = not cfg.CONF.simple
if cfg.CONF.all:
_diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
_diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
_diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
return
if cfg.CONF.sensors:
_diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
if cfg.CONF.actions:
_diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
if cfg.CONF.rules:
_diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
# Disconnect from db.
db_teardown()
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:49,代码来源:diff-db-disk.py
示例8: main
def main():
monkey_patch()
cli_opts = [
cfg.StrOpt('action_ref', default=None,
help='Root action to begin analysis.'),
cfg.StrOpt('link_trigger_ref', default='core.st2.generic.actiontrigger',
help='Root action to begin analysis.'),
cfg.StrOpt('out_file', default='pipeline')
]
do_register_cli_opts(cli_opts)
config.parse_args()
db_setup()
rule_links = LinksAnalyzer().analyze(cfg.CONF.action_ref, cfg.CONF.link_trigger_ref)
Grapher().generate_graph(rule_links, cfg.CONF.out_file)
开发者ID:lyandut,项目名称:st2,代码行数:15,代码来源:st2-analyze-links.py
示例9: _setup
def _setup(argv):
config.parse_args()
log_level = logging.DEBUG
logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s', level=log_level)
if not cfg.CONF.verbose:
# Note: We still want to print things at the following log levels: INFO, ERROR, CRITICAL
exclude_log_levels = [logging.AUDIT, logging.DEBUG]
handlers = logging.getLoggerClass().manager.root.handlers
for handler in handlers:
handler.addFilter(LogLevelFilter(log_levels=exclude_log_levels))
db_setup()
register_exchanges()
开发者ID:ruslantum,项目名称:st2,代码行数:16,代码来源:bootstrap.py
示例10: main
def main():
_monkey_patch()
cli_opts = [
cfg.IntOpt(
"rate",
default=100,
help="Rate of trigger injection measured in instances in per sec."
+ " Assumes a default exponential distribution in time so arrival is poisson.",
),
cfg.ListOpt(
"triggers",
required=False,
help="List of triggers for which instances should be fired."
+ " Uniform distribution will be followed if there is more than one"
+ "trigger.",
),
cfg.StrOpt("schema_file", default=None, help="Path to schema file defining trigger and payload."),
cfg.IntOpt("duration", default=1, help="Duration of stress test in minutes."),
]
do_register_cli_opts(cli_opts)
config.parse_args()
# Get config values
triggers = cfg.CONF.triggers
trigger_payload_schema = {}
if not triggers:
if cfg.CONF.schema_file is None or cfg.CONF.schema_file == "" or not os.path.exists(cfg.CONF.schema_file):
print('Either "triggers" need to be provided or a schema file containing' + " triggers should be provided.")
return
with open(cfg.CONF.schema_file) as fd:
trigger_payload_schema = yaml.safe_load(fd)
triggers = trigger_payload_schema.keys()
print("Triggers=%s" % triggers)
rate = cfg.CONF.rate
rate_per_trigger = int(rate / len(triggers))
duration = cfg.CONF.duration
dispatcher_pool = eventlet.GreenPool(len(triggers))
for trigger in triggers:
payload = trigger_payload_schema.get(trigger, {})
dispatcher_pool.spawn(_inject_instances, trigger, rate_per_trigger, duration, payload=payload)
eventlet.sleep(random.uniform(0, 1))
dispatcher_pool.waitall()
开发者ID:ipv1337,项目名称:st2,代码行数:47,代码来源:st2-inject-trigger-instances.py
示例11: __init__
def __init__(self, pack, file_path, config=None, parameters=None, user=None, parent_args=None,
log_level=PYTHON_RUNNER_DEFAULT_LOG_LEVEL):
"""
:param pack: Name of the pack this action belongs to.
:type pack: ``str``
:param file_path: Path to the action module.
:type file_path: ``str``
:param config: Pack config.
:type config: ``dict``
:param parameters: action parameters.
:type parameters: ``dict`` or ``None``
:param user: Name of the user who triggered this action execution.
:type user: ``str``
:param parent_args: Command line arguments passed to the parent process.
:type parse_args: ``list``
"""
self._pack = pack
self._file_path = file_path
self._config = config or {}
self._parameters = parameters or {}
self._user = user
self._parent_args = parent_args or []
self._log_level = log_level
self._class_name = None
self._logger = logging.getLogger('PythonActionWrapper')
try:
st2common_config.parse_args(args=self._parent_args)
except Exception as e:
LOG.debug('Failed to parse config using parent args (parent_args=%s): %s' %
(str(self._parent_args), str(e)))
# Note: We can only set a default user value if one is not provided after parsing the
# config
if not self._user:
# Note: We use late import to avoid performance overhead
from oslo_config import cfg
self._user = cfg.CONF.system_user.user
开发者ID:lyandut,项目名称:st2,代码行数:45,代码来源:python_action_wrapper.py
示例12: main
def main():
config.parse_args()
# Connect to db.
db_setup()
# Migrate rules.
try:
migrate_datastore()
print('SUCCESS: Datastore items migrated successfully.')
exit_code = 0
except:
print('ABORTED: Datastore migration aborted on first failure.')
exit_code = 1
# Disconnect from db.
db_teardown()
sys.exit(exit_code)
开发者ID:StackStorm,项目名称:st2,代码行数:18,代码来源:st2-migrate-datastore-scopes.py
示例13: _setup
def _setup(argv):
config.parse_args()
# 2. setup logging
log_level = logging.DEBUG
logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s', level=log_level)
if not cfg.CONF.verbose:
# Note: We still want to print things at the following log levels: INFO, ERROR, CRITICAL
exclude_log_levels = [logging.AUDIT, logging.DEBUG]
handlers = logging.getLoggerClass().manager.root.handlers
for handler in handlers:
handler.addFilter(LogLevelFilter(log_levels=exclude_log_levels))
# 3. all other setup which requires config to be parsed and logging to
# be correctly setup.
username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
username=username, password=password)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:21,代码来源:bootstrap.py
示例14: _setup
def _setup():
config.parse_args()
# 2. setup logging.
logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s',
level=logging.DEBUG)
开发者ID:StackStorm,项目名称:st2,代码行数:6,代码来源:bootstrap.py
示例15: setup_logging
}
# Options which should be removed from the st2 config
ST2_CONF_OPTIONS_TO_REMOVE = {
'database': ['username', 'password'],
'messaging': ['url']
}
REMOVE_VALUE_NAME = '**removed**'
OUTPUT_FILENAME_TEMPLATE = 'st2-debug-output-%(hostname)s-%(date)s.tar.gz'
DATE_FORMAT = '%Y-%m-%d-%H%M%S'
try:
config.parse_args(args=[])
except Exception:
pass
def setup_logging():
root = LOG
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
开发者ID:lyandut,项目名称:st2,代码行数:29,代码来源:submit_debug_info.py
示例16: print
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from st2common.runners import get_available_backends
from st2common.runners import get_backend_instance
from st2common import config
config.parse_args()
runner_names = get_available_backends()
print('Available / installed action runners:')
for name in runner_names:
runner_driver = get_backend_instance(name)
runner_instance = runner_driver.get_runner()
runner_metadata = runner_driver.get_metadata()
print('- %s (runner_module=%s,cls=%s)' % (name, runner_metadata['runner_module'],
runner_instance.__class__))
开发者ID:nzlosh,项目名称:st2,代码行数:30,代码来源:enumerate-runners.py
示例17: initialize
def initialize(self):
try:
config.parse_args()
except:
pass
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:5,代码来源:delete.py
示例18: main
from kombu import Connection, Exchange
from oslo_config import cfg
from st2common import config
from st2common.transport.publishers import PoolPublisher
def main(exchange, routing_key, payload):
exchange = Exchange(exchange, type='topic')
publisher = PoolPublisher(cfg.CONF.messaging.url)
with Connection(cfg.CONF.messaging.url):
publisher.publish(payload=payload, exchange=exchange,
routing_key=routing_key)
if __name__ == '__main__':
config.parse_args(args={})
parser = argparse.ArgumentParser(description='Queue producer')
parser.add_argument('--exchange', required=True,
help='Exchange to publish the message to')
parser.add_argument('--routing-key', required=True,
help='Routing key to use')
parser.add_argument('--payload', required=True,
help='Message payload')
args = parser.parse_args()
main(exchange=args.exchange, routing_key=args.routing_key,
payload=args.payload)
开发者ID:ipv1337,项目名称:st2,代码行数:30,代码来源:queue_producer.py
示例19: _setup
def _setup():
try:
config.parse_args()
except:
pass
开发者ID:bjoernbessert,项目名称:st2,代码行数:5,代码来源:unload.py
注:本文中的st2common.config.parse_args函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论