• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python test_utils.check_arrays函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中zipline.utils.test_utils.check_arrays函数的典型用法代码示例。如果您正苦于以下问题:Python check_arrays函数的具体用法?Python check_arrays怎么用?Python check_arrays使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了check_arrays函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_masked_rankdata_2d

    def test_masked_rankdata_2d(self,
                                seed_value,
                                method,
                                use_mask,
                                set_missing,
                                ascending):
        eyemask = ~eye(5, dtype=bool)
        nomask = ones((5, 5), dtype=bool)

        seed(seed_value)
        asfloat = (randn(5, 5) * seed_value)
        asdatetime = (asfloat).copy().view('datetime64[ns]')

        mask = eyemask if use_mask else nomask
        if set_missing:
            asfloat[:, 2] = nan
            asdatetime[:, 2] = np_NaT

        float_result = masked_rankdata_2d(
            data=asfloat,
            mask=mask,
            missing_value=nan,
            method=method,
            ascending=True,
        )
        datetime_result = masked_rankdata_2d(
            data=asdatetime,
            mask=mask,
            missing_value=np_NaT,
            method=method,
            ascending=True,
        )

        check_arrays(float_result, datetime_result)
开发者ID:oliverjo,项目名称:zipline,代码行数:34,代码来源:test_factor.py


示例2: test_engine_with_multicolumn_loader

    def test_engine_with_multicolumn_loader(self):
        open_, close = USEquityPricing.open, USEquityPricing.close

        # Test for thirty days up to the second to last day that we think all
        # the assets existed.  If we test the last day of our calendar, no
        # assets will be in our output, because their end dates are all
        dates_to_test = self.dates[-32:-2]

        loader = MultiColumnLoader({
            open_: ConstantLoader(dates=self.dates,
                                  assets=self.assets,
                                  constants={open_: 1}),
            close: ConstantLoader(dates=self.dates,
                                  assets=self.assets,
                                  constants={close: 2})
        })

        engine = SimpleFFCEngine(loader, self.dates, self.asset_finder)

        factor = RollingSumDifference()

        result = engine.factor_matrix({'f': factor},
                                      dates_to_test[0],
                                      dates_to_test[-1])
        self.assertIsNotNone(result)
        self.assertEqual({'f'}, set(result.columns))

        result_index = self.assets * len(dates_to_test)
        result_shape = (len(result_index),)
        check_arrays(
            result['f'],
            Series(index=result_index, data=full(result_shape, -3)),
        )
开发者ID:MonoCloud,项目名称:zipline,代码行数:33,代码来源:test_engine.py


示例3: test_engine_with_multicolumn_loader

    def test_engine_with_multicolumn_loader(self):
        open_, close = USEquityPricing.open, USEquityPricing.close

        loader = MultiColumnLoader({
            open_: ConstantLoader(dates=self.dates,
                                  assets=self.assets,
                                  constants={open_: 1}),
            close: ConstantLoader(dates=self.dates,
                                  assets=self.assets,
                                  constants={close: 2})
        })

        engine = SimpleFFCEngine(loader, self.dates, self.asset_finder)

        factor = RollingSumDifference()

        result = engine.factor_matrix({'f': factor},
                                      self.dates[2],
                                      self.dates[-1])
        self.assertIsNotNone(result)
        self.assertEqual({'f'}, set(result.columns))

        # (close - open) * window = (1 - 2) * 3 = -3
        # skipped 2 from the start, so that the window is full
        check_arrays(result['f'],
                     Series([-3] * len(self.assets) * (len(self.dates) - 2)))
开发者ID:michaeljohnbennett,项目名称:zipline,代码行数:26,代码来源:test_engine.py


示例4: test_single_factor

    def test_single_factor(self):
        loader = self.loader
        assets = self.assets
        engine = SimplePipelineEngine(
            lambda column: loader, self.dates, self.asset_finder,
        )
        result_shape = (num_dates, num_assets) = (5, len(assets))
        dates = self.dates[10:10 + num_dates]

        factor = RollingSumDifference()
        expected_result = -factor.window_length

        # Since every asset will pass the screen, these should be equivalent.
        pipelines = [
            Pipeline(columns={'f': factor}),
            Pipeline(
                columns={'f': factor},
                screen=factor.eq(expected_result),
            ),
        ]

        for p in pipelines:
            result = engine.run_pipeline(p, dates[0], dates[-1])
            self.assertEqual(set(result.columns), {'f'})
            assert_multi_index_is_product(
                self, result.index, dates, assets
            )

            check_arrays(
                result['f'].unstack().values,
                full(result_shape, expected_result, dtype=float),
            )
开发者ID:AlexanderAA,项目名称:zipline,代码行数:32,代码来源:test_engine.py


示例5: test_rolling_and_nonrolling

    def test_rolling_and_nonrolling(self):
        open_ = USEquityPricing.open
        close = USEquityPricing.close
        volume = USEquityPricing.volume

        # Test for thirty days up to the last day that we think all
        # the assets existed.
        dates_to_test = self.dates[-30:]

        constants = {open_: 1, close: 2, volume: 3}
        loader = ConstantLoader(constants=constants, dates=self.dates, assets=self.assets)
        engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)

        sumdiff = RollingSumDifference()

        result = engine.run_pipeline(
            Pipeline(
                columns={"sumdiff": sumdiff, "open": open_.latest, "close": close.latest, "volume": volume.latest}
            ),
            dates_to_test[0],
            dates_to_test[-1],
        )
        self.assertIsNotNone(result)
        self.assertEqual({"sumdiff", "open", "close", "volume"}, set(result.columns))

        result_index = self.assets * len(dates_to_test)
        result_shape = (len(result_index),)
        check_arrays(result["sumdiff"], Series(index=result_index, data=full(result_shape, -3)))

        for name, const in [("open", 1), ("close", 2), ("volume", 3)]:
            check_arrays(result[name], Series(index=result_index, data=full(result_shape, const)))
开发者ID:testmana2,项目名称:zipline,代码行数:31,代码来源:test_engine.py


示例6: test_engine_with_multicolumn_loader

    def test_engine_with_multicolumn_loader(self):
        open_ = USEquityPricing.open
        close = USEquityPricing.close
        volume = USEquityPricing.volume

        # Test for thirty days up to the second to last day that we think all
        # the assets existed.  If we test the last day of our calendar, no
        # assets will be in our output, because their end dates are all
        dates_to_test = self.dates[-32:-2]

        constants = {open_: 1, close: 2, volume: 3}
        loader = ConstantLoader(constants=constants, dates=self.dates, assets=self.assets)
        engine = SimplePipelineEngine(loader, self.dates, self.asset_finder)

        sumdiff = RollingSumDifference()

        result = engine.run_pipeline(
            Pipeline(
                columns={"sumdiff": sumdiff, "open": open_.latest, "close": close.latest, "volume": volume.latest}
            ),
            dates_to_test[0],
            dates_to_test[-1],
        )
        self.assertIsNotNone(result)
        self.assertEqual({"sumdiff", "open", "close", "volume"}, set(result.columns))

        result_index = self.assets * len(dates_to_test)
        result_shape = (len(result_index),)
        check_arrays(result["sumdiff"], Series(index=result_index, data=full(result_shape, -3)))

        for name, const in [("open", 1), ("close", 2), ("volume", 3)]:
            check_arrays(result[name], Series(index=result_index, data=full(result_shape, const)))
开发者ID:mirizzi,项目名称:zipline,代码行数:32,代码来源:test_engine.py


示例7: test_percentile_nasty_partitions

    def test_percentile_nasty_partitions(self):
        # Test percentile with nasty partitions: divide up 5 assets into
        # quartiles.
        # There isn't a nice mathematical definition of correct behavior here,
        # so for now we guarantee the behavior of numpy.nanpercentile.  This is
        # mostly for regression testing in case we write our own specialized
        # percentile calculation at some point in the future.

        data = arange(25, dtype=float).reshape(5, 5) % 4
        quartiles = range(4)
        filter_names = ['pct_' + str(q) for q in quartiles]

        graph = TermGraph(
            {
                name: self.f.percentile_between(q * 25.0, (q + 1) * 25.0)
                for name, q in zip(filter_names, quartiles)
            }
        )
        results = self.run_graph(
            graph,
            initial_workspace={self.f: data},
            mask=self.build_mask(ones((5, 5))),
        )

        for name, quartile in zip(filter_names, quartiles):
            result = results[name]
            lower = quartile * 25.0
            upper = (quartile + 1) * 25.0
            expected = and_(
                nanpercentile(data, lower, axis=1, keepdims=True) <= data,
                data <= nanpercentile(data, upper, axis=1, keepdims=True),
            )
            check_arrays(result, expected)
开发者ID:AlexanderAA,项目名称:zipline,代码行数:33,代码来源:test_filter.py


示例8: test_isnull_int_dtype

    def test_isnull_int_dtype(self, custom_missing_value):

        class CustomMissingValue(Factor):
            dtype = int64_dtype
            window_length = 0
            missing_value = custom_missing_value
            inputs = ()

        factor = CustomMissingValue()

        data = arange(25).reshape(5, 5)
        data[eye(5, dtype=bool)] = custom_missing_value

        graph = TermGraph(
            {
                'isnull': factor.isnull(),
                'notnull': factor.notnull(),
            }
        )

        results = self.run_graph(
            graph,
            initial_workspace={factor: data},
            mask=self.build_mask(ones((5, 5))),
        )
        check_arrays(results['isnull'], eye(5, dtype=bool))
        check_arrays(results['notnull'], ~eye(5, dtype=bool))
开发者ID:litespeeddi2,项目名称:zipline,代码行数:27,代码来源:test_factor.py


示例9: test_isnull_datetime_dtype

    def test_isnull_datetime_dtype(self):
        class DatetimeFactor(Factor):
            dtype = datetime64ns_dtype
            window_length = 0
            inputs = ()

        factor = DatetimeFactor()

        data = arange(25).reshape(5, 5).astype('datetime64[ns]')
        data[eye(5, dtype=bool)] = NaTns

        graph = TermGraph(
            {
                'isnull': factor.isnull(),
                'notnull': factor.notnull(),
            }
        )

        results = self.run_graph(
            graph,
            initial_workspace={factor: data},
            mask=self.build_mask(ones((5, 5))),
        )
        check_arrays(results['isnull'], eye(5, dtype=bool))
        check_arrays(results['notnull'], ~eye(5, dtype=bool))
开发者ID:litespeeddi2,项目名称:zipline,代码行数:25,代码来源:test_factor.py


示例10: test_rank_after_mask

    def test_rank_after_mask(self):
        # data = arange(25).reshape(5, 5).transpose() % 4
        data = array([[0, 1, 2, 3, 0], [1, 2, 3, 0, 1], [2, 3, 0, 1, 2], [3, 0, 1, 2, 3], [0, 1, 2, 3, 0]], dtype=float)
        mask_data = ~eye(5, dtype=bool)
        initial_workspace = {self.f: data, Mask(): mask_data}

        graph = TermGraph(
            {
                "ascending_nomask": self.f.rank(ascending=True),
                "ascending_mask": self.f.rank(ascending=True, mask=Mask()),
                "descending_nomask": self.f.rank(ascending=False),
                "descending_mask": self.f.rank(ascending=False, mask=Mask()),
            }
        )

        expected = {
            "ascending_nomask": array(
                [
                    [1.0, 3.0, 4.0, 5.0, 2.0],
                    [2.0, 4.0, 5.0, 1.0, 3.0],
                    [3.0, 5.0, 1.0, 2.0, 4.0],
                    [4.0, 1.0, 2.0, 3.0, 5.0],
                    [1.0, 3.0, 4.0, 5.0, 2.0],
                ]
            ),
            "descending_nomask": array(
                [
                    [4.0, 3.0, 2.0, 1.0, 5.0],
                    [3.0, 2.0, 1.0, 5.0, 4.0],
                    [2.0, 1.0, 5.0, 4.0, 3.0],
                    [1.0, 5.0, 4.0, 3.0, 2.0],
                    [4.0, 3.0, 2.0, 1.0, 5.0],
                ]
            ),
            # Diagonal should be all nans, and anything whose rank was less
            # than the diagonal in the unmasked calc should go down by 1.
            "ascending_mask": array(
                [
                    [nan, 2.0, 3.0, 4.0, 1.0],
                    [2.0, nan, 4.0, 1.0, 3.0],
                    [2.0, 4.0, nan, 1.0, 3.0],
                    [3.0, 1.0, 2.0, nan, 4.0],
                    [1.0, 2.0, 3.0, 4.0, nan],
                ]
            ),
            "descending_mask": array(
                [
                    [nan, 3.0, 2.0, 1.0, 4.0],
                    [2.0, nan, 1.0, 4.0, 3.0],
                    [2.0, 1.0, nan, 4.0, 3.0],
                    [1.0, 4.0, 3.0, nan, 2.0],
                    [4.0, 3.0, 2.0, 1.0, nan],
                ]
            ),
        }

        results = self.run_graph(graph, initial_workspace, mask=self.build_mask(ones((5, 5))))
        for method in results:
            check_arrays(expected[method], results[method])
开发者ID:testmana2,项目名称:zipline,代码行数:59,代码来源:test_factor.py


示例11: check

 def check(terms):
     results = self.run_terms(
         terms,
         initial_workspace={self.f: data},
         mask=self.build_mask(ones((5, 5))),
     )
     for method in terms:
         check_arrays(results[method], expected_ranks[method])
开发者ID:NJ32,项目名称:zipline,代码行数:8,代码来源:test_factor.py


示例12: check_output

 def check_output(self, expr, expected):
     result = expr._compute(
         [self.fake_raw_data[input_] for input_ in expr.inputs],
         self.mask.index,
         self.mask.columns,
         self.mask.values,
     )
     check_arrays(result, expected)
开发者ID:NJ32,项目名称:zipline,代码行数:8,代码来源:test_numerical_expression.py


示例13: test_rolling_and_nonrolling

    def test_rolling_and_nonrolling(self):
        open_ = USEquityPricing.open
        close = USEquityPricing.close
        volume = USEquityPricing.volume

        # Test for thirty days up to the last day that we think all
        # the assets existed.
        dates_to_test = self.dates[-30:]

        constants = {open_: 1, close: 2, volume: 3}
        loader = PrecomputedLoader(
            constants=constants,
            dates=self.dates,
            sids=self.asset_ids,
        )
        engine = SimplePipelineEngine(
            lambda column: loader, self.dates, self.asset_finder,
        )

        sumdiff = RollingSumDifference()

        result = engine.run_pipeline(
            Pipeline(
                columns={
                    'sumdiff': sumdiff,
                    'open': open_.latest,
                    'close': close.latest,
                    'volume': volume.latest,
                },
            ),
            dates_to_test[0],
            dates_to_test[-1]
        )
        self.assertIsNotNone(result)
        self.assertEqual(
            {'sumdiff', 'open', 'close', 'volume'},
            set(result.columns)
        )

        result_index = self.asset_ids * len(dates_to_test)
        result_shape = (len(result_index),)
        check_arrays(
            result['sumdiff'],
            Series(
                index=result_index,
                data=full(result_shape, -3, dtype=float),
            ),
        )

        for name, const in [('open', 1), ('close', 2), ('volume', 3)]:
            check_arrays(
                result[name],
                Series(
                    index=result_index,
                    data=full(result_shape, const, dtype=float),
                ),
            )
开发者ID:AlexanderAA,项目名称:zipline,代码行数:57,代码来源:test_engine.py


示例14: check

 def check(terms):
     graph = TermGraph(terms)
     results = self.run_graph(
         graph,
         initial_workspace={f: data},
         mask=self.build_mask(ones((5, 5))),
     )
     for method in terms:
         check_arrays(results[method], expected_ranks[method])
开发者ID:oliverjo,项目名称:zipline,代码行数:9,代码来源:test_factor.py


示例15: test_notnan

    def test_notnan(self):
        data = self.randn_data(seed=10)
        diag = eye(*data.shape, dtype=bool)
        data[diag] = nan

        results = self.run_graph(
            TermGraph({'notnan': self.f.notnan()}),
            initial_workspace={self.f: data},
        )
        check_arrays(results['notnan'], ~diag)
开发者ID:AlexanderAA,项目名称:zipline,代码行数:10,代码来源:test_filter.py


示例16: test_isfinite

    def test_isfinite(self):
        data = self.randn_data(seed=10)
        data[:, 0] = nan
        data[:, 2] = inf
        data[:, 4] = -inf

        results = self.run_graph(
            TermGraph({'isfinite': self.f.isfinite()}),
            initial_workspace={self.f: data},
        )
        check_arrays(results['isfinite'], isfinite(data))
开发者ID:AlexanderAA,项目名称:zipline,代码行数:11,代码来源:test_filter.py


示例17: test_top_and_bottom

    def test_top_and_bottom(self):
        data = self.randn_data(seed=5)  # Fix a seed for determinism.

        mask_data = ones_like(data, dtype=bool)
        mask_data[:, 0] = False

        nan_data = data.copy()
        nan_data[:, 0] = nan

        mask = Mask()
        workspace = {self.f: data, mask: mask_data}

        methods = ['top', 'bottom']
        counts = 2, 3, 10
        term_combos = list(product(methods, counts, [True, False]))

        def termname(method, count, masked):
            return '_'.join([method, str(count), 'mask' if masked else ''])

        # Add a term for each permutation of top/bottom, count, and
        # mask/no_mask.
        terms = {}
        for method, count, masked in term_combos:
            kwargs = {'N': count}
            if masked:
                kwargs['mask'] = mask
            term = getattr(self.f, method)(**kwargs)
            terms[termname(method, count, masked)] = term

        results = self.run_graph(TermGraph(terms), initial_workspace=workspace)

        def expected_result(method, count, masked):
            # Ranking with a mask is equivalent to ranking with nans applied on
            # the masked values.
            to_rank = nan_data if masked else data

            if method == 'top':
                return rowwise_rank(-to_rank) < count
            elif method == 'bottom':
                return rowwise_rank(to_rank) < count

        for method, count, masked in term_combos:
            result = results[termname(method, count, masked)]

            # Check that `min(c, num_assets)` assets passed each day.
            passed_per_day = result.sum(axis=1)
            check_arrays(
                passed_per_day,
                full_like(passed_per_day, min(count, data.shape[1])),
            )

            expected = expected_result(method, count, masked)
            check_arrays(result, expected)
开发者ID:AlexanderAA,项目名称:zipline,代码行数:53,代码来源:test_filter.py


示例18: test_sequenced_filter_order_independent

    def test_sequenced_filter_order_independent(self):
        data = self.arange_data() % 5
        results = self.run_terms(
            {
                # Sequencing is equivalent to &ing for commutative filters.
                'sequenced': (1.5 < self.f).then(self.f < 3.5),
                'anded': (1.5 < self.f) & (self.f < 3.5),
            },
            initial_workspace={self.f: data},
        )
        expected = (1.5 < data) & (data < 3.5)

        check_arrays(results['sequenced'], expected)
        check_arrays(results['anded'], expected)
开发者ID:michaeljohnbennett,项目名称:zipline,代码行数:14,代码来源:test_filter.py


示例19: test_masking

    def test_masking(self, dtype, missing_value, window_length):
        missing_value = coerce_to_dtype(dtype, missing_value)
        baseline_ints = arange(15).reshape(5, 3)
        baseline = baseline_ints.astype(dtype)
        mask = (baseline_ints % 2).astype(bool)
        masked_baseline = where(mask, baseline, missing_value)

        array = AdjustedArray(
            baseline,
            mask,
            adjustments={},
            missing_value=missing_value,
        )

        gen_expected = moving_window(masked_baseline, window_length)
        gen_actual = array.traverse(window_length)
        for expected, actual in zip(gen_expected, gen_actual):
            check_arrays(expected, actual)
开发者ID:litespeeddi2,项目名称:zipline,代码行数:18,代码来源:test_adjusted_array.py


示例20: test_bottom

    def test_bottom(self):
        counts = 2, 3, 10
        data = self.randn_data(seed=5)  # Arbitrary seed choice.
        results = self.run_terms(
            terms={'bottom_' + str(c): self.f.bottom(c) for c in counts},
            initial_workspace={self.f: data},
        )
        for c in counts:
            result = results['bottom_' + str(c)]

            # Check that `min(c, num_assets)` assets passed each day.
            passed_per_day = result.sum(axis=1)
            check_arrays(
                passed_per_day,
                full_like(passed_per_day, min(c, data.shape[1])),
            )

            # Check that the bottom `c` assets passed.
            expected = rowwise_rank(data) < c
            check_arrays(result, expected)
开发者ID:NJ32,项目名称:zipline,代码行数:20,代码来源:test_filter.py



注:本文中的zipline.utils.test_utils.check_arrays函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python test_utils.drain_zipline函数代码示例发布时间:2022-05-26
下一篇:
Python test_utils.assert_single_position函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap