def _create_generator(self, sim_params, source_filter=None):
"""
Create a basic generator setup using the sources to this algorithm.
::source_filter:: is a method that receives events in date
sorted order, and returns True for those events that should be
processed by the zipline, and False for those that should be
skipped.
"""
if not self.initialized:
self.initialize(*self.initialize_args, **self.initialize_kwargs)
self.initialized = True
if self.perf_tracker is None:
# HACK: When running with the `run` method, we set perf_tracker to
# None so that it will be overwritten here.
self.perf_tracker = PerformanceTracker(
sim_params=sim_params, env=self.trading_environment
)
self.portfolio_needs_update = True
self.account_needs_update = True
self.performance_needs_update = True
self.data_gen = self._create_data_generator(source_filter, sim_params)
self.trading_client = AlgorithmSimulator(self, sim_params)
transact_method = transact_partial(self.slippage, self.commission)
self.set_transact(transact_method)
return self.trading_client.transform(self.data_gen)
开发者ID:qnu,项目名称:zipline,代码行数:33,代码来源:algorithm.py
示例2: _create_generator
def _create_generator(self, sim_params, source_filter=None):
"""
Create a basic generator setup using the sources and
transforms attached to this algorithm.
::source_filter:: is a method that receives events in date
sorted order, and returns True for those events that should be
processed by the zipline, and False for those that should be
skipped.
"""
sim_params.data_frequency = self.data_frequency
# perf_tracker will be instantiated in __init__ if a sim_params
# is passed to the constructor. If not, we instantiate here.
if self.perf_tracker is None:
self.perf_tracker = PerformanceTracker(sim_params)
self.data_gen = self._create_data_generator(source_filter,
sim_params)
self.trading_client = AlgorithmSimulator(self, sim_params)
transact_method = transact_partial(self.slippage, self.commission)
self.set_transact(transact_method)
return self.trading_client.transform(self.data_gen)
def __init__(self, algo, sim_params):
# ==============
# Simulation
# Param Setup
# ==============
self.sim_params = sim_params
# ==============
# Perf Tracker
# Setup
# ==============
self.perf_tracker = PerformanceTracker(self.sim_params)
self.perf_key = self.EMISSION_TO_PERF_KEY_MAP[
self.perf_tracker.emission_rate]
# ==============
# Algo Setup
# ==============
self.algo = algo
self.algo_start = self.sim_params.first_open
self.algo_start = self.algo_start.replace(hour=0, minute=0,
second=0,
microsecond=0)
# ==============
# Snapshot Setup
# ==============
# The algorithm's data as of our most recent event.
# We want an object that will have empty objects as default
# values on missing keys.
self.current_data = BarData()
# We don't have a datetime for the current snapshot until we
# receive a message.
self.simulation_dt = None
self.snapshot_dt = None
# =============
# Logging Setup
# =============
# Processor function for injecting the algo_dt into
# user prints/logs.
def inject_algo_dt(record):
if not 'algo_dt' in record.extra:
record.extra['algo_dt'] = self.snapshot_dt
self.processor = Processor(inject_algo_dt)
def _create_generator(self, sim_params, source_filter=None):
"""
Create a basic generator setup using the sources and
transforms attached to this algorithm.
::source_filter:: is a method that receives events in date
sorted order, and returns True for those events that should be
processed by the zipline, and False for those that should be
skipped.
"""
# Instantiate perf_tracker
self.perf_tracker = PerformanceTracker(sim_params)
self.portfolio_needs_update = True
self.data_gen = self._create_data_generator(source_filter, sim_params)
self.trading_client = AlgorithmSimulator(self, sim_params)
transact_method = transact_partial(self.slippage, self.commission)
self.set_transact(transact_method)
return self.trading_client.transform(self.data_gen)
def __init__(self, *args, **kwargs):
"""Initialize sids and other state variables.
:Arguments:
:Optional:
initialize : function
Function that is called with a single
argument at the begninning of the simulation.
handle_data : function
Function that is called with 2 arguments
(context and data) on every bar.
script : str
Algoscript that contains initialize and
handle_data function definition.
data_frequency : {'daily', 'minute'}
The duration of the bars.
capital_base : float <default: 1.0e5>
How much capital to start with.
instant_fill : bool <default: False>
Whether to fill orders immediately or on next bar.
asset_finder : An AssetFinder object
A new AssetFinder object to be used in this TradingEnvironment
equities_metadata : can be either:
- dict
- pandas.DataFrame
- object with 'read' property
If dict is provided, it must have the following structure:
* keys are the identifiers
* values are dicts containing the metadata, with the metadata
field name as the key
If pandas.DataFrame is provided, it must have the
following structure:
* column names must be the metadata fields
* index must be the different asset identifiers
* array contents should be the metadata value
If an object with a 'read' property is provided, 'read' must
return rows containing at least one of 'sid' or 'symbol' along
with the other metadata fields.
identifiers : List
Any asset identifiers that are not provided in the
equities_metadata, but will be traded by this TradingAlgorithm
"""
self.sources = []
# List of trading controls to be used to validate orders.
self.trading_controls = []
# List of account controls to be checked on each bar.
self.account_controls = []
self._recorded_vars = {}
self.namespace = kwargs.pop('namespace', {})
self._platform = kwargs.pop('platform', 'zipline')
self.logger = None
self.benchmark_return_source = None
# default components for transact
self.slippage = VolumeShareSlippage()
self.commission = PerShare()
self.instant_fill = kwargs.pop('instant_fill', False)
# If an env has been provided, pop it
self.trading_environment = kwargs.pop('env', None)
if self.trading_environment is None:
self.trading_environment = TradingEnvironment()
# Update the TradingEnvironment with the provided asset metadata
self.trading_environment.write_data(
equities_data=kwargs.pop('equities_metadata', {}),
equities_identifiers=kwargs.pop('identifiers', []),
futures_data=kwargs.pop('futures_metadata', {}),
)
# set the capital base
self.capital_base = kwargs.pop('capital_base', DEFAULT_CAPITAL_BASE)
self.sim_params = kwargs.pop('sim_params', None)
if self.sim_params is None:
self.sim_params = create_simulation_parameters(
capital_base=self.capital_base,
start=kwargs.pop('start', None),
end=kwargs.pop('end', None),
env=self.trading_environment,
)
else:
self.sim_params.update_internal_from_env(self.trading_environment)
# Build a perf_tracker
self.perf_tracker = PerformanceTracker(sim_params=self.sim_params,
env=self.trading_environment)
# Pull in the environment's new AssetFinder for quick reference
self.asset_finder = self.trading_environment.asset_finder
self.init_engine(kwargs.pop('ffc_loader', None))
# Maps from name to Term
#.........这里部分代码省略.........
开发者ID:qnu,项目名称:zipline,代码行数:101,代码来源:algorithm.py
示例9: TradingAlgorithm
#.........这里部分代码省略.........
# default components for transact
self.slippage = VolumeShareSlippage()
self.commission = PerShare()
self.instant_fill = kwargs.pop('instant_fill', False)
# If an env has been provided, pop it
self.trading_environment = kwargs.pop('env', None)
if self.trading_environment is None:
self.trading_environment = TradingEnvironment()
# Update the TradingEnvironment with the provided asset metadata
self.trading_environment.write_data(
equities_data=kwargs.pop('equities_metadata', {}),
equities_identifiers=kwargs.pop('identifiers', []),
futures_data=kwargs.pop('futures_metadata', {}),
)
# set the capital base
self.capital_base = kwargs.pop('capital_base', DEFAULT_CAPITAL_BASE)
self.sim_params = kwargs.pop('sim_params', None)
if self.sim_params is None:
self.sim_params = create_simulation_parameters(
capital_base=self.capital_base,
start=kwargs.pop('start', None),
end=kwargs.pop('end', None),
env=self.trading_environment,
)
else:
self.sim_params.update_internal_from_env(self.trading_environment)
# Build a perf_tracker
self.perf_tracker = PerformanceTracker(sim_params=self.sim_params,
env=self.trading_environment)
# Pull in the environment's new AssetFinder for quick reference
self.asset_finder = self.trading_environment.asset_finder
self.init_engine(kwargs.pop('ffc_loader', None))
# Maps from name to Term
self._filters = {}
self._factors = {}
self._classifiers = {}
self.blotter = kwargs.pop('blotter', None)
if not self.blotter:
self.blotter = Blotter()
# Set the dt initally to the period start by forcing it to change
self.on_dt_changed(self.sim_params.period_start)
# The symbol lookup date specifies the date to use when resolving
# symbols to sids, and can be set using set_symbol_lookup_date()
self._symbol_lookup_date = None
self.portfolio_needs_update = True
self.account_needs_update = True
self.performance_needs_update = True
self._portfolio = None
self._account = None
self.history_container_class = kwargs.pop(
'history_container_class', HistoryContainer,
)
self.history_container = None
开发者ID:qnu,项目名称:zipline,代码行数:67,代码来源:algorithm.py
示例10: transaction_sim
def transaction_sim(self, **params):
""" This is a utility method that asserts expected
results for conversion of orders to transactions given a
trade history"""
trade_count = params['trade_count']
trade_interval = params['trade_interval']
order_count = params['order_count']
order_amount = params['order_amount']
order_interval = params['order_interval']
expected_txn_count = params['expected_txn_count']
expected_txn_volume = params['expected_txn_volume']
# optional parameters
# ---------------------
# if present, alternate between long and short sales
alternate = params.get('alternate')
# if present, expect transaction amounts to match orders exactly.
complete_fill = params.get('complete_fill')
sid = 1
sim_params = factory.create_simulation_parameters()
blotter = Blotter()
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = sim_params.first_open
generated_trades = factory.create_trade_history(
sid,
price,
volume,
trade_interval,
sim_params,
env=self.env,
)
if alternate:
alternator = -1
else:
alternator = 1
order_date = start_date
for i in range(order_count):
blotter.set_date(order_date)
blotter.order(sid, order_amount * alternator ** i, MarketOrder())
order_date = order_date + order_interval
# move after market orders to just after market next
# market open.
if order_date.hour >= 21:
if order_date.minute >= 00:
order_date = order_date + timedelta(days=1)
order_date = order_date.replace(hour=14, minute=30)
# there should now be one open order list stored under the sid
oo = blotter.open_orders
self.assertEqual(len(oo), 1)
self.assertTrue(sid in oo)
order_list = oo[sid][:] # make copy
self.assertEqual(order_count, len(order_list))
for i in range(order_count):
order = order_list[i]
self.assertEqual(order.sid, sid)
self.assertEqual(order.amount, order_amount * alternator ** i)
tracker = PerformanceTracker(sim_params, env=self.env)
benchmark_returns = [
Event({'dt': dt,
'returns': ret,
'type':
zipline.protocol.DATASOURCE_TYPE.BENCHMARK,
'source_id': 'benchmarks'})
for dt, ret in self.env.benchmark_returns.iteritems()
if dt.date() >= sim_params.period_start.date() and
dt.date() <= sim_params.period_end.date()
]
generated_events = date_sorted_sources(generated_trades,
benchmark_returns)
# this approximates the loop inside TradingSimulationClient
transactions = []
for dt, events in itertools.groupby(generated_events,
operator.attrgetter('dt')):
for event in events:
if event.type == DATASOURCE_TYPE.TRADE:
for txn, order in blotter.process_trade(event):
transactions.append(txn)
tracker.process_transaction(txn)
elif event.type == DATASOURCE_TYPE.BENCHMARK:
tracker.process_benchmark(event)
elif event.type == DATASOURCE_TYPE.TRADE:
tracker.process_trade(event)
if complete_fill:
self.assertEqual(len(transactions), len(order_list))
#.........这里部分代码省略.........
class TradingAlgorithm(object):
"""
Base class for trading algorithms. Inherit and overload
initialize() and handle_data(data).
A new algorithm could look like this:
```
from zipline.api import order
def initialize(context):
context.sid = 'AAPL'
context.amount = 100
def handle_data(self, data):
sid = context.sid
amount = context.amount
order(sid, amount)
```
To then to run this algorithm pass these functions to
TradingAlgorithm:
my_algo = TradingAlgorithm(initialize, handle_data)
stats = my_algo.run(data)
"""
# If this is set to false then it is the responsibility
# of the overriding subclass to set initialized = true
AUTO_INITIALIZE = True
def __init__(self, *args, **kwargs):
"""Initialize sids and other state variables.
:Arguments:
:Optional:
initialize : function
Function that is called with a single
argument at the begninning of the simulation.
handle_data : function
Function that is called with 2 arguments
(context and data) on every bar.
script : str
Algoscript that contains initialize and
handle_data function definition.
data_frequency : str (daily, hourly or minutely)
The duration of the bars.
capital_base : float <default: 1.0e5>
How much capital to start with.
instant_fill : bool <default: False>
Whether to fill orders immediately or on next bar.
environment : str <default: 'zipline'>
The environment that this algorithm is running in.
"""
self.datetime = None
self.registered_transforms = {}
self.transforms = []
self.sources = []
# List of trading controls to be used to validate orders.
self.trading_controls = []
self._recorded_vars = {}
self.namespace = kwargs.get('namespace', {})
self._environment = kwargs.pop('environment', 'zipline')
self.logger = None
self.benchmark_return_source = None
# default components for transact
self.slippage = VolumeShareSlippage()
self.commission = PerShare()
self.instant_fill = kwargs.pop('instant_fill', False)
# set the capital base
self.capital_base = kwargs.pop('capital_base', DEFAULT_CAPITAL_BASE)
self.sim_params = kwargs.pop('sim_params', None)
if self.sim_params is None:
self.sim_params = create_simulation_parameters(
capital_base=self.capital_base
)
self.perf_tracker = PerformanceTracker(self.sim_params)
self.blotter = kwargs.pop('blotter', None)
if not self.blotter:
self.blotter = Blotter()
self.portfolio_needs_update = True
self.account_needs_update = True
self.performance_needs_update = True
self._portfolio = None
self._account = None
self.history_container = None
self.history_specs = {}
#.........这里部分代码省略.........
def __init__(self, *args, **kwargs):
"""Initialize sids and other state variables.
:Arguments:
:Optional:
initialize : function
Function that is called with a single
argument at the begninning of the simulation.
handle_data : function
Function that is called with 2 arguments
(context and data) on every bar.
script : str
Algoscript that contains initialize and
handle_data function definition.
data_frequency : str (daily, hourly or minutely)
The duration of the bars.
capital_base : float <default: 1.0e5>
How much capital to start with.
instant_fill : bool <default: False>
Whether to fill orders immediately or on next bar.
environment : str <default: 'zipline'>
The environment that this algorithm is running in.
"""
self.datetime = None
self.registered_transforms = {}
self.transforms = []
self.sources = []
# List of trading controls to be used to validate orders.
self.trading_controls = []
self._recorded_vars = {}
self.namespace = kwargs.get('namespace', {})
self._environment = kwargs.pop('environment', 'zipline')
self.logger = None
self.benchmark_return_source = None
# default components for transact
self.slippage = VolumeShareSlippage()
self.commission = PerShare()
self.instant_fill = kwargs.pop('instant_fill', False)
# set the capital base
self.capital_base = kwargs.pop('capital_base', DEFAULT_CAPITAL_BASE)
self.sim_params = kwargs.pop('sim_params', None)
if self.sim_params is None:
self.sim_params = create_simulation_parameters(
capital_base=self.capital_base
)
self.perf_tracker = PerformanceTracker(self.sim_params)
self.blotter = kwargs.pop('blotter', None)
if not self.blotter:
self.blotter = Blotter()
self.portfolio_needs_update = True
self.account_needs_update = True
self.performance_needs_update = True
self._portfolio = None
self._account = None
self.history_container = None
self.history_specs = {}
# If string is passed in, execute and get reference to
# functions.
self.algoscript = kwargs.pop('script', None)
self._initialize = None
self._before_trading_start = None
self._analyze = None
self.event_manager = EventManager()
if self.algoscript is not None:
exec_(self.algoscript, self.namespace)
self._initialize = self.namespace.get('initialize')
if 'handle_data' not in self.namespace:
raise ValueError('You must define a handle_data function.')
else:
self._handle_data = self.namespace['handle_data']
self._before_trading_start = \
self.namespace.get('before_trading_start')
# Optional analyze function, gets called after run
self._analyze = self.namespace.get('analyze')
elif kwargs.get('initialize') and kwargs.get('handle_data'):
if self.algoscript is not None:
raise ValueError('You can not set script and \
initialize/handle_data.')
self._initialize = kwargs.pop('initialize')
self._handle_data = kwargs.pop('handle_data')
self._before_trading_start = kwargs.pop('before_trading_start',
#.........这里部分代码省略.........
def __init__(self, *args, **kwargs):
"""Initialize sids and other state variables.
:Arguments:
:Optional:
initialize : function
Function that is called with a single
argument at the begninning of the simulation.
handle_data : function
Function that is called with 2 arguments
(context and data) on every bar.
script : str
Algoscript that contains initialize and
handle_data function definition.
data_frequency : str (daily, hourly or minutely)
The duration of the bars.
annualizer : int <optional>
Which constant to use for annualizing risk metrics.
If not provided, will extract from data_frequency.
capital_base : float <default: 1.0e5>
How much capital to start with.
instant_fill : bool <default: False>
Whether to fill orders immediately or on next bar.
"""
self.datetime = None
self.registered_transforms = {}
self.transforms = []
self.sources = []
self._recorded_vars = {}
self.logger = None
self.benchmark_return_source = None
self.perf_tracker = None
# default components for transact
self.slippage = VolumeShareSlippage()
self.commission = PerShare()
if 'data_frequency' in kwargs:
self.set_data_frequency(kwargs.pop('data_frequency'))
else:
self.data_frequency = None
self.instant_fill = kwargs.pop('instant_fill', False)
# Override annualizer if set
if 'annualizer' in kwargs:
self.annualizer = kwargs['annualizer']
# set the capital base
self.capital_base = kwargs.pop('capital_base', DEFAULT_CAPITAL_BASE)
self.sim_params = kwargs.pop('sim_params', None)
if self.sim_params:
if self.data_frequency is None:
self.data_frequency = self.sim_params.data_frequency
else:
self.sim_params.data_frequency = self.data_frequency
self.perf_tracker = PerformanceTracker(self.sim_params)
self.blotter = kwargs.pop('blotter', None)
if not self.blotter:
self.blotter = Blotter()
self.portfolio_needs_update = True
self._portfolio = None
# If string is passed in, execute and get reference to
# functions.
self.algoscript = kwargs.pop('script', None)
if self.algoscript is not None:
self.ns = {}
exec_(self.algoscript, self.ns)
if 'initialize' not in self.ns:
raise ValueError('You must define an initialze function.')
if 'handle_data' not in self.ns:
raise ValueError('You must define a handle_data function.')
self._initialize = self.ns['initialize']
self._handle_data = self.ns['handle_data']
# If two functions are passed in assume initialize and
# handle_data are passed in.
elif kwargs.get('initialize', False) and kwargs.get('handle_data'):
if self.algoscript is not None:
raise ValueError('You can not set script and \
initialize/handle_data.')
self._initialize = kwargs.pop('initialize')
self._handle_data = kwargs.pop('handle_data')
# an algorithm subclass needs to set initialized to True when
# it is fully initialized.
self.initialized = False
self.initialize(*args, **kwargs)
def transaction_sim(self, **params):
""" This is a utility method that asserts expected
results for conversion of orders to transactions given a
trade history"""
trade_count = params['trade_count']
trade_interval = params['trade_interval']
trade_delay = params.get('trade_delay')
order_count = params['order_count']
order_amount = params['order_amount']
order_interval = params['order_interval']
expected_txn_count = params['expected_txn_count']
expected_txn_volume = params['expected_txn_volume']
# optional parameters
# ---------------------
# if present, alternate between long and short sales
alternate = params.get('alternate')
# if present, expect transaction amounts to match orders exactly.
complete_fill = params.get('complete_fill')
sid = 1
sim_params = factory.create_simulation_parameters()
trade_sim = TransactionSimulator()
price = [10.1] * trade_count
volume = [100] * trade_count
start_date = sim_params.first_open
generated_trades = factory.create_trade_history(
sid,
price,
volume,
trade_interval,
sim_params
)
if alternate:
alternator = -1
else:
alternator = 1
order_date = start_date
for i in xrange(order_count):
order = ndict({
'sid': sid,
'amount': order_amount * alternator ** i,
'dt': order_date
})
trade_sim.place_order(order)
order_date = order_date + order_interval
# move after market orders to just after market next
# market open.
if order_date.hour >= 21:
if order_date.minute >= 00:
order_date = order_date + timedelta(days=1)
order_date = order_date.replace(hour=14, minute=30)
# there should now be one open order list stored under the sid
oo = trade_sim.open_orders
self.assertEqual(len(oo), 1)
self.assertTrue(sid in oo)
order_list = oo[sid]
self.assertEqual(order_count, len(order_list))
for i in xrange(order_count):
order = order_list[i]
self.assertEqual(order.sid, sid)
self.assertEqual(order.amount, order_amount * alternator ** i)
tracker = PerformanceTracker(sim_params)
# this approximates the loop inside TradingSimulationClient
transactions = []
for trade in generated_trades:
if trade_delay:
trade.dt = trade.dt + trade_delay
trade_sim.update(trade)
if trade.TRANSACTION:
transactions.append(trade.TRANSACTION)
tracker.process_event(trade)
if complete_fill:
self.assertEqual(len(transactions), len(order_list))
total_volume = 0
for i in xrange(len(transactions)):
txn = transactions[i]
total_volume += txn.amount
if complete_fill:
order = order_list[i]
self.assertEqual(order.amount, txn.amount)
self.assertEqual(total_volume, expected_txn_volume)
self.assertEqual(len(transactions), expected_txn_count)
cumulative_pos = tracker.cumulative_performance.positions[sid]
self.assertEqual(total_volume, cumulative_pos.amount)
#.........这里部分代码省略.........
#.........这里部分代码省略.........
"close": [10.1] * len(days),
"volume": [100] * len(days),
"day": [day.value for day in days],
},
index=days,
)
}
path = os.path.join(tempdir.path, "testdata.bcolz")
BcolzDailyBarWriter(path, days).write(assets.items())
equity_daily_reader = BcolzDailyBarReader(path)
data_portal = DataPortal(
env,
first_trading_day=equity_daily_reader.first_trading_day,
equity_daily_reader=equity_daily_reader,
)
if "default_slippage" not in params or not params["default_slippage"]:
slippage_func = FixedSlippage()
else:
slippage_func = None
blotter = Blotter(sim_params.data_frequency, self.env.asset_finder, slippage_func)
start_date = sim_params.first_open
if alternate:
alternator = -1
else:
alternator = 1
tracker = PerformanceTracker(sim_params, self.env)
# replicate what tradesim does by going through every minute or day
# of the simulation and processing open orders each time
if sim_params.data_frequency == "minute":
ticks = minutes
else:
ticks = days
transactions = []
order_list = []
order_date = start_date
for tick in ticks:
blotter.current_dt = tick
if tick >= order_date and len(order_list) < order_count:
# place an order
direction = alternator ** len(order_list)
order_id = blotter.order(
blotter.asset_finder.retrieve_asset(sid), order_amount * direction, MarketOrder()
)
order_list.append(blotter.orders[order_id])
order_date = order_date + order_interval
# move after market orders to just after market next
# market open.
if order_date.hour >= 21:
if order_date.minute >= 00:
order_date = order_date + timedelta(days=1)
order_date = order_date.replace(hour=14, minute=30)
else:
bar_data = BarData(data_portal, lambda: tick, sim_params.data_frequency)
txns, _, closed_orders = blotter.get_transactions(bar_data)
for txn in txns:
#.........这里部分代码省略.........
else:
benchmark_return_source = self.benchmark_return_source
date_sorted = date_sorted_sources(*self.sources)
if source_filter:
date_sorted = ifilter(source_filter, date_sorted)
with_tnfms = sequential_transforms(date_sorted,
*self.transforms)
with_alias_dt = alias_dt(with_tnfms)
with_benchmarks = date_sorted_sources(benchmark_return_source,
with_alias_dt)
# Group together events with the same dt field. This depends on the
# events already being sorted.
return groupby(with_benchmarks, attrgetter('dt'))
def _create_generator(self, sim_params, source_filter=None):
"""
Create a basic generator setup using the sources and
transforms attached to this algorithm.
::source_filter:: is a method that receives events in date
sorted order, and returns True for those events that should be
processed by the zipline, and False for those that should be
skipped.
"""
sim_params.data_frequency = self.data_frequency
self.data_gen = self._create_data_generator(source_filter,
sim_params)
self.perf_tracker = PerformanceTracker(sim_params)
self.trading_client = AlgorithmSimulator(self, sim_params)
transact_method = transact_partial(self.slippage, self.commission)
self.set_transact(transact_method)
self.blotter.leverage = leverage_partial(self.leverage, self.perf_tracker.get_portfolio())
return self.trading_client.transform(self.data_gen)
def get_generator(self):
"""
Override this method to add new logic to the construction
of the generator. Overrides can use the _create_generator
method to get a standard construction generator.
"""
return self._create_generator(self.sim_params)
def initialize(self, *args, **kwargs):
pass
# TODO: make a new subclass, e.g. BatchAlgorithm, and move
# the run method to the subclass, and refactor to put the
# generator creation logic into get_generator.
def run(self, source, sim_params=None, benchmark_return_source=None):
"""Run the algorithm.
:Arguments:
source : can be either:
- pandas.DataFrame
- zipline source
- list of zipline sources
class AlgorithmSimulator(object):
EMISSION_TO_PERF_KEY_MAP = {
'minute': 'intraday_perf',
'daily': 'daily_perf'
}
def get_hash(self):
"""
There should only ever be one TSC in the system, so
we don't bother passing args into the hash.
"""
return self.__class__.__name__ + hash_args()
def __init__(self, algo, sim_params):
# ==============
# Simulation
# Param Setup
# ==============
self.sim_params = sim_params
# ==============
# Perf Tracker
# Setup
# ==============
self.perf_tracker = PerformanceTracker(self.sim_params)
self.perf_key = self.EMISSION_TO_PERF_KEY_MAP[
self.perf_tracker.emission_rate]
# ==============
# Algo Setup
# ==============
self.algo = algo
self.algo_start = self.sim_params.first_open
self.algo_start = self.algo_start.replace(hour=0, minute=0,
second=0,
microsecond=0)
# ==============
# Snapshot Setup
# ==============
# The algorithm's data as of our most recent event.
# We want an object that will have empty objects as default
# values on missing keys.
self.current_data = BarData()
# We don't have a datetime for the current snapshot until we
# receive a message.
self.simulation_dt = None
self.snapshot_dt = None
# =============
# Logging Setup
# =============
# Processor function for injecting the algo_dt into
# user prints/logs.
def inject_algo_dt(record):
if not 'algo_dt' in record.extra:
record.extra['algo_dt'] = self.snapshot_dt
self.processor = Processor(inject_algo_dt)
def transform(self, stream_in):
"""
Main generator work loop.
"""
# Set the simulation date to be the first event we see.
peek_date, peek_snapshot = next(stream_in)
self.simulation_dt = peek_date
# Stitch back together the generator by placing the peeked
# event back in front
stream = itertools.chain([(peek_date, peek_snapshot)],
stream_in)
# inject the current algo
# snapshot time to any log record generated.
with self.processor.threadbound():
updated = False
bm_updated = False
for date, snapshot in stream:
self.perf_tracker.set_date(date)
self.algo.blotter.set_date(date)
# If we're still in the warmup period. Use the event to
# update our universe, but don't yield any perf messages,
# and don't send a snapshot to handle_data.
if date < self.algo_start:
for event in snapshot:
if event.type in (DATASOURCE_TYPE.TRADE,
DATASOURCE_TYPE.CUSTOM):
self.update_universe(event)
self.perf_tracker.process_event(event)
else:
for event in snapshot:
#.........这里部分代码省略.........
请发表评论