• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python datetimeutil.date_to_string函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中socorro.lib.datetimeutil.date_to_string函数的典型用法代码示例。如果您正苦于以下问题:Python date_to_string函数的具体用法?Python date_to_string怎么用?Python date_to_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了date_to_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _action

    def _action(self, raw_crash, raw_dumps, processed_crash, processor_meta):
        crash_id = raw_crash.uuid
        old_processed_crash = self.crashstore.get_unredacted_processed(crash_id)

        for key, value in old_processed_crash.iteritems():
            if 'date_processed' in key:
                processed_crash[key] = date_to_string(
                    string_to_datetime(value) - self.config.time_delta
                )
                print processed_crash.uuid, value, processed_crash[key]
            else:
                if key != 'uptime' and key != 'crash_time' and (
                   'time' in key or "date" in key or 'Date' in key
                ):
                    value = date_to_string(string_to_datetime(value))
                processed_crash[key] = value
        processor_meta.processor_notes.append(
            'DateProcessedTimeMachine has pushed date_processed into the past'
            ' by "%s" (D HH:MM:SS)' %  to_str(self.config.time_delta)
        )
        processor_meta.processor_notes.append(
            'Original processor_notes: %s'
            % old_processed_crash['processor_notes']
        )
        return True
开发者ID:FrostburnStudios,项目名称:socorro,代码行数:25,代码来源:timemachine.py


示例2: test_get_exploitibility

    def test_get_exploitibility(self):
        crashes = Crashes(config=self.config)
        today = datetimeutil.date_to_string(self.now.date())
        yesterday_date = (self.now - datetime.timedelta(days=1)).date()
        yesterday = datetimeutil.date_to_string(yesterday_date)

        res_expected = {
            "hits": [
                {
                    "signature": "canIhaveYourSignature()",
                    "report_date": today,
                    "null_count": 0,
                    "none_count": 1,
                    "low_count": 2,
                    "medium_count": 3,
                    "high_count": 4,
                },
                {
                    "signature": "ofCourseYouCan()",
                    "report_date": yesterday,
                    "null_count": 4,
                    "none_count": 3,
                    "low_count": 2,
                    "medium_count": 1,
                    "high_count": 0,
                }
            ],
            "total": 2,
        }

        res = crashes.get_exploitability()
        self.assertEqual(res, res_expected)
开发者ID:kidundead,项目名称:socorro,代码行数:32,代码来源:test_crashes.py


示例3: test_get_parameters_date_defaults

    def test_get_parameters_date_defaults(self):
        with _get_config_manager().context() as config:
            search = SearchBase(
                config=config,
                fields=SUPERSEARCH_FIELDS_MOCKED_RESULTS,
            )

        now = datetimeutil.utc_now()

        # Test default values when nothing is passed
        params = search.get_parameters()
        ok_('date' in params)
        eq_(len(params['date']), 2)

        # Pass only the high value
        args = {
            'date': '<%s' % datetimeutil.date_to_string(now)
        }
        params = search.get_parameters(**args)
        ok_('date' in params)
        eq_(len(params['date']), 2)
        eq_(params['date'][0].operator, '<')
        eq_(params['date'][1].operator, '>=')
        eq_(params['date'][0].value.date(), now.date())
        eq_(
            params['date'][1].value.date(),
            now.date() - datetime.timedelta(days=7)
        )

        # Pass only the low value
        pasttime = now - datetime.timedelta(days=10)
        args = {
            'date': '>=%s' % datetimeutil.date_to_string(pasttime)
        }
        params = search.get_parameters(**args)
        ok_('date' in params)
        eq_(len(params['date']), 2)
        eq_(params['date'][0].operator, '<=')
        eq_(params['date'][1].operator, '>=')
        eq_(params['date'][0].value.date(), now.date())
        eq_(params['date'][1].value.date(), pasttime.date())

        # Pass the two values
        pasttime = now - datetime.timedelta(days=10)
        args = {
            'date': [
                '<%s' % datetimeutil.date_to_string(now),
                '>%s' % datetimeutil.date_to_string(pasttime),
            ]
        }
        params = search.get_parameters(**args)
        ok_('date' in params)
        eq_(len(params['date']), 2)
        eq_(params['date'][0].operator, '<')
        eq_(params['date'][1].operator, '>')
        eq_(params['date'][0].value.date(), now.date())
        eq_(params['date'][1].value.date(), pasttime.date())
开发者ID:Tchanders,项目名称:socorro,代码行数:57,代码来源:test_search_common.py


示例4: get

    def get(self, **kwargs):
        filters = [
            ("start_date", None, "datetime"),
            ("end_date", None, "datetime"),
            ("product", None, "str"),
            ("version", None, "str"),
            ]

        params = external_common.parse_arguments(filters, kwargs)
        results = []  # So we have something to return.

        query_string = """SELECT product_name,
                    version_string,
                    product_version_id,
                    report_date,
                    nightly_builds.build_date,
                    days_out,
                    sum(report_count) as report_count
                FROM nightly_builds
                JOIN product_versions USING ( product_version_id )
                WHERE report_date <= %(end_date)s
                AND report_date >= %(start_date)s
                AND product_name = %(product)s
                AND version_string = %(version)s
                GROUP BY product_name,
                         version_string,
                         product_version_id,
                         report_date,
                         nightly_builds.build_date,
                         days_out"""

        try:
            connection = self.database.connection()
            cursor = connection.cursor()
            sql_results = db.execute(cursor, query_string, params)
        except psycopg2.Error:
            logger.error("Failed retrieving crashtrends data from PostgreSQL",
                         exc_info=True)
        else:
            for trend in sql_results:
                row = dict(zip((
                              "product_name",
                              "version_string",
                              "product_version_id",
                              "report_date",
                              "build_date",
                              "days_out",
                              "report_count"), trend))
                row['report_date'] = datetimeutil.date_to_string(row['report_date'])
                row['build_date'] = datetimeutil.date_to_string(row['build_date'])
                results.append(row)
        finally:
            connection.close()
        results = {'crashtrends' : results}
        return results
开发者ID:Meghashyamt,项目名称:socorro,代码行数:55,代码来源:crash_trends.py


示例5: get

    def get(self, **kwargs):
        filters = [
            ("start_date", None, "datetime"),
            ("end_date", None, "datetime"),
            ("product", None, "str"),
            ("version", None, "str"),
        ]

        params = external_common.parse_arguments(filters, kwargs)

        sql = """
        /* socorro.external.postgresql.crash_trends.CrashTrends.get */
        SELECT product_name,
               version_string,
               product_version_id,
               report_date,
               nightly_builds.build_date,
               days_out,
               sum(report_count) as report_count
        FROM nightly_builds
            JOIN product_versions USING ( product_version_id )
        WHERE report_date <= %(end_date)s
        AND report_date >= %(start_date)s
        AND product_name = %(product)s
        AND version_string = %(version)s
        GROUP BY product_name,
                 version_string,
                 product_version_id,
                 report_date,
                 nightly_builds.build_date,
                 days_out
        """

        error_message = "Failed to retrieve crash trends data from PostgreSQL"
        sql_results = self.query(sql, params, error_message=error_message)

        results = []
        for row in sql_results:
            trend = dict(zip((
                "product_name",
                "version_string",
                "product_version_id",
                "report_date",
                "build_date",
                "days_out",
                "report_count"
            ), row))
            trend['report_date'] = datetimeutil.date_to_string(
                trend['report_date'])
            trend['build_date'] = datetimeutil.date_to_string(
                trend['build_date'])
            results.append(trend)

        return {'crashtrends': results}
开发者ID:Earth4,项目名称:socorro,代码行数:54,代码来源:crash_trends.py


示例6: test_get_parameters_date_defaults

    def test_get_parameters_date_defaults(self):
        with _get_config_manager().context() as config:
            search = SearchBaseWithFields(
                config=config,
            )

        now = datetimeutil.utc_now()

        # Test default values when nothing is passed
        params = search.get_parameters()
        assert 'date' in params
        assert len(params['date']) == 2

        # Pass only the high value
        args = {
            'date': '<%s' % datetimeutil.date_to_string(now)
        }
        params = search.get_parameters(**args)
        assert 'date' in params
        assert len(params['date']) == 2
        assert params['date'][0].operator == '<'
        assert params['date'][1].operator == '>='
        assert params['date'][0].value.date() == now.date()
        assert params['date'][1].value.date() == now.date() - datetime.timedelta(days=7)

        # Pass only the low value
        pasttime = now - datetime.timedelta(days=10)
        args = {
            'date': '>=%s' % datetimeutil.date_to_string(pasttime)
        }
        params = search.get_parameters(**args)
        assert 'date' in params
        assert len(params['date']) == 2
        assert params['date'][0].operator == '<='
        assert params['date'][1].operator == '>='
        assert params['date'][0].value.date() == now.date()
        assert params['date'][1].value.date() == pasttime.date()

        # Pass the two values
        pasttime = now - datetime.timedelta(days=10)
        args = {
            'date': [
                '<%s' % datetimeutil.date_to_string(now),
                '>%s' % datetimeutil.date_to_string(pasttime),
            ]
        }
        params = search.get_parameters(**args)
        assert 'date' in params
        assert len(params['date']) == 2
        assert params['date'][0].operator == '<'
        assert params['date'][1].operator == '>'
        assert params['date'][0].value.date() == now.date()
        assert params['date'][1].value.date() == pasttime.date()
开发者ID:m8ttyB,项目名称:socorro,代码行数:53,代码来源:test_search_common.py


示例7: twoPeriodTopCrasherComparison

def twoPeriodTopCrasherComparison(
            databaseConnection, context,
            closestEntryFunction=latestEntryBeforeOrEqualTo,
            listOfTopCrashersFunction=getListOfTopCrashersBySignature):
    try:
        context['logger'].debug('entered twoPeriodTopCrasherComparison')
    except KeyError:
        context['logger'] = util.SilentFakeLogger()

    assertions = ['to_date', 'duration', 'product', 'version']

    for param in assertions:
        assert param in context, (
            "%s is missing from the configuration" % param)

    context['numberOfComparisonPoints'] = 2
    if not context['limit']:
        context['limit'] = 100

    #context['logger'].debug('about to latestEntryBeforeOrEqualTo')
    context['to_date'] = closestEntryFunction(databaseConnection,
                                              context['to_date'],
                                              context['product'],
                                              context['version'])
    context['logger'].debug('New to_date: %s' % context['to_date'])
    context['startDate'] = context.to_date - (context.duration *
                                              context.numberOfComparisonPoints)
    #context['logger'].debug('after %s' % context)
    listOfTopCrashers = listOfListsWithChangeInRank(
                                            rangeOfQueriesGenerator(
                                                databaseConnection,
                                                context,
                                                listOfTopCrashersFunction))[0]
    #context['logger'].debug('listOfTopCrashers %s' % listOfTopCrashers)
    totalNumberOfCrashes = totalPercentOfTotal = 0
    for x in listOfTopCrashers:
        if 'total_crashes' in x:
            totalNumberOfCrashes = x['total_crashes']
            del x['total_crashes']
        totalPercentOfTotal += x.get('percentOfTotal', 0)

    result = {
        'crashes': listOfTopCrashers,
        'start_date': datetimeutil.date_to_string(
            context.to_date - context.duration
        ),
        'end_date': datetimeutil.date_to_string(context.to_date),
        'totalNumberOfCrashes': totalNumberOfCrashes,
        'totalPercentage': totalPercentOfTotal,
    }
    #logger.debug("about to return %s", result)
    return result
开发者ID:Earth4,项目名称:socorro,代码行数:52,代码来源:tcbs.py


示例8: test_search_combined_filters

    def test_search_combined_filters(self, mock_psql_util):
        with self.get_config_manager().context() as config:
            api = Search(config=config)

            # get the first, default crash report
            params = {
                'terms': 'js::break_your_browser',
                'search_mode': 'is_exactly',
                'products': 'WaterWolf',
                'versions': 'WaterWolf:1.0',
                'release_channels': 'release',
                'os': 'Linux',
                'build_ids': '1234567890',
                'reasons': 'MOZALLOC_WENT_WRONG',
                'report_type': 'crash',
                'report_process': 'browser',
            }
            res = api.get(**params)

            self.assertEqual(res['total'], 1)
            self.assertEqual(
                res['hits'][0]['signature'],
                'js::break_your_browser'
            )
            self.assertEqual(res['hits'][0]['is_linux'], 1)
            self.assertEqual(res['hits'][0]['is_windows'], 0)
            self.assertEqual(res['hits'][0]['is_mac'], 0)

            # get the crash report from last month
            now = datetimeutil.utc_now()

            three_weeks_ago = now - datetime.timedelta(weeks=3)
            three_weeks_ago = datetimeutil.date_to_string(three_weeks_ago)

            five_weeks_ago = now - datetime.timedelta(weeks=5)
            five_weeks_ago = datetimeutil.date_to_string(five_weeks_ago)

            params = {
                'from_date': five_weeks_ago,
                'to_date': three_weeks_ago,
            }
            res = api.get(**params)

            self.assertEqual(res['total'], 1)
            self.assertEqual(
                res['hits'][0]['signature'],
                'my_little_signature'
            )
            self.assertEqual(res['hits'][0]['is_linux'], 1)
            self.assertEqual(res['hits'][0]['is_windows'], 0)
            self.assertEqual(res['hits'][0]['is_mac'], 0)
开发者ID:erikrose,项目名称:socorro,代码行数:51,代码来源:test_search.py


示例9: post

    def post(self, **kwargs):
        params = external_common.parse_arguments(self.filters, kwargs)

        if not params['signatures']:
            raise MissingArgumentError('signatures')

        sql_params = [tuple(params['signatures'])]
        sql = """
            SELECT
                signature,
                first_report AS first_date,
                first_build
            FROM signatures
            WHERE signature IN %s
        """

        error_message = 'Failed to retrieve signatures from PostgreSQL'
        results = self.query(sql, sql_params, error_message=error_message)

        signatures = []
        for sig in results.zipped():
            sig.first_date = datetimeutil.date_to_string(sig.first_date)
            signatures.append(sig)

        return {
            'hits': signatures,
            'total': len(signatures)
        }
开发者ID:snorp,项目名称:socorro,代码行数:28,代码来源:signature_first_date.py


示例10: get_comments

    def get_comments(self, **kwargs):
        """Return a list of comments on crash reports, filtered by
        signatures and other fields.

        See socorro.lib.search_common.get_parameters() for all filters.
        """
        params = self.prepare_search_params(**kwargs)

        # Creating the parameters for the sql query
        sql_params = {}

        # Preparing the different parts of the sql query

        # WARNING: sensitive data is returned here (email). When there is
        # an authentication mecanism, a verification should be done here.
        sql_select = """
            SELECT
                r.date_processed,
                r.user_comments,
                r.uuid,
                CASE
                    WHEN r.email = '' THEN null
                    WHEN r.email IS NULL THEN null
                    ELSE r.email
                END
        """

        sql_from = self.build_reports_sql_from(params)
        (sql_where, sql_params) = self.build_reports_sql_where(params,
                                                               sql_params,
                                                               self.context)
        sql_where = "%s AND r.user_comments IS NOT NULL" % sql_where

        sql_order = "ORDER BY email ASC, r.date_processed ASC"

        # Assembling the query
        sql_query = " ".join((
                "/* external.postgresql.crashes.Crashes.get_comments */",
                sql_select, sql_from, sql_where, sql_order))

        error_message = "Failed to retrieve comments from PostgreSQL"
        results = self.query(sql_query, sql_params,
                             error_message=error_message)

        # Transforming the results into what we want
        comments = []
        for row in results:
            comment = dict(zip((
                       "date_processed",
                       "user_comments",
                       "uuid",
                       "email"), row))
            comment["date_processed"] = datetimeutil.date_to_string(
                                                    comment["date_processed"])
            comments.append(comment)

        return {
            "hits": comments,
            "total": len(comments)
        }
开发者ID:ajsb85,项目名称:socorro,代码行数:60,代码来源:crashes.py


示例11: twoPeriodTopCrasherComparison

def twoPeriodTopCrasherComparison(
    databaseConnection,
    context,
    closestEntryFunction=latestEntryBeforeOrEqualTo,
    listOfTopCrashersFunction=getListOfTopCrashersBySignature,
):
    try:
        context["logger"].debug("entered twoPeriodTopCrasherComparison")
    except KeyError:
        context["logger"] = util.SilentFakeLogger()

    assertions = ["to_date", "duration", "product", "version"]

    for param in assertions:
        assert param in context, "%s is missing from the configuration" % param

    context["numberOfComparisonPoints"] = 2
    if not context["limit"]:
        context["limit"] = 100

    # context['logger'].debug('about to latestEntryBeforeOrEqualTo')
    context["to_date"] = closestEntryFunction(
        databaseConnection, context["to_date"], context["product"], context["version"]
    )
    context["logger"].debug("New to_date: %s" % context["to_date"])
    context["startDate"] = context.to_date - (context.duration * context.numberOfComparisonPoints)
    # context['logger'].debug('after %s' % context)
    listOfTopCrashers = listOfListsWithChangeInRank(
        rangeOfQueriesGenerator(databaseConnection, context, listOfTopCrashersFunction)
    )[0]
    # context['logger'].debug('listOfTopCrashers %s' % listOfTopCrashers)
    totalNumberOfCrashes = totalPercentOfTotal = 0
    for x in listOfTopCrashers:
        if "total_crashes" in x:
            totalNumberOfCrashes = x["total_crashes"]
            del x["total_crashes"]
        totalPercentOfTotal += x.get("percentOfTotal", 0)

    result = {
        "crashes": listOfTopCrashers,
        "start_date": datetimeutil.date_to_string(context.to_date - context.duration),
        "end_date": datetimeutil.date_to_string(context.to_date),
        "totalNumberOfCrashes": totalNumberOfCrashes,
        "totalPercentage": totalPercentOfTotal,
    }
    # logger.debug("about to return %s", result)
    return result
开发者ID:andreja-cliqz,项目名称:socorro,代码行数:47,代码来源:tcbs.py


示例12: test_listOfListsWithChangeInRank

    def test_listOfListsWithChangeInRank(self):

        lastweek = self.now - datetime.timedelta(days=7)
        lastweek_str = datetimeutil.date_to_string(lastweek.date())

        params = self.params
        params.startDate = self.now.date() - datetime.timedelta(days=14)

        query_list = tcbs.getListOfTopCrashersBySignature
        query_range = tcbs.rangeOfQueriesGenerator(
            self.connection,
            self.params,
            query_list
        )
        res = tcbs.listOfListsWithChangeInRank(query_range)

        res_expected = [[{
            'count': 5L,
            'mac_count': 0L,
            'content_count': 0,
            'first_report': lastweek_str,
            'previousRank': 0,
            'currentRank': 0,
            'startup_percent': None,
            'versions': 'plugin1, plugin2',
            'first_report_exact': lastweek_str + ' 00:00:00',
            'percentOfTotal': 0.625,
            'changeInRank': 0,
            'is_gc_count': 10L,
            'win_count': 0L,
            'changeInPercentOfTotal': 0.041666666666666963,
            'linux_count': 5L,
            'hang_count': 5L,
            'signature': 'Fake Signature #1',
            'versions_count': 2,
            'previousPercentOfTotal': 0.58333333333333304,
            'plugin_count': 0
        }, {
            'count': 3L,
            'mac_count': 1L,
            'content_count': 0,
            'first_report': lastweek_str,
            'previousRank': 1,
            'currentRank': 1,
            'startup_percent': None,
            'versions': 'plugin1, plugin2, plugin3, plugin4, plugin5, plugin6',
            'first_report_exact': lastweek_str + ' 00:00:00',
            'percentOfTotal': 0.375,
            'changeInRank': 0,
            'is_gc_count': 1L,
            'win_count': 1L,
            'changeInPercentOfTotal': -0.041666666666667018,
            'linux_count': 1L,
            'hang_count': 0L,
            'signature': 'Fake Signature #2',
            'versions_count': 6,
            'previousPercentOfTotal': 0.41666666666666702,
            'plugin_count': 0
        }]]
开发者ID:FishingCactus,项目名称:socorro,代码行数:59,代码来源:test_tcbs.py


示例13: get

    def get(self, **kwargs):
        """Return a job in the job queue. """
        filters = [
            ("uuid", None, "str"),
        ]
        params = external_common.parse_arguments(filters, kwargs)

        if not params.uuid:
            raise MissingOrBadArgumentError(
                        "Mandatory parameter 'uuid' is missing or empty")

        fields = [
            "id",
            "pathname",
            "uuid",
            "owner",
            "priority",
            "queueddatetime",
            "starteddatetime",
            "completeddatetime",
            "success",
            "message"
        ]
        sql = """
            /* socorro.external.postgresql.job.Job.get */
            SELECT %s FROM jobs WHERE uuid=%%(uuid)s
        """ % ", ".join(fields)

        json_result = {
            "total": 0,
            "hits": []
        }

        connection = None
        try:
            # Creating the connection to the DB
            connection = self.database.connection()
            cur = connection.cursor()
            results = db.execute(cur, sql, params)
        except psycopg2.Error:
            logger.error("Failed retrieving jobs data from PostgreSQL",
                         exc_info=True)
        else:
            for job in results:
                row = dict(zip(fields, job))

                # Make sure all dates are turned into strings
                for i in row:
                    if isinstance(row[i], datetime.datetime):
                        row[i] = datetimeutil.date_to_string(row[i])

                json_result["hits"].append(row)
            json_result["total"] = len(json_result["hits"])
        finally:
            if connection:
                connection.close()

        return json_result
开发者ID:mrmiller,项目名称:socorro,代码行数:58,代码来源:job.py


示例14: test_date_to_string

def test_date_to_string():
    # Datetime with timezone
    date = datetime.datetime(2012, 1, 3, 12, 23, 34, tzinfo=UTC)
    res_exp = '2012-01-03T12:23:34+00:00'
    res = datetimeutil.date_to_string(date)
    assert res == res_exp

    # Datetime without timezone
    date = datetime.datetime(2012, 1, 3, 12, 23, 34)
    res_exp = '2012-01-03T12:23:34'
    res = datetimeutil.date_to_string(date)
    assert res == res_exp

    # Date (no time, no timezone)
    date = datetime.date(2012, 1, 3)
    res_exp = '2012-01-03'
    res = datetimeutil.date_to_string(date)
    assert res == res_exp
开发者ID:m8ttyB,项目名称:socorro,代码行数:18,代码来源:test_datetimeutil.py


示例15: test_get

    def test_get(self):
        extensions = Extensions(config=self.config)
        now = datetimeutil.utc_now()
        now = datetime.datetime(now.year, now.month, now.day,
                                tzinfo=now.tzinfo)
        uuid = "%%s-%s" % now.strftime("%y%m%d")
        now_str = datetimeutil.date_to_string(now)

        #......................................................................
        # Test 1: a valid crash with duplicates
        params = {
            "uuid": uuid % "a1",
            "date": now_str
        }
        res = extensions.get(**params)
        res_expected = {
            "hits": [
                {
                    "report_id": 1,
                    "date_processed": now_str,
                    "extension_key": 10,
                    "extension_id": 'id1',
                    "extension_version": 'version1'
                },
                {
                    "report_id": 1,
                    "date_processed": now_str,
                    "extension_key": 11,
                    "extension_id": 'id2',
                    "extension_version": 'version2'
                },
                {
                    "report_id": 1,
                    "date_processed": now_str,
                    "extension_key": 12,
                    "extension_id": 'id3',
                    "extension_version": 'version3'
                }
            ],
            "total": 3
        }

        self.assertEqual(res, res_expected)

        #......................................................................
        # Test 2: a crash without extensions
        params = {
            "uuid": uuid % "a2",
            "date": now_str
        }
        res = extensions.get(**params)
        res_expected = {
            "hits": [],
            "total": 0
        }

        self.assertEqual(res, res_expected)
开发者ID:ajsb85,项目名称:socorro,代码行数:57,代码来源:test_extensions.py


示例16: test_get

    def test_get(self):
        signature_urls = SignatureURLs(config=self.config)
        now = datetimeutil.utc_now()
        now = datetime.datetime(now.year, now.month, now.day)
        now_str = datetimeutil.date_to_string(now)

        #......................................................................
        # Test 1: find one exact match for products and versions passed
        params = {
            "signature": "EMPTY: no crashing thread identified; corrupt dump",
            "start_date": now_str,
            "end_date": now_str,
            "products": ['Firefox'],
            "versions": ["Firefox:10.0", "Firefox:11.0"]
        }
        res = signature_urls.get(**params)
        res_expected = {
            "hits": [
                {
                    "url": "http://deusex.wikia.com/wiki/Praxis_kit",
                    "crash_count": 1
                 }
            ],
            "total": 1
        }

        self.assertEqual(res, res_expected)

        #......................................................................
        # Test 2: Raise error if parameter is not passed
        params = {
            "signature": "",
            "start_date": "",
            "end_date": now_str,
            "products": ['Firefox'],
            "versions": ["Firefox:10.0", "Firefox:11.0"]
        }
        self.assertRaises(MissingOrBadArgumentException,
                          signature_urls.get,
                          **params)

        #......................................................................
        # Test 3: Query returning no results
        params = {
            "signature": "EMPTY: no crashing thread identified; corrupt dump",
            "start_date": now_str,
            "end_date": now_str,
            "products": ['Fennec'],
            "versions": ["Fennec:10.0", "Fennec:11.0"]
        }
        res = signature_urls.get(**params)
        res_expected = {
            "hits": [],
            "total": 0
        }

        self.assertEqual(res, res_expected)
开发者ID:phb,项目名称:socorro,代码行数:57,代码来源:test_signature_urls.py


示例17: setUp

    def setUp(self):
        super(IntegrationTestQuery, self).setUp()

        config = self.get_config_context()
        self.storage = crashstorage.ElasticSearchCrashStorage(config)
        self.api = Query(config=config)

        # clear the indices cache so the index is created on every test
        self.storage.indices_cache = set()

        # Create the supersearch fields.
        self.storage.es.bulk_index(
            index=config.webapi.elasticsearch_default_index,
            doc_type='supersearch_fields',
            docs=SUPERSEARCH_FIELDS.values(),
            id_field='name',
            refresh=True,
        )

        self.now = datetimeutil.utc_now()

        yesterday = self.now - datetime.timedelta(days=1)
        yesterday = datetimeutil.date_to_string(yesterday)

        # insert data into elasticsearch
        default_crash_report = {
            'uuid': 100,
            'signature': 'js::break_your_browser',
            'date_processed': yesterday,
            'product': 'WaterWolf',
            'version': '1.0',
            'release_channel': 'release',
            'os_name': 'Linux',
            'build': '1234567890',
            'reason': 'MOZALLOC_WENT_WRONG',
            'hangid': None,
            'process_type': None,
        }

        self.storage.save_processed(default_crash_report)

        self.storage.save_processed(
            dict(default_crash_report, uuid=1, product='EarthRaccoon')
        )

        self.storage.save_processed(
            dict(default_crash_report, uuid=2, version='2.0')
        )

        self.storage.save_processed(
            dict(default_crash_report, uuid=3, release_channel='aurora')
        )

        # As indexing is asynchronous, we need to force elasticsearch to
        # make the newly created content searchable before we run the tests
        self.storage.es.refresh()
开发者ID:FrostburnStudios,项目名称:socorro,代码行数:56,代码来源:test_query.py


示例18: test_twoPeriodTopCrasherComparisonLimited

    def test_twoPeriodTopCrasherComparisonLimited(self):

        lastweek = self.now - datetime.timedelta(days=7)
        lastweek_str = datetimeutil.date_to_string(lastweek.date())
        two_weeks = datetimeutil.date_to_string(self.now.date() -
                                                datetime.timedelta(days=14))

        self.params.limit = 1
        res = tcbs.twoPeriodTopCrasherComparison(
            self.connection,
            self.params
        )

        res_expected = {
            'totalPercentage': 0.58333333333333304,
            'end_date': lastweek_str,
            'start_date': two_weeks,
            'crashes': [{
                'count': 14L,
                'mac_count': 1L,
                'content_count': 0,
                'first_report': lastweek_str,
                'previousRank': 'null',
                'currentRank': 0,
                'startup_percent': None,
                'versions': 'plugin1, plugin2',
                'first_report_exact': lastweek_str + ' 00:00:00',
                'percentOfTotal': 0.58333333333333304,
                'changeInRank': 'new',
                'is_gc_count': 1L,
                'win_count': 12L,
                'changeInPercentOfTotal': 'new',
                'linux_count': 1L,
                'hang_count': 0L,
                'signature': 'Fake Signature #1',
                'versions_count': 2,
                'previousPercentOfTotal': 'null',
                'plugin_count': 0
            }],
            'totalNumberOfCrashes': 24L
        }

        eq_(res, res_expected)
开发者ID:Tchanders,项目名称:socorro,代码行数:43,代码来源:test_tcbs.py


示例19: test_get_parameters_date_defaults

    def test_get_parameters_date_defaults(self):
        with _get_config_manager().context() as config:
            search = SearchBase(config=config)

        now = datetimeutil.utc_now()

        # Test default values when nothing is passed
        params = search.get_parameters()
        ok_("date" in params)
        eq_(len(params["date"]), 2)

        # Pass only the high value
        args = {"date": "<%s" % datetimeutil.date_to_string(now)}
        params = search.get_parameters(**args)
        ok_("date" in params)
        eq_(len(params["date"]), 2)
        eq_(params["date"][0].operator, "<")
        eq_(params["date"][1].operator, ">=")
        eq_(params["date"][0].value.date(), now.date())
        eq_(params["date"][1].value.date(), now.date() - datetime.timedelta(days=7))

        # Pass only the low value
        pasttime = now - datetime.timedelta(days=10)
        args = {"date": ">=%s" % datetimeutil.date_to_string(pasttime)}
        params = search.get_parameters(**args)
        ok_("date" in params)
        eq_(len(params["date"]), 2)
        eq_(params["date"][0].operator, "<=")
        eq_(params["date"][1].operator, ">=")
        eq_(params["date"][0].value.date(), now.date())
        eq_(params["date"][1].value.date(), pasttime.date())

        # Pass the two values
        pasttime = now - datetime.timedelta(days=10)
        args = {"date": ["<%s" % datetimeutil.date_to_string(now), ">%s" % datetimeutil.date_to_string(pasttime)]}
        params = search.get_parameters(**args)
        ok_("date" in params)
        eq_(len(params["date"]), 2)
        eq_(params["date"][0].operator, "<")
        eq_(params["date"][1].operator, ">")
        eq_(params["date"][0].value.date(), now.date())
        eq_(params["date"][1].value.date(), pasttime.date())
开发者ID:JisJis,项目名称:socorro,代码行数:42,代码来源:test_search_common.py


示例20: test_get_signature_history

    def test_get_signature_history(self):
        api = Crashes(config=self.config)
        now = self.now
        lastweek = now - datetime.timedelta(days=7)

        params = {
            "product": "Firefox",
            "version": "8.0",
            "signature": "signature1",
            "start_date": lastweek,
            "end_date": now,
        }
        res = api.get_signature_history(**params)

        self.assertEqual(len(res["hits"]), 2)
        self.assertEqual(len(res["hits"]), res["total"])

        date = datetimeutil.date_to_string(now.date())
        self.assertEqual(res["hits"][0]["date"], date)
        self.assertEqual(res["hits"][1]["date"], date)

        self.assertEqual(res[&q 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python datetimeutil.string_to_datetime函数代码示例发布时间:2022-05-27
下一篇:
Python lib.ConfigurationManager类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap