本文整理汇总了Python中zipline.sources.SpecificEquityTrades类的典型用法代码示例。如果您正苦于以下问题:Python SpecificEquityTrades类的具体用法?Python SpecificEquityTrades怎么用?Python SpecificEquityTrades使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SpecificEquityTrades类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_single_source
def test_single_source(self):
# Just using the built-in defaults. See
# zipline.sources.py
source = SpecificEquityTrades()
expected = list(source)
source.rewind()
# The raw source doesn't handle done messaging, so we need to
# append a done message for sort to work properly.
with_done = chain(source, [done_message(source.get_hash())])
self.run_date_sort(with_done, expected, [source.get_hash()])
开发者ID:Jeff-Lewis,项目名称:zipline,代码行数:11,代码来源:test_sorting.py
示例2: TestAccountControls
class TestAccountControls(TestCase):
def setUp(self):
self.sim_params = factory.create_simulation_parameters(num_days=4)
self.sid = 133
self.trade_history = factory.create_trade_history(
self.sid,
[10.0, 10.0, 11.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.sim_params
)
self.source = SpecificEquityTrades(event_list=self.trade_history)
def _check_algo(self,
algo,
handle_data,
expected_exc):
algo._handle_data = handle_data
with self.assertRaises(expected_exc) if expected_exc else nullctx():
algo.run(self.source)
self.source.rewind()
def check_algo_succeeds(self, algo, handle_data):
# Default for order_count assumes one order per handle_data call.
self._check_algo(algo, handle_data, None)
def check_algo_fails(self, algo, handle_data):
self._check_algo(algo,
handle_data,
AccountControlViolation)
def test_set_max_leverage(self):
# Set max leverage to 0 so buying one share fails.
def handle_data(algo, data):
algo.order(self.sid, 1)
algo = SetMaxLeverageAlgorithm(0)
self.check_algo_fails(algo, handle_data)
# Set max leverage to 1 so buying one share passes
def handle_data(algo, data):
algo.order(self.sid, 1)
algo = SetMaxLeverageAlgorithm(1)
self.check_algo_succeeds(algo, handle_data)
开发者ID:acycliq,项目名称:zipline,代码行数:49,代码来源:test_algorithm.py
示例3: test_set_max_order_count
def test_set_max_order_count(self):
# Override the default setUp to use six-hour intervals instead of full
# days so we can exercise trading-session rollover logic.
trade_history = factory.create_trade_history(
self.sid,
[10.0, 10.0, 11.0, 11.0],
[100, 100, 100, 300],
timedelta(hours=6),
self.sim_params
)
self.source = SpecificEquityTrades(event_list=trade_history)
def handle_data(algo, data):
for i in range(5):
algo.order(self.sid, 1)
algo.order_count += 1
algo = SetMaxOrderCountAlgorithm(3)
self.check_algo_fails(algo, handle_data, 3)
# Second call to handle_data is the same day as the first, so the last
# order of the second call should fail.
algo = SetMaxOrderCountAlgorithm(9)
self.check_algo_fails(algo, handle_data, 9)
# Only ten orders are placed per day, so this should pass even though
# in total more than 20 orders are placed.
algo = SetMaxOrderCountAlgorithm(10)
self.check_algo_succeeds(algo, handle_data, order_count=20)
开发者ID:erain,项目名称:zipline,代码行数:30,代码来源:test_algorithm.py
示例4: setUp
def setUp(self):
self.sim_params = factory.create_simulation_parameters(num_days=4)
self.sid = 133
self.trade_history = factory.create_trade_history(
self.sid,
[10.0, 10.0, 11.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.sim_params
)
self.source = SpecificEquityTrades(event_list=self.trade_history)
开发者ID:erain,项目名称:zipline,代码行数:12,代码来源:test_algorithm.py
示例5: test_sort_composite
def test_sort_composite(self):
filter = [1, 2]
#Set up source a. One hour between events.
args_a = tuple()
kwargs_a = {
'count': 100,
'sids': [1],
'start': datetime(2012, 6, 6, 0),
'delta': timedelta(hours=1),
'filter': filter
}
source_a = SpecificEquityTrades(*args_a, **kwargs_a)
#Set up source b. One day between events.
args_b = tuple()
kwargs_b = {
'count': 50,
'sids': [2],
'start': datetime(2012, 6, 6, 0),
'delta': timedelta(days=1),
'filter': filter
}
source_b = SpecificEquityTrades(*args_b, **kwargs_b)
#Set up source c. One minute between events.
args_c = tuple()
kwargs_c = {
'count': 150,
'sids': [1, 2],
'start': datetime(2012, 6, 6, 0),
'delta': timedelta(minutes=1),
'filter': filter
}
source_c = SpecificEquityTrades(*args_c, **kwargs_c)
# Set up source d. This should produce no events because the
# internal sids don't match the filter.
args_d = tuple()
kwargs_d = {
'count': 50,
'sids': [3],
'start': datetime(2012, 6, 6, 0),
'delta': timedelta(minutes=1),
'filter': filter
}
source_d = SpecificEquityTrades(*args_d, **kwargs_d)
sources = [source_a, source_b, source_c, source_d]
hashes = [source.get_hash() for source in sources]
sort_out = date_sorted_sources(*sources)
# Read all the values from sort and assert that they arrive in
# the correct sorting with the expected hash values.
to_list = list(sort_out)
copy = to_list[:]
# We should have 300 events (100 from a, 150 from b, 50 from c)
assert len(to_list) == 300
for e in to_list:
# All events should match one of our expected source_ids.
assert e.source_id in hashes
# But none of them should match source_d.
assert e.source_id != source_d.get_hash()
# The events should be sorted by dt, with source_id as tiebreaker.
expected = sorted(copy, comp)
assert to_list == expected
开发者ID:Jeff-Lewis,项目名称:zipline,代码行数:70,代码来源:test_sorting.py
示例6: test_multi_source
def test_multi_source(self):
filter = [2, 3]
args_a = tuple()
kwargs_a = {
'count': 100,
'sids': [1, 2, 3],
'start': datetime(2012, 1, 3, 15, tzinfo=pytz.utc),
'delta': timedelta(minutes=6),
'filter': filter
}
source_a = SpecificEquityTrades(*args_a, **kwargs_a)
args_b = tuple()
kwargs_b = {
'count': 100,
'sids': [2, 3, 4],
'start': datetime(2012, 1, 3, 15, tzinfo=pytz.utc),
'delta': timedelta(minutes=5),
'filter': filter
}
source_b = SpecificEquityTrades(*args_b, **kwargs_b)
all_events = list(chain(source_a, source_b))
# The expected output is all events, sorted by dt with
# source_id as a tiebreaker.
expected = sorted(all_events, comp)
source_ids = [source_a.get_hash(), source_b.get_hash()]
# Generating the events list consumes the sources. Rewind them
# for testing.
source_a.rewind()
source_b.rewind()
# Append a done message to each source.
with_done_a = chain(source_a, [done_message(source_a.get_hash())])
with_done_b = chain(source_b, [done_message(source_b.get_hash())])
interleaved = alternate(with_done_a, with_done_b)
# Test sort with alternating messages from source_a and
# source_b.
self.run_date_sort(interleaved, expected, source_ids)
source_a.rewind()
source_b.rewind()
with_done_a = chain(source_a, [done_message(source_a.get_hash())])
with_done_b = chain(source_b, [done_message(source_b.get_hash())])
sequential = chain(with_done_a, with_done_b)
# Test sort with all messages from a, followed by all messages
# from b.
self.run_date_sort(sequential, expected, source_ids)
开发者ID:Jeff-Lewis,项目名称:zipline,代码行数:56,代码来源:test_sorting.py
示例7: TestTradingControls
class TestTradingControls(TestCase):
def setUp(self):
self.sim_params = factory.create_simulation_parameters(num_days=4)
self.sid = 133
self.trade_history = factory.create_trade_history(
self.sid,
[10.0, 10.0, 11.0, 11.0],
[100, 100, 100, 300],
timedelta(days=1),
self.sim_params
)
self.source = SpecificEquityTrades(event_list=self.trade_history)
def _check_algo(self,
algo,
handle_data,
expected_order_count,
expected_exc):
algo._handle_data = handle_data
with self.assertRaises(expected_exc) if expected_exc else nullctx():
algo.run(self.source)
self.assertEqual(algo.order_count, expected_order_count)
self.source.rewind()
def check_algo_succeeds(self, algo, handle_data, order_count=4):
# Default for order_count assumes one order per handle_data call.
self._check_algo(algo, handle_data, order_count, None)
def check_algo_fails(self, algo, handle_data, order_count):
self._check_algo(algo,
handle_data,
order_count,
TradingControlViolation)
def test_set_max_position_size(self):
# Buy one share four times. Should be fine.
def handle_data(algo, data):
algo.order(self.sid, 1)
algo.order_count += 1
algo = SetMaxPositionSizeAlgorithm(sid=self.sid,
max_shares=10,
max_notional=500.0)
self.check_algo_succeeds(algo, handle_data)
# Buy three shares four times. Should bail on the fourth before it's
# placed.
def handle_data(algo, data):
algo.order(self.sid, 3)
algo.order_count += 1
algo = SetMaxPositionSizeAlgorithm(sid=self.sid,
max_shares=10,
max_notional=500.0)
self.check_algo_fails(algo, handle_data, 3)
# Buy two shares four times. Should bail due to max_notional on the
# third attempt.
def handle_data(algo, data):
algo.order(self.sid, 3)
algo.order_count += 1
algo = SetMaxPositionSizeAlgorithm(sid=self.sid,
max_shares=10,
max_notional=61.0)
self.check_algo_fails(algo, handle_data, 2)
# Set the trading control to a different sid, then BUY ALL THE THINGS!.
# Should continue normally.
def handle_data(algo, data):
algo.order(self.sid, 10000)
algo.order_count += 1
algo = SetMaxPositionSizeAlgorithm(sid=self.sid + 1,
max_shares=10,
max_notional=61.0)
self.check_algo_succeeds(algo, handle_data)
# Set the trading control sid to None, then BUY ALL THE THINGS!. Should
# fail because setting sid to None makes the control apply to all sids.
def handle_data(algo, data):
algo.order(self.sid, 10000)
algo.order_count += 1
algo = SetMaxPositionSizeAlgorithm(max_shares=10, max_notional=61.0)
self.check_algo_fails(algo, handle_data, 0)
def test_set_max_order_size(self):
# Buy one share.
def handle_data(algo, data):
algo.order(self.sid, 1)
algo.order_count += 1
algo = SetMaxOrderSizeAlgorithm(sid=self.sid,
max_shares=10,
max_notional=500.0)
self.check_algo_succeeds(algo, handle_data)
# Buy 1, then 2, then 3, then 4 shares. Bail on the last attempt
#.........这里部分代码省略.........
开发者ID:erain,项目名称:zipline,代码行数:101,代码来源:test_algorithm.py
注:本文中的zipline.sources.SpecificEquityTrades类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论