本文整理汇总了Python中zipline.pipeline.Pipeline类的典型用法代码示例。如果您正苦于以下问题:Python Pipeline类的具体用法?Python Pipeline怎么用?Python Pipeline使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Pipeline类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _run_pipeline
def _run_pipeline(
self, expr, deltas, expected_views, expected_output, finder, calendar, start, end, window_length, compute_fn
):
loader = BlazeLoader()
ds = from_blaze(
expr, deltas, loader=loader, no_deltas_rule=no_deltas_rules.raise_, missing_values=self.missing_values
)
p = Pipeline()
# prevent unbound locals issue in the inner class
window_length_ = window_length
class TestFactor(CustomFactor):
inputs = (ds.value,)
window_length = window_length_
def compute(self, today, assets, out, data):
assert_array_almost_equal(data, expected_views[today])
out[:] = compute_fn(data)
p.add(TestFactor(), "value")
result = SimplePipelineEngine(loader, calendar, finder).run_pipeline(p, start, end)
assert_frame_equal(result, _utc_localize_index_level_0(expected_output), check_dtype=False)
开发者ID:easysg,项目名称:zipline,代码行数:25,代码来源:test_blaze.py
示例2: test_id
def test_id(self):
expr = bz.Data(self.df, name='expr', dshape=self.dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
dates = self.dates
with tmp_asset_finder() as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])
expected = self.df.drop('asof_date', axis=1).set_index(
['timestamp', 'sid'],
)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
assert_frame_equal(result, expected, check_dtype=False)
开发者ID:larssonandreas,项目名称:zipline,代码行数:27,代码来源:test_blaze.py
示例3: test_custom_query_time_tz
def test_custom_query_time_tz(self):
df = self.df.copy()
df["timestamp"] = (
(pd.DatetimeIndex(df["timestamp"], tz="EST") + timedelta(hours=8, minutes=44))
.tz_convert("utc")
.tz_localize(None)
)
df.ix[3:5, "timestamp"] = pd.Timestamp("2014-01-01 13:45")
expr = bz.Data(df, name="expr", dshape=self.dshape)
loader = BlazeLoader(data_query_time=time(8, 45), data_query_tz="EST")
ds = from_blaze(expr, loader=loader, no_deltas_rule=no_deltas_rules.ignore, missing_values=self.missing_values)
p = Pipeline()
p.add(ds.value.latest, "value")
p.add(ds.int_value.latest, "int_value")
dates = self.dates
with tmp_asset_finder() as finder:
result = SimplePipelineEngine(loader, dates, finder).run_pipeline(p, dates[0], dates[-1])
expected = df.drop("asof_date", axis=1)
expected["timestamp"] = expected["timestamp"].dt.normalize().astype("datetime64[ns]").dt.tz_localize("utc")
expected.ix[3:5, "timestamp"] += timedelta(days=1)
expected.set_index(["timestamp", "sid"], inplace=True)
expected.index = pd.MultiIndex.from_product(
(expected.index.levels[0], finder.retrieve_all(expected.index.levels[1]))
)
assert_frame_equal(result, expected, check_dtype=False)
开发者ID:easysg,项目名称:zipline,代码行数:27,代码来源:test_blaze.py
示例4: test_id_macro_dataset
def test_id_macro_dataset(self):
expr = bz.Data(self.macro_df, name='expr', dshape=self.macro_dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
dates = self.dates
asset_info = asset_infos[0][0]
with tmp_asset_finder(equities=asset_info) as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])
nassets = len(asset_info)
expected = pd.DataFrame(
list(concatv([0] * nassets, [1] * nassets, [2] * nassets)),
index=pd.MultiIndex.from_product((
self.macro_df.timestamp,
finder.retrieve_all(asset_info.index),
)),
columns=('value',),
)
assert_frame_equal(result, expected, check_dtype=False)
开发者ID:larssonandreas,项目名称:zipline,代码行数:30,代码来源:test_blaze.py
示例5: _test_id
def _test_id(self, df, dshape, expected, finder, add):
expr = bz.data(df, name='expr', dshape=dshape)
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
p = Pipeline()
for a in add:
p.add(getattr(ds, a).latest, a)
dates = self.dates
with tmp_asset_finder() as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])
assert_frame_equal(
result,
_utc_localize_index_level_0(expected),
check_dtype=False,
)
开发者ID:280185386,项目名称:zipline,代码行数:26,代码来源:test_blaze.py
示例6: test_id_macro_dataset_multiple_columns
def test_id_macro_dataset_multiple_columns(self):
"""
input (df):
asof_date timestamp other value
0 2014-01-01 2014-01-01 1 0
3 2014-01-02 2014-01-02 2 1
6 2014-01-03 2014-01-03 3 2
output (expected):
other value
2014-01-01 Equity(65 [A]) 1 0
Equity(66 [B]) 1 0
Equity(67 [C]) 1 0
2014-01-02 Equity(65 [A]) 2 1
Equity(66 [B]) 2 1
Equity(67 [C]) 2 1
2014-01-03 Equity(65 [A]) 3 2
Equity(66 [B]) 3 2
Equity(67 [C]) 3 2
"""
df = self.macro_df.copy()
df['other'] = df.value + 1
fields = OrderedDict(self.macro_dshape.measure.fields)
fields['other'] = fields['value']
expr = bz.Data(df, name='expr', dshape=var * Record(fields))
loader = BlazeLoader()
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.other.latest, 'other')
dates = self.dates
asset_info = asset_infos[0][0]
with tmp_asset_finder(equities=asset_info) as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])
expected = pd.DataFrame(
np.array([[0, 1],
[1, 2],
[2, 3]]).repeat(3, axis=0),
index=pd.MultiIndex.from_product((
df.timestamp,
finder.retrieve_all(asset_info.index),
)),
columns=('value', 'other'),
).sort_index(axis=1)
assert_frame_equal(
result,
expected.sort_index(axis=1),
check_dtype=False,
)
开发者ID:Jicheng-Yan,项目名称:zipline,代码行数:59,代码来源:test_blaze.py
示例7: test_factor_with_multiple_outputs
def test_factor_with_multiple_outputs(self):
dates = self.dates[5:10]
assets = self.assets
asset_ids = self.asset_ids
constants = self.constants
num_dates = len(dates)
num_assets = len(assets)
open = USEquityPricing.open
close = USEquityPricing.close
engine = SimplePipelineEngine(
lambda column: self.loader, self.dates, self.asset_finder,
)
def create_expected_results(expected_value, mask):
expected_values = where(mask, expected_value, nan)
return DataFrame(expected_values, index=dates, columns=assets)
cascading_mask = AssetIDPlusDay() < (asset_ids[-1] + dates[0].day)
expected_cascading_mask_result = make_cascading_boolean_array(
shape=(num_dates, num_assets),
)
alternating_mask = (AssetIDPlusDay() % 2).eq(0)
expected_alternating_mask_result = make_alternating_boolean_array(
shape=(num_dates, num_assets), first_value=False,
)
expected_no_mask_result = full(
shape=(num_dates, num_assets), fill_value=True, dtype=bool_dtype,
)
masks = cascading_mask, alternating_mask, NotSpecified
expected_mask_results = (
expected_cascading_mask_result,
expected_alternating_mask_result,
expected_no_mask_result,
)
for mask, expected_mask in zip(masks, expected_mask_results):
open_price, close_price = MultipleOutputs(mask=mask)
pipeline = Pipeline(
columns={'open_price': open_price, 'close_price': close_price},
)
if mask is not NotSpecified:
pipeline.add(mask, 'mask')
results = engine.run_pipeline(pipeline, dates[0], dates[-1])
for colname, case_column in (('open_price', open),
('close_price', close)):
if mask is not NotSpecified:
mask_results = results['mask'].unstack()
check_arrays(mask_results.values, expected_mask)
output_results = results[colname].unstack()
output_expected = create_expected_results(
constants[case_column], expected_mask,
)
assert_frame_equal(output_results, output_expected)
开发者ID:AtwooTM,项目名称:zipline,代码行数:56,代码来源:test_engine.py
示例8: initialize
def initialize(context):
# Create, register and name a pipeline in initialize.
pipe = Pipeline()
attach_pipeline(pipe, 'example')
# Construct a simple moving average factor and add it to the pipeline.
sma_short = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=10)
pipe.add(sma_short, 'sma_short')
# Set a screen on the pipelines to filter out securities.
pipe.set_screen(sma_short > 1.0)
开发者ID:mequanta,项目名称:z-runner,代码行数:12,代码来源:using_pipelines.py
示例9: test_conflict_between_outputs
def test_conflict_between_outputs(self):
class D(DataSet):
c = Column(float)
D_US = D.specialize(US_EQUITIES)
D_CA = D.specialize(CA_EQUITIES)
pipe = Pipeline({"f": D_US.c.latest, "g": D_CA.c.latest})
with self.assertRaises(AmbiguousDomain) as e:
pipe.domain(default=GENERIC)
self.assertEqual(e.exception.domains, [CA_EQUITIES, US_EQUITIES])
开发者ID:barrygolden,项目名称:zipline,代码行数:12,代码来源:test_pipeline.py
示例10: test_adding_slice_column
def test_adding_slice_column(self):
"""
Test that slices cannot be added as a pipeline column.
"""
my_asset = self.asset_finder.retrieve_asset(self.sids[0])
open_slice = OpenPrice()[my_asset]
with self.assertRaises(UnsupportedPipelineOutput):
Pipeline(columns={'open_slice': open_slice})
pipe = Pipeline(columns={})
with self.assertRaises(UnsupportedPipelineOutput):
pipe.add(open_slice, 'open_slice')
开发者ID:4ever911,项目名称:zipline,代码行数:13,代码来源:test_slice.py
示例11: test_mean_reversion_5day_sector_neutral_smoothed
def test_mean_reversion_5day_sector_neutral_smoothed(fn):
column_name = 'Mean_Reversion_5Day_Sector_Neutral_Smoothed'
start_date_str = '2015-01-05'
end_date_str = '2015-01-07'
# Build engine
trading_calendar = get_calendar('NYSE')
bundle_data = bundles.load(project_helper.EOD_BUNDLE_NAME)
engine = project_helper.build_pipeline_engine(bundle_data, trading_calendar)
# Build pipeline
universe_window_length = 2
universe_asset_count = 4
universe = AverageDollarVolume(window_length=universe_window_length).top(universe_asset_count)
pipeline = Pipeline(screen=universe)
run_pipeline_args = {
'pipeline': pipeline,
'start_date': pd.Timestamp(start_date_str, tz='utc'),
'end_date': pd.Timestamp(end_date_str, tz='utc')}
fn_inputs = {
'window_length': 3,
'universe': universe,
'sector': project_helper.Sector()}
fn_correct_outputs = OrderedDict([
(
'pipline_out', pd.DataFrame(
[0.44721360, 1.34164079, -1.34164079, -0.44721360,
1.34164079, 0.44721360, -1.34164079, -0.44721360,
0.44721360, 1.34164079, -1.34164079, -0.44721360],
engine.run_pipeline(**run_pipeline_args).index,
[column_name]))])
print('Running Integration Test on pipeline:')
print('> start_dat = pd.Timestamp(\'{}\', tz=\'utc\')'.format(start_date_str))
print('> end_date = pd.Timestamp(\'{}\', tz=\'utc\')'.format(end_date_str))
print('> universe = AverageDollarVolume(window_length={}).top({})'.format(
universe_window_length, universe_asset_count))
print('> factor = {}('.format(fn.__name__))
print(' window_length={},'.format(fn_inputs['window_length']))
print(' universe=universe,')
print(' sector=project_helper.Sector())')
print('> pipeline.add(factor, \'{}\')'.format(column_name))
print('> engine.run_pipeline(pipeline, start_dat, end_date)')
print('')
pipeline.add(fn(**fn_inputs), column_name)
assert_output(engine.run_pipeline, run_pipeline_args, fn_correct_outputs, check_parameter_changes=False)
开发者ID:lRuix,项目名称:ai_for_trading_nanodegree_alpha_research_multi_factor_modeling_project,代码行数:48,代码来源:project_tests.py
示例12: initialize
def initialize(context):
# Create, register and name a pipeline in initialize.
pipe = Pipeline()
context.attach_pipeline(pipe, 'AAPL')
# Construct a simple moving average factor and add it to the pipeline.
USEquityPricing需要本地自定义
if True:
sma_short = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=10)
else:#mid added
data = Column(float64)
dataset = DataSet()
close = data.bind(dataset, 'aapl')
sma_short = SimpleMovingAverage(inputs=[close], window_length=10)
pipe.add(sma_short, 'sma_short')
开发者ID:UpSea,项目名称:ZipLineMid,代码行数:17,代码来源:03_UsingPipelines.py
示例13: initialize
def initialize(context):
pipe = Pipeline()
attach_pipeline(pipe, 'example')
# Note that we don't call add_factor on these Factors.
# We don't need to store intermediate values if we're not going to use them
sma_short = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=30)
sma_long = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=100)
sma_val = sma_short/sma_long
# Construct the custom factor
mkt_cap = MarketCap()
# Create and apply a filter representing the top 500 equities by MarketCap
# every day.
mkt_cap_top_500 = mkt_cap.top(500)
remove_penny_stocks = sma_short > 1.0
pipe.add(sma_val, 'sma_val')
pipe.add(mkt_cap, 'mkt_cap')
# Use mkt_cap_top_500 as a mask on rank
pipe.add(sma_val.rank(mask=mkt_cap_top_500), 'sma_rank')
# Use multiple screens to narrow the universe
pipe.set_screen(mkt_cap.top(500) & remove_penny_stocks)
开发者ID:mequanta,项目名称:z-runner,代码行数:28,代码来源:creating_custom_factors.py
示例14: test_add
def test_add(self):
p = Pipeline()
f = SomeFactor()
p.add(f, 'f')
self.assertEqual(p.columns, {'f': f})
p.add(f > 5, 'g')
self.assertEqual(p.columns, {'f': f, 'g': f > 5})
with self.assertRaises(TypeError):
p.add(f, 1)
with self.assertRaises(TypeError):
p.add(USEquityPricing.open, 'open')
开发者ID:AtwooTM,项目名称:zipline,代码行数:15,代码来源:test_pipeline.py
示例15: test_add
def test_add(self):
p = Pipeline('test')
f = SomeFactor()
p.add(f, 'f')
self.assertEqual(p.columns, {'f': f})
p.add(f > 5, 'g')
self.assertEqual(p.columns, {'f': f, 'g': f > 5})
with self.assertRaises(TypeError):
p.add(f, 1)
开发者ID:icecube11,项目名称:zipline,代码行数:12,代码来源:test_pipeline.py
示例16: test_add
def test_add(self):
p = Pipeline()
f = SomeFactor()
p.add(f, "f")
self.assertEqual(p.columns, {"f": f})
p.add(f > 5, "g")
self.assertEqual(p.columns, {"f": f, "g": f > 5})
with self.assertRaises(TypeError):
p.add(f, 1)
开发者ID:testmana2,项目名称:zipline,代码行数:12,代码来源:test_pipeline.py
示例17: test_custom_query_time_tz
def test_custom_query_time_tz(self):
df = self.df.copy()
df['timestamp'] = (
pd.DatetimeIndex(df['timestamp'], tz='EST') +
timedelta(hours=8, minutes=44)
).tz_convert('utc').tz_localize(None)
df.ix[3:5, 'timestamp'] = pd.Timestamp('2014-01-01 13:45')
expr = bz.data(df, name='expr', dshape=self.dshape)
loader = BlazeLoader(data_query_time=time(8, 45), data_query_tz='EST')
ds = from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.ignore,
missing_values=self.missing_values,
)
p = Pipeline()
p.add(ds.value.latest, 'value')
p.add(ds.int_value.latest, 'int_value')
dates = self.dates
with tmp_asset_finder() as finder:
result = SimplePipelineEngine(
loader,
dates,
finder,
).run_pipeline(p, dates[0], dates[-1])
expected = df.drop('asof_date', axis=1)
expected['timestamp'] = expected['timestamp'].dt.normalize().astype(
'datetime64[ns]',
).dt.tz_localize('utc')
expected.ix[3:5, 'timestamp'] += timedelta(days=1)
expected.set_index(['timestamp', 'sid'], inplace=True)
expected.index = pd.MultiIndex.from_product((
expected.index.levels[0],
finder.retrieve_all(expected.index.levels[1]),
))
assert_frame_equal(result, expected, check_dtype=False)
开发者ID:280185386,项目名称:zipline,代码行数:38,代码来源:test_blaze.py
示例18: test_set_screen
def test_set_screen(self):
f, g = SomeFilter(), SomeOtherFilter()
p = Pipeline()
self.assertEqual(p.screen, None)
p.set_screen(f)
self.assertEqual(p.screen, f)
with self.assertRaises(ValueError):
p.set_screen(f)
p.set_screen(g, overwrite=True)
self.assertEqual(p.screen, g)
with self.assertRaises(TypeError) as e:
p.set_screen(f, g)
message = e.exception.args[0]
self.assertIn("expected a value of type bool or int for argument 'overwrite'", message)
开发者ID:testmana2,项目名称:zipline,代码行数:20,代码来源:test_pipeline.py
示例19: test_set_screen
def test_set_screen(self):
f, g = SomeFilter(), SomeOtherFilter()
p = Pipeline()
self.assertEqual(p.screen, None)
p.set_screen(f)
self.assertEqual(p.screen, f)
with self.assertRaises(ValueError):
p.set_screen(f)
p.set_screen(g, overwrite=True)
self.assertEqual(p.screen, g)
开发者ID:kczxl,项目名称:zipline,代码行数:14,代码来源:test_pipeline.py
示例20: initialize
def initialize(context):
pipeline = Pipeline()
context.vwaps = []
for length in vwaps:
name = vwap_key(length)
factor = VWAP(window_length=length)
context.vwaps.append(factor)
pipeline.add(factor, name=name)
filter_ = USEquityPricing.close.latest > 300
pipeline.add(filter_, "filter")
if set_screen:
pipeline.set_screen(filter_)
attach_pipeline(pipeline, "test")
开发者ID:RoyHsiao,项目名称:zipline,代码行数:15,代码来源:test_pipeline_algo.py
注:本文中的zipline.pipeline.Pipeline类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论