• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python schema.validate函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中st2common.util.schema.validate函数的典型用法代码示例。如果您正苦于以下问题:Python validate函数的具体用法?Python validate怎么用?Python validate使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了validate函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _validate_runner

def _validate_runner(runner_schema, result):
    LOG.debug('Validating runner output: %s', runner_schema)

    runner_schema = {
        "type": "object",
        "properties": runner_schema,
        "additionalProperties": False
    }

    schema.validate(result, runner_schema, cls=schema.get_validator('custom'))
开发者ID:nzlosh,项目名称:st2,代码行数:10,代码来源:output_schema.py


示例2: test_allow_default_explicit_none

    def test_allow_default_explicit_none(self):
        # Explicitly pass None to arguments
        instance = {
            'arg_optional_default': None,
            'arg_optional_default_none': None,
            'arg_optional_no_default': None
        }

        validator = util_schema.get_validator()
        util_schema.validate(instance=instance, schema=TEST_SCHEMA_3, cls=validator,
                             use_default=True, allow_default_none=True)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:11,代码来源:test_json_schema.py


示例3: _validate_action

def _validate_action(action_schema, result, output_key):
    LOG.debug('Validating action output: %s', action_schema)

    final_result = result[output_key]

    action_schema = {
        "type": "object",
        "properties": action_schema,
        "additionalProperties": False
    }

    schema.validate(final_result, action_schema, cls=schema.get_validator('custom'))
开发者ID:nzlosh,项目名称:st2,代码行数:12,代码来源:output_schema.py


示例4: __init__

    def __init__(self, **kw):
        util_schema.validate(instance=kw, schema=self.schema, cls=util_schema.CustomValidator,
                             use_default=False, allow_default_none=True)

        for prop in six.iterkeys(self.schema.get('properties', [])):
            value = kw.get(prop, None)
            # special handling for chain property to create the Node object
            if prop == 'chain':
                nodes = []
                for node in value:
                    nodes.append(Node(**node))
                value = nodes
            setattr(self, prop, value)
开发者ID:azamsheriff,项目名称:st2,代码行数:13,代码来源:actionchain.py


示例5: validate_config_against_schema

def validate_config_against_schema(config_schema, config_object, config_path,
                                  pack_name=None):
    """
    Validate provided config dictionary against the provided config schema
    dictionary.
    """
    # NOTE: Lazy improt to avoid performance overhead of importing this module when it's not used
    import jsonschema

    pack_name = pack_name or 'unknown'

    schema = util_schema.get_schema_for_resource_parameters(parameters_schema=config_schema,
                                                            allow_additional_properties=True)
    instance = config_object

    try:
        cleaned = util_schema.validate(instance=instance, schema=schema,
                                       cls=util_schema.CustomValidator, use_default=True,
                                       allow_default_none=True)
    except jsonschema.ValidationError as e:
        attribute = getattr(e, 'path', [])

        if isinstance(attribute, (tuple, list, collections.Iterable)):
            attribute = [str(item) for item in attribute]
            attribute = '.'.join(attribute)
        else:
            attribute = str(attribute)

        msg = ('Failed validating attribute "%s" in config for pack "%s" (%s): %s' %
               (attribute, pack_name, config_path, six.text_type(e)))
        raise jsonschema.ValidationError(msg)

    return cleaned
开发者ID:StackStorm,项目名称:st2,代码行数:33,代码来源:pack.py


示例6: _validate_config_values_against_schema

    def _validate_config_values_against_schema(self):
        try:
            config_schema_db = ConfigSchema.get_by_pack(value=self.pack)
        except StackStormDBObjectNotFoundError:
            # Config schema is optional
            return

        # Note: We are doing optional validation so for now, we do allow additional properties
        instance = self.values or {}
        schema = config_schema_db.attributes
        schema = util_schema.get_schema_for_resource_parameters(parameters_schema=schema,
                                                                allow_additional_properties=True)

        try:
            cleaned = util_schema.validate(instance=instance, schema=schema,
                                           cls=util_schema.CustomValidator, use_default=True,
                                           allow_default_none=True)
        except jsonschema.ValidationError as e:
            attribute = getattr(e, 'path', [])
            attribute = '.'.join(attribute)
            configs_path = os.path.join(cfg.CONF.system.base_path, 'configs/')
            config_path = os.path.join(configs_path, '%s.yaml' % (self.pack))

            msg = ('Failed validating attribute "%s" in config for pack "%s" (%s): %s' %
                   (attribute, self.pack, config_path, str(e)))
            raise jsonschema.ValidationError(msg)

        return cleaned
开发者ID:janjaapbos,项目名称:st2,代码行数:28,代码来源:pack.py


示例7: validate_trigger_parameters

def validate_trigger_parameters(trigger_type_ref, parameters):
    """
    This function validates parameters for system triggers (e.g. webhook and timers).

    Note: Eventually we should also validate parameters for user defined triggers which correctly
    specify JSON schema for the parameters.

    :param trigger_type_ref: Reference of a trigger type.
    :type trigger_type_ref: ``str``

    :param parameters: Trigger parameters.
    :type parameters: ``dict``
    """
    if not trigger_type_ref:
        return None

    if trigger_type_ref not in SYSTEM_TRIGGER_TYPES:
        # Not a system trigger, skip validation for now
        return None

    parameters_schema = SYSTEM_TRIGGER_TYPES[trigger_type_ref]['parameters_schema']
    cleaned = util_schema.validate(instance=parameters, schema=parameters_schema,
                                   cls=util_schema.CustomValidator, use_default=True,
                                   allow_default_none=True)

    # Additional validation for CronTimer trigger
    # TODO: If we need to add more checks like this we should consider abstracting this out.
    if trigger_type_ref == CRON_TIMER_TRIGGER_REF:
        # Validate that the user provided parameters are valid. This is required since JSON schema
        # allows arbitrary strings, but not any arbitrary string is a valid CronTrigger argument
        # Note: Constructor throws ValueError on invalid parameters
        CronTrigger(**parameters)

    return cleaned
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:34,代码来源:reactor.py


示例8: validate_trigger_parameters

def validate_trigger_parameters(trigger_type_ref, parameters):
    """
    This function validates parameters for system triggers (e.g. webhook and timers).

    Note: Eventually we should also validate parameters for user defined triggers which correctly
    specify JSON schema for the parameters.

    :param trigger_type_ref: Reference of a trigger type.
    :type trigger_type_ref: ``str``

    :param parameters: Trigger parameters.
    :type parameters: ``dict``
    """
    if not trigger_type_ref:
        return None

    if trigger_type_ref not in SYSTEM_TRIGGER_TYPES:
        # Not a system trigger, skip validation for now
        return None

    parameters_schema = SYSTEM_TRIGGER_TYPES[trigger_type_ref]['parameters_schema']
    cleaned = util_schema.validate(instance=parameters, schema=parameters_schema,
                                   cls=util_schema.CustomValidator, use_default=True,
                                   allow_default_none=True)
    return cleaned
开发者ID:automotola,项目名称:st2,代码行数:25,代码来源:reactor.py


示例9: validate_response

def validate_response(inquiry, response):
    schema = inquiry.schema

    LOG.debug('Validating inquiry response: %s against schema: %s' % (response, schema))

    try:
        schema_utils.validate(
            instance=response,
            schema=schema,
            cls=schema_utils.CustomValidator,
            use_default=True,
            allow_default_none=True
        )
    except Exception as e:
        msg = 'Response for inquiry "%s" did not pass schema validation.'
        LOG.exception(msg % str(inquiry.id))
        raise inquiry_exceptions.InvalidInquiryResponse(str(inquiry.id), six.text_type(e))
开发者ID:nzlosh,项目名称:st2,代码行数:17,代码来源:inquiry.py


示例10: _add_job_to_scheduler

    def _add_job_to_scheduler(self, trigger):
        trigger_type_ref = trigger['type']
        trigger_type = TIMER_TRIGGER_TYPES[trigger_type_ref]
        try:
            util_schema.validate(instance=trigger['parameters'],
                                 schema=trigger_type['parameters_schema'],
                                 cls=util_schema.CustomValidator,
                                 use_default=True,
                                 allow_default_none=True)
        except jsonschema.ValidationError as e:
            LOG.error('Exception scheduling timer: %s, %s',
                      trigger['parameters'], e, exc_info=True)
            raise  # Or should we just return?

        time_spec = trigger['parameters']
        time_zone = aps_utils.astimezone(trigger['parameters'].get('timezone'))

        time_type = None

        if trigger_type['name'] == 'st2.IntervalTimer':
            unit = time_spec.get('unit', None)
            value = time_spec.get('delta', None)
            time_type = IntervalTrigger(**{unit: value, 'timezone': time_zone})
        elif trigger_type['name'] == 'st2.DateTimer':
            # Raises an exception if date string isn't a valid one.
            dat = date_parser.parse(time_spec.get('date', None))
            time_type = DateTrigger(dat, timezone=time_zone)
        elif trigger_type['name'] == 'st2.CronTimer':
            cron = time_spec.copy()
            cron['timezone'] = time_zone

            time_type = CronTrigger(**cron)

        utc_now = date_utils.get_datetime_utc_now()
        if hasattr(time_type, 'run_date') and utc_now > time_type.run_date:
            LOG.warning('Not scheduling expired timer: %s : %s',
                        trigger['parameters'], time_type.run_date)
        else:
            self._add_job(trigger, time_type)
        return time_type
开发者ID:LindsayHill,项目名称:st2,代码行数:40,代码来源:base.py


示例11: test_use_default_value

    def test_use_default_value(self):
        # No default, no value provided, should fail
        instance = {}
        validator = util_schema.get_validator()

        expected_msg = '\'cmd_no_default\' is a required property'
        self.assertRaisesRegexp(ValidationError, expected_msg, util_schema.validate,
                                instance=instance, schema=TEST_SCHEMA_1, cls=validator,
                                use_default=True)

        # No default, value provided
        instance = {'cmd_no_default': 'foo'}
        util_schema.validate(instance=instance, schema=TEST_SCHEMA_1, cls=validator,
                             use_default=True)

        # default value provided, no value, should pass
        instance = {}
        validator = util_schema.get_validator()
        util_schema.validate(instance=instance, schema=TEST_SCHEMA_2, cls=validator,
                             use_default=True)

        # default value provided, value provided, should pass
        instance = {'cmd_default': 'foo'}
        validator = util_schema.get_validator()
        util_schema.validate(instance=instance, schema=TEST_SCHEMA_2, cls=validator,
                             use_default=True)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:26,代码来源:test_json_schema.py


示例12: validate_trigger_parameters

def validate_trigger_parameters(trigger_type_ref, parameters):
    """
    This function validates parameters for system and user-defined triggers.

    :param trigger_type_ref: Reference of a trigger type.
    :type trigger_type_ref: ``str``

    :param parameters: Trigger parameters.
    :type parameters: ``dict``

    :return: Cleaned parameters on success, None if validation is not performed.
    """
    if not trigger_type_ref:
        return None

    is_system_trigger = trigger_type_ref in SYSTEM_TRIGGER_TYPES
    if is_system_trigger:
        # System trigger
        parameters_schema = SYSTEM_TRIGGER_TYPES[trigger_type_ref]['parameters_schema']
    else:
        trigger_type_db = triggers.get_trigger_type_db(trigger_type_ref)
        if not trigger_type_db:
            # Trigger doesn't exist in the database
            return None

        parameters_schema = getattr(trigger_type_db, 'parameters_schema', {})
        if not parameters_schema:
            # Parameters schema not defined for the this trigger
            return None

    # We only validate non-system triggers if config option is set (enabled)
    if not is_system_trigger and not cfg.CONF.system.validate_trigger_parameters:
        LOG.debug('Got non-system trigger "%s", but trigger parameter validation for non-system'
                  'triggers is disabled, skipping validation.' % (trigger_type_ref))
        return None

    cleaned = util_schema.validate(instance=parameters, schema=parameters_schema,
                                   cls=util_schema.CustomValidator, use_default=True,
                                   allow_default_none=True)

    # Additional validation for CronTimer trigger
    # TODO: If we need to add more checks like this we should consider abstracting this out.
    if trigger_type_ref == CRON_TIMER_TRIGGER_REF:
        # Validate that the user provided parameters are valid. This is required since JSON schema
        # allows arbitrary strings, but not any arbitrary string is a valid CronTrigger argument
        # Note: Constructor throws ValueError on invalid parameters
        CronTrigger(**parameters)

    return cleaned
开发者ID:peak6,项目名称:st2,代码行数:49,代码来源:reactor.py


示例13: validate

    def validate(self):
        """
        Perform validation and return cleaned object on success.

        Note: This method doesn't mutate this object in place, but it returns a new one.

        :return: Cleaned / validated object.
        """
        schema = getattr(self, "schema", {})
        attributes = vars(self)

        cleaned = util_schema.validate(
            instance=attributes, schema=schema, cls=util_schema.CustomValidator, use_default=True
        )

        return self.__class__(**cleaned)
开发者ID:KenMercusLai,项目名称:st2,代码行数:16,代码来源:base.py


示例14: validate

    def validate(self):
        """
        Perform validation and return cleaned object on success.

        Note: This method doesn't mutate this object in place, but it returns a new one.

        :return: Cleaned / validated object.
        """
        schema = getattr(self, 'schema', {})
        attributes = vars(self)

        cleaned = util_schema.validate(instance=attributes, schema=schema,
                                       cls=util_schema.CustomValidator, use_default=True,
                                       allow_default_none=True)

        # Note: We use type() instead of self.__class__ since self.__class__ confuses pylint
        return type(self)(**cleaned)
开发者ID:maniacs-ops,项目名称:st2,代码行数:17,代码来源:base.py


示例15: validate

    def validate(self):
        # Validate policy itself
        cleaned = super(PolicyAPI, self).validate()

        # Validate policy parameters
        policy_type_db = PolicyType.get_by_ref(cleaned.policy_type)
        if not policy_type_db:
            raise ValueError('Referenced policy_type "%s" doesnt exist' % (cleaned.policy_type))

        parameters_schema = policy_type_db.parameters
        parameters = getattr(cleaned, 'parameters', {})
        schema = util_schema.get_schema_for_resource_parameters(
            parameters_schema=parameters_schema)
        validator = util_schema.get_validator()
        cleaned_parameters = util_schema.validate(parameters, schema, validator, use_default=True,
                                                  allow_default_none=True)

        cleaned.parameters = cleaned_parameters

        return cleaned
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:20,代码来源:policy.py


示例16: validate_trigger_payload

def validate_trigger_payload(trigger_type_ref, payload):
    """
    This function validates trigger payload parameters for system and user-defined triggers.

    :param trigger_type_ref: Reference of a trigger type.
    :type trigger_type_ref: ``str``

    :param payload: Trigger payload.
    :type payload: ``dict``

    :return: Cleaned payload on success, None if validation is not performed.
    """
    if not trigger_type_ref:
        return None

    is_system_trigger = trigger_type_ref in SYSTEM_TRIGGER_TYPES
    if is_system_trigger:
        # System trigger
        payload_schema = SYSTEM_TRIGGER_TYPES[trigger_type_ref]['payload_schema']
    else:
        trigger_type_db = triggers.get_trigger_type_db(trigger_type_ref)
        if not trigger_type_db:
            # Trigger doesn't exist in the database
            return None

        payload_schema = getattr(trigger_type_db, 'payload_schema', {})
        if not payload_schema:
            # Payload schema not defined for the this trigger
            return None

    # We only validate non-system triggers if config option is set (enabled)
    if not is_system_trigger and not cfg.CONF.system.validate_trigger_payload:
        LOG.debug('Got non-system trigger "%s", but trigger payload validation for non-system'
                  'triggers is disabled, skipping validation.' % (trigger_type_ref))
        return None

    cleaned = util_schema.validate(instance=payload, schema=payload_schema,
                                   cls=util_schema.CustomValidator, use_default=True,
                                   allow_default_none=True)

    return cleaned
开发者ID:peak6,项目名称:st2,代码行数:41,代码来源:reactor.py


示例17: _validate_config_values_against_schema

    def _validate_config_values_against_schema(self):
        try:
            config_schema_db = ConfigSchema.get_by_pack(value=self.pack)
        except StackStormDBObjectNotFoundError:
            # Config schema is optional
            return

        # Note: We are doing optional validation so for now, we do allow additional properties
        instance = self.values or {}
        schema = config_schema_db.attributes
        schema = util_schema.get_schema_for_resource_parameters(parameters_schema=schema,
                                                                allow_additional_properties=True)

        try:
            cleaned = util_schema.validate(instance=instance, schema=schema,
                                           cls=util_schema.CustomValidator, use_default=True,
                                           allow_default_none=True)
        except jsonschema.ValidationError as e:
            msg = 'Failed validating config for pack "%s": %s' % (self.pack, str(e))
            raise jsonschema.ValidationError(msg)

        return cleaned
开发者ID:sameer7234,项目名称:st2,代码行数:22,代码来源:pack.py


示例18: request

def request(liveaction):
    """
    Request an action execution.

    :return: (liveaction, execution)
    :rtype: tuple
    """
    # Use the user context from the parent action execution. Subtasks in a workflow
    # action can be invoked by a system user and so we want to use the user context
    # from the original workflow action.
    if getattr(liveaction, 'context', None) and 'parent' in liveaction.context:
        parent_user = liveaction.context['parent'].get('user', None)
        if parent_user:
            liveaction.context['user'] = parent_user

    # Validate action.
    action_db = action_utils.get_action_by_ref(liveaction.action)
    if not action_db:
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
    if not action_db.enabled:
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)

    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])

    if not hasattr(liveaction, 'parameters'):
        liveaction.parameters = dict()

    # Validate action parameters.
    schema = util_schema.get_parameter_schema(action_db)
    validator = util_schema.get_validator()
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True)

    # validate that no immutable params are being overriden. Although possible to
    # ignore the override it is safer to inform the user to avoid surprises.
    immutables = _get_immutable_params(action_db.parameters)
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
    if len(overridden_immutables) > 0:
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
                         % str(overridden_immutables))

    # Set notification settings for action.
    # XXX: There are cases when we don't want notifications to be sent for a particular
    # execution. So we should look at liveaction.parameters['notify']
    # and not set liveaction.notify.
    if action_db.notify:
        liveaction.notify = action_db.notify

    # Write to database and send to message queue.
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()

    # Publish creation after both liveaction and actionexecution are created.
    liveaction = LiveAction.add_or_update(liveaction, publish=False)

    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
    # liveaction object so we don't see things in requested mode.
    trace_db = None
    try:
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
    except StackStormDBObjectNotFoundError as e:
        _cleanup_liveaction(liveaction)
        raise TraceNotFoundException(str(e))

    execution = executions.create_execution_object(liveaction, publish=False)

    if trace_db:
        trace_service.add_or_update_given_trace_db(
            trace_db=trace_db,
            action_executions=[str(execution.id)])

    # Assume that this is a creation.
    LiveAction.publish_create(liveaction)
    LiveAction.publish_status(liveaction)
    ActionExecution.publish_create(execution)

    extra = {'liveaction_db': liveaction, 'execution_db': execution}
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
              (liveaction.id, execution.id), extra=extra)

    return liveaction, execution
开发者ID:jonico,项目名称:st2,代码行数:81,代码来源:action.py


示例19: validate_trigger_payload

def validate_trigger_payload(trigger_type_ref, payload, throw_on_inexistent_trigger=False):
    """
    This function validates trigger payload parameters for system and user-defined triggers.

    :param trigger_type_ref: Reference of a trigger type / trigger / trigger dictionary object.
    :type trigger_type_ref: ``str``

    :param payload: Trigger payload.
    :type payload: ``dict``

    :return: Cleaned payload on success, None if validation is not performed.
    """
    if not trigger_type_ref:
        return None

    # NOTE: Due to the awful code in some other places we also need to support a scenario where
    # this variable is a dictionary and contains various TriggerDB object attributes.
    if isinstance(trigger_type_ref, dict):
        if trigger_type_ref.get('type', None):
            trigger_type_ref = trigger_type_ref['type']
        else:
            trigger_db = triggers.get_trigger_db_by_ref_or_dict(trigger_type_ref)

            if not trigger_db:
                # Corresponding TriggerDB not found, likely a corrupted database, skip the
                # validation.
                return None

            trigger_type_ref = trigger_db.type

    is_system_trigger = trigger_type_ref in SYSTEM_TRIGGER_TYPES
    if is_system_trigger:
        # System trigger
        payload_schema = SYSTEM_TRIGGER_TYPES[trigger_type_ref]['payload_schema']
    else:
        # We assume Trigger ref and not TriggerType ref is passed in if second
        # part (trigger name) is a valid UUID version 4
        try:
            trigger_uuid = uuid.UUID(trigger_type_ref.split('.')[-1])
        except ValueError:
            is_trigger_db = False
        else:
            is_trigger_db = (trigger_uuid.version == 4)

        if is_trigger_db:
            trigger_db = triggers.get_trigger_db_by_ref(trigger_type_ref)

            if trigger_db:
                trigger_type_ref = trigger_db.type

        trigger_type_db = triggers.get_trigger_type_db(trigger_type_ref)

        if not trigger_type_db:
            # Trigger doesn't exist in the database
            if throw_on_inexistent_trigger:
                msg = ('Trigger type with reference "%s" doesn\'t exist in the database' %
                       (trigger_type_ref))
                raise ValueError(msg)

            return None

        payload_schema = getattr(trigger_type_db, 'payload_schema', {})
        if not payload_schema:
            # Payload schema not defined for the this trigger
            return None

    # We only validate non-system triggers if config option is set (enabled)
    if not is_system_trigger and not cfg.CONF.system.validate_trigger_payload:
        LOG.debug('Got non-system trigger "%s", but trigger payload validation for non-system'
                  'triggers is disabled, skipping validation.' % (trigger_type_ref))
        return None

    cleaned = util_schema.validate(instance=payload, schema=payload_schema,
                                   cls=util_schema.CustomValidator, use_default=True,
                                   allow_default_none=True)

    return cleaned
开发者ID:StackStorm,项目名称:st2,代码行数:77,代码来源:reactor.py


示例20: request

def request(liveaction):
    """
    Request an action execution.

    :return: (liveaction, execution)
    :rtype: tuple
    """
    # Use the user context from the parent action execution. Subtasks in a workflow
    # action can be invoked by a system user and so we want to use the user context
    # from the original workflow action.
    if getattr(liveaction, "context", None) and "parent" in liveaction.context:
        parent_user = liveaction.context["parent"].get("user", None)
        if parent_user:
            liveaction.context["user"] = parent_user

    # Validate action.
    action_db = action_utils.get_action_by_ref(liveaction.action)
    if not action_db:
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
    if not action_db.enabled:
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)

    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type["name"])

    if not hasattr(liveaction, "parameters"):
        liveaction.parameters = dict()

    # Validate action parameters.
    schema = util_schema.get_parameter_schema(action_db)
    validator = util_schema.get_validator()
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True)

    # validate that no immutable params are being overriden. Although possible to
    # ignore the override it is safer to inform the user to avoid surprises.
    immutables = _get_immutable_params(action_db.parameters)
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
    if len(overridden_immutables) > 0:
        raise ValueError("Override of immutable parameter(s) %s is unsupported." % str(overridden_immutables))

    # Set notification settings for action.
    # XXX: There are cases when we don't want notifications to be sent for a particular
    # execution. So we should look at liveaction.parameters['notify']
    # and not set liveaction.notify.
    if action_db.notify:
        liveaction.notify = action_db.notify

    # Write to database and send to message queue.
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()

    # Publish creation after both liveaction and actionexecution are created.
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
    execution = executions.create_execution_object(liveaction, publish=False)

    # Assume that this is a creation.
    LiveAction.publish_create(liveaction)
    LiveAction.publish_status(liveaction)
    ActionExecution.publish_create(execution)

    extra = {"liveaction_db": liveaction, "execution_db": execution}
    LOG.audit(
        "Action execution requested. LiveAction.id=%s, ActionExecution.id=%s" % (liveaction.id, execution.id),
        extra=extra,
    )

    return liveaction, execution
开发者ID:ruslantum,项目名称:st2,代码行数:67,代码来源:action.py



注:本文中的st2common.util.schema.validate函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python secrets.get_secret_parameters函数代码示例发布时间:2022-05-27
下一篇:
Python schema.get_validator函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap