本文整理汇总了Python中zipline.testing.check_arrays函数的典型用法代码示例。如果您正苦于以下问题:Python check_arrays函数的具体用法?Python check_arrays怎么用?Python check_arrays使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了check_arrays函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_make_cascading_boolean_array
def test_make_cascading_boolean_array(self):
check_arrays(
make_cascading_boolean_array((3, 3)),
array(
[[True, True, False],
[True, False, False],
[False, False, False]]
),
)
check_arrays(
make_cascading_boolean_array((3, 3), first_value=False),
array(
[[False, False, True],
[False, True, True],
[True, True, True]]
),
)
check_arrays(
make_cascading_boolean_array((1, 3)),
array([[True, True, False]]),
)
check_arrays(
make_cascading_boolean_array((3, 1)),
array([[False], [False], [False]]),
)
check_arrays(
make_cascading_boolean_array((3, 0)),
empty((3, 0), dtype=bool_dtype),
)
开发者ID:4ever911,项目名称:zipline,代码行数:29,代码来源:test_testing.py
示例2: test_setitem_array
def test_setitem_array(self):
arr = LabelArray(self.strs, missing_value=None)
orig_arr = arr.copy()
# Write a row.
self.assertFalse(
(arr[0] == arr[1]).all(),
"This test doesn't test anything because rows 0"
" and 1 are already equal!"
)
arr[0] = arr[1]
for i in range(arr.shape[1]):
self.assertEqual(arr[0, i], arr[1, i])
# Write a column.
self.assertFalse(
(arr[:, 0] == arr[:, 1]).all(),
"This test doesn't test anything because columns 0"
" and 1 are already equal!"
)
arr[:, 0] = arr[:, 1]
for i in range(arr.shape[0]):
self.assertEqual(arr[i, 0], arr[i, 1])
# Write the whole array.
arr[:] = orig_arr
check_arrays(arr, orig_arr)
开发者ID:FranSal,项目名称:zipline,代码行数:27,代码来源:test_labelarray.py
示例3: test_masked_rankdata_2d
def test_masked_rankdata_2d(self,
seed_value,
method,
use_mask,
set_missing,
ascending):
eyemask = ~eye(5, dtype=bool)
nomask = ones((5, 5), dtype=bool)
seed(seed_value)
asfloat = (randn(5, 5) * seed_value)
asdatetime = (asfloat).copy().view('datetime64[ns]')
mask = eyemask if use_mask else nomask
if set_missing:
asfloat[:, 2] = nan
asdatetime[:, 2] = NaTns
float_result = masked_rankdata_2d(
data=asfloat,
mask=mask,
missing_value=nan,
method=method,
ascending=True,
)
datetime_result = masked_rankdata_2d(
data=asdatetime,
mask=mask,
missing_value=NaTns,
method=method,
ascending=True,
)
check_arrays(float_result, datetime_result)
开发者ID:SJCosgrove,项目名称:quantopianresearch,代码行数:34,代码来源:test_factor.py
示例4: test_rolling_and_nonrolling
def test_rolling_and_nonrolling(self):
open_ = USEquityPricing.open
close = USEquityPricing.close
volume = USEquityPricing.volume
# Test for thirty days up to the last day that we think all
# the assets existed.
dates_to_test = self.dates[-30:]
constants = {open_: 1, close: 2, volume: 3}
loader = PrecomputedLoader(constants=constants, dates=self.dates, sids=self.asset_ids)
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
sumdiff = RollingSumDifference()
result = engine.run_pipeline(
Pipeline(
columns={"sumdiff": sumdiff, "open": open_.latest, "close": close.latest, "volume": volume.latest}
),
dates_to_test[0],
dates_to_test[-1],
)
self.assertIsNotNone(result)
self.assertEqual({"sumdiff", "open", "close", "volume"}, set(result.columns))
result_index = self.asset_ids * len(dates_to_test)
result_shape = (len(result_index),)
check_arrays(result["sumdiff"], Series(index=result_index, data=full(result_shape, -3, dtype=float)))
for name, const in [("open", 1), ("close", 2), ("volume", 3)]:
check_arrays(result[name], Series(index=result_index, data=full(result_shape, const, dtype=float)))
开发者ID:RoyHsiao,项目名称:zipline,代码行数:31,代码来源:test_engine.py
示例5: test_isnull_datetime_dtype
def test_isnull_datetime_dtype(self):
class DatetimeFactor(Factor):
dtype = datetime64ns_dtype
window_length = 0
inputs = ()
factor = DatetimeFactor()
data = arange(25).reshape(5, 5).astype('datetime64[ns]')
data[eye(5, dtype=bool)] = NaTns
graph = TermGraph(
{
'isnull': factor.isnull(),
'notnull': factor.notnull(),
}
)
results = self.run_graph(
graph,
initial_workspace={factor: data},
mask=self.build_mask(ones((5, 5))),
)
check_arrays(results['isnull'], eye(5, dtype=bool))
check_arrays(results['notnull'], ~eye(5, dtype=bool))
开发者ID:4ever911,项目名称:zipline,代码行数:25,代码来源:test_factor.py
示例6: test_percentile_nasty_partitions
def test_percentile_nasty_partitions(self):
# Test percentile with nasty partitions: divide up 5 assets into
# quartiles.
# There isn't a nice mathematical definition of correct behavior here,
# so for now we guarantee the behavior of numpy.nanpercentile. This is
# mostly for regression testing in case we write our own specialized
# percentile calculation at some point in the future.
data = arange(25, dtype=float).reshape(5, 5) % 4
quartiles = range(4)
filter_names = ['pct_' + str(q) for q in quartiles]
graph = TermGraph(
{
name: self.f.percentile_between(q * 25.0, (q + 1) * 25.0)
for name, q in zip(filter_names, quartiles)
}
)
results = self.run_graph(
graph,
initial_workspace={self.f: data},
mask=self.build_mask(ones((5, 5))),
)
for name, quartile in zip(filter_names, quartiles):
result = results[name]
lower = quartile * 25.0
upper = (quartile + 1) * 25.0
expected = and_(
nanpercentile(data, lower, axis=1, keepdims=True) <= data,
data <= nanpercentile(data, upper, axis=1, keepdims=True),
)
check_arrays(result, expected)
开发者ID:Weylew,项目名称:zipline,代码行数:33,代码来源:test_filter.py
示例7: test_isnull_int_dtype
def test_isnull_int_dtype(self, custom_missing_value):
class CustomMissingValue(Factor):
dtype = int64_dtype
window_length = 0
missing_value = custom_missing_value
inputs = ()
factor = CustomMissingValue()
data = arange(25).reshape(5, 5)
data[eye(5, dtype=bool)] = custom_missing_value
graph = TermGraph(
{
'isnull': factor.isnull(),
'notnull': factor.notnull(),
}
)
results = self.run_graph(
graph,
initial_workspace={factor: data},
mask=self.build_mask(ones((5, 5))),
)
check_arrays(results['isnull'], eye(5, dtype=bool))
check_arrays(results['notnull'], ~eye(5, dtype=bool))
开发者ID:4ever911,项目名称:zipline,代码行数:27,代码来源:test_factor.py
示例8: test_single_factor
def test_single_factor(self):
loader = self.loader
assets = self.assets
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
result_shape = (num_dates, num_assets) = (5, len(assets))
dates = self.dates[10:10 + num_dates]
factor = RollingSumDifference()
expected_result = -factor.window_length
# Since every asset will pass the screen, these should be equivalent.
pipelines = [
Pipeline(columns={'f': factor}),
Pipeline(
columns={'f': factor},
screen=factor.eq(expected_result),
),
]
for p in pipelines:
result = engine.run_pipeline(p, dates[0], dates[-1])
self.assertEqual(set(result.columns), {'f'})
assert_multi_index_is_product(
self, result.index, dates, assets
)
check_arrays(
result['f'].unstack().values,
full(result_shape, expected_result, dtype=float),
)
开发者ID:AdaoSmith,项目名称:zipline,代码行数:32,代码来源:test_engine.py
示例9: compute
def compute(self, today, assets, out, returns, returns_slice):
# Make sure that our slice is the correct shape (i.e. has only
# one column) and that it has the same values as the original
# returns factor from which it is derived.
assert returns_slice.shape == (self.window_length, 1)
assert returns.shape == (self.window_length, len(sids))
check_arrays(returns_slice[:, 0], returns[:, my_asset_column])
开发者ID:4ever911,项目名称:zipline,代码行数:7,代码来源:test_slice.py
示例10: test_window_safe
def test_window_safe(self, factor_len):
# all true data set of (days, securities)
data = full(self.default_shape, True, dtype=bool)
class InputFilter(Filter):
inputs = ()
window_length = 0
class TestFactor(CustomFactor):
dtype = float64_dtype
inputs = (InputFilter(), )
window_length = factor_len
def compute(self, today, assets, out, filter_):
# sum for each column
out[:] = np_sum(filter_, axis=0)
results = self.run_graph(
TermGraph({'windowsafe': TestFactor()}),
initial_workspace={InputFilter(): data},
)
# number of days in default_shape
n = self.default_shape[0]
# shape of output array
output_shape = ((n - factor_len + 1), self.default_shape[1])
check_arrays(
results['windowsafe'],
full(output_shape, factor_len, dtype=float64)
)
开发者ID:Weylew,项目名称:zipline,代码行数:31,代码来源:test_filter.py
示例11: check_output
def check_output(self, expr, expected):
result = expr._compute(
[self.fake_raw_data[input_] for input_ in expr.inputs],
self.mask.index,
self.mask.columns,
self.mask.values,
)
check_arrays(result, expected)
开发者ID:Priestfu,项目名称:zipline,代码行数:8,代码来源:test_numerical_expression.py
示例12: test_any
def test_any(self):
# FUN FACT: The inputs and outputs here are exactly the negation of
# the inputs and outputs for test_all above. This isn't a coincidence.
#
# By de Morgan's Laws, we have::
#
# ~(a & b) == (~a | ~b)
#
# negating both sides, we have::
#
# (a & b) == ~(a | ~b)
#
# Since all(a, b) is isomorphic to (a & b), and any(a, b) is isomorphic
# to (a | b), we have::
#
# all(a, b) == ~(any(~a, ~b))
#
data = array([[0, 0, 0, 0, 0, 0],
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1]], dtype=bool)
# With a window_length of N, 1's should be "sticky" for the (N - 1)
# days after the 1 in the base data.
# Note that, the way ``self.run_graph`` works, we compute the same
# number of output rows for all inputs, so we only get the last 4
# outputs for expected_3 even though we have enought input data to
# compute 5 rows.
expected_3 = array([[1, 1, 1, 0, 0, 0],
[0, 1, 1, 1, 0, 0],
[0, 0, 1, 1, 1, 0],
[0, 0, 0, 1, 1, 1]], dtype=bool)
expected_4 = array([[1, 1, 1, 0, 0, 0],
[1, 1, 1, 1, 0, 0],
[0, 1, 1, 1, 1, 0],
[0, 0, 1, 1, 1, 1]], dtype=bool)
class Input(Filter):
inputs = ()
window_length = 0
results = self.run_graph(
TermGraph({
'3': Any(inputs=[Input()], window_length=3),
'4': Any(inputs=[Input()], window_length=4),
}),
initial_workspace={Input(): data},
mask=self.build_mask(ones(shape=data.shape)),
)
check_arrays(results['3'], expected_3)
check_arrays(results['4'], expected_4)
开发者ID:4ever911,项目名称:zipline,代码行数:58,代码来源:test_filter.py
示例13: check
def check(terms):
graph = TermGraph(terms)
results = self.run_graph(
graph,
initial_workspace={f: data},
mask=self.build_mask(ones((5, 5))),
)
for method in terms:
check_arrays(results[method], expected_ranks[method])
开发者ID:4ever911,项目名称:zipline,代码行数:9,代码来源:test_factor.py
示例14: check_terms
def check_terms(self, terms, expected, initial_workspace, mask):
"""
Compile the given terms into a TermGraph, compute it with
initial_workspace, and compare the results with ``expected``.
"""
graph = TermGraph(terms)
results = self.run_graph(graph, initial_workspace, mask)
for key, (res, exp) in dzip_exact(results, expected).items():
check_arrays(res, exp)
开发者ID:AdaoSmith,项目名称:zipline,代码行数:9,代码来源:base.py
示例15: test_factor_with_multiple_outputs
def test_factor_with_multiple_outputs(self):
dates = self.dates[5:10]
assets = self.assets
asset_ids = self.asset_ids
constants = self.constants
num_dates = len(dates)
num_assets = len(assets)
open = USEquityPricing.open
close = USEquityPricing.close
engine = SimplePipelineEngine(
lambda column: self.loader, self.dates, self.asset_finder,
)
def create_expected_results(expected_value, mask):
expected_values = where(mask, expected_value, nan)
return DataFrame(expected_values, index=dates, columns=assets)
cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_dates, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_dates, num_assets), first_value=False,
)
expected_no_mask_result = full(
shape=(num_dates, num_assets), fill_value=True, dtype=bool_dtype,
)
masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
open_price, close_price = MultipleOutputs(mask=mask)
pipeline = Pipeline(
columns={'open_price': open_price, 'close_price': close_price},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
for colname, case_column in (('open_price', open),
('close_price', close)):
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
output_results = results[colname].unstack()
output_expected = create_expected_results(
constants[case_column], expected_mask,
)
assert_frame_equal(output_results, output_expected)
开发者ID:AtwooTM,项目名称:zipline,代码行数:56,代码来源:test_engine.py
示例16: test_isfinite
def test_isfinite(self):
data = self.randn_data(seed=10)
data[:, 0] = nan
data[:, 2] = inf
data[:, 4] = -inf
results = self.run_graph(
TermGraph({'isfinite': self.f.isfinite()}),
initial_workspace={self.f: data},
)
check_arrays(results['isfinite'], isfinite(data))
开发者ID:Weylew,项目名称:zipline,代码行数:11,代码来源:test_filter.py
示例17: test_top_and_bottom
def test_top_and_bottom(self):
data = self.randn_data(seed=5) # Fix a seed for determinism.
mask_data = ones_like(data, dtype=bool)
mask_data[:, 0] = False
nan_data = data.copy()
nan_data[:, 0] = nan
mask = Mask()
workspace = {self.f: data, mask: mask_data}
methods = ['top', 'bottom']
counts = 2, 3, 10
term_combos = list(product(methods, counts, [True, False]))
def termname(method, count, masked):
return '_'.join([method, str(count), 'mask' if masked else ''])
# Add a term for each permutation of top/bottom, count, and
# mask/no_mask.
terms = {}
for method, count, masked in term_combos:
kwargs = {'N': count}
if masked:
kwargs['mask'] = mask
term = getattr(self.f, method)(**kwargs)
terms[termname(method, count, masked)] = term
results = self.run_graph(TermGraph(terms), initial_workspace=workspace)
def expected_result(method, count, masked):
# Ranking with a mask is equivalent to ranking with nans applied on
# the masked values.
to_rank = nan_data if masked else data
if method == 'top':
return rowwise_rank(-to_rank) < count
elif method == 'bottom':
return rowwise_rank(to_rank) < count
for method, count, masked in term_combos:
result = results[termname(method, count, masked)]
# Check that `min(c, num_assets)` assets passed each day.
passed_per_day = result.sum(axis=1)
check_arrays(
passed_per_day,
full_like(passed_per_day, min(count, data.shape[1])),
)
expected = expected_result(method, count, masked)
check_arrays(result, expected)
开发者ID:Weylew,项目名称:zipline,代码行数:53,代码来源:test_filter.py
示例18: test_rank_after_mask
def test_rank_after_mask(self, name, factor_dtype):
f = F(dtype=factor_dtype)
# data = arange(25).reshape(5, 5).transpose() % 4
data = array([[0, 1, 2, 3, 0],
[1, 2, 3, 0, 1],
[2, 3, 0, 1, 2],
[3, 0, 1, 2, 3],
[0, 1, 2, 3, 0]], dtype=factor_dtype)
mask_data = ~eye(5, dtype=bool)
initial_workspace = {f: data, Mask(): mask_data}
graph = TermGraph(
{
"ascending_nomask": f.rank(ascending=True),
"ascending_mask": f.rank(ascending=True, mask=Mask()),
"descending_nomask": f.rank(ascending=False),
"descending_mask": f.rank(ascending=False, mask=Mask()),
}
)
expected = {
"ascending_nomask": array([[1., 3., 4., 5., 2.],
[2., 4., 5., 1., 3.],
[3., 5., 1., 2., 4.],
[4., 1., 2., 3., 5.],
[1., 3., 4., 5., 2.]]),
"descending_nomask": array([[4., 3., 2., 1., 5.],
[3., 2., 1., 5., 4.],
[2., 1., 5., 4., 3.],
[1., 5., 4., 3., 2.],
[4., 3., 2., 1., 5.]]),
# Diagonal should be all nans, and anything whose rank was less
# than the diagonal in the unmasked calc should go down by 1.
"ascending_mask": array([[nan, 2., 3., 4., 1.],
[2., nan, 4., 1., 3.],
[2., 4., nan, 1., 3.],
[3., 1., 2., nan, 4.],
[1., 2., 3., 4., nan]]),
"descending_mask": array([[nan, 3., 2., 1., 4.],
[2., nan, 1., 4., 3.],
[2., 1., nan, 4., 3.],
[1., 4., 3., nan, 2.],
[4., 3., 2., 1., nan]]),
}
results = self.run_graph(
graph,
initial_workspace,
mask=self.build_mask(ones((5, 5))),
)
for method in results:
check_arrays(expected[method], results[method])
开发者ID:4ever911,项目名称:zipline,代码行数:53,代码来源:test_factor.py
示例19: test_overwrite_adjustment_cases
def test_overwrite_adjustment_cases(self,
name,
data,
lookback,
adjustments,
missing_value,
expected):
array = AdjustedArray(data, NOMASK, adjustments, missing_value)
for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable.
window_iter = array.traverse(lookback)
for yielded, expected_yield in zip_longest(window_iter, expected):
check_arrays(yielded, expected_yield)
开发者ID:280185386,项目名称:zipline,代码行数:12,代码来源:test_adjusted_array.py
示例20: test_no_adjustments
def test_no_adjustments(self,
name,
data,
lookback,
adjustments,
missing_value,
expected_output):
array = AdjustedArray(data, NOMASK, adjustments, missing_value)
for _ in range(2): # Iterate 2x ensure adjusted_arrays are re-usable.
in_out = zip(array.traverse(lookback), expected_output)
for yielded, expected_yield in in_out:
check_arrays(yielded, expected_yield)
开发者ID:280185386,项目名称:zipline,代码行数:13,代码来源:test_adjusted_array.py
注:本文中的zipline.testing.check_arrays函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论