• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python analytics.track函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中temba.utils.analytics.track函数的典型用法代码示例。如果您正苦于以下问题:Python track函数的具体用法?Python track怎么用?Python track使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了track函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: form_valid

        def form_valid(self, form):
            user = self.request.user
            org = user.get_org()
            groups = form.cleaned_data["groups"]

            # first archive all catch all message triggers with matching groups
            Trigger.objects.filter(
                org=org, groups__in=groups, trigger_type=Trigger.TYPE_CATCH_ALL, is_active=True
            ).update(is_archived=True)

            # then create a new catch all trigger
            trigger = Trigger.objects.create(
                created_by=user,
                modified_by=user,
                org=org,
                trigger_type=Trigger.TYPE_CATCH_ALL,
                flow=form.cleaned_data["flow"],
            )

            # add all the groups we are relevant for
            for group in groups:
                trigger.groups.add(group)

            analytics.track(self.request.user.username, "temba.trigger_created_catchall")

            response = self.render_to_response(self.get_context_data(form=form))
            response["REDIRECT"] = self.get_success_url()
            return response
开发者ID:mxabierto,项目名称:rapidpro,代码行数:28,代码来源:views.py


示例2: render_to_response

        def render_to_response(self, context, **response_kwargs):

            analytics.track(self.request.user.username, 'temba.contact_exported')

            user = self.request.user
            org = user.get_org()

            group = None
            group_id = self.request.REQUEST.get('g', None)
            if group_id:
                groups = ContactGroup.user_groups.filter(pk=group_id, org=org)

                if groups:
                    group = groups[0]

            host = self.request.branding['host']

            export = ExportContactsTask.objects.create(created_by=user, modified_by=user, org=org, group=group, host=host)
            export_contacts_task.delay(export.pk)

            from django.contrib import messages
            if not getattr(settings, 'CELERY_ALWAYS_EAGER', False):
                messages.info(self.request, _("We are preparing your export. ") +
                                            _("We will e-mail you at %s when it is ready.") % self.request.user.username)

            else:
                export = ExportContactsTask.objects.get(id=export.pk)
                dl_url = reverse('assets.download', kwargs=dict(type='contact_export', identifier=export.pk))
                messages.info(self.request, _("Export complete, you can find it here: %s (production users will get an email)") % dl_url)

            return HttpResponseRedirect(reverse('contacts.contact_list'))
开发者ID:austiine04,项目名称:rapidpro,代码行数:31,代码来源:views.py


示例3: get_context_data

        def get_context_data(self, **kwargs):
            context = super(ContactCRUDL.Import, self).get_context_data(**kwargs)
            context['task'] = None
            context['group'] = None
            context['show_form'] = True

            analytics.track(self.request.user.username, 'temba.contact_imported')

            task_id = self.request.REQUEST.get('task', None)
            if task_id:
                tasks = ImportTask.objects.filter(pk=task_id, created_by=self.request.user)

                if tasks:
                    task = tasks[0]
                    context['task'] = task
                    context['show_form'] = False
                    context['results'] = json.loads(task.import_results) if task.import_results else dict()

                    groups = ContactGroup.user_groups.filter(import_task=task)

                    if groups:
                        context['group'] = groups[0]

                    elif not task.status() in ['PENDING', 'RUNNING', 'STARTED']:
                        context['show_form'] = True

            return context
开发者ID:austiine04,项目名称:rapidpro,代码行数:27,代码来源:views.py


示例4: form_valid

        def form_valid(self, form):
            self.form = form
            user = self.request.user
            org = user.get_org()
            simulation = self.request.GET.get("simulation", "false") == "true"

            omnibox = self.form.cleaned_data["omnibox"]
            has_schedule = self.form.cleaned_data["schedule"]
            step_uuid = self.form.cleaned_data.get("step_node", None)
            text = self.form.cleaned_data["text"]

            groups = list(omnibox["groups"])
            contacts = list(omnibox["contacts"])
            urns = list(omnibox["urns"])

            if step_uuid:
                from .tasks import send_to_flow_node

                get_params = {k: v for k, v in self.request.GET.items()}
                get_params.update({"s": step_uuid})
                send_to_flow_node.delay(org.pk, user.pk, text, **get_params)
                if "_format" in self.request.GET and self.request.GET["_format"] == "json":
                    return HttpResponse(json.dumps(dict(status="success")), content_type="application/json")
                else:
                    return HttpResponseRedirect(self.get_success_url())

            # if simulating only use the test contact
            if simulation:
                groups = []
                urns = []
                for contact in contacts:
                    if contact.is_test:
                        contacts = [contact]
                        break

            schedule = Schedule.objects.create(created_by=user, modified_by=user) if has_schedule else None
            broadcast = Broadcast.create(
                org, user, text, groups=groups, contacts=contacts, urns=urns, schedule=schedule, status=QUEUED
            )

            if not has_schedule:
                self.post_save(broadcast)
                super().form_valid(form)

            analytics.track(
                self.request.user.username,
                "temba.broadcast_created",
                dict(contacts=len(contacts), groups=len(groups), urns=len(urns)),
            )

            if "_format" in self.request.GET and self.request.GET["_format"] == "json":
                data = dict(status="success", redirect=reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
                return HttpResponse(json.dumps(data), content_type="application/json")
            else:
                if self.form.cleaned_data["schedule"]:
                    return HttpResponseRedirect(reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
                return HttpResponseRedirect(self.get_success_url())
开发者ID:teehamaral,项目名称:rapidpro,代码行数:57,代码来源:views.py


示例5: form_valid

        def form_valid(self, form):
            analytics.track(self.request.user.username, 'temba.trigger_created_schedule')
            schedule = Schedule.objects.create(created_by=self.request.user, modified_by=self.request.user)

            if form.starts_never():
                schedule.reset()

            elif form.stopped():
                schedule.reset()

            elif form.starts_now():
                schedule.next_fire = timezone.now() - timedelta(days=1)
                schedule.repeat_period = 'O'
                schedule.repeat_days = 0
                schedule.status = 'S'
                schedule.save()

            else:
                # Scheduled case
                schedule.status = 'S'
                schedule.repeat_period = form.cleaned_data['repeat_period']
                start_time = form.get_start_time()
                if start_time:
                    schedule.next_fire = start_time

                # create our recurrence
                if form.is_recurring():
                    days = None
                    if 'repeat_days' in form.cleaned_data:
                        days = form.cleaned_data['repeat_days']
                    schedule.repeat_days = days
                    schedule.repeat_hour_of_day = schedule.next_fire.hour
                    schedule.repeat_day_of_month = schedule.next_fire.day
                schedule.save()

            recipients = self.form.cleaned_data['omnibox']

            trigger = Trigger.objects.create(flow=self.form.cleaned_data['flow'],
                                             org=self.request.user.get_org(),
                                             schedule=schedule,
                                             trigger_type=SCHEDULE_TRIGGER,
                                             created_by=self.request.user,
                                             modified_by=self.request.user)

            for group in recipients['groups']:
                trigger.groups.add(group)

            for contact in recipients['contacts']:
                trigger.contacts.add(contact)

            self.post_save(trigger)


            response = self.render_to_response(self.get_context_data(form=form))
            response['REDIRECT'] = self.get_success_url()
            return response
开发者ID:TextoCMR,项目名称:TexTo.cm,代码行数:56,代码来源:views.py


示例6: form_valid

        def form_valid(self, form):
            user = self.request.user
            org = user.get_org()

            Trigger.objects.create(created_by=user, modified_by=user, org=org, trigger_type=Trigger.TYPE_FOLLOW,
                                   flow=form.cleaned_data['flow'], channel=form.cleaned_data['channel'])

            analytics.track(self.request.user.username, 'temba.trigger_created_follow')

            response = self.render_to_response(self.get_context_data(form=form))
            response['REDIRECT'] = self.get_success_url()
            return response
开发者ID:daffyfeng,项目名称:rapidpro,代码行数:12,代码来源:views.py


示例7: pre_save

        def pre_save(self, obj):
            anon = User.objects.get(id=-1)
            obj = super(LeadCRUDL.Create, self).pre_save(obj)
            obj.created_by = anon
            obj.modified_by = anon

            if self.request.user.is_anonymous():
                analytics.identify(obj.email, dict(email=obj.email, plan='None', segment=randint(1, 10),
                                                   brand=self.request.branding['slug']))
                analytics.track(obj.email, 'temba.org_lead')

            return obj
开发者ID:Ebaneck,项目名称:rapidpro,代码行数:12,代码来源:views.py


示例8: pre_save

        def pre_save(self, obj):
            anon = User.objects.get(username=settings.ANONYMOUS_USER_NAME)
            obj = super(LeadCRUDL.Create, self).pre_save(obj)
            obj.created_by = anon
            obj.modified_by = anon

            if self.request.user.is_anonymous():
                analytics.identify(
                    obj.email,
                    dict(email=obj.email, plan="None", segment=randint(1, 10), brand=self.request.branding["slug"]),
                )
                analytics.track(obj.email, "temba.org_lead")

            return obj
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:14,代码来源:views.py


示例9: form_valid

        def form_valid(self, form):
            user = self.request.user
            org = user.get_org()

            trigger = Trigger.objects.create(created_by=user, modified_by=user, org=org, trigger_type=Trigger.TYPE_NEW_CONVERSATION,
                                             flow=form.cleaned_data['flow'], channel=form.cleaned_data['channel'])
            trigger.archive_conflicts(user)
            trigger.channel.set_fb_call_to_action_payload(Channel.GET_STARTED)

            analytics.track(self.request.user.username, 'temba.trigger_created_new_conversation')

            response = self.render_to_response(self.get_context_data(form=form))
            response['REDIRECT'] = self.get_success_url()
            return response
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:14,代码来源:views.py


示例10: form_valid

        def form_valid(self, form):
            self.form = form
            user = self.request.user
            simulation = self.request.REQUEST.get("simulation", "false") == "true"

            omnibox = self.form.cleaned_data["omnibox"]
            has_schedule = self.form.cleaned_data["schedule"]

            groups = list(omnibox["groups"])
            contacts = list(omnibox["contacts"])
            urns = list(omnibox["urns"])
            recipients = list()

            if simulation:
                # when simulating make sure we only use test contacts
                for contact in contacts:
                    if contact.is_test:
                        recipients.append(contact)
            else:
                for group in groups:
                    recipients.append(group)
                for contact in contacts:
                    recipients.append(contact)
                for urn in urns:
                    recipients.append(urn)

            schedule = Schedule.objects.create(created_by=user, modified_by=user) if has_schedule else None
            broadcast = Broadcast.create(
                user.get_org(), user, self.form.cleaned_data["text"], recipients, schedule=schedule
            )

            if not has_schedule:
                self.post_save(broadcast)
                super(BroadcastCRUDL.Send, self).form_valid(form)

            analytics.track(
                self.request.user.username,
                "temba.broadcast_created",
                dict(contacts=len(contacts), groups=len(groups), urns=len(urns)),
            )

            if "_format" in self.request.REQUEST and self.request.REQUEST["_format"] == "json":
                data = dict(status="success", redirect=reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
                return HttpResponse(json.dumps(data), content_type="application/json")
            else:
                if self.form.cleaned_data["schedule"]:
                    return HttpResponseRedirect(reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
                return HttpResponseRedirect(self.get_success_url())
开发者ID:mdheyab,项目名称:rapidpro,代码行数:48,代码来源:views.py


示例11: render_to_response

        def render_to_response(self, context, **response_kwargs):

            analytics.track(self.request.user.username, 'temba.contact_exported')

            user = self.request.user
            org = user.get_org()

            group = None
            group_id = self.request.REQUEST.get('g', None)
            if group_id:
                groups = ContactGroup.user_groups.filter(pk=group_id, org=org)

                if groups:
                    group = groups[0]

            host = self.request.branding['host']

            # is there already an export taking place?
            existing = ExportContactsTask.objects.filter(org=org, is_finished=False,
                                                         created_on__gt=timezone.now() - timedelta(hours=24))\
                                                 .order_by('-created_on').first()

            # if there is an existing export, don't allow it
            if existing:
                messages.info(self.request,
                              _("There is already an export in progress, started by %s. You must wait "
                                "for that export to complete before starting another." % existing.created_by.username))

            # otherwise, off we go
            else:
                export = ExportContactsTask.objects.create(created_by=user, modified_by=user, org=org,
                                                           group=group, host=host)
                export_contacts_task.delay(export.pk)

                if not getattr(settings, 'CELERY_ALWAYS_EAGER', False):
                    messages.info(self.request,
                                  _("We are preparing your export. We will e-mail you at %s when it is ready.")
                                  % self.request.user.username)

                else:
                    export = ExportContactsTask.objects.get(id=export.pk)
                    dl_url = reverse('assets.download', kwargs=dict(type='contact_export', pk=export.pk))
                    messages.info(self.request,
                                  _("Export complete, you can find it here: %s (production users will get an email)")
                                  % dl_url)

            return HttpResponseRedirect(reverse('contacts.contact_list'))
开发者ID:thierhost,项目名称:rapidpro,代码行数:47,代码来源:views.py


示例12: send_to_flow_node

def send_to_flow_node(org_id, user_id, text, **kwargs):
    from django.contrib.auth.models import User
    from temba.contacts.models import Contact
    from temba.orgs.models import Org
    from temba.flows.models import FlowRun

    org = Org.objects.get(pk=org_id)
    user = User.objects.get(pk=user_id)
    simulation = kwargs.get("simulation", "false") == "true"
    node_uuid = kwargs.get("s", None)

    runs = FlowRun.objects.filter(org=org, current_node_uuid=node_uuid, is_active=True)

    contact_ids = (
        Contact.objects.filter(org=org, is_blocked=False, is_stopped=False, is_active=True, is_test=simulation)
        .filter(id__in=runs.values_list("contact", flat=True))
        .values_list("id", flat=True)
    )

    broadcast = Broadcast.create(org, user, text, contact_ids=contact_ids)
    broadcast.send(expressions_context={})

    analytics.track(user.username, "temba.broadcast_created", dict(contacts=len(contact_ids), groups=0, urns=0))
开发者ID:teehamaral,项目名称:rapidpro,代码行数:23,代码来源:tasks.py


示例13: form_valid

        def form_valid(self, form):
            try:
                cleaned_data = form.cleaned_data
                org = self.request.user.get_org()

                for key in cleaned_data:
                    if key.startswith('field_'):
                        idx = key[6:]
                        label = cleaned_data["label_%s" % idx]
                        field = cleaned_data[key]
                        show_in_table = cleaned_data["show_%s" % idx]
                        value_type = cleaned_data['type_%s' % idx]

                        if field == '__new_field':
                            if label:
                                analytics.track(self.request.user.username, 'temba.contactfield_created')
                                key = ContactField.make_key(label)
                                ContactField.get_or_create(org, key, label, show_in_table=show_in_table, value_type=value_type)
                        else:
                            if label:
                                ContactField.get_or_create(org, field.key, label, show_in_table=show_in_table, value_type=value_type)
                            else:
                                ContactField.hide_field(org, field.key)

                if 'HTTP_X_PJAX' not in self.request.META:
                    return HttpResponseRedirect(self.get_success_url())
                else:  # pragma: no cover
                    return self.render_to_response(self.get_context_data(form=form,
                                                                         success_url=self.get_success_url(),
                                                                         success_script=getattr(self, 'success_script', None)))

            except IntegrityError as e:  # pragma: no cover
                message = str(e).capitalize()
                errors = self.form._errors.setdefault(forms.forms.NON_FIELD_ERRORS, forms.utils.ErrorList())
                errors.append(message)
                return self.render_to_response(self.get_context_data(form=form))
开发者ID:austiine04,项目名称:rapidpro,代码行数:36,代码来源:views.py


示例14: claim_number

    def claim_number(self, user, phone_number, country, role):
        org = user.get_org()

        client = org.get_twilio_client()
        twilio_phones = client.api.incoming_phone_numbers.stream(phone_number=phone_number)
        channel_uuid = uuid4()

        # create new TwiML app
        callback_domain = org.get_brand_domain()
        new_receive_url = "https://" + callback_domain + reverse("courier.t", args=[channel_uuid, "receive"])
        new_status_url = (
            "https://" + callback_domain + reverse("handlers.twilio_handler", args=["status", channel_uuid])
        )
        new_voice_url = "https://" + callback_domain + reverse("handlers.twilio_handler", args=["voice", channel_uuid])

        new_app = client.api.applications.create(
            friendly_name="%s/%s" % (callback_domain.lower(), channel_uuid),
            sms_url=new_receive_url,
            sms_method="POST",
            voice_url=new_voice_url,
            voice_fallback_url="https://" + settings.TEMBA_HOST+settings.MEDIA_URL+ "voice_unavailable.xml",
            voice_fallback_method="GET",
            status_callback=new_status_url,
            status_callback_method="POST",
        )

        is_short_code = len(phone_number) <= 6
        if is_short_code:
            short_codes = client.api.short_codes.stream(short_code=phone_number)
            short_code = next(short_codes, None)

            if short_code:
                number_sid = short_code.sid
                app_url = "https://" + callback_domain + "%s" % reverse("courier.t", args=[channel_uuid, "receive"])
                client.api.short_codes.get(number_sid).update(sms_url=app_url, sms_method="POST")

                role = Channel.ROLE_SEND + Channel.ROLE_RECEIVE
                phone = phone_number

            else:  # pragma: no cover
                raise Exception(
                    _(
                        "Short code not found on your Twilio Account. "
                        "Please check you own the short code and Try again"
                    )
                )
        else:
            twilio_phone = next(twilio_phones, None)
            if twilio_phone:

                client.api.incoming_phone_numbers.get(twilio_phone.sid).update(
                    voice_application_sid=new_app.sid, sms_application_sid=new_app.sid
                )

            else:  # pragma: needs cover
                twilio_phone = client.api.incoming_phone_numbers.create(
                    phone_number=phone_number, voice_application_sid=new_app.sid, sms_application_sid=new_app.sid
                )

            phone = phonenumbers.format_number(
                phonenumbers.parse(phone_number, None), phonenumbers.PhoneNumberFormat.NATIONAL
            )

            number_sid = twilio_phone.sid

        org_config = org.config
        config = {
            Channel.CONFIG_APPLICATION_SID: new_app.sid,
            Channel.CONFIG_NUMBER_SID: number_sid,
            Channel.CONFIG_ACCOUNT_SID: org_config[ACCOUNT_SID],
            Channel.CONFIG_AUTH_TOKEN: org_config[ACCOUNT_TOKEN],
            Channel.CONFIG_CALLBACK_DOMAIN: callback_domain,
        }

        channel = Channel.create(
            org, user, country, "T", name=phone, address=phone_number, role=role, config=config, uuid=channel_uuid
        )

        analytics.track(user.username, "temba.channel_claim_twilio", properties=dict(number=phone_number))

        return channel
开发者ID:mxabierto,项目名称:rapidpro,代码行数:81,代码来源:views.py


示例15: claim_number

    def claim_number(self, user, phone_number, country, role):
        org = user.get_org()

        client = org.get_nexmo_client()
        org_config = org.config
        app_id = org_config.get(NEXMO_APP_ID)

        nexmo_phones = client.get_numbers(phone_number)
        is_shortcode = False

        # try it with just the national code (for short codes)
        if not nexmo_phones:
            parsed = phonenumbers.parse(phone_number, None)
            shortcode = str(parsed.national_number)

            nexmo_phones = client.get_numbers(shortcode)

            if nexmo_phones:
                is_shortcode = True
                phone_number = shortcode

        # buy the number if we have to
        if not nexmo_phones:
            try:
                client.buy_nexmo_number(country, phone_number)
            except Exception as e:
                raise Exception(
                    _(
                        "There was a problem claiming that number, "
                        "please check the balance on your account. " + "Note that you can only claim numbers after "
                        "adding credit to your Nexmo account."
                    )
                    + "\n"
                    + str(e)
                )

        channel_uuid = generate_uuid()
        callback_domain = org.get_brand_domain()
        new_receive_url = "https://" + callback_domain + reverse("courier.nx", args=[channel_uuid, "receive"])

        nexmo_phones = client.get_numbers(phone_number)

        features = [elt.upper() for elt in nexmo_phones[0]["features"]]
        role = ""
        if "SMS" in features:
            role += Channel.ROLE_SEND + Channel.ROLE_RECEIVE

        if "VOICE" in features:
            role += Channel.ROLE_ANSWER + Channel.ROLE_CALL

        # update the delivery URLs for it
        try:
            client.update_nexmo_number(country, phone_number, new_receive_url, app_id)

        except Exception as e:  # pragma: no cover
            # shortcodes don't seem to claim right on nexmo, move forward anyways
            if not is_shortcode:
                raise Exception(
                    _("There was a problem claiming that number, please check the balance on your account.")
                    + "\n"
                    + str(e)
                )

        if is_shortcode:
            phone = phone_number
            nexmo_phone_number = phone_number
        else:
            parsed = phonenumbers.parse(phone_number, None)
            phone = phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.INTERNATIONAL)

            # nexmo ships numbers around as E164 without the leading +
            nexmo_phone_number = phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164).strip("+")

        config = {
            Channel.CONFIG_NEXMO_APP_ID: app_id,
            Channel.CONFIG_NEXMO_APP_PRIVATE_KEY: org_config[NEXMO_APP_PRIVATE_KEY],
            Channel.CONFIG_NEXMO_API_KEY: org_config[NEXMO_KEY],
            Channel.CONFIG_NEXMO_API_SECRET: org_config[NEXMO_SECRET],
            Channel.CONFIG_CALLBACK_DOMAIN: callback_domain,
        }

        channel = Channel.create(
            org,
            user,
            country,
            "NX",
            name=phone,
            address=phone_number,
            role=role,
            config=config,
            bod=nexmo_phone_number,
            uuid=channel_uuid,
            tps=1,
        )

        analytics.track(user.username, "temba.channel_claim_nexmo", dict(number=phone_number))

        return channel
开发者ID:teehamaral,项目名称:rapidpro,代码行数:98,代码来源:views.py



注:本文中的temba.utils.analytics.track函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python json.dumps函数代码示例发布时间:2022-05-27
下一篇:
Python models.Msg类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap