• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python config.parse_args函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中st2common.config.parse_args函数的典型用法代码示例。如果您正苦于以下问题:Python parse_args函数的具体用法?Python parse_args怎么用?Python parse_args使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse_args函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main():
    _monkey_patch()

    cli_opts = [
        cfg.StrOpt('timestamp', default=None,
                   help='Will delete data older than ' +
                   'this timestamp. (default 48 hours). ' +
                   'Example value: 2015-03-13T19:01:27.255542Z'),
        cfg.StrOpt('action-ref', default='',
                   help='action-ref to delete executions for.')
    ]
    do_register_cli_opts(cli_opts)
    config.parse_args()

    # Get config values
    timestamp = cfg.CONF.timestamp
    action_ref = cfg.CONF.action_ref
    username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
    password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None

    # Connect to db.
    db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
             username=username, password=password)

    if not timestamp:
        now = datetime.now()
        timestamp = now - timedelta(days=DEFAULT_TIMEDELTA_DAYS)
    else:
        timestamp = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')

    # Purge models.
    _purge_executions(timestamp=timestamp, action_ref=action_ref)

    # Disconnect from db.
    db_teardown()
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:35,代码来源:purge_executions.py


示例2: main

def main():
    config.parse_args()

    # Connect to db.
    db_setup()

    # Migrate rules.
    migrate_rules()

    # Disconnect from db.
    db_teardown()
开发者ID:E-LLP,项目名称:st2,代码行数:11,代码来源:migrate_rules_to_include_pack.py


示例3: main

def main():
    monkey_patch()

    cli_opts = [
        cfg.IntOpt('rate', default=100,
                   help='Rate of trigger injection measured in instances in per sec.' +
                   ' Assumes a default exponential distribution in time so arrival is poisson.'),
        cfg.ListOpt('triggers', required=False,
                    help='List of triggers for which instances should be fired.' +
                    ' Uniform distribution will be followed if there is more than one' +
                    'trigger.'),
        cfg.StrOpt('schema_file', default=None,
                   help='Path to schema file defining trigger and payload.'),
        cfg.IntOpt('duration', default=60,
                   help='Duration of stress test in seconds.'),
        cfg.BoolOpt('max-throughput', default=False,
                   help='If True, "rate" argument will be ignored and this script will try to '
                   'saturize the CPU and achieve max utilization.')
    ]
    do_register_cli_opts(cli_opts)
    config.parse_args()

    # Get config values
    triggers = cfg.CONF.triggers
    trigger_payload_schema = {}

    if not triggers:
        if (cfg.CONF.schema_file is None or cfg.CONF.schema_file == '' or
                not os.path.exists(cfg.CONF.schema_file)):
            print('Either "triggers" need to be provided or a schema file containing' +
                  ' triggers should be provided.')
            return
        with open(cfg.CONF.schema_file) as fd:
            trigger_payload_schema = yaml.safe_load(fd)
            triggers = list(trigger_payload_schema.keys())
            print('Triggers=%s' % triggers)

    rate = cfg.CONF.rate
    rate_per_trigger = int(rate / len(triggers))
    duration = cfg.CONF.duration
    max_throughput = cfg.CONF.max_throughput

    if max_throughput:
        rate = 0
        rate_per_trigger = 0

    dispatcher_pool = eventlet.GreenPool(len(triggers))

    for trigger in triggers:
        payload = trigger_payload_schema.get(trigger, {})
        dispatcher_pool.spawn(_inject_instances, trigger, rate_per_trigger, duration,
                              payload=payload, max_throughput=max_throughput)
        eventlet.sleep(random.uniform(0, 1))
    dispatcher_pool.waitall()
开发者ID:nzlosh,项目名称:st2,代码行数:54,代码来源:st2-inject-trigger-instances.py


示例4: _setup

def _setup():
    config.parse_args()

    # 2. setup logging.
    logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s',
                        level=logging.DEBUG)

    # 3. all other setup which requires config to be parsed and logging to
    # be correctly setup.
    username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
    password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
    db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
             username=username, password=password)
开发者ID:bjoernbessert,项目名称:st2,代码行数:13,代码来源:bootstrap.py


示例5: main

def main():
    config.parse_args()

    # Connect to db.
    username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
    password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
    db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
             username=username, password=password)

    # Migrate rules.
    migrate_rules()

    # Disconnect from db.
    db_teardown()
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:14,代码来源:migrate_rules_to_include_pack.py


示例6: main

def main():
    _monkey_patch()

    cli_opts = [
        cfg.BoolOpt("sensors", default=False, help="diff sensor alone."),
        cfg.BoolOpt("actions", default=False, help="diff actions alone."),
        cfg.BoolOpt("rules", default=False, help="diff rules alone."),
        cfg.BoolOpt("all", default=False, help="diff sensors, actions and rules."),
        cfg.BoolOpt("verbose", default=False),
        cfg.BoolOpt(
            "simple",
            default=False,
            help="In simple mode, tool only tells you if content is missing."
            + "It doesn't show you content diff between disk and db.",
        ),
        cfg.StrOpt("pack-dir", default=None, help="Path to specific pack to diff."),
    ]
    do_register_cli_opts(cli_opts)
    config.parse_args()

    username = cfg.CONF.database.username if hasattr(cfg.CONF.database, "username") else None
    password = cfg.CONF.database.password if hasattr(cfg.CONF.database, "password") else None

    # Connect to db.
    db_setup(
        cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port, username=username, password=password
    )

    # Diff content
    pack_dir = cfg.CONF.pack_dir or None
    content_diff = not cfg.CONF.simple

    if cfg.CONF.all:
        _diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
        _diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
        _diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
        return

    if cfg.CONF.sensors:
        _diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)

    if cfg.CONF.actions:
        _diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)

    if cfg.CONF.rules:
        _diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)

    # Disconnect from db.
    db_teardown()
开发者ID:azamsheriff,项目名称:st2,代码行数:49,代码来源:diff-db-disk.py


示例7: main

def main():
    monkey_patch()

    cli_opts = [
        cfg.BoolOpt('sensors', default=False,
                    help='diff sensor alone.'),
        cfg.BoolOpt('actions', default=False,
                    help='diff actions alone.'),
        cfg.BoolOpt('rules', default=False,
                    help='diff rules alone.'),
        cfg.BoolOpt('all', default=False,
                    help='diff sensors, actions and rules.'),
        cfg.BoolOpt('verbose', default=False),
        cfg.BoolOpt('simple', default=False,
                    help='In simple mode, tool only tells you if content is missing.' +
                         'It doesn\'t show you content diff between disk and db.'),
        cfg.StrOpt('pack-dir', default=None, help='Path to specific pack to diff.')
    ]
    do_register_cli_opts(cli_opts)
    config.parse_args()

    username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
    password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None

    # Connect to db.
    db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
             username=username, password=password)

    # Diff content
    pack_dir = cfg.CONF.pack_dir or None
    content_diff = not cfg.CONF.simple

    if cfg.CONF.all:
        _diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
        _diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
        _diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)
        return

    if cfg.CONF.sensors:
        _diff_sensors(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)

    if cfg.CONF.actions:
        _diff_actions(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)

    if cfg.CONF.rules:
        _diff_rules(pack_dir=pack_dir, verbose=cfg.CONF.verbose, content_diff=content_diff)

    # Disconnect from db.
    db_teardown()
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:49,代码来源:diff-db-disk.py


示例8: main

def main():
    monkey_patch()

    cli_opts = [
        cfg.StrOpt('action_ref', default=None,
                   help='Root action to begin analysis.'),
        cfg.StrOpt('link_trigger_ref', default='core.st2.generic.actiontrigger',
                   help='Root action to begin analysis.'),
        cfg.StrOpt('out_file', default='pipeline')
    ]
    do_register_cli_opts(cli_opts)
    config.parse_args()
    db_setup()
    rule_links = LinksAnalyzer().analyze(cfg.CONF.action_ref, cfg.CONF.link_trigger_ref)
    Grapher().generate_graph(rule_links, cfg.CONF.out_file)
开发者ID:lyandut,项目名称:st2,代码行数:15,代码来源:st2-analyze-links.py


示例9: _setup

def _setup(argv):
    config.parse_args()

    log_level = logging.DEBUG
    logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s', level=log_level)

    if not cfg.CONF.verbose:
        # Note: We still want to print things at the following log levels: INFO, ERROR, CRITICAL
        exclude_log_levels = [logging.AUDIT, logging.DEBUG]
        handlers = logging.getLoggerClass().manager.root.handlers

        for handler in handlers:
            handler.addFilter(LogLevelFilter(log_levels=exclude_log_levels))

    db_setup()
    register_exchanges()
开发者ID:ruslantum,项目名称:st2,代码行数:16,代码来源:bootstrap.py


示例10: main

def main():
    _monkey_patch()

    cli_opts = [
        cfg.IntOpt(
            "rate",
            default=100,
            help="Rate of trigger injection measured in instances in per sec."
            + " Assumes a default exponential distribution in time so arrival is poisson.",
        ),
        cfg.ListOpt(
            "triggers",
            required=False,
            help="List of triggers for which instances should be fired."
            + " Uniform distribution will be followed if there is more than one"
            + "trigger.",
        ),
        cfg.StrOpt("schema_file", default=None, help="Path to schema file defining trigger and payload."),
        cfg.IntOpt("duration", default=1, help="Duration of stress test in minutes."),
    ]
    do_register_cli_opts(cli_opts)
    config.parse_args()

    # Get config values
    triggers = cfg.CONF.triggers
    trigger_payload_schema = {}

    if not triggers:
        if cfg.CONF.schema_file is None or cfg.CONF.schema_file == "" or not os.path.exists(cfg.CONF.schema_file):
            print('Either "triggers" need to be provided or a schema file containing' + " triggers should be provided.")
            return
        with open(cfg.CONF.schema_file) as fd:
            trigger_payload_schema = yaml.safe_load(fd)
            triggers = trigger_payload_schema.keys()
            print("Triggers=%s" % triggers)

    rate = cfg.CONF.rate
    rate_per_trigger = int(rate / len(triggers))
    duration = cfg.CONF.duration

    dispatcher_pool = eventlet.GreenPool(len(triggers))

    for trigger in triggers:
        payload = trigger_payload_schema.get(trigger, {})
        dispatcher_pool.spawn(_inject_instances, trigger, rate_per_trigger, duration, payload=payload)
        eventlet.sleep(random.uniform(0, 1))
    dispatcher_pool.waitall()
开发者ID:ipv1337,项目名称:st2,代码行数:47,代码来源:st2-inject-trigger-instances.py


示例11: __init__

    def __init__(self, pack, file_path, config=None, parameters=None, user=None, parent_args=None,
                 log_level=PYTHON_RUNNER_DEFAULT_LOG_LEVEL):
        """
        :param pack: Name of the pack this action belongs to.
        :type pack: ``str``

        :param file_path: Path to the action module.
        :type file_path: ``str``

        :param config: Pack config.
        :type config: ``dict``

        :param parameters: action parameters.
        :type parameters: ``dict`` or ``None``

        :param user: Name of the user who triggered this action execution.
        :type user: ``str``

        :param parent_args: Command line arguments passed to the parent process.
        :type parse_args: ``list``
        """

        self._pack = pack
        self._file_path = file_path
        self._config = config or {}
        self._parameters = parameters or {}
        self._user = user
        self._parent_args = parent_args or []
        self._log_level = log_level

        self._class_name = None
        self._logger = logging.getLogger('PythonActionWrapper')

        try:
            st2common_config.parse_args(args=self._parent_args)
        except Exception as e:
            LOG.debug('Failed to parse config using parent args (parent_args=%s): %s' %
                      (str(self._parent_args), str(e)))

        # Note: We can only set a default user value if one is not provided after parsing the
        # config
        if not self._user:
            # Note: We use late import to avoid performance overhead
            from oslo_config import cfg
            self._user = cfg.CONF.system_user.user
开发者ID:lyandut,项目名称:st2,代码行数:45,代码来源:python_action_wrapper.py


示例12: main

def main():
    config.parse_args()

    # Connect to db.
    db_setup()

    # Migrate rules.
    try:
        migrate_datastore()
        print('SUCCESS: Datastore items migrated successfully.')
        exit_code = 0
    except:
        print('ABORTED: Datastore migration aborted on first failure.')
        exit_code = 1

    # Disconnect from db.
    db_teardown()
    sys.exit(exit_code)
开发者ID:StackStorm,项目名称:st2,代码行数:18,代码来源:st2-migrate-datastore-scopes.py


示例13: _setup

def _setup(argv):
    config.parse_args()

    # 2. setup logging
    log_level = logging.DEBUG
    logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s', level=log_level)

    if not cfg.CONF.verbose:
        # Note: We still want to print things at the following log levels: INFO, ERROR, CRITICAL
        exclude_log_levels = [logging.AUDIT, logging.DEBUG]
        handlers = logging.getLoggerClass().manager.root.handlers

        for handler in handlers:
            handler.addFilter(LogLevelFilter(log_levels=exclude_log_levels))

    # 3. all other setup which requires config to be parsed and logging to
    # be correctly setup.
    username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
    password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
    db_setup(cfg.CONF.database.db_name, cfg.CONF.database.host, cfg.CONF.database.port,
             username=username, password=password)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:21,代码来源:bootstrap.py


示例14: _setup

def _setup():
    config.parse_args()

    # 2. setup logging.
    logging.basicConfig(format='%(asctime)s %(levelname)s [-] %(message)s',
                        level=logging.DEBUG)
开发者ID:StackStorm,项目名称:st2,代码行数:6,代码来源:bootstrap.py


示例15: setup_logging

}

# Options which should be removed from the st2 config
ST2_CONF_OPTIONS_TO_REMOVE = {
    'database': ['username', 'password'],
    'messaging': ['url']
}

REMOVE_VALUE_NAME = '**removed**'

OUTPUT_FILENAME_TEMPLATE = 'st2-debug-output-%(hostname)s-%(date)s.tar.gz'

DATE_FORMAT = '%Y-%m-%d-%H%M%S'

try:
    config.parse_args(args=[])
except Exception:
    pass


def setup_logging():
    root = LOG
    root.setLevel(logging.INFO)

    ch = logging.StreamHandler(sys.stdout)
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s  %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    root.addHandler(ch)

开发者ID:lyandut,项目名称:st2,代码行数:29,代码来源:submit_debug_info.py


示例16: print

# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

from st2common.runners import get_available_backends
from st2common.runners import get_backend_instance

from st2common import config
config.parse_args()

runner_names = get_available_backends()

print('Available / installed action runners:')
for name in runner_names:
    runner_driver = get_backend_instance(name)
    runner_instance = runner_driver.get_runner()
    runner_metadata = runner_driver.get_metadata()

    print('- %s (runner_module=%s,cls=%s)' % (name, runner_metadata['runner_module'],
                                              runner_instance.__class__))
开发者ID:nzlosh,项目名称:st2,代码行数:30,代码来源:enumerate-runners.py


示例17: initialize

 def initialize(self):
     try:
         config.parse_args()
     except:
         pass
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:5,代码来源:delete.py


示例18: main

from kombu import Connection, Exchange
from oslo_config import cfg

from st2common import config

from st2common.transport.publishers import PoolPublisher


def main(exchange, routing_key, payload):
    exchange = Exchange(exchange, type='topic')
    publisher = PoolPublisher(cfg.CONF.messaging.url)

    with Connection(cfg.CONF.messaging.url):
        publisher.publish(payload=payload, exchange=exchange,
                          routing_key=routing_key)


if __name__ == '__main__':
    config.parse_args(args={})
    parser = argparse.ArgumentParser(description='Queue producer')
    parser.add_argument('--exchange', required=True,
                        help='Exchange to publish the message to')
    parser.add_argument('--routing-key', required=True,
                        help='Routing key to use')
    parser.add_argument('--payload', required=True,
                        help='Message payload')
    args = parser.parse_args()

    main(exchange=args.exchange, routing_key=args.routing_key,
         payload=args.payload)
开发者ID:ipv1337,项目名称:st2,代码行数:30,代码来源:queue_producer.py


示例19: _setup

 def _setup():
     try:
         config.parse_args()
     except:
         pass
开发者ID:bjoernbessert,项目名称:st2,代码行数:5,代码来源:unload.py



注:本文中的st2common.config.parse_args函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python config.register_opts函数代码示例发布时间:2022-05-27
下一篇:
Python runnersregistrar.register_runners函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap