• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python swapper.load_model函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swapper.load_model函数的典型用法代码示例。如果您正苦于以下问题:Python load_model函数的具体用法?Python load_model怎么用?Python load_model使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了load_model函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setup_imagestore_permissions

def setup_imagestore_permissions(instance, created, **kwargs):
    if not created:
        return
    try:
        Album = swapper.load_model('imagestore', 'Album')
        Image = swapper.load_model('imagestore', 'Image')

        image_applabel, image_classname = Image._meta.app_label, Image.__name__.lower()
        album_applabel, album_classname = Album._meta.app_label, Album.__name__.lower()

        image_ctype = ContentType.objects.get(app_label=image_applabel, model=image_classname)
        album_ctype = ContentType.objects.get(app_label=album_applabel, model=album_classname)

        add_image_permission = Permission.objects.get(codename='add_%s' % image_classname, content_type=image_ctype)
        add_album_permission = Permission.objects.get(codename='add_%s' % album_classname, content_type=album_ctype)
        change_image_permission = Permission.objects.get(codename='change_%s' % image_classname, content_type=image_ctype)
        change_album_permission = Permission.objects.get(codename='change_%s' % album_classname, content_type=album_ctype)
        delete_image_permission = Permission.objects.get(codename='delete_%s' % image_classname, content_type=image_ctype)
        delete_album_permission = Permission.objects.get(codename='delete_%s' % album_classname, content_type=album_ctype)
        instance.user_permissions.add(add_image_permission, add_album_permission,)
        instance.user_permissions.add(change_image_permission, change_album_permission,)
        instance.user_permissions.add(delete_image_permission, delete_album_permission,)
    except ObjectDoesNotExist:
        # Permissions are not yet installed or content does not created yet
        # probaly this is first
        pass
开发者ID:SaadBinShahid,项目名称:basic-django-imagestore,代码行数:26,代码来源:image.py


示例2: process_zipfile

def process_zipfile(uploaded_album):
    Image = swapper.load_model('imagestore', 'Image')
    Album = swapper.load_model('imagestore', 'Album')

    if default_storage.exists(uploaded_album.zip_file.name):
        # TODO: implement try-except here
        zip = zipfile.ZipFile(uploaded_album.zip_file)
        bad_file = zip.testzip()
        if bad_file:
            raise Exception('"%s" in the .zip archive is corrupt.' % bad_file)

        if not uploaded_album.album:
            uploaded_album.album = \
                Album.objects.create(name=uploaded_album.new_album_name)

        for filename in sorted(zip.namelist()):
            try:
                encoding = chardet.detect(filename)['encoding']
                unicode_filename = filename.decode(encoding)
            except ValueError:  # if detect takes unicode string
                unicode_filename = filename

            # do not process meta files
            if unicode_filename.startswith('__'):
                continue

            logger.info('Processing file {}.'.format(unicode_filename))

            data = zip.read(filename)
            if len(data):
                try:
                    # the following is taken from django.forms.fields.ImageField:
                    # load() could spot a truncated JPEG, but it loads the entire
                    # image in memory, which is a DoS vector. See #3848 and #18520.
                    # verify() must be called immediately after the constructor.
                    PILImage.open(BytesIO(data)).verify()
                except Exception as e:
                    # if a "bad" file is found we just skip it.
                    logger.info('Error verify image: %s' % e)
                    continue

                if hasattr(data, 'seek') and \
                        isinstance(data.seek, collections.Callable):
                    logger.info('Seeked')
                    data.seek(0)

                try:
                    img = Image(album=uploaded_album.album)
                    img.image.save(filename, ContentFile(data))
                    img.save()
                except Exception as e:
                    logger.info('Error create Image: %s' % e)
                    continue
        zip.close()
        uploaded_album.delete()
开发者ID:holms,项目名称:imagestore,代码行数:55,代码来源:upload.py


示例3: test_create

    def test_create(self):
        Type = swapper.load_model('default_app', 'Type')
        Item = swapper.load_model('default_app', 'Item')

        Item.objects.create(
            type=Type.objects.create(name="Type 1"),
            name="Item 1",
        )

        self.assertEqual(Item.objects.count(), 1)

        item = Item.objects.all()[0]
        self.assertEqual(item.type.name, "Type 1")
开发者ID:wq,项目名称:django-swappable-models,代码行数:13,代码来源:test_swapper.py


示例4: set_for_events

    def set_for_events(self, delete=True, **event_filter):
        """
        Update EventResult cache for the given events.  The event query should
        be specified as query keyword arguments, rather than a queryset, so
        that a JOIN can be used instead of retrieving the results for each
        event separately.
        """

        # Delete existing EventResults (using denormalized Event fields)
        if delete:
            er_filter = {
                'event_' + key: val for key, val in event_filter.items()
            }
            self.filter(**er_filter).delete()

        # Filter results (using JOIN through Report to Event)
        Result = swapper.load_model('vera', 'Result')
        result_filter = {
            'report__event__' + key: val for key, val in event_filter.items()
        }
        ers = []
        results = Result.objects.valid_results(
            **result_filter
        ).select_related('report__event')
        for result in results:
            er = self.model(
                event=result.report.event,
                result=result
            )
            er.denormalize()
            ers.append(er)
        self.bulk_create(ers)
开发者ID:TipOfit,项目名称:vera,代码行数:32,代码来源:models.py


示例5: test_swap_fields

 def test_swap_fields(self):
     Type = swapper.load_model('default_app', 'Type')
     fields = dict(
         (field.name, field)
         for field in Type._meta.fields
     )
     self.assertIn('code', fields)
开发者ID:wq,项目名称:django-swappable-models,代码行数:7,代码来源:test_swapper.py


示例6: test_fields

 def test_fields(self):
     Type = swapper.load_model('default_app', 'Type')
     fields = {
         field.name: field
         for field in Type._meta.fields
     }
     self.assertIn('name', fields)
开发者ID:ebugfix,项目名称:django-swappable-models,代码行数:7,代码来源:test_swapper.py


示例7: test_page_fails_with_nonexistant_template

def test_page_fails_with_nonexistant_template(client):
    Page = swapper.load_model('varlet', 'Page')
    homepage = Page(url='', template='home.html')
    with pytest.raises(ValidationError) as e:
        homepage.full_clean()
    homepage.save()
    with pytest.raises(TemplateDoesNotExist):
        client.get(homepage.get_absolute_url())
开发者ID:kezabelle,项目名称:django-varlet,代码行数:8,代码来源:test_views.py


示例8: process_tweet_queue

    def process_tweet_queue(self):
        """
        Inserts any queued tweets into the database.

        It is ok for this to be called on a thread other than the streaming thread.
        """

        # this is for calculating the tps rate
        now = time.time()
        diff = now - self.time
        self.time = now

        try:
            batch = self.queue.get_all_nowait()
        except Queue.Empty:
            return 0

        if len(batch) == 0:
            return 0

        Tweet = load_model("twitter_stream", "Tweet")

        tweets = []
        for status in batch:
            if settings.CAPTURE_EMBEDDED and 'retweeted_status' in status:
                if self.to_file:
                    tweets.append(json.dumps(status['retweeted_status']))
                else:
                    tweets.append(Tweet.create_from_json(status['retweeted_status']))

            if self.to_file:
                if 'retweeted_status' in status:
                    del status['retweeted_status']

                tweets.append(json.dumps(status))
            else:
                tweets.append(Tweet.create_from_json(status))

        if tweets:
            if self.to_file:
                if not self._output_file or self._output_file.closed:
                    self._output_file = open(self.to_file, 'ab')
                self._output_file.write("\n".join(tweets) + "\n")
                self._output_file.flush()
                logger.info("Dumped %s tweets at %s tps to %s" % (len(tweets), len(tweets) / diff, self.to_file))
            else:
                Tweet.objects.bulk_create(tweets, settings.INSERT_BATCH_SIZE)
                logger.info("Inserted %s tweets at %s tps" % (len(tweets), len(tweets) / diff))
        else:
            logger.info("Saved 0 tweets")

        if settings.DEBUG:
            # Prevent apparent memory leaks
            # https://docs.djangoproject.com/en/dev/faq/models/#why-is-django-leaking-memory
            from django import db
            db.reset_queries()

        return len(tweets) / diff
开发者ID:acquayefrank,项目名称:django-twitter-stream,代码行数:58,代码来源:streaming.py


示例9: test_page_renders_html_ok

def test_page_renders_html_ok(client):
    Page = swapper.load_model('varlet', 'Page')
    homepage = Page(url='', template='varlet/pages/layouts/test_template.html')
    homepage.full_clean()
    homepage.save()
    response = client.get(homepage.get_absolute_url())
    assert response.status_code == 200
    response.render()
    assert '<b>Test template</b>' in force_text(response.content)
开发者ID:kezabelle,项目名称:django-varlet,代码行数:9,代码来源:test_views.py


示例10: test_page_renders_json_detailview_ok

def test_page_renders_json_detailview_ok(client):
    Page = swapper.load_model('varlet', 'Page')
    page = Page(url='/test/', template='varlet/pages/layouts/test_template.html')
    page.full_clean()
    page.save()
    response = client.get(page.get_absolute_url(), HTTP_ACCEPT="application/json")
    assert response.status_code == 200
    data = json.loads(force_text(response.content))
    assert data['get_absolute_url'] == '/test/'
开发者ID:kezabelle,项目名称:django-varlet,代码行数:9,代码来源:test_views.py


示例11: test_rendering_sitemaps

def test_rendering_sitemaps(rf):
    Page = swapper.load_model('varlet', 'Page')
    pages = [Page.objects.create(url=str(x), template="varlet/pages/layouts/test_template.html") for x in range(1, 5)]
    sm = PageSitemap()
    response = sitemap(rf.get('/'), sitemaps={'pages': sm})
    response.render()
    assert '<loc>http://example.com/3/</loc>' in force_text(response.content)
    assert '<loc>http://example.com/2/</loc>' in force_text(response.content)
    assert '<loc>http://example.com/1/</loc>' in force_text(response.content)
开发者ID:kezabelle,项目名称:django-varlet,代码行数:9,代码来源:test_sitemaps.py


示例12: get_admin_url

 def get_admin_url(self):
     model = swapper.load_model('varlet', 'Page')
     if not admin.site.is_registered(model):
         return None
     try:
         url = reverse('admin:{}_{}_autocomplete'.format(
             model._meta.app_label, model._meta.model_name))
     except NoReverseMatch:
         return None
     return url
开发者ID:kezabelle,项目名称:django-varlet,代码行数:10,代码来源:admin.py


示例13: get_page

def get_page(path):
    Page = swapper.load_model('varlet', 'Page')
    field = Page._meta.get_field('url')
    try:
        cleaned_path = field.to_python(path)
    except ValidationError:
        return None
    try:
        return Page.objects.get(url=cleaned_path)
    except (Page.DoesNotExist, MultipleObjectsReturned):
        return None
开发者ID:kezabelle,项目名称:django-varlet,代码行数:11,代码来源:page_tags.py


示例14: process_zipfile

def process_zipfile(uploaded_album):
    Image = swapper.load_model('imagestore', 'Image')
    Album = swapper.load_model('imagestore', 'Album')

    if default_storage.exists(uploaded_album.zip_file.name):
        # TODO: implement try-except here
        zip = zipfile.ZipFile(uploaded_album.zip_file)
        bad_file = zip.testzip()
        if bad_file:
            raise Exception('"%s" in the .zip archive is corrupt.' % bad_file)

        if not uploaded_album.album:
            uploaded_album.album = Album.objects.create(name=uploaded_album.new_album_name)

        for filename in sorted(zip.namelist()):
            if filename.startswith('__'):  # do not process meta files
                continue
            print(filename.encode('ascii', errors='replace'))
            data = zip.read(filename)
            if len(data):
                try:
                    # the following is taken from django.forms.fields.ImageField:
                    # load() could spot a truncated JPEG, but it loads the entire
                    # image in memory, which is a DoS vector. See #3848 and #18520.
                    # verify() must be called immediately after the constructor.
                    PILImage.open(six.moves.cStringIO(data)).verify()
                except Exception as ex:
                    # if a "bad" file is found we just skip it.
                    print('Error verify image: %s' % ex.message)
                    continue
                if hasattr(data, 'seek') and isinstance(data.seek, collections.Callable):
                    print('seeked')
                    data.seek(0)
                try:
                    img = Image(album=uploaded_album.album)
                    img.image.save(filename, ContentFile(data))
                    img.save()
                except Exception as ex:
                    print('error create Image: %s' % ex.message)
        zip.close()
        uploaded_album.delete()
开发者ID:SaadBinShahid,项目名称:basic-django-imagestore,代码行数:41,代码来源:upload.py


示例15: setup_imagestore_permissions

def setup_imagestore_permissions(instance, created, **kwargs):
    if not created:
        return
    try:
        Album = swapper.load_model('imagestore', 'Album')
        Image = swapper.load_model('imagestore', 'Image')

        perms = []

        for model_class in [Album, Image]:
            for perm_name in ['add', 'change', 'delete']:
                app_label, model_name = model_class._meta.app_label, model_class.__name__.lower()
                perm = Permission.objects.get_by_natural_key('%s_%s' % (perm_name, model_name), app_label, model_name)
                perms.append(perm)

        instance.user_permissions.add(*perms)

    except ObjectDoesNotExist:
        # Permissions are not yet installed or content does not created yet
        # probaly this is first
        pass
开发者ID:grengojbo,项目名称:imagestore,代码行数:21,代码来源:image.py


示例16: stream_status

def stream_status():
    terms = FilterTerm.objects.filter(enabled=True)
    processes = StreamProcess.get_current_stream_processes()
    running = False
    for p in processes:
        if p.status == StreamProcess.STREAM_STATUS_RUNNING:
            running = True
            break

    Tweet = load_model("twitter_stream", "Tweet")
    tweet_count = Tweet.count_approx()
    earliest_time = Tweet.get_earliest_created_at()
    latest_time = Tweet.get_latest_created_at()

    avg_rate = None
    if earliest_time is not None and latest_time is not None:
        avg_rate = float(tweet_count) / (latest_time - earliest_time).total_seconds()

    # Get the tweets / minute over the past 10 minutes
    tweet_counts = []
    if latest_time is not None:
        latest_time_minute = latest_time.replace(second=0, microsecond=0)

        if settings.DATABASES['default']['ENGINE'].endswith('mysql'):
            drop_seconds = "created_at - INTERVAL SECOND(created_at) SECOND"
        elif settings.DATABASES['default']['ENGINE'].endswith('postgresql_psycopg2'):
            drop_seconds = "date_trunc('minute', created_at)"
        else:
            drop_seconds = "created_at"

        tweet_counts = Tweet.objects.extra(select={
            'time': drop_seconds
        }) \
            .filter(created_at__gt=latest_time_minute - timedelta(minutes=20)) \
            .values('time') \
            .order_by('time') \
            .annotate(tweets=models.Count('id'))

        tweet_counts = list(tweet_counts)

    for row in tweet_counts:
        row['time'] = row['time'].isoformat()

    return {
        'running': running,
        'terms': [t.term for t in terms],
        'processes': processes,
        'tweet_count': tweet_count,
        'earliest': earliest_time,
        'latest': latest_time,
        'avg_rate': avg_rate,
        'timeline': tweet_counts
    }
开发者ID:michaelbrooks,项目名称:django-twitter-stream,代码行数:53,代码来源:views.py


示例17: test_page_renders_json_listview_ok

def test_page_renders_json_listview_ok(client):
    Page = swapper.load_model('varlet', 'Page')
    homepage = Page(url='', template='varlet/pages/layouts/test_template.html')
    homepage.full_clean()
    homepage.save()
    for x in range(1, 3):
        y = Page.objects.create(url=str(x), template='varlet/pages/layouts/test_template.html')
    response = client.get(homepage.get_absolute_url(), HTTP_ACCEPT="application/json")
    assert response.status_code == 200
    data = json.loads(force_text(response.content))
    assert len(data) == 3
    assert {x['get_absolute_url'] for x in data} == {"/", "/1/", "/2/"}
开发者ID:kezabelle,项目名称:django-varlet,代码行数:12,代码来源:test_views.py


示例18: dictionary_get

def dictionary_get(request):
    if request.method != "GET":
        return False
    # print('start view')
    model_name = request.GET["model_name"]
    # print(model_name)
    my_model = swapper.load_model("chemical", model_name)
    model_dict = {}

    for mdl in my_model.objects.all():
        model_dict[mdl.pk] = mdl.name

    xml_bytes = json.dumps(model_dict)
    # print(xml_bytes)
    # print('end eiw')
    return HttpResponse(xml_bytes, "application/json")
开发者ID:KateGL,项目名称:chemcloudsite,代码行数:16,代码来源:views.py


示例19: vals

    def vals(self, vals):
        Parameter = swapper.load_model('vera', 'Parameter')
        keys = [(key,) for key in vals.keys()]
        params, success = Parameter.objects.resolve_keys(keys)
        if not success:
            missing = [
                name for name, params in params.items() if params is None
            ]
            raise TypeError(
                "Could not identify one or more parameters: %s!"
                % missing
            )

        for key, param in params.items():
            result, is_new = self.results.get_or_create(type=param)
            result.value = vals[key[0]]
            result.save()
开发者ID:TipOfit,项目名称:vera,代码行数:17,代码来源:models.py


示例20: valid_results

    def valid_results(self, **filter):
        Parameter = swapper.load_model('vera', 'Parameter')

        filter['report__status__is_valid'] = True
        filter['empty'] = False

        # DISTINCT ON event, then parameter, collapsing results from different
        # reports into one
        distinct = ['report__event__id'] + nest_ordering(
            'type', Parameter._meta.ordering
        ) + ['type__id']

        # ORDER BY distinct fields, then valid report order
        order = distinct + nest_ordering('report', VALID_REPORT_ORDER)

        return self.filter(
            **filter
        ).order_by(*order).distinct(*distinct).select_related('report')
开发者ID:TipOfit,项目名称:vera,代码行数:18,代码来源:models.py



注:本文中的swapper.load_model函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python swift._函数代码示例发布时间:2022-05-27
下一篇:
Python common.get_user_login_object函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap