• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python queue.queue_json_publish函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zerver.lib.queue.queue_json_publish函数的典型用法代码示例。如果您正苦于以下问题:Python queue_json_publish函数的具体用法?Python queue_json_publish怎么用?Python queue_json_publish使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了queue_json_publish函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: email_on_new_login

def email_on_new_login(sender: Any, user: UserProfile, request: Any, **kwargs: Any) -> None:
    # We import here to minimize the dependencies of this module,
    # since it runs as part of `manage.py` initialization
    from zerver.context_processors import common_context

    if not settings.SEND_LOGIN_EMAILS:
        return

    if request:
        # If the user's account was just created, avoid sending an email.
        if getattr(user, "just_registered", False):
            return

        user_agent = request.META.get('HTTP_USER_AGENT', "").lower()

        context = common_context(user)
        context['user_email'] = user.email
        user_tz = user.timezone
        if user_tz == '':
            user_tz = timezone_get_current_timezone_name()
        local_time = timezone_now().astimezone(get_timezone(user_tz))
        context['login_time'] = local_time.strftime('%A, %B %d, %Y at %I:%M%p ') + user_tz
        context['device_ip'] = request.META.get('REMOTE_ADDR') or _("Unknown IP address")
        context['device_os'] = get_device_os(user_agent)
        context['device_browser'] = get_device_browser(user_agent)

        email_dict = {
            'template_prefix': 'zerver/emails/notify_new_login',
            'to_user_id': user.id,
            'from_name': 'Zulip Account Security',
            'from_address': FromAddress.NOREPLY,
            'context': context}
        queue_json_publish("email_senders", email_dict)
开发者ID:284928489,项目名称:zulip,代码行数:33,代码来源:signals.py


示例2: emit

    def emit(self, record):
        # type: (ExceptionReporter) -> None
        try:
            request = record.request  # type: HttpRequest

            exception_filter = get_exception_reporter_filter(request)

            if record.exc_info:
                stack_trace = ''.join(traceback.format_exception(*record.exc_info))
            else:
                stack_trace = None

            try:
                user_profile = request.user
                user_full_name = user_profile.full_name
                user_email = user_profile.email
            except Exception:
                traceback.print_exc()
                # Error was triggered by an anonymous user.
                user_full_name = None
                user_email = None

            data = request.GET if request.method == 'GET' else \
                exception_filter.get_post_parameters(request)

            report = dict(
                node = platform.node(),
                method = request.method,
                path = request.path,
                data = data,
                remote_addr = request.META.get('REMOTE_ADDR', None),
                query_string = request.META.get('QUERY_STRING', None),
                server_name = request.META.get('SERVER_NAME', None),
                message = record.getMessage(),
                stack_trace = stack_trace,
                user_full_name = user_full_name,
                user_email = user_email,
            )
        except Exception:
            traceback.print_exc()
            report = dict(
                node = platform.node(),
                message = record.getMessage(),
            )

        try:
            if settings.STAGING_ERROR_NOTIFICATIONS:
                # On staging, process the report directly so it can happen inside this
                # try/except to prevent looping
                from zilencer.error_notify import notify_server_error
                notify_server_error(report)
            else:
                queue_json_publish('error_reports', dict(
                    type = "server",
                    report = report,
                ), lambda x: None)
        except Exception:
            # If this breaks, complain loudly but don't pass the traceback up the stream
            # However, we *don't* want to use logging.exception since that could trigger a loop.
            logging.warning("Reporting an exception triggered an exception!", exc_info=True)
开发者ID:acemaster,项目名称:zulip,代码行数:60,代码来源:logging_handlers.py


示例3: handle

    def handle(self, *args, **options):
        # type: (*Any, **str) -> None
        queue_name = options['queue_name']
        file_name = options['file_name']

        if file_name == '-':
            f = sys.stdin # type: IO[str]
        else:
            f = open(file_name)

        while True:
            line = f.readline()
            if not line:
                break

            line = line.strip()
            try:
                payload = line.split('\t')[1]
            except IndexError:
                payload = line

            print('Queueing to queue %s: %s' % (queue_name, payload))

            # Verify that payload is valid json.
            data = ujson.loads(payload)

            queue_json_publish(queue_name, data, error)
开发者ID:150vb,项目名称:zulip,代码行数:27,代码来源:enqueue_file.py


示例4: json_report_error

def json_report_error(request, user_profile, message=REQ(), stacktrace=REQ(),
                      ui_message=REQ(validator=check_bool), user_agent=REQ(),
                      href=REQ(), log=REQ(),
                      more_info=REQ(validator=check_dict([]), default=None)):
    # type: (HttpRequest, UserProfile, text_type, text_type, bool, text_type, text_type, text_type, Dict[str, Any]) -> HttpResponse
    if not settings.ERROR_REPORTING:
        return json_success()

    if js_source_map:
        stacktrace = js_source_map.annotate_stacktrace(stacktrace)

    try:
        version = subprocess.check_output(["git", "log", "HEAD^..HEAD", "--oneline"], universal_newlines=True)
    except Exception:
        version = None

    queue_json_publish('error_reports', dict(
        type = "browser",
        report = dict(
            user_email = user_profile.email,
            user_full_name = user_profile.full_name,
            user_visible = ui_message,
            server_path = settings.DEPLOY_ROOT,
            version = version,
            user_agent = user_agent,
            href = href,
            message = message,
            stacktrace = stacktrace,
            log = log,
            more_info = more_info,
        )
    ), lambda x: None)

    return json_success()
开发者ID:150vb,项目名称:zulip,代码行数:34,代码来源:report.py


示例5: test_register_consumer_nack

    def test_register_consumer_nack(self) -> None:
        output = []
        count = 0

        queue_client = get_queue_client()

        def collect(event: Dict[str, Any]) -> None:
            queue_client.stop_consuming()
            nonlocal count
            count += 1
            if count == 1:
                raise Exception("Make me nack!")
            output.append(event)

        queue_client.register_json_consumer("test_suite", collect)
        queue_json_publish("test_suite", {"event": "my_event"})

        try:
            queue_client.start_consuming()
        except Exception:
            queue_client.register_json_consumer("test_suite", collect)
            queue_client.start_consuming()

        # Confirm that we processed the event fully once
        self.assertEqual(count, 2)
        self.assertEqual(len(output), 1)
        self.assertEqual(output[0]['event'], 'my_event')
开发者ID:BakerWang,项目名称:zulip,代码行数:27,代码来源:test_queue.py


示例6: mirror_email_message

def mirror_email_message(data: Dict[str, str]) -> Dict[str, str]:
    rcpt_to = data['recipient']
    if is_missed_message_address(rcpt_to):
        try:
            mark_missed_message_address_as_used(rcpt_to)
        except ZulipEmailForwardError:
            return {
                "status": "error",
                "msg": "5.1.1 Bad destination mailbox address: "
                       "Bad or expired missed message address."
            }
    else:
        try:
            extract_and_validate(rcpt_to)
        except ZulipEmailForwardError:
            return {
                "status": "error",
                "msg": "5.1.1 Bad destination mailbox address: "
                       "Please use the address specified in your Streams page."
            }
    queue_json_publish(
        "email_mirror",
        {
            "message": data['msg_text'],
            "rcpt_to": rcpt_to
        }
    )
    return {"status": "success"}
开发者ID:deltay,项目名称:zulip,代码行数:28,代码来源:email_mirror.py


示例7: handle

    def handle(self, *args: Any, **options: str) -> None:
        queue_name = options['queue_name']
        file_name = options['file_name']

        if file_name == '-':
            f = sys.stdin  # type: IO[str]
        else:
            f = open(file_name)

        while True:
            line = f.readline()
            if not line:
                break

            line = line.strip()
            try:
                payload = line.split('\t')[1]
            except IndexError:
                payload = line

            print('Queueing to queue %s: %s' % (queue_name, payload))

            # Verify that payload is valid json.
            data = ujson.loads(payload)

            # This is designed to use the `error` method rather than
            # the call_consume_in_tests flow.
            queue_json_publish(queue_name, data, error)
开发者ID:brockwhittaker,项目名称:zulip,代码行数:28,代码来源:enqueue_file.py


示例8: report_error

def report_error(request: HttpRequest, user_profile: UserProfile, message: str=REQ(),
                 stacktrace: str=REQ(), ui_message: bool=REQ(validator=check_bool),
                 user_agent: str=REQ(), href: str=REQ(), log: str=REQ(),
                 more_info: Optional[Dict[str, Any]]=REQ(validator=check_dict([]), default=None)
                 ) -> HttpResponse:
    """Accepts an error report and stores in a queue for processing.  The
    actual error reports are later handled by do_report_error (below)"""
    if not settings.BROWSER_ERROR_REPORTING:
        return json_success()
    if more_info is None:
        more_info = {}

    js_source_map = get_js_source_map()
    if js_source_map:
        stacktrace = js_source_map.annotate_stacktrace(stacktrace)

    try:
        version = subprocess.check_output(["git", "log", "HEAD^..HEAD", "--oneline"],
                                          universal_newlines=True)  # type: Optional[str]
    except Exception:
        version = None

    # Get the IP address of the request
    remote_ip = request.META.get('HTTP_X_REAL_IP')
    if remote_ip is None:
        remote_ip = request.META['REMOTE_ADDR']

    # For the privacy of our users, we remove any actual text content
    # in draft_content (from drafts rendering exceptions).  See the
    # comment on privacy_clean_markdown for more details.
    if more_info.get('draft_content'):
        more_info['draft_content'] = privacy_clean_markdown(more_info['draft_content'])

    if user_profile.is_authenticated:
        email = user_profile.delivery_email
        full_name = user_profile.full_name
    else:
        email = "[email protected]"
        full_name = "Anonymous User"

    queue_json_publish('error_reports', dict(
        type = "browser",
        report = dict(
            host = request.get_host().split(":")[0],
            ip_address = remote_ip,
            user_email = email,
            user_full_name = full_name,
            user_visible = ui_message,
            server_path = settings.DEPLOY_ROOT,
            version = version,
            user_agent = user_agent,
            href = href,
            message = message,
            stacktrace = stacktrace,
            log = log,
            more_info = more_info,
        )
    ))

    return json_success()
开发者ID:akashnimare,项目名称:zulip,代码行数:60,代码来源:report.py


示例9: missedmessage_hook

def missedmessage_hook(user_profile_id, queue, last_for_client):
    # Only process missedmessage hook when the last queue for a
    # client has been garbage collected
    if not last_for_client:
        return

    message_ids_to_notify = []
    for event in queue.event_queue.contents():
        if not event["type"] == "message" or not event["flags"]:
            continue

        if "mentioned" in event["flags"] and "read" not in event["flags"]:
            notify_info = dict(message_id=event["message"]["id"])

            if not event.get("push_notified", False):
                notify_info["send_push"] = True
            if not event.get("email_notified", False):
                notify_info["send_email"] = True
            message_ids_to_notify.append(notify_info)

    for notify_info in message_ids_to_notify:
        msg_id = notify_info["message_id"]
        notice = build_offline_notification(user_profile_id, msg_id)
        if notify_info.get("send_push", False):
            queue_json_publish("missedmessage_mobile_notifications", notice, lambda notice: None)
        if notify_info.get("send_email", False):
            queue_json_publish("missedmessage_emails", notice, lambda notice: None)
开发者ID:RomiPierre,项目名称:zulip,代码行数:27,代码来源:event_queue.py


示例10: missedmessage_hook

def missedmessage_hook(user_profile_id, queue, last_for_client):
    # Only process missedmessage hook when the last queue for a
    # client has been garbage collected
    if not last_for_client:
        return

    message_ids_to_notify = []
    for event in queue.event_queue.contents():
        if not event['type'] == 'message' or not event['flags']:
            continue

        if 'mentioned' in event['flags'] and not 'read' in event['flags']:
            notify_info = dict(message_id=event['message']['id'])

            if not event.get('push_notified', False):
                notify_info['send_push'] = True
            if not event.get('email_notified', False):
                notify_info['send_email'] = True
            message_ids_to_notify.append(notify_info)

    for notify_info in message_ids_to_notify:
        msg_id = notify_info['message_id']
        notice = build_offline_notification(user_profile_id, msg_id)
        if notify_info.get('send_push', False):
            queue_json_publish("missedmessage_mobile_notifications", notice, lambda notice: None)
        if notify_info.get('send_email', False):
            queue_json_publish("missedmessage_emails", notice, lambda notice: None)
开发者ID:seanly,项目名称:zulip,代码行数:27,代码来源:event_queue.py


示例11: test_queue_basics_json

    def test_queue_basics_json(self) -> None:
        queue_json_publish("test_suite", {"event": "my_event"})

        queue_client = get_queue_client()
        result = queue_client.drain_queue("test_suite", json=True)
        self.assertEqual(len(result), 1)
        self.assertEqual(result[0]['event'], 'my_event')
开发者ID:BakerWang,项目名称:zulip,代码行数:7,代码来源:test_queue.py


示例12: on_message

    def on_message(self, msg_raw):
        # type: (str) -> None
        log_data = dict(extra='[transport=%s' % (self.session.transport_name,))
        record_request_start_data(log_data)
        msg = ujson.loads(msg_raw)

        if self.did_close:
            logger.info("Received message on already closed socket! transport=%s user=%s client_id=%s"
                        % (self.session.transport_name,
                           self.session.user_profile.email if self.session.user_profile is not None else 'unknown',
                           self.client_id))

        self.session.send_message({'req_id': msg['req_id'], 'type': 'ack'})

        if msg['type'] == 'auth':
            log_data['extra'] += ']'
            try:
                self.authenticate_client(msg)
                # TODO: Fill in the correct client
                write_log_line(log_data, path='/socket/auth', method='SOCKET',
                               remote_ip=self.session.conn_info.ip,
                               email=self.session.user_profile.email,
                               client_name='?')
            except SocketAuthError as e:
                response = {'result': 'error', 'msg': e.msg}
                self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
                                           'response': response})
                write_log_line(log_data, path='/socket/auth', method='SOCKET',
                               remote_ip=self.session.conn_info.ip,
                               email='unknown', client_name='?',
                               status_code=403, error_content=ujson.dumps(response))
            return
        else:
            if not self.authenticated:
                response = {'result': 'error', 'msg': "Not yet authenticated"}
                self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
                                           'response': response})
                write_log_line(log_data, path='/socket/service_request', method='SOCKET',
                               remote_ip=self.session.conn_info.ip,
                               email='unknown', client_name='?',
                               status_code=403, error_content=ujson.dumps(response))
                return

        redis_key = req_redis_key(msg['req_id'])
        with redis_client.pipeline() as pipeline:
            pipeline.hmset(redis_key, {'status': 'received'})
            pipeline.expire(redis_key, 60 * 60 * 24)
            pipeline.execute()

        record_request_stop_data(log_data)
        queue_json_publish("message_sender",
                           dict(request=msg['request'],
                                req_id=msg['req_id'],
                                server_meta=dict(user_id=self.session.user_profile.id,
                                                 client_id=self.client_id,
                                                 return_queue="tornado_return",
                                                 log_data=log_data,
                                                 request_environ=dict(REMOTE_ADDR=self.session.conn_info.ip))),
                           fake_message_sender)
开发者ID:aakash-cr7,项目名称:zulip,代码行数:59,代码来源:socket.py


示例13: send_event

def send_event(event, users):
    # type: (Mapping[str, Any], Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None
    """`users` is a list of user IDs, or in the case of `message` type
    events, a list of dicts describing the users and metadata about
    the user/message pair."""
    queue_json_publish("notify_tornado",
                       dict(event=event, users=users),
                       send_notification_http)
开发者ID:zulip,项目名称:zulip,代码行数:8,代码来源:event_queue.py


示例14: send_event

def send_event(realm: Realm, event: Mapping[str, Any],
               users: Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None:
    """`users` is a list of user IDs, or in the case of `message` type
    events, a list of dicts describing the users and metadata about
    the user/message pair."""
    port = get_tornado_port(realm)
    queue_json_publish(notify_tornado_queue_name(port),
                       dict(event=event, users=users),
                       lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs))
开发者ID:BakerWang,项目名称:zulip,代码行数:9,代码来源:event_queue.py


示例15: request_retry

def request_retry(event, failure_message):
    # type: (Dict[str, Any], Text) -> None
    event['failed_tries'] += 1
    if event['failed_tries'] > MAX_REQUEST_RETRIES:
        bot_user = get_user_profile_by_id(event['user_profile_id'])
        failure_message = "Maximum retries exceeded! " + failure_message
        fail_with_message(event, failure_message)
        logging.warning("Maximum retries exceeded for trigger:%s event:%s" % (bot_user.email, event['command']))
    else:
        queue_json_publish("outgoing_webhooks", event, lambda x: None)
开发者ID:christi3k,项目名称:zulip,代码行数:10,代码来源:outgoing_webhook.py


示例16: consume

    def consume(self, event):
        # type: (Mapping[str, Any]) -> None
        server_meta = event['server_meta']

        environ = {
            'REQUEST_METHOD': 'SOCKET',
            'SCRIPT_NAME': '',
            'PATH_INFO': '/json/messages',
            'SERVER_NAME': '127.0.0.1',
            'SERVER_PORT': 9993,
            'SERVER_PROTOCOL': 'ZULIP_SOCKET/1.0',
            'wsgi.version': (1, 0),
            'wsgi.input': StringIO(),
            'wsgi.errors': sys.stderr,
            'wsgi.multithread': False,
            'wsgi.multiprocess': True,
            'wsgi.run_once': False,
            'zulip.emulated_method': 'POST'
        }

        if 'socket_user_agent' in event['request']:
            environ['HTTP_USER_AGENT'] = event['request']['socket_user_agent']
            del event['request']['socket_user_agent']

        # We're mostly using a WSGIRequest for convenience
        environ.update(server_meta['request_environ'])
        request = WSGIRequest(environ)
        # Note: If we ever support non-POST methods, we'll need to change this.
        request._post = event['request']
        request.csrf_processing_done = True

        user_profile = get_user_profile_by_id(server_meta['user_id'])
        request._cached_user = user_profile

        resp = self.handler.get_response(request)
        server_meta['time_request_finished'] = time.time()
        server_meta['worker_log_data'] = request._log_data

        resp_content = resp.content.decode('utf-8')
        response_data = ujson.loads(resp_content)
        if response_data['result'] == 'error':
            check_and_send_restart_signal()

        result = {'response': response_data, 'req_id': event['req_id'],
                  'server_meta': server_meta}

        redis_key = req_redis_key(event['req_id'])
        self.redis_client.hmset(redis_key, {'status': 'complete',
                                            'response': resp_content})

        # Since this sends back to Tornado, we can't use
        # call_consume_in_tests here.
        queue_json_publish(server_meta['return_queue'], result, lambda e: None)
开发者ID:brockwhittaker,项目名称:zulip,代码行数:53,代码来源:queue_processors.py


示例17: emit

    def emit(self, record: logging.LogRecord) -> None:
        report = {}  # type: Dict[str, Any]

        try:
            report['node'] = platform.node()
            report['host'] = platform.node()

            add_deployment_metadata(report)

            if record.exc_info:
                stack_trace = ''.join(traceback.format_exception(*record.exc_info))
                message = str(record.exc_info[1])
            else:
                stack_trace = 'No stack trace available'
                message = record.getMessage()
                if '\n' in message:
                    # Some exception code paths in queue processors
                    # seem to result in super-long messages
                    stack_trace = message
                    message = message.split('\n')[0]
            report['stack_trace'] = stack_trace
            report['message'] = message

            report['logger_name'] = record.name
            report['log_module'] = find_log_caller_module(record)
            report['log_lineno'] = record.lineno

            if hasattr(record, "request"):
                add_request_metadata(report, record.request)  # type: ignore  # record.request is added dynamically

        except Exception:
            report['message'] = "Exception in preparing exception report!"
            logging.warning(report['message'], exc_info=True)
            report['stack_trace'] = "See /var/log/zulip/errors.log"

        try:
            if settings.STAGING_ERROR_NOTIFICATIONS:
                # On staging, process the report directly so it can happen inside this
                # try/except to prevent looping
                from zerver.lib.error_notify import notify_server_error
                notify_server_error(report)
            else:
                queue_json_publish('error_reports', dict(
                    type = "server",
                    report = report,
                ))
        except Exception:
            # If this breaks, complain loudly but don't pass the traceback up the stream
            # However, we *don't* want to use logging.exception since that could trigger a loop.
            logging.warning("Reporting an exception triggered an exception!", exc_info=True)
开发者ID:gnprice,项目名称:zulip,代码行数:50,代码来源:logging_handlers.py


示例18: handle

    def handle(self, *args, **options):
        # type: (*Any, **str) -> None
        if six.PY3:
            print(py3_warning)
            return
        rcpt_to = os.environ.get("ORIGINAL_RECIPIENT", options['recipient'])
        if rcpt_to is not None:
            if is_missed_message_address(rcpt_to):
                try:
                    mark_missed_message_address_as_used(rcpt_to)
                except ZulipEmailForwardError:
                    print("5.1.1 Bad destination mailbox address: Bad or expired missed message address.")
                    exit(posix.EX_NOUSER) # type: ignore # There are no stubs for posix in python 3
            else:
                try:
                    extract_and_validate(rcpt_to)
                except ZulipEmailForwardError:
                    print("5.1.1 Bad destination mailbox address: Please use the address specified "
                          "in your Streams page.")
                    exit(posix.EX_NOUSER) # type: ignore # There are no stubs for posix in python 3

            # Read in the message, at most 25MiB. This is the limit enforced by
            # Gmail, which we use here as a decent metric.
            message = sys.stdin.read(25*1024*1024)

            if len(sys.stdin.read(1)) != 0:
                # We're not at EOF, reject large mail.
                print("5.3.4 Message too big for system: Max size is 25MiB")
                exit(posix.EX_DATAERR) # type: ignore # There are no stubs for posix in python 3

            queue_json_publish(
                    "email_mirror",
                    {
                        "message": message,
                        "rcpt_to": rcpt_to
                    },
                    lambda x: None
            )
        else:
            # We're probably running from cron, try to batch-process mail
            if (not settings.EMAIL_GATEWAY_BOT or not settings.EMAIL_GATEWAY_LOGIN or
                not settings.EMAIL_GATEWAY_PASSWORD or not settings.EMAIL_GATEWAY_IMAP_SERVER or
                not settings.EMAIL_GATEWAY_IMAP_PORT or not settings.EMAIL_GATEWAY_IMAP_FOLDER):
                print("Please configure the Email Mirror Gateway in /etc/zulip/, "
                      "or specify $ORIGINAL_RECIPIENT if piping a single mail.")
                exit(1)
            reactor.callLater(0, main)
            reactor.run()
开发者ID:150vb,项目名称:zulip,代码行数:48,代码来源:email-mirror.py


示例19: update_user_activity

def update_user_activity(request, user_profile):
    # update_active_status also pushes to rabbitmq, and it seems
    # redundant to log that here as well.
    if request.META["PATH_INFO"] == '/json/users/me/presence':
        return

    if hasattr(request, '_query'):
        query = request._query
    else:
        query = request.META['PATH_INFO']

    event={'query': query,
           'user_profile_id': user_profile.id,
           'time': datetime_to_timestamp(now()),
           'client': request.client.name}
    queue_json_publish("user_activity", event, lambda event: None)
开发者ID:anteq,项目名称:zulip,代码行数:16,代码来源:decorator.py


示例20: test_register_consumer

    def test_register_consumer(self) -> None:
        output = []

        queue_client = get_queue_client()

        def collect(event: Dict[str, Any]) -> None:
            output.append(event)
            queue_client.stop_consuming()

        queue_client.register_json_consumer("test_suite", collect)
        queue_json_publish("test_suite", {"event": "my_event"})

        queue_client.start_consuming()

        self.assertEqual(len(output), 1)
        self.assertEqual(output[0]['event'], 'my_event')
开发者ID:BakerWang,项目名称:zulip,代码行数:16,代码来源:test_queue.py



注:本文中的zerver.lib.queue.queue_json_publish函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python queue.SimpleQueueClient类代码示例发布时间:2022-05-26
下一篇:
Python openapi.validate_against_openapi_schema函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap