本文整理汇总了Python中zipline.pipeline.engine.SimplePipelineEngine类的典型用法代码示例。如果您正苦于以下问题:Python SimplePipelineEngine类的具体用法?Python SimplePipelineEngine怎么用?Python SimplePipelineEngine使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SimplePipelineEngine类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_engine_with_multicolumn_loader
def test_engine_with_multicolumn_loader(self):
open_ = USEquityPricing.open
close = USEquityPricing.close
volume = USEquityPricing.volume
# Test for thirty days up to the second to last day that we think all
# the assets existed. If we test the last day of our calendar, no
# assets will be in our output, because their end dates are all
dates_to_test = self.dates[-32:-2]
constants = {open_: 1, close: 2, volume: 3}
loader = ConstantLoader(constants=constants, dates=self.dates, assets=self.assets)
engine = SimplePipelineEngine(loader, self.dates, self.asset_finder)
sumdiff = RollingSumDifference()
result = engine.run_pipeline(
Pipeline(
columns={"sumdiff": sumdiff, "open": open_.latest, "close": close.latest, "volume": volume.latest}
),
dates_to_test[0],
dates_to_test[-1],
)
self.assertIsNotNone(result)
self.assertEqual({"sumdiff", "open", "close", "volume"}, set(result.columns))
result_index = self.assets * len(dates_to_test)
result_shape = (len(result_index),)
check_arrays(result["sumdiff"], Series(index=result_index, data=full(result_shape, -3)))
for name, const in [("open", 1), ("close", 2), ("volume", 3)]:
check_arrays(result[name], Series(index=result_index, data=full(result_shape, const)))
开发者ID:mirizzi,项目名称:zipline,代码行数:32,代码来源:test_engine.py
示例2: test_rolling_and_nonrolling
def test_rolling_and_nonrolling(self):
open_ = USEquityPricing.open
close = USEquityPricing.close
volume = USEquityPricing.volume
# Test for thirty days up to the last day that we think all
# the assets existed.
dates_to_test = self.dates[-30:]
constants = {open_: 1, close: 2, volume: 3}
loader = PrecomputedLoader(constants=constants, dates=self.dates, sids=self.asset_ids)
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
sumdiff = RollingSumDifference()
result = engine.run_pipeline(
Pipeline(
columns={"sumdiff": sumdiff, "open": open_.latest, "close": close.latest, "volume": volume.latest}
),
dates_to_test[0],
dates_to_test[-1],
)
self.assertIsNotNone(result)
self.assertEqual({"sumdiff", "open", "close", "volume"}, set(result.columns))
result_index = self.asset_ids * len(dates_to_test)
result_shape = (len(result_index),)
check_arrays(result["sumdiff"], Series(index=result_index, data=full(result_shape, -3, dtype=float)))
for name, const in [("open", 1), ("close", 2), ("volume", 3)]:
check_arrays(result[name], Series(index=result_index, data=full(result_shape, const, dtype=float)))
开发者ID:RoyHsiao,项目名称:zipline,代码行数:31,代码来源:test_engine.py
示例3: test_custom_factor_outputs_parameter
def test_custom_factor_outputs_parameter(self):
dates = self.dates[5:10]
assets = self.assets
num_dates = len(dates)
num_assets = len(assets)
constants = self.constants
engine = SimplePipelineEngine(lambda column: self.loader, self.dates, self.asset_finder)
def create_expected_results(expected_value):
expected_values = full((num_dates, num_assets), expected_value, float64)
return DataFrame(expected_values, index=dates, columns=assets)
for window_length in range(1, 3):
sum_, diff = OpenCloseSumAndDiff(outputs=["sum_", "diff"], window_length=window_length)
pipeline = Pipeline(columns={"sum_": sum_, "diff": diff})
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
for colname, op in ("sum_", add), ("diff", sub):
output_results = results[colname].unstack()
output_expected = create_expected_results(
op(
constants[USEquityPricing.open] * window_length,
constants[USEquityPricing.close] * window_length,
)
)
assert_frame_equal(output_results, output_expected)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:25,代码来源:test_engine.py
示例4: test_factor_with_single_output
def test_factor_with_single_output(self):
"""
Test passing an `outputs` parameter of length 1 to a CustomFactor.
"""
dates = self.dates[5:10]
assets = self.assets
num_dates = len(dates)
open = USEquityPricing.open
open_values = [self.constants[open]] * num_dates
open_values_as_tuple = [(self.constants[open],)] * num_dates
engine = SimplePipelineEngine(lambda column: self.loader, self.dates, self.asset_finder)
single_output = OpenPrice(outputs=["open"])
pipeline = Pipeline(columns={"open_instance": single_output, "open_attribute": single_output.open})
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
# The instance `single_output` itself will compute a numpy.recarray
# when added as a column to our pipeline, so we expect its output
# values to be 1-tuples.
open_instance_expected = {asset: open_values_as_tuple for asset in assets}
open_attribute_expected = {asset: open_values for asset in assets}
for colname, expected_values in (
("open_instance", open_instance_expected),
("open_attribute", open_attribute_expected),
):
column_results = results[colname].unstack()
expected_results = DataFrame(expected_values, index=dates, columns=assets, dtype=float64)
assert_frame_equal(column_results, expected_results)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:29,代码来源:test_engine.py
示例5: test_drawdown
def test_drawdown(self):
# The monotonically-increasing data produced by SyntheticDailyBarWriter
# exercises two pathological cases for MaxDrawdown. The actual
# computed results are pretty much useless (everything is either NaN)
# or zero, but verifying we correctly handle those corner cases is
# valuable.
engine = SimplePipelineEngine(
lambda column: self.pipeline_loader, self.trading_calendar.all_sessions, self.asset_finder
)
window_length = 5
asset_ids = self.all_asset_ids
dates = date_range(
self.first_asset_start + self.trading_calendar.day, self.last_asset_end, freq=self.trading_calendar.day
)
dates_to_test = dates[window_length:]
drawdown = MaxDrawdown(inputs=(USEquityPricing.close,), window_length=window_length)
results = engine.run_pipeline(Pipeline(columns={"drawdown": drawdown}), dates_to_test[0], dates_to_test[-1])
# We expect NaNs when the asset was undefined, otherwise 0 everywhere,
# since the input is always increasing.
expected = DataFrame(
data=zeros((len(dates_to_test), len(asset_ids)), dtype=float),
index=dates_to_test,
columns=self.asset_finder.retrieve_all(asset_ids),
)
self.write_nans(expected)
result = results["drawdown"].unstack()
assert_frame_equal(expected, result)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:31,代码来源:test_engine.py
示例6: test_SMA
def test_SMA(self):
engine = SimplePipelineEngine(
lambda column: self.pipeline_loader, self.trading_calendar.all_sessions, self.asset_finder
)
window_length = 5
asset_ids = self.all_asset_ids
dates = date_range(
self.first_asset_start + self.trading_calendar.day, self.last_asset_end, freq=self.trading_calendar.day
)
dates_to_test = dates[window_length:]
SMA = SimpleMovingAverage(inputs=(USEquityPricing.close,), window_length=window_length)
results = engine.run_pipeline(Pipeline(columns={"sma": SMA}), dates_to_test[0], dates_to_test[-1])
# Shift back the raw inputs by a trading day because we expect our
# computed results to be computed using values anchored on the
# **previous** day's data.
expected_raw = rolling_mean(
expected_bar_values_2d(dates - self.trading_calendar.day, self.equity_info, "close"),
window_length,
min_periods=1,
)
expected = DataFrame(
# Truncate off the extra rows needed to compute the SMAs.
expected_raw[window_length:],
index=dates_to_test, # dates_to_test is dates[window_length:]
columns=self.asset_finder.retrieve_all(asset_ids),
)
self.write_nans(expected)
result = results["sma"].unstack()
assert_frame_equal(result, expected)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:33,代码来源:test_engine.py
示例7: test_instance_of_factor_with_multiple_outputs
def test_instance_of_factor_with_multiple_outputs(self):
"""
Test adding a CustomFactor instance, which has multiple outputs, as a
pipeline column directly. Its computed values should be tuples
containing the computed values of each of its outputs.
"""
dates = self.dates[5:10]
assets = self.assets
num_dates = len(dates)
num_assets = len(assets)
constants = self.constants
engine = SimplePipelineEngine(
lambda column: self.loader, self.dates, self.asset_finder,
)
open_values = [constants[USEquityPricing.open]] * num_assets
close_values = [constants[USEquityPricing.close]] * num_assets
expected_values = [list(zip(open_values, close_values))] * num_dates
expected_results = DataFrame(
expected_values, index=dates, columns=assets, dtype=float64,
)
multiple_outputs = MultipleOutputs()
pipeline = Pipeline(columns={'instance': multiple_outputs})
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
instance_results = results['instance'].unstack()
assert_frame_equal(instance_results, expected_results)
开发者ID:AtwooTM,项目名称:zipline,代码行数:27,代码来源:test_engine.py
示例8: test_single_factor
def test_single_factor(self):
loader = self.loader
assets = self.assets
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
result_shape = (num_dates, num_assets) = (5, len(assets))
dates = self.dates[10:10 + num_dates]
factor = RollingSumDifference()
expected_result = -factor.window_length
# Since every asset will pass the screen, these should be equivalent.
pipelines = [
Pipeline(columns={'f': factor}),
Pipeline(
columns={'f': factor},
screen=factor.eq(expected_result),
),
]
for p in pipelines:
result = engine.run_pipeline(p, dates[0], dates[-1])
self.assertEqual(set(result.columns), {'f'})
assert_multi_index_is_product(
self, result.index, dates, assets
)
check_arrays(
result['f'].unstack().values,
full(result_shape, expected_result, dtype=float),
)
开发者ID:AdaoSmith,项目名称:zipline,代码行数:32,代码来源:test_engine.py
示例9: test_numeric_factor
def test_numeric_factor(self):
constants = self.constants
loader = self.loader
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
num_dates = 5
dates = self.dates[10 : 10 + num_dates]
high, low = USEquityPricing.high, USEquityPricing.low
open, close = USEquityPricing.open, USEquityPricing.close
high_minus_low = RollingSumDifference(inputs=[high, low])
open_minus_close = RollingSumDifference(inputs=[open, close])
avg = (high_minus_low + open_minus_close) / 2
results = engine.run_pipeline(
Pipeline(columns={"high_low": high_minus_low, "open_close": open_minus_close, "avg": avg}),
dates[0],
dates[-1],
)
high_low_result = results["high_low"].unstack()
expected_high_low = 3.0 * (constants[high] - constants[low])
assert_frame_equal(high_low_result, DataFrame(expected_high_low, index=dates, columns=self.assets))
open_close_result = results["open_close"].unstack()
expected_open_close = 3.0 * (constants[open] - constants[close])
assert_frame_equal(open_close_result, DataFrame(expected_open_close, index=dates, columns=self.assets))
avg_result = results["avg"].unstack()
expected_avg = (expected_high_low + expected_open_close) / 2.0
assert_frame_equal(avg_result, DataFrame(expected_avg, index=dates, columns=self.assets))
开发者ID:RoyHsiao,项目名称:zipline,代码行数:30,代码来源:test_engine.py
示例10: run_graph
def run_graph(self, graph, initial_workspace, mask=None):
"""
Compute the given TermGraph, seeding the workspace of our engine with
`initial_workspace`.
Parameters
----------
graph : zipline.pipeline.graph.TermGraph
Graph to run.
initial_workspace : dict
Initial workspace to forward to SimplePipelineEngine.compute_chunk.
mask : DataFrame, optional
This is a value to pass to `initial_workspace` as the mask from
`AssetExists()`. Defaults to a frame of shape `self.default_shape`
containing all True values.
Returns
-------
results : dict
Mapping from termname -> computed result.
"""
engine = SimplePipelineEngine(lambda column: ExplodingObject(), self.__calendar, self.__finder)
if mask is None:
mask = self.__mask
dates, assets, mask_values = explode(mask)
initial_workspace.setdefault(AssetExists(), mask_values)
return engine.compute_chunk(graph, dates, assets, initial_workspace)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:28,代码来源:base.py
示例11: test_bad_dates
def test_bad_dates(self):
loader = self.loader
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
p = Pipeline()
msg = "start_date must be before or equal to end_date .*"
with self.assertRaisesRegexp(ValueError, msg):
engine.run_pipeline(p, self.dates[2], self.dates[1])
开发者ID:RoyHsiao,项目名称:zipline,代码行数:9,代码来源:test_engine.py
示例12: test_factor_with_multiple_outputs
def test_factor_with_multiple_outputs(self):
dates = self.dates[5:10]
assets = self.assets
asset_ids = self.asset_ids
constants = self.constants
num_dates = len(dates)
num_assets = len(assets)
open = USEquityPricing.open
close = USEquityPricing.close
engine = SimplePipelineEngine(
lambda column: self.loader, self.dates, self.asset_finder,
)
def create_expected_results(expected_value, mask):
expected_values = where(mask, expected_value, nan)
return DataFrame(expected_values, index=dates, columns=assets)
cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_dates, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_dates, num_assets), first_value=False,
)
expected_no_mask_result = full(
shape=(num_dates, num_assets), fill_value=True, dtype=bool_dtype,
)
masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
open_price, close_price = MultipleOutputs(mask=mask)
pipeline = Pipeline(
columns={'open_price': open_price, 'close_price': close_price},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
for colname, case_column in (('open_price', open),
('close_price', close)):
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
output_results = results[colname].unstack()
output_expected = create_expected_results(
constants[case_column], expected_mask,
)
assert_frame_equal(output_results, output_expected)
开发者ID:AtwooTM,项目名称:zipline,代码行数:56,代码来源:test_engine.py
示例13: test_same_day_pipeline
def test_same_day_pipeline(self):
loader = self.loader
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
factor = AssetID()
asset = self.asset_ids[0]
p = Pipeline(columns={"f": factor}, screen=factor <= asset)
# The crux of this is that when we run the pipeline for a single day
# (i.e. start and end dates are the same) we should accurately get
# data for the day prior.
result = engine.run_pipeline(p, self.dates[1], self.dates[1])
self.assertEqual(result["f"][0], 1.0)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:12,代码来源:test_engine.py
示例14: test_rolling_and_nonrolling
def test_rolling_and_nonrolling(self):
open_ = USEquityPricing.open
close = USEquityPricing.close
volume = USEquityPricing.volume
# Test for thirty days up to the last day that we think all
# the assets existed.
dates_to_test = self.dates[-30:]
constants = {open_: 1, close: 2, volume: 3}
loader = ConstantLoader(
constants=constants,
dates=self.dates,
assets=self.assets,
)
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
sumdiff = RollingSumDifference()
result = engine.run_pipeline(
Pipeline(
columns={
'sumdiff': sumdiff,
'open': open_.latest,
'close': close.latest,
'volume': volume.latest,
},
),
dates_to_test[0],
dates_to_test[-1]
)
self.assertIsNotNone(result)
self.assertEqual(
{'sumdiff', 'open', 'close', 'volume'},
set(result.columns)
)
result_index = self.assets * len(dates_to_test)
result_shape = (len(result_index),)
check_arrays(
result['sumdiff'],
Series(index=result_index, data=full(result_shape, -3)),
)
for name, const in [('open', 1), ('close', 2), ('volume', 3)]:
check_arrays(
result[name],
Series(index=result_index, data=full(result_shape, const)),
)
开发者ID:xiaojinyue,项目名称:zipline,代码行数:51,代码来源:test_engine.py
示例15: test_multiple_rolling_factors
def test_multiple_rolling_factors(self):
loader = self.loader
finder = self.asset_finder
assets = self.assets
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
shape = num_dates, num_assets = (5, len(assets))
dates = self.dates[10:10 + num_dates]
short_factor = RollingSumDifference(window_length=3)
long_factor = RollingSumDifference(window_length=5)
high_factor = RollingSumDifference(
window_length=3,
inputs=[USEquityPricing.open, USEquityPricing.high],
)
pipeline = Pipeline(
columns={
'short': short_factor,
'long': long_factor,
'high': high_factor,
}
)
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
self.assertEqual(set(results.columns), {'short', 'high', 'long'})
assert_multi_index_is_product(
self, results.index, dates, finder.retrieve_all(assets)
)
# row-wise sum over an array whose values are all (1 - 2)
check_arrays(
results['short'].unstack().values,
full(shape, -short_factor.window_length),
)
check_arrays(
results['long'].unstack().values,
full(shape, -long_factor.window_length),
)
# row-wise sum over an array whose values are all (1 - 3)
check_arrays(
results['high'].unstack().values,
full(shape, -2 * high_factor.window_length),
)
开发者ID:xiaojinyue,项目名称:zipline,代码行数:46,代码来源:test_engine.py
示例16: run_graph
def run_graph(self, graph, initial_workspace, mask=None):
"""
Compute the given TermGraph, seeding the workspace of our engine with
`initial_workspace`.
Parameters
----------
graph : zipline.pipeline.graph.ExecutionPlan
Graph to run.
initial_workspace : dict
Initial workspace to forward to SimplePipelineEngine.compute_chunk.
mask : DataFrame, optional
This is a value to pass to `initial_workspace` as the mask from
`AssetExists()`. Defaults to a frame of shape `self.default_shape`
containing all True values.
Returns
-------
results : dict
Mapping from termname -> computed result.
"""
def get_loader(c):
raise AssertionError("run_graph() should not require any loaders!")
engine = SimplePipelineEngine(
get_loader,
self.asset_finder,
default_domain=US_EQUITIES,
)
if mask is None:
mask = self.default_asset_exists_mask
dates, sids, mask_values = explode(mask)
initial_workspace.setdefault(AssetExists(), mask_values)
initial_workspace.setdefault(InputDates(), dates)
return engine.compute_chunk(
graph=graph,
dates=dates,
sids=sids,
initial_workspace=initial_workspace,
)
开发者ID:barrygolden,项目名称:zipline,代码行数:43,代码来源:base.py
示例17: test_fail_usefully_on_insufficient_data
def test_fail_usefully_on_insufficient_data(self):
loader = self.loader
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
class SomeFactor(CustomFactor):
inputs = [USEquityPricing.close]
window_length = 10
def compute(self, today, assets, out, closes):
pass
p = Pipeline(columns={'t': SomeFactor()})
# self.dates[9] is the earliest date we should be able to compute.
engine.run_pipeline(p, self.dates[9], self.dates[9])
# We shouldn't be able to compute dates[8], since we only know about 8
# prior dates, and we need a window length of 10.
with self.assertRaises(NoFurtherDataError):
engine.run_pipeline(p, self.dates[8], self.dates[8])
开发者ID:AtwooTM,项目名称:zipline,代码行数:22,代码来源:test_engine.py
示例18: test_screen
def test_screen(self):
loader = self.loader
finder = self.asset_finder
asset_ids = array(self.asset_ids)
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
num_dates = 5
dates = self.dates[10 : 10 + num_dates]
factor = AssetID()
for asset_id in asset_ids:
p = Pipeline(columns={"f": factor}, screen=factor <= asset_id)
result = engine.run_pipeline(p, dates[0], dates[-1])
expected_sids = asset_ids[asset_ids <= asset_id]
expected_assets = finder.retrieve_all(expected_sids)
expected_result = DataFrame(
index=MultiIndex.from_product([dates, expected_assets]),
data=tile(expected_sids.astype(float), [len(dates)]),
columns=["f"],
)
assert_frame_equal(result, expected_result)
开发者ID:RoyHsiao,项目名称:zipline,代码行数:22,代码来源:test_engine.py
示例19: test_input_dates_provided_by_default
def test_input_dates_provided_by_default(self):
loader = self.loader
engine = SimplePipelineEngine(
lambda column: loader, self.dates, self.asset_finder,
)
class TestFactor(CustomFactor):
inputs = [InputDates(), USEquityPricing.close]
window_length = 10
dtype = datetime64ns_dtype
def compute(self, today, assets, out, dates, closes):
first, last = dates[[0, -1], 0]
assert last == today.asm8
assert len(dates) == len(closes) == self.window_length
out[:] = first
p = Pipeline(columns={'t': TestFactor()})
results = engine.run_pipeline(p, self.dates[9], self.dates[10])
# All results are the same, so just grab one column.
column = results.unstack().iloc[:, 0].values
check_arrays(column, self.dates[:2].values)
开发者ID:AtwooTM,项目名称:zipline,代码行数:23,代码来源:test_engine.py
示例20: test_multiple_rolling_factors
def test_multiple_rolling_factors(self):
loader = self.loader
assets = self.assets
engine = SimplePipelineEngine(lambda column: loader, self.dates, self.asset_finder)
shape = num_dates, num_assets = (5, len(assets))
dates = self.dates[10 : 10 + num_dates]
short_factor = RollingSumDifference(window_length=3)
long_factor = RollingSumDifference(window_length=5)
high_factor = RollingSumDifference(window_length=3, inputs=[USEquityPricing.open, USEquityPricing.high])
pipeline = Pipeline(columns={"short": short_factor, "long": long_factor, "high": high_factor})
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
self.assertEqual(set(results.columns), {"short", "high", "long"})
assert_multi_index_is_product(self, results.index, dates, assets)
# row-wise sum over an array whose values are all (1 - 2)
check_arrays(results["short"].unstack().values, full(shape, -short_factor.window_length, dtype=float))
check_arrays(results["long"].unstack().values, full(shape, -long_factor.window_length, dtype=float))
# row-wise sum over an array whose values are all (1 - 3)
check_arrays(results["high"].unstack().values, full(shape, -2 * high_factor.window_length, dtype=float))
开发者ID:RoyHsiao,项目名称:zipline,代码行数:23,代码来源:test_engine.py
注:本文中的zipline.pipeline.engine.SimplePipelineEngine类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论