• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python http.json_resp函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.http.json_resp函数的典型用法代码示例。如果您正苦于以下问题:Python json_resp函数的具体用法?Python json_resp怎么用?Python json_resp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了json_resp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: search_bangumi

    def search_bangumi(self, type, term, offset, count):
        """
        search bangumi from bangumi.tv, properly handling cookies is required for the bypass anti-bot mechanism
        :param term: a urlencoded word of the search term.
        :return: a json object
        """

        result = {"data": [], "total": 0}
        api_url = 'http://api.bgm.tv/search/subject/{0}?responseGroup=large&max_result={1}&start={2}&type={3}'.format(term.encode('utf-8'), count, offset, type)
        r = bangumi_request.get(api_url)

        if r.status_code > 399:
            r.raise_for_status()

        try:
            bgm_content = r.json()
        except Exception as error:
            logger.warn(error)
            result['message'] = 'fail to query bangumi'
            return json_resp(result, 500)

        if 'code' in bgm_content and bgm_content['code'] == 404:
            return json_resp(result, 200)

        bgm_list = bgm_content['list']
        total_count = bgm_content['results']
        if len(bgm_list) == 0:
            return json_resp(result)

        bgm_id_list = [bgm['id'] for bgm in bgm_list]
        bangumi_list = self.get_bangumi_from_bgm_id_list(bgm_id_list)

        for bgm in bgm_list:
            bgm['bgm_id'] = bgm.get('id')
            bgm['id'] = None
            # if bgm_id has found in database, give the database id to bgm.id
            # that's we know that this bangumi exists in our database
            for bangumi in bangumi_list:
                if bgm['bgm_id'] == bangumi.bgm_id:
                    bgm['id'] = bangumi.id
                    break
            bgm_images = bgm.get('images')
            if bgm_images:
                bgm['image'] = bgm_images.get('large')
            # remove useless keys
            bgm.pop('images', None)
            bgm.pop('collection', None)
            bgm.pop('url', None)
            bgm.pop('type', None)

        result['data'] = bgm_list
        result['total'] = total_count
        return json_resp(result)
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:53,代码来源:admin.py


示例2: add_web_hook_token

    def add_web_hook_token(self, token_id, web_hook_id, user):
        session = SessionManager.Session()
        try:
            web_hook = session.query(WebHook).filter(WebHook.id == web_hook_id).one()
            web_hook_token = WebHookToken(web_hook_id=web_hook_id,
                                          user_id=user.id,
                                          token_id=token_id)
            session.add(web_hook_token)
            session.commit()
            method_args = {
                'web_hook_id': web_hook_id,
                'token_id': token_id,
                'user_id': user.id,
                'email': None
            }
            if web_hook.has_permission(WebHook.PERMISSION_EMAIL) and user.email is not None and user.email_confirmed:
                method_args['email'] = user.email

            rpc_request.send('token_add', method_args)

            return json_resp({'message': 'ok'})
        except NoResultFound:
            raise ClientError('web hook not existed')
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:25,代码来源:web_hook.py


示例3: update_web_hook

    def update_web_hook(self, web_hook_id, web_hook_dict):
        """
        update a web hook
        :param web_hook_dict:
        :param web_hook_id:
        :return:
        """
        session = SessionManager.Session()
        try:
            web_hook = session.query(WebHook).\
                filter(WebHook.id == web_hook_id).\
                one()
            web_hook.name = web_hook_dict.get('name')
            web_hook.description = bleach.clean(web_hook_dict.get('description'), tags=self.ALLOWED_TAGS)
            web_hook.url = web_hook_dict.get('url')
            web_hook.status = web_hook_dict.get('status')
            web_hook.consecutive_failure_count = web_hook_dict.get('consecutive_failure_count')
            web_hook.permissions = web_hook_dict.get('permissions')
            if 'shared_secret' in web_hook_dict and web_hook_dict.get('shared_secret') is not None:
                web_hook.shared_secret = web_hook_dict.get('shared_secret')

            session.commit()

            return json_resp({'message': 'ok'})
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:26,代码来源:web_hook.py


示例4: get_bangumi

    def get_bangumi(self, id):
        try:
            session = SessionManager.Session()

            bangumi = session.query(Bangumi).options(joinedload(Bangumi.episodes)).filter(Bangumi.id == id).one()

            episodes = []

            for episode in bangumi.episodes:
                eps = row2dict(episode)
                eps['thumbnail'] = utils.generate_thumbnail_link(episode, bangumi)
                episodes.append(eps)

            bangumi_dict = row2dict(bangumi)

            bangumi_dict['episodes'] = episodes

            bangumi_dict['cover'] = utils.generate_cover_link(bangumi)

            return json_resp({'data': bangumi_dict})
        except NoResultFound:
            raise ClientError(ClientError.NOT_FOUND, 404)
        except Exception as exception:
            raise exception
        finally:
            SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:26,代码来源:admin.py


示例5: update_bangumi

    def update_bangumi(self, bangumi_id, bangumi_dict):
        try:
            session = SessionManager.Session()
            bangumi = session.query(Bangumi).filter(Bangumi.id == bangumi_id).one()

            bangumi.name = bangumi_dict['name']
            bangumi.name_cn = bangumi_dict['name_cn']
            bangumi.summary = bangumi_dict['summary']
            bangumi.eps = bangumi_dict['eps']
            bangumi.eps_regex = bangumi_dict['eps_regex']
            bangumi.image = bangumi_dict['image']
            bangumi.air_date = datetime.strptime(bangumi_dict['air_date'], '%Y-%m-%d')
            bangumi.air_weekday = bangumi_dict['air_weekday']
            bangumi.rss = bangumi_dict['rss']
            bangumi.update_time = datetime.now()

            session.commit()

            return json_resp({'msg': 'ok'})
        except NoResultFound:
            raise ClientError(ClientError.NOT_FOUND)
        except Exception as exception:
            raise exception
        finally:
            SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:25,代码来源:admin.py


示例6: register_web_hook

    def register_web_hook(self, web_hook_dict, add_by_uid):
        """
        register an web hook and send an initial keep alive event
        :param web_hook_dict:
        :param add_by_uid:
        :return:
        """
        session = SessionManager.Session()
        try:
            web_hook = WebHook(name=web_hook_dict.get('name'),
                               description=bleach.clean(web_hook_dict.get('description'), tags=self.ALLOWED_TAGS),
                               url=web_hook_dict.get('url'),
                               shared_secret=web_hook_dict.get('shared_secret'),
                               created_by_uid=add_by_uid,
                               permissions=web_hook_dict.get('permissions'))
            session.add(web_hook)
            session.commit()
            web_hook_id = str(web_hook.id)

            # send event via rpc
            rpc_request.send('initialize_web_hook', {
                'web_hook_id': web_hook_id,
                'web_hook_url': web_hook.url,
                'shared_secret': web_hook.shared_secret
            })

            return json_resp({'data': web_hook_id})
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:29,代码来源:web_hook.py


示例7: request_reset_pass

def request_reset_pass():
    data = json.loads(request.get_data(True, as_text=True))
    if 'email' in data:
        UserCredential.send_pass_reset_email(data['email'])
        return json_resp({'message': 'ok'})
    else:
        raise ClientError(ClientError.INVALID_REQUEST)
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:7,代码来源:user.py


示例8: register

def register():
    """
    register a new user using invite code, note that a newly registered user is not administrator, you need to
    use an admin user to promote it
    :return: response
    """
    content = request.get_data(True, as_text=True)
    register_data = json.loads(content)
    if ('name' in register_data) and ('password' in register_data) and ('password_repeat' in register_data) and ('invite_code' in register_data) and ('email' in register_data):
        name = register_data['name']
        password = register_data['password']
        password_repeat = register_data['password_repeat']
        email = register_data['email']
        invite_code = register_data['invite_code']
        if password != password_repeat:
            raise ClientError(ClientError.PASSWORD_MISMATCH)
        if UserCredential.register_user(name=name, password=password, email=email, invite_code=invite_code):
            # login automatically
            credential = UserCredential.login_user(name, password)
            login_user(credential, remember=False)
            # send email
            credential.send_confirm_email()
            return json_resp({'message': 'ok'}, 201)
    else:
        raise ClientError(ClientError.INVALID_REQUEST)
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:25,代码来源:user.py


示例9: update_episode

    def update_episode(self, episode_id, episode_dict):
        try:
            session = SessionManager.Session()
            episode = session.query(Episode).filter(Episode.id == episode_id).one()
            episode.episode_no = episode_dict.get('episode_no')
            episode.bgm_eps_id = episode_dict.get('bgm_eps_id')
            episode.name = episode_dict.get('name')
            episode.name_cn = episode_dict.get('name_cn')

            if 'airdate' in episode_dict:
                episode.airdate = datetime.strptime(episode_dict.get('airdate'), '%Y-%m-%d')

            episode.duration = episode_dict.get('duration')
            episode.update_time = datetime.utcnow()

            if 'status' in episode_dict:
                episode.status = episode_dict['status']

            session.commit()

            return json_resp({'msg': 'ok'})

        except NoResultFound:
            raise ClientError(ClientError.NOT_FOUND, 404)
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:26,代码来源:admin.py


示例10: list_episode

    def list_episode(self, page, count, sort_field, sort_order, status):
        try:

            session = SessionManager.Session()
            query_object = session.query(Episode).\
                filter(Episode.delete_mark == None)

            if status is not None:
                query_object = query_object.filter(Episode.status==status)
                # count total rows
                total = session.query(func.count(Episode.id)).filter(Episode.status==status).scalar()
            else:
                total = session.query(func.count(Episode.id)).scalar()

            offset = (page - 1) * count

            if sort_order == 'desc':
                episode_list = query_object.\
                    order_by(desc(getattr(Episode, sort_field))).\
                    offset(offset).\
                    limit(count).\
                    all()
            else:
                episode_list = query_object.\
                    order_by(asc(getattr(Episode, sort_field))).\
                    offset(offset).limit(count).\
                    all()

            episode_dict_list = [row2dict(episode, Episode) for episode in episode_list]

            return json_resp({'data': episode_dict_list, 'total': total})
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:33,代码来源:admin.py


示例11: on_air_bangumi

    def on_air_bangumi(self):
        session = SessionManager.Session()
        current_day = datetime.today()
        start_time = datetime(current_day.year, current_day.month, 1)
        if current_day.month == 12:
            next_year = current_day.year + 1
            next_month = 1
        else:
            next_year = current_day.year
            next_month = current_day.month + 1
        end_time = datetime(next_year, next_month, 1)

        try:
            result = session.query(distinct(Episode.bangumi_id), Bangumi).\
                join(Bangumi).\
                filter(Episode.airdate >= start_time).\
                filter(Episode.airdate <= end_time)

            bangumi_list = []
            for bangumi_id, bangumi in result:
                bangumi_dict = row2dict(bangumi)
                bangumi_dict['cover'] = utils.generate_cover_link(bangumi)
                bangumi_list.append(bangumi_dict)

            return json_resp({'data': bangumi_list})
        except Exception as error:
            raise error
        finally:
            SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:29,代码来源:bangumi.py


示例12: get_all_announce

    def get_all_announce(self, position, offset, count, content):
        session = SessionManager.Session()
        try:
            if content:
                announce_list = session.query(Announce).\
                    filter(Announce.content == content).\
                    all()
            else:
                announce_list = session.query(Announce).\
                    filter(Announce.position == position).\
                    offset(offset).\
                    limit(count).\
                    all()

            total = session.query(func.count(Announce.id)). \
                scalar()

            announce_dict_list = []

            for announce in announce_list:
                announce_dict = row2dict(announce, Announce)
                announce_dict_list.append(announce_dict)

            if position == Announce.POSITION_BANGUMI:
                self.__add_bangumi_info(session, announce_dict_list)

            return json_resp({'data': announce_dict_list, 'total': total})
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:29,代码来源:announce.py


示例13: recent_update

    def recent_update(self, days):

        current = datetime.now()

        # from one week ago
        start_time = current - timedelta(days=days)

        session = SessionManager.Session()
        try:
            result = session.query(Episode, Bangumi).\
                join(Bangumi).\
                filter(Episode.status == Episode.STATUS_DOWNLOADED).\
                filter(Episode.update_time >= start_time).\
                filter(Episode.update_time <= current).\
                order_by(desc(Episode.update_time))

            episode_list = []

            for eps, bgm in result:
                episode = row2dict(eps)
                episode['thumbnail'] = utils.generate_thumbnail_link(eps, bgm)
                episode['bangumi'] = row2dict(bgm)
                episode['bangumi']['cover'] = utils.generate_cover_link(bgm)
                episode_list.append(episode)

            return json_resp({'data': episode_list})
        except Exception as error:
            raise error
        finally:
            SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:30,代码来源:bangumi.py


示例14: logout

def logout():
    """
    logout a user
    :return: response
    """
    logout_user()
    return json_resp({'msg': 'ok'})
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:7,代码来源:user.py


示例15: on_air_bangumi

    def on_air_bangumi(self, user_id, type):
        session = SessionManager.Session()
        current_day = datetime.today()
        start_time = datetime(current_day.year, current_day.month, 1)
        if current_day.month == 12:
            next_year = current_day.year + 1
            next_month = 1
        else:
            next_year = current_day.year
            next_month = current_day.month + 1
        end_time = datetime(next_year, next_month, 1)

        try:
            result = session.query(distinct(Episode.bangumi_id), Bangumi).\
                join(Bangumi). \
                options(joinedload(Bangumi.cover_image)).\
                filter(Bangumi.delete_mark == None). \
                filter(Bangumi.type == type).\
                filter(Episode.airdate >= start_time).\
                filter(Episode.airdate <= end_time). \
                order_by(desc(getattr(Bangumi, 'air_date')))

            bangumi_list = []
            bangumi_id_list = [bangumi_id for bangumi_id, bangumi in result]

            if len(bangumi_id_list) == 0:
                return json_resp({'data': []})

            favorites = session.query(Favorites).\
                filter(Favorites.bangumi_id.in_(bangumi_id_list)).\
                filter(Favorites.user_id == user_id).\
                all()

            for bangumi_id, bangumi in result:
                bangumi_dict = row2dict(bangumi, Bangumi)
                bangumi_dict['cover'] = utils.generate_cover_link(bangumi)
                utils.process_bangumi_dict(bangumi, bangumi_dict)
                for fav in favorites:
                    if fav.bangumi_id == bangumi_id:
                        bangumi_dict['favorite_status'] = fav.status
                        break
                bangumi_list.append(bangumi_dict)

            return json_resp({'data': bangumi_list})
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:46,代码来源:bangumi.py


示例16: list_task

 def list_task(self):
     try:
         session = SessionManager.Session()
         result = session.query(Task).all()
         task_list = [row2dict(task, Task) for task in result]
         return json_resp({'data': task_list})
     finally:
         SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:8,代码来源:task.py


示例17: get_user_info

def get_user_info():
    '''
    get current user name and level
    :return: response
    '''
    user_info = {}
    user_info['name'] = current_user.name
    user_info['level'] = current_user.level
    return json_resp({'data': user_info})
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:9,代码来源:user.py


示例18: list_bangumi

    def list_bangumi(self, page, count, sort_field, sort_order, name, user_id, bangumi_type):
        try:

            session = SessionManager.Session()
            query_object = session.query(Bangumi).\
                options(joinedload(Bangumi.cover_image)).\
                filter(Bangumi.delete_mark == None)

            if bangumi_type != -1:
                query_object = query_object.filter(Bangumi.type == bangumi_type)

            if name is not None:
                name_pattern = '%{0}%'.format(name.encode('utf-8'),)
                logger.debug(name_pattern)
                query_object = query_object.\
                    filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern)))
                # count total rows
                total = session.query(func.count(Bangumi.id)).\
                    filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\
                    scalar()
            else:
                total = session.query(func.count(Bangumi.id)).scalar()

            if sort_order == 'desc':
                query_object = query_object.\
                    order_by(desc(getattr(Bangumi, sort_field)))

            else:
                query_object = query_object.\
                    order_by(asc(getattr(Bangumi, sort_field)))

            if count == -1:
                bangumi_list = query_object.all()
            else:
                offset = (page - 1) * count
                bangumi_list = query_object.offset(offset).limit(count).all()

            bangumi_id_list = [bgm.id for bgm in bangumi_list]

            favorites = session.query(Favorites).\
                filter(Favorites.bangumi_id.in_(bangumi_id_list)).\
                filter(Favorites.user_id == user_id).\
                all()

            bangumi_dict_list = []
            for bgm in bangumi_list:
                bangumi = row2dict(bgm, Bangumi)
                bangumi['cover'] = utils.generate_cover_link(bgm)
                utils.process_bangumi_dict(bgm, bangumi)
                for fav in favorites:
                    if fav.bangumi_id == bgm.id:
                        bangumi['favorite_status'] = fav.status
                bangumi_dict_list.append(bangumi)

            return json_resp({'data': bangumi_dict_list, 'total': total})
        finally:
            SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:57,代码来源:bangumi.py


示例19: query_one_bangumi

def query_one_bangumi(bgm_id):
    bangumi_tv_url_base = 'http://api.bgm.tv/subject/'
    bangumi_tv_url_param = '?responseGroup=large'
    if bgm_id is not None:
        bangumi_tv_url = bangumi_tv_url_base + bgm_id + bangumi_tv_url_param
        h = httplib2.Http('.cache')
        (resp, content) = h.request(bangumi_tv_url, 'GET')
        return content
    else:
        return json_resp({})
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:10,代码来源:admin.py


示例20: search_bangumi

def search_bangumi():
    bangumi_tv_url_base = 'http://api.bgm.tv/search/subject/'
    bangumi_tv_url_param = '?responseGroup=simple&max_result=10&start=0'
    name = request.args.get('name', None)
    result = {"data": []}
    if name is not None and len(name) > 0:
        bangumi_tv_url = bangumi_tv_url_base + name + bangumi_tv_url_param
        h = httplib2.Http('.cache')
        (resp, content) = h.request(bangumi_tv_url, 'GET')
        if resp.status == 200:
            try:
                bgm_content = json.loads(content)
            except ValueError as e:
                return json_resp(result)
            list = [bgm for bgm in bgm_content['list'] if bgm['type'] == 2]
            if len(list) == 0:
                return json_resp(result)

            bgm_id_list = [bgm['id'] for bgm in list]
            bangumi_list = admin_service.get_bangumi_from_bgm_id_list(bgm_id_list)

            for bgm in list:
                bgm['bgm_id'] = bgm['id']
                bgm['id'] = None
                # if bgm_id has found in database, give the database id to bgm.id
                # that's we know that this bangumi exists in our database
                for bangumi in bangumi_list:
                    if bgm['bgm_id'] == bangumi.bgm_id:
                        bgm['id'] = bangumi.id
                        break
                bgm['image'] = bgm['images']['large']
                # remove useless keys
                bgm.pop('images', None)
                bgm.pop('collection', None)
                bgm.pop('url', None)
                bgm.pop('type', None)

            result['data'] = list
        elif resp.status == 502:
            # when bangumi.tv is down
            result['msg'] = 'bangumi is down'

    return json_resp(result)
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:43,代码来源:admin.py



注:本文中的utils.http.json_resp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python imageIO.imread函数代码示例发布时间:2022-05-26
下一篇:
Python hostname.get_hostname函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap