• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python json.dumps函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中temba.utils.json.dumps函数的典型用法代码示例。如果您正苦于以下问题:Python dumps函数的具体用法?Python dumps怎么用?Python dumps使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了dumps函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_claim

    def test_claim(self, mock_post):
        url = reverse("channels.types.viber_public.claim")

        self.login(self.admin)

        # check that claim page URL appears on claim list page
        response = self.client.get(reverse("channels.channel_claim"))
        self.assertContains(response, url)

        # try submitting with invalid token
        mock_post.return_value = MockResponse(400, json.dumps({"status": 3, "status_message": "Invalid token"}))
        response = self.client.post(url, {"auth_token": "invalid"})

        self.assertEqual(response.status_code, 200)
        self.assertContains(response, "Error validating authentication token")

        # ok this time claim with a success
        mock_post.side_effect = [
            MockResponse(200, json.dumps({"status": 0, "status_message": "ok", "id": "viberId", "uri": "viberName"})),
            MockResponse(200, json.dumps({"status": 0, "status_message": "ok", "id": "viberId", "uri": "viberName"})),
            MockResponse(200, json.dumps({"status": 0, "status_message": "ok"})),
        ]

        self.client.post(url, {"auth_token": "123456"}, follow=True)

        # assert our channel got created
        channel = Channel.objects.get(address="viberId")
        self.assertEqual(channel.config["auth_token"], "123456")
        self.assertEqual(channel.name, "viberName")
        self.assertTrue(channel.get_type().has_attachment_support(channel))

        # should have been called with our webhook URL
        self.assertEqual(mock_post.call_args[0][0], "https://chatapi.viber.com/pa/set_webhook")
开发者ID:teehamaral,项目名称:rapidpro,代码行数:33,代码来源:tests.py


示例2: refresh_access_token

    def refresh_access_token(self, channel_id):
        r = get_redis_connection()
        lock_name = self.TOKEN_REFRESH_LOCK % self.channel_uuid

        if not r.get(lock_name):
            with r.lock(lock_name, timeout=30):
                key = self.TOKEN_STORE_KEY % self.channel_uuid

                post_data = dict(grant_type="client_credentials", client_id=self.app_id, client_secret=self.app_secret)
                url = self.TOKEN_URL

                event = HttpEvent("POST", url, json.dumps(post_data))
                start = time.time()

                response = self._request(url, post_data, access_token=None)
                event.status_code = response.status_code

                if response.status_code != 200:
                    event.response_body = response.content
                    ChannelLog.log_channel_request(
                        channel_id, "Got non-200 response from %s" % self.API_NAME, event, start, True
                    )
                    return

                response_json = response.json()
                event.response_body = json.dumps(response_json)
                ChannelLog.log_channel_request(
                    channel_id, "Successfully fetched access token from %s" % self.API_NAME, event, start
                )

                access_token = response_json["access_token"]
                expires = response_json.get("expires_in", 7200)
                r.set(key, access_token, ex=int(expires))
                return access_token
开发者ID:teehamaral,项目名称:rapidpro,代码行数:34,代码来源:access_token.py


示例3: form_valid

        def form_valid(self, form):
            self.form = form
            user = self.request.user
            org = user.get_org()
            simulation = self.request.GET.get("simulation", "false") == "true"

            omnibox = self.form.cleaned_data["omnibox"]
            has_schedule = self.form.cleaned_data["schedule"]
            step_uuid = self.form.cleaned_data.get("step_node", None)
            text = self.form.cleaned_data["text"]

            groups = list(omnibox["groups"])
            contacts = list(omnibox["contacts"])
            urns = list(omnibox["urns"])

            if step_uuid:
                from .tasks import send_to_flow_node

                get_params = {k: v for k, v in self.request.GET.items()}
                get_params.update({"s": step_uuid})
                send_to_flow_node.delay(org.pk, user.pk, text, **get_params)
                if "_format" in self.request.GET and self.request.GET["_format"] == "json":
                    return HttpResponse(json.dumps(dict(status="success")), content_type="application/json")
                else:
                    return HttpResponseRedirect(self.get_success_url())

            # if simulating only use the test contact
            if simulation:
                groups = []
                urns = []
                for contact in contacts:
                    if contact.is_test:
                        contacts = [contact]
                        break

            schedule = Schedule.objects.create(created_by=user, modified_by=user) if has_schedule else None
            broadcast = Broadcast.create(
                org, user, text, groups=groups, contacts=contacts, urns=urns, schedule=schedule, status=QUEUED
            )

            if not has_schedule:
                self.post_save(broadcast)
                super().form_valid(form)

            analytics.track(
                self.request.user.username,
                "temba.broadcast_created",
                dict(contacts=len(contacts), groups=len(groups), urns=len(urns)),
            )

            if "_format" in self.request.GET and self.request.GET["_format"] == "json":
                data = dict(status="success", redirect=reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
                return HttpResponse(json.dumps(data), content_type="application/json")
            else:
                if self.form.cleaned_data["schedule"]:
                    return HttpResponseRedirect(reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
                return HttpResponseRedirect(self.get_success_url())
开发者ID:teehamaral,项目名称:rapidpro,代码行数:57,代码来源:views.py


示例4: start_call

    def start_call(self, call, to, from_, status_callback):
        if not settings.SEND_CALLS:
            raise ValueError("SEND_CALLS set to False, skipping call start")

        params = dict(to=to, from_=call.channel.address, url=status_callback, status_callback=status_callback)

        try:
            twilio_call = self.api.calls.create(**params)
            call.external_id = str(twilio_call.sid)

            # the call was successfully sent to the IVR provider
            call.status = IVRCall.WIRED
            call.save()

            for event in self.events:
                ChannelLog.log_ivr_interaction(call, "Started call", event)

        except TwilioRestException as twilio_error:
            message = "Twilio Error: %s" % twilio_error.msg
            if twilio_error.code == 20003:
                message = _("Could not authenticate with your Twilio account. Check your token and try again.")

            event = HttpEvent("POST", "https://api.nexmo.com/v1/calls", json.dumps(params), response_body=str(message))
            ChannelLog.log_ivr_interaction(call, "Call start failed", event, is_error=True)

            call.status = IVRCall.FAILED
            call.save()

            raise IVRException(message)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:29,代码来源:clients.py


示例5: form_invalid

 def form_invalid(self, form):
     if "_format" in self.request.GET and self.request.GET["_format"] == "json":  # pragma: needs cover
         return HttpResponse(
             json.dumps(dict(status="error", errors=form.errors)), content_type="application/json", status=400
         )
     else:
         return super().form_invalid(form)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:7,代码来源:views.py


示例6: test_release

    def test_release(self, mock_delete):
        mock_delete.return_value = MockResponse(200, json.dumps({"success": True}))
        self.channel.release()

        mock_delete.assert_called_once_with(
            "https://graph.facebook.com/v2.12/me/subscribed_apps", params={"access_token": "09876543"}
        )
开发者ID:mxabierto,项目名称:rapidpro,代码行数:7,代码来源:tests.py


示例7: push_task

def push_task(org, queue, task_name, args, priority=DEFAULT_PRIORITY):
    """
    Adds a task to queue_name with the supplied arguments.

    Ex: push_task(nyaruka, 'flows', 'start_flow', [1,2,3,4,5,6,7,8,9,10])
    """
    r = get_redis_connection("default")

    # calculate our score from the current time and priority, this could get us in trouble
    # if things are queued for more than ~100 days, but otherwise gives us the properties of prioritizing
    # first based on priority, then insertion order.
    score = time.time() + priority

    # push our task onto the right queue and make sure it is in the active list (atomically)
    with r.pipeline() as pipe:
        org_id = org if isinstance(org, int) else org.id
        pipe.zadd("%s:%d" % (task_name, org_id), score, json.dumps(args))

        # and make sure this key is in our list of queues so this job will get worked on
        pipe.zincrby("%s:active" % task_name, org_id, 0)
        pipe.execute()

    # if we were given a queue to schedule on, then add this task to celery.
    #
    # note that the task that is fired needs no arguments as it should just use pop_task with the
    # task name to determine what to work on.
    if queue:
        if getattr(settings, "CELERY_ALWAYS_EAGER", False):
            task_function = lookup_task_function(task_name)
            task_function()
        else:  # pragma: needs cover
            current_app.send_task(task_name, args=[], kwargs={}, queue=queue)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:32,代码来源:queues.py


示例8: serialize_run

def serialize_run(run):
    serialized = {
        "uuid": str(run.uuid),
        "status": "completed" if run.exited_on else "active",
        "created_on": run.created_on.isoformat(),
        "exited_on": run.exited_on.isoformat() if run.exited_on else None,
        "expires_on": run.expires_on.isoformat() if run.expires_on else None,
        "flow": {"uuid": str(run.flow.uuid), "name": run.flow.name},
        "path": run.path,
    }

    if run.results:
        serialized["results"] = run.results
    if run.events:
        serialized["events"] = run.events
    if run.parent_id and run.parent.contact == run.contact:
        serialized["parent_uuid"] = str(run.parent.uuid)

    msg_in = run.get_last_msg()
    if msg_in:
        serialized["input"] = serialize_input(msg_in)

    # things in @extra might not have come from a webhook call but at least they'll be accessible if we make one
    if run.fields:
        payload = json.dumps(run.fields)
        serialized["webhook"] = {
            "request": "GET / HTTP/1.1\r\nHost: fakewebhooks.com\r\nUser-Agent: goflow-trials\r\n\r\n",
            "response": "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\n\r\n" + payload,
            "status": "success",
            "status_code": 200,
            "url": "http://fakewebhooks.com/",
        }

    return serialized
开发者ID:mxabierto,项目名称:rapidpro,代码行数:34,代码来源:resumes.py


示例9: assertStatus

 def assertStatus(sms, event_type, assert_status):
     data["event_type"] = event_type
     response = self.client.post(
         reverse("handlers.junebug_handler", args=["event", self.channel.uuid]),
         data=json.dumps(data),
         content_type="application/json",
     )
     self.assertEqual(200, response.status_code)
     sms = Msg.objects.get(pk=sms.id)
     self.assertEqual(assert_status, sms.status)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:10,代码来源:tests.py


示例10: put_jsonl

    def put_jsonl(self, bucket, key, records):
        stream = io.BytesIO()
        gz = gzip.GzipFile(fileobj=stream, mode="wb")

        for record in records:
            gz.write(json.dumps(record).encode("utf-8"))
            gz.write(b"\n")
        gz.close()

        self.objects[(bucket, key)] = stream
开发者ID:mxabierto,项目名称:rapidpro,代码行数:10,代码来源:s3.py


示例11: _request

    def _request(self, endpoint, payload):
        if logger.isEnabledFor(logging.DEBUG):  # pragma: no cover
            logger.debug("=============== %s request ===============" % endpoint)
            logger.debug(json.dumps(payload, indent=2))
            logger.debug("=============== /%s request ===============" % endpoint)

        response = requests.post("%s/mr/%s" % (self.base_url, endpoint), json=payload, headers=self.headers)
        resp_json = response.json()

        if logger.isEnabledFor(logging.DEBUG):  # pragma: no cover
            logger.debug("=============== %s response ===============" % endpoint)
            logger.debug(json.dumps(resp_json, indent=2))
            logger.debug("=============== /%s response ===============" % endpoint)

        if 400 <= response.status_code < 500:
            raise MailroomException(endpoint, payload, resp_json)

        response.raise_for_status()

        return resp_json
开发者ID:teehamaral,项目名称:rapidpro,代码行数:20,代码来源:client.py


示例12: setUp

 def setUp(self):
     responses.add(
         responses.GET,
         "https://api.github.com/repos/nyaruka/posm-extracts/git/trees/master",
         body=json.dumps({"tree": [{"path": "geojson", "sha": "the-sha"}]}),
         content_type="application/json",
     )
     responses.add(
         responses.GET,
         "https://api.github.com/repos/nyaruka/posm-extracts/git/trees/the-sha",
         body=json.dumps({"tree": [{"path": "R12345_simplified.json"}, {"path": "R45678_simplified.json"}]}),
         content_type="application/json",
     )
     responses.add(
         responses.GET,
         "https://raw.githubusercontent.com/nyaruka/posm-extracts/master/geojson/R12345_simplified.json",
         body="the-relation-json",
         content_type="application/json",
     )
     self.testdir = tempfile.mkdtemp()
开发者ID:teehamaral,项目名称:rapidpro,代码行数:20,代码来源:tests.py


示例13: _request

    def _request(self, endpoint, payload):
        if self.debug:
            print("[GOFLOW]=============== %s request ===============" % endpoint)
            print(json.dumps(payload, indent=2))
            print("[GOFLOW]=============== /%s request ===============" % endpoint)

        response = requests.post("%s/flow/%s" % (self.base_url, endpoint), json=payload, headers=self.headers)
        resp_json = response.json()

        if self.debug:
            print("[GOFLOW]=============== %s response ===============" % endpoint)
            print(json.dumps(resp_json, indent=2))
            print("[GOFLOW]=============== /%s response ===============" % endpoint)

        if 400 <= response.status_code < 500:
            raise FlowServerException(endpoint, payload, resp_json)

        response.raise_for_status()

        return resp_json
开发者ID:mxabierto,项目名称:rapidpro,代码行数:20,代码来源:client.py


示例14: test_receive_with_session_id

    def test_receive_with_session_id(self):
        from temba.ussd.models import USSDSession

        data = self.mk_ussd_msg(content="événement", session_id="session-id", to=self.starcode)
        callback_url = reverse("handlers.junebug_handler", args=["inbound", self.channel.uuid])
        self.client.post(callback_url, json.dumps(data), content_type="application/json")

        # load our message
        inbound_msg, outbound_msg = Msg.objects.all().order_by("pk")
        self.assertEqual(outbound_msg.connection.status, USSDSession.TRIGGERED)
        self.assertEqual(outbound_msg.connection.external_id, "session-id")
        self.assertEqual(inbound_msg.connection.external_id, "session-id")
开发者ID:mxabierto,项目名称:rapidpro,代码行数:12,代码来源:tests.py


示例15: test_receive_ussd_no_session

    def test_receive_ussd_no_session(self):
        from temba.channels.handlers import JunebugUSSDHandler

        # Delete the trigger to prevent the sesion from being created
        self.trigger.delete()

        data = self.mk_ussd_msg(content="événement", to=self.starcode)
        callback_url = reverse("handlers.junebug_handler", args=["inbound", self.channel.uuid])
        response = self.client.post(callback_url, json.dumps(data), content_type="application/json")

        self.assertEqual(response.status_code, 400)
        self.assertEqual(response.json()["status"], JunebugUSSDHandler.NACK)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:12,代码来源:tests.py


示例16: get_db_prep_value

    def get_db_prep_value(self, value, *args, **kwargs):
        # if the value is falsy we will save is as null
        if self.null and value in (None, {}, []) and not kwargs.get("force"):
            return None

        if value is None:
            return None

        if type(value) not in (list, dict, OrderedDict):
            raise ValueError("JSONAsTextField should be a dict or a list, got %s => %s" % (type(value), value))

        return json.dumps(value)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:12,代码来源:models.py


示例17: post

    def post(self, request, *args, **kwargs):
        user = self.request.user
        org = user.get_org()

        form = MsgActionForm(self.request.POST, org=org, user=user)

        if form.is_valid():
            response = form.execute()

            # shouldn't get in here in normal operation
            if response and "error" in response:  # pragma: no cover
                return HttpResponse(json.dumps(response), content_type="application/json", status=400)

        return self.get(request, *args, **kwargs)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:14,代码来源:views.py


示例18: send_message_auto_complete_processor

def send_message_auto_complete_processor(request):
    """
    Adds completions for the expression auto-completion to the request context
    """
    completions = []
    user = request.user
    org = None

    if hasattr(user, "get_org"):
        org = request.user.get_org()

    if org:
        completions.append(dict(name="contact", display=str(_("Contact Name"))))
        completions.append(dict(name="contact.first_name", display=str(_("Contact First Name"))))
        completions.append(dict(name="contact.groups", display=str(_("Contact Groups"))))
        completions.append(dict(name="contact.language", display=str(_("Contact Language"))))
        completions.append(dict(name="contact.name", display=str(_("Contact Name"))))
        completions.append(dict(name="contact.tel", display=str(_("Contact Phone"))))
        completions.append(dict(name="contact.tel_e164", display=str(_("Contact Phone - E164"))))
        completions.append(dict(name="contact.uuid", display=str(_("Contact UUID"))))

        completions.append(dict(name="date", display=str(_("Current Date and Time"))))
        completions.append(dict(name="date.now", display=str(_("Current Date and Time"))))
        completions.append(dict(name="date.today", display=str(_("Current Date"))))
        completions.append(dict(name="date.tomorrow", display=str(_("Tomorrow's Date"))))
        completions.append(dict(name="date.yesterday", display=str(_("Yesterday's Date"))))

        for scheme, label in ContactURN.SCHEME_CHOICES:
            if scheme != TEL_SCHEME and scheme in org.get_schemes(Channel.ROLE_SEND):
                completions.append(dict(name="contact.%s" % scheme, display=str(_("Contact %s" % label))))

        for field in org.contactfields(manager="user_fields").filter(is_active=True).order_by("label"):
            display = str(_("Contact Field: %(label)s")) % {"label": field.label}
            completions.append(dict(name="contact.%s" % str(field.key), display=display))

    function_completions = get_function_listing()
    return dict(completions=json.dumps(completions), function_completions=json.dumps(function_completions))
开发者ID:teehamaral,项目名称:rapidpro,代码行数:37,代码来源:views.py


示例19: __str__

    def __str__(self):

        object_len = len(self.document)
        for idx in range(object_len):
            action_dict = self.document[idx]

            if action_dict["action"] in ["talk", "stream"]:
                if idx == object_len - 1:
                    self.document[idx]["bargeIn"] = False
                elif idx <= object_len - 2:
                    next_action_dict = self.document[idx + 1]
                    if next_action_dict["action"] != "input":
                        self.document[idx]["bargeIn"] = False

        return json.dumps(self.document)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:15,代码来源:nexmo.py


示例20: push_courier_msgs

def push_courier_msgs(channel, msgs, high_priority=False):
    """
    Adds the passed in msgs to our courier queue for channel
    """
    r = get_redis_connection("default")
    priority = COURIER_HIGH_PRIORITY if high_priority else COURIER_LOW_PRIORITY
    tps = channel.tps if channel.tps else COURIER_DEFAULT_TPS

    # create our payload
    payload = []
    for msg in msgs:
        payload.append(msg_as_task(msg))

    # call our lua script
    get_script(r)(keys=(time.time(), "msgs", channel.uuid, tps, priority, json.dumps(payload)), client=r)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:15,代码来源:courier.py



注:本文中的temba.utils.json.dumps函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python choreography.Choreography类代码示例发布时间:2022-05-27
下一篇:
Python analytics.track函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap