本文整理汇总了Python中upsg.pipeline.Pipeline类的典型用法代码示例。如果您正苦于以下问题:Python Pipeline类的具体用法?Python Pipeline怎么用?Python Pipeline使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Pipeline类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_merge
def test_merge(self):
a1 = np.array([(0, 'Lisa', 2),
(1, 'Bill', 1),
(2, 'Fred', 2),
(3, 'Samantha', 2),
(4, 'Augustine', 1),
(5, 'William', 0)], dtype=[('id', int),
('name', 'S64'),
('dept_id', int)])
a2 = np.array([(0, 'accts receivable'),
(1, 'accts payable'),
(2, 'shipping')], dtype=[('id', int),
('name', 'S64')])
kwargs = {}
p = Pipeline()
a1_in = p.add(NumpyRead(a1))
a2_in = p.add(NumpyRead(a2))
merge = p.add(Merge('dept_id', 'id', **kwargs))
out = p.add(NumpyWrite())
out(merge(a1_in, a2_in))
self.run_pipeline(p)
result = out.get_stage().result
ctrl = obj_to_str(
pd.DataFrame(a1).merge(
pd.DataFrame(a2),
left_on='dept_id',
right_on='id').to_records(index=False))
assert(np.array_equal(result, ctrl))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:33,代码来源:test_transform.py
示例2: test_DAG
def test_DAG(self):
p = Pipeline()
s0 = OneCellLambdaStage(lambda: 'S0')
s1 = OneCellLambdaStage(lambda: 'S1')
s2 = OneCellLambdaStage(lambda: 'S2')
s3 = OneCellLambdaStage(lambda x, y: '({},{})->I{}'.format(x, y, '3'))
s4 = OneCellLambdaStage(lambda x, y: '({},{})->I{}'.format(x, y, '4'))
s5out = StringIO()
s6out = StringIO()
s5 = OneCellLambdaStage(lambda x, y: '({},{})->T{}'.format(x, y, '5'),
fout=s5out)
s6 = OneCellLambdaStage(lambda x: '({})->T{}'.format(x, '6'),
fout=s6out)
nodes = [p.add(s) for s in (s0, s1, s2, s3, s4, s5, s6)]
nodes[0]['fx'] > nodes[3]['x']
nodes[1]['fx'] > nodes[3]['y']
nodes[1]['fx'] > nodes[4]['x']
nodes[2]['fx'] > nodes[4]['y']
nodes[3]['fx'] > nodes[5]['x']
nodes[4]['fx'] > nodes[5]['y']
nodes[4]['fx'] > nodes[6]['x']
self.run_pipeline(p)
self.assertEqual(s5out.getvalue(),
"((S0,S1)->I3,(S1,S2)->I4)->T5")
self.assertEqual(s6out.getvalue(),
"((S1,S2)->I4)->T6")
开发者ID:macressler,项目名称:UPSG,代码行数:30,代码来源:test_pipeline.py
示例3: test_query_complex
def test_query_complex(self):
p = Pipeline()
csv_in = p.add(CSVRead(path_of_data('query.csv')))
q1_node = p.add(Query("((id == value) and not (use_this_col == 'no'))"
"or name == 'fish'"))
csv_out = p.add(CSVWrite(self._tmp_files('out.csv')))
csv_comp = p.add(CSVWrite(self._tmp_files('out_comp.csv')))
csv_in['output'] > q1_node['input']
q1_node['output'] > csv_out['input']
q1_node['complement'] > csv_comp['input']
self.run_pipeline(p)
result = self._tmp_files.csv_read('out.csv')
ctrl = csv_read(path_of_data('query_ctrl.csv'))
self.assertTrue(np.array_equal(result, ctrl))
result = self._tmp_files.csv_read('out_comp.csv')
ctrl = csv_read(path_of_data('query_ctrl_comp.csv'))
self.assertTrue(np.array_equal(result, ctrl))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:25,代码来源:test_transform.py
示例4: test_multiclassify
def test_multiclassify(self):
samples = 150
features = 3
folds = 2
X = np.random.random((samples, features))
y = np.random.randint(0, 2, (samples))
p = Pipeline()
np_in_X = p.add(NumpyRead(X))
np_in_y = p.add(NumpyRead(y))
split_train_test = p.add(SplitTrainTest(2))
np_in_X['output'] > split_train_test['input0']
np_in_y['output'] > split_train_test['input1']
multi = p.add(Multiclassify(
'score',
self._tmp_files('report.html'),
None,
folds))
split_train_test['train0'] > multi['X_train']
split_train_test['test0'] > multi['X_test']
split_train_test['train1'] > multi['y_train']
split_train_test['test1'] > multi['y_test']
self.run_pipeline(p)
self.assertTrue(os.path.isfile(self._tmp_files('report.html')))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:31,代码来源:test_model.py
示例5: test_cross_validation_score
def test_cross_validation_score(self):
rows = 100
folds = 10
X = np.random.random((rows, 10))
y = np.random.randint(0, 2, (rows))
p = Pipeline()
np_in_X = p.add(NumpyRead(X))
np_in_y = p.add(NumpyRead(y))
cv_score = p.add(CrossValidationScore(wrap(SVC), 'score', {}, folds,
random_state=0))
np_in_X['output'] > cv_score['X_train']
np_in_y['output'] > cv_score['y_train']
score_out = p.add(CSVWrite(self._tmp_files('out.csv')))
cv_score['score'] > score_out['input']
self.run_pipeline(p)
result = self._tmp_files.csv_read('out.csv')['f0']
ctrl_kf = SKKFold(rows, folds, random_state=0)
ctrl = np.mean(cross_val_score(SVC(), X, y, cv=ctrl_kf))
self.assertTrue(np.allclose(ctrl, result))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:28,代码来源:test_model.py
示例6: test_3_stage
def test_3_stage(self):
from sklearn.preprocessing import Imputer
infile_name = path_of_data('missing_vals.csv')
p = Pipeline()
csv_read_node = p.add(CSVRead(infile_name))
csv_write_node = p.add(CSVWrite(self._tmp_files.get('out.csv')))
impute_node = p.add(wrap_and_make_instance(Imputer))
csv_read_node['output'] > impute_node['X_train']
impute_node['X_new'] > csv_write_node['input']
self.run_pipeline(p)
ctrl_imputer = Imputer()
ctrl_X_sa = np.genfromtxt(infile_name, dtype=None, delimiter=",",
names=True)
num_type = ctrl_X_sa[0][0].dtype
ctrl_X_nd, ctrl_X_sa_type = np_sa_to_nd(ctrl_X_sa)
ctrl_X_new_nd = ctrl_imputer.fit_transform(ctrl_X_nd)
control = ctrl_X_new_nd
result = self._tmp_files.csv_read('out.csv', True)
self.assertTrue(np.allclose(result, control))
开发者ID:macressler,项目名称:UPSG,代码行数:27,代码来源:test_pipeline.py
示例7: __simple_pipeline
def __simple_pipeline(self, sk_cls, sk_method_name, upsg_out_key,
init_kwargs={}, in_data=None):
X_in, y_in = self.__process_in_data(in_data)
ctrl_sk_inst = sk_cls(**init_kwargs)
est_params = ctrl_sk_inst.get_params()
try:
random_state = est_params['random_state']
if random_state is None:
# This has to be fixed. Set a state and try again
init_kwargs['random_state'] = 0
ctrl_sk_inst = sk_cls(**init_kwargs)
except KeyError:
pass
p = Pipeline()
sk_stage = p.add(wrap_and_make_instance(
sk_cls,
**init_kwargs))
X_in_stage = p.add(NumpyRead(X_in))
y_in_stage = p.add(NumpyRead(y_in))
if sk_method_name == 'predict':
train_test = p.add(SplitTrainTest(2, random_state=0))
X_in_stage['output'] > train_test['input0']
y_in_stage['output'] > train_test['input1']
input_keys = sk_stage.get_stage().input_keys
if 'X_train' in input_keys:
train_test['train0'] > sk_stage['X_train']
if 'X_test' in input_keys:
train_test['test0'] > sk_stage['X_test']
if 'y_train' in input_keys:
train_test['train1'] > sk_stage['y_train']
else:
X_in_stage['output'] > sk_stage['X_train']
y_in_stage['output'] > sk_stage['y_train']
csv_out = p.add(CSVWrite(self._tmp_files.get('out.csv')))
sk_stage[upsg_out_key] > csv_out['input']
self.run_pipeline(p)
if sk_method_name == 'predict':
ctrl_X_train, ctrl_X_test, ctrl_y_train, ctrl_y_test = (
train_test_split(X_in, y_in, random_state=0))
ctrl_sk_inst.fit(ctrl_X_train, ctrl_y_train)
control = ctrl_sk_inst.predict(ctrl_X_test)
else:
control = ctrl_sk_inst.fit_transform(X_in, y_in)
result = self._tmp_files.csv_read('out.csv', as_nd=True)
if result.ndim != control.ndim and result.ndim == 1:
result = result.reshape(result.size, 1)
self.assertTrue(result.shape == control.shape and
np.allclose(result, control))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:60,代码来源:test_wrap.py
示例8: test_moving_params
def test_moving_params(self):
digits = datasets.load_digits()
digits_data = digits.data
digits_target = digits.target
p = Pipeline()
node_data = p.add(NumpyRead(digits_data))
node_target = p.add(NumpyRead(digits_target))
node_split = p.add(SplitTrainTest(2, random_state=0))
# parameters from
# http://scikit-learn.org/stable/auto_examples/plot_classifier_comparison.html
node_clf1 = p.add(
wrap_and_make_instance(
RandomForestClassifier,
max_depth=5,
n_estimators=10,
max_features=1,
random_state=0))
node_clf2 = p.add(wrap_and_make_instance(RandomForestClassifier, max_depth=12,
n_estimators=100, max_features=1000))
node_params_out_1 = p.add(CSVWrite(self._tmp_files.get(
'out_params_1.csv')))
node_params_out_2 = p.add(CSVWrite(self._tmp_files.get(
'out_params_2.csv')))
node_pred_out_1 = p.add(CSVWrite(self._tmp_files.get(
'out_pred_1.csv')))
node_pred_out_2 = p.add(CSVWrite(self._tmp_files.get(
'out_pred_2.csv')))
node_data['output'] > node_split['input0']
node_target['output'] > node_split['input1']
node_split['train0'] > node_clf1['X_train']
node_split['train1'] > node_clf1['y_train']
node_split['test0'] > node_clf1['X_test']
node_split['train0'] > node_clf2['X_train']
node_split['train1'] > node_clf2['y_train']
node_split['test0'] > node_clf2['X_test']
node_clf1['params_out'] > node_clf2['params_in']
node_clf1['params_out'] > node_params_out_1['input']
node_clf2['params_out'] > node_params_out_2['input']
node_clf1['y_pred'] > node_pred_out_1['input']
node_clf2['y_pred'] > node_pred_out_2['input']
self.run_pipeline(p)
params_1 = self._tmp_files.csv_read('out_params_1.csv')
params_2 = self._tmp_files.csv_read('out_params_2.csv')
self.assertTrue(np.array_equal(params_1, params_2))
y_pred_1 = self._tmp_files.csv_read('out_pred_1.csv')
y_pred_2 = self._tmp_files.csv_read('out_pred_2.csv')
self.assertTrue(np.array_equal(y_pred_1, y_pred_2))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:58,代码来源:test_wrap.py
示例9: test_numpy_write
def test_numpy_write(self):
in_data = np.random.rand(10,10)
p = Pipeline()
np_in = p.add(NumpyRead(in_data))
np_out = p.add(NumpyWrite())
np_in['output'] > np_out['input']
self.run_pipeline(p)
self.assertTrue(np.allclose(
in_data,
np_sa_to_nd(np_out.get_stage().result)[0]))
开发者ID:macressler,项目名称:UPSG,代码行数:10,代码来源:test_export.py
示例10: test_sql
def test_sql(self):
# Make sure we don't accidentally corrupt our test database
db_path, db_file_name = self._tmp_files.tmp_copy(path_of_data(
'small.db'))
db_url = 'sqlite:///{}'.format(db_path)
q_sel_employees = 'CREATE TABLE {tmp_emp} AS SELECT * FROM employees;'
# We have to be careful about the datetime type in sqlite3. It will
# forget if we don't keep reminding it, and if it forgets sqlalchemy
# will be unhappy. Hence, we can't use CREATE TABLE AS if our table
# has a DATETIME
q_sel_hours = ('CREATE TABLE {tmp_hrs} '
'(id INT, employee_id INT, time DATETIME, '
' event_type TEXT); '
'INSERT INTO {tmp_hrs} SELECT * FROM hours;')
q_join = ('CREATE TABLE {joined} '
'(id INT, last_name TEXT, salary REAL, time DATETIME, '
' event_type TEXT); '
'INSERT INTO {joined} '
'SELECT {tmp_emp}.id, last_name, salary, time, event_type '
'FROM {tmp_emp} JOIN {tmp_hrs} ON '
'{tmp_emp}.id = {tmp_hrs}.employee_id;')
p = Pipeline()
get_emp = p.add(RunSQL(db_url, q_sel_employees, [], ['tmp_emp'], {}))
get_hrs = p.add(RunSQL(db_url, q_sel_hours, [], ['tmp_hrs'], {}))
join = p.add(RunSQL(db_url, q_join, ['tmp_emp', 'tmp_hrs'], ['joined'],
{}))
csv_out = p.add(CSVWrite(self._tmp_files('out.csv')))
get_emp['tmp_emp'] > join['tmp_emp']
get_hrs['tmp_hrs'] > join['tmp_hrs']
join['joined'] > csv_out['input']
self.run_pipeline(p)
ctrl = csv_read(path_of_data('test_transform_test_sql_ctrl.csv'))
result = self._tmp_files.csv_read('out.csv')
# Because Numpy insists on printing times with local offsets, but
# not every computer has the same offset, we have to force it back
# into UTC
for i, dt in enumerate(result['time']):
# .item() makes a datetime, which we can format correctly later
# http://stackoverflow.com/questions/25134639/how-to-force-python-print-numpy-datetime64-with-specified-timezone
result['time'][i] = np.datetime64(dt).item().strftime(
'%Y-%m-%dT%H:%M:%S')
# Then we have to make the string field smaller
new_cols = []
for col in result.dtype.names:
new_cols.append(result[col].astype(ctrl.dtype[col]))
result = merge_arrays(new_cols, flatten=True)
result.dtype.names = ctrl.dtype.names
self.assertTrue(np.array_equal(result, ctrl))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:55,代码来源:test_transform.py
示例11: test_label_encode
def test_label_encode(self):
p = Pipeline()
csv_in = p.add(CSVRead(path_of_data('categories.csv')))
le = p.add(LabelEncode())
csv_out = p.add(CSVWrite(self._tmp_files('out.csv')))
csv_in['output'] > le['input']
le['output'] > csv_out['input']
self.run_pipeline(p)
result = self._tmp_files.csv_read('out.csv')
ctrl = csv_read(path_of_data('test_transform_test_label_encode_ctrl.csv'))
self.assertTrue(np.array_equal(result, ctrl))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:17,代码来源:test_transform.py
示例12: test_fill_na
def test_fill_na(self):
p = Pipeline()
csv_in = p.add(CSVRead(path_of_data('missing_vals_mixed.csv')))
fill_na = p.add(FillNA(-1))
csv_out = p.add(CSVWrite(self._tmp_files('out.csv')))
csv_in['output'] > fill_na['input']
fill_na['output'] > csv_out['input']
self.run_pipeline(p)
result = self._tmp_files.csv_read('out.csv')
ctrl = csv_read(path_of_data('test_transform_test_fill_na_ctrl.csv'))
self.assertTrue(np.array_equal(result, ctrl))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:17,代码来源:test_transform.py
示例13: test_rw
def test_rw(self):
infile_name = path_of_data('mixed_csv.csv')
p = Pipeline()
csv_read_node = p.add(CSVRead(infile_name))
csv_write_node = p.add(CSVWrite(self._tmp_files.get('out.csv')))
csv_read_node['output'] > csv_write_node['input']
self.run_pipeline(p)
control = np.genfromtxt(infile_name, dtype=None, delimiter=",",
names=True)
result = self._tmp_files.csv_read('out.csv')
self.assertTrue(np.array_equal(result, control))
开发者ID:macressler,项目名称:UPSG,代码行数:17,代码来源:test_pipeline.py
示例14: test_hstack
def test_hstack(self):
a = np.array(
[(0.0, 0.1), (1.0, 1.1), (2.0, 2.1)],
dtype=[('f0', float), ('f1', float)])
b = np.array(
[(0.2, 0.3), (1.2, 1.3), (2.2, 2.3)],
dtype=[('f2', float), ('f3', float)])
ctrl = np.array(
[(0.0, 0.1, 0.2, 0.3), (1.0, 1.1, 1.2, 1.3),
(2.0, 2.1, 2.2, 2.3)],
dtype=[('f0', float), ('f1', float), ('f2', float),
('f3', float)])
p = Pipeline()
np_in_a = p.add(NumpyRead(a))
np_in_b = p.add(NumpyRead(b))
hstack = p.add(HStack(2))
hstack(np_in_a, np_in_b)
out = p.add(NumpyWrite())
out(hstack)
p.run()
self.assertTrue(np.array_equal(ctrl, out.get_stage().result))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:29,代码来源:test_transform.py
示例15: test_query_dates
def test_query_dates(self):
p = Pipeline()
dates = np.array([(np.datetime64('2012-01-01')),
(np.datetime64('2013-04-05')),
(np.datetime64('2014-03-11')),
(np.datetime64('2015-01-01'))], dtype=[('dt', 'M8[D]')])
inds = np.array([(i,) for i in xrange(dates.size)], dtype=[('f0', int)])
np_in = p.add(NumpyRead(dates))
q2_node = p.add(Query("dt <= DT('2014-01-01')"))
np_in['output'] > q2_node['input']
np_out = p.add(NumpyWrite())
q2_node['output'] > np_out['input']
np_complement = p.add(NumpyWrite())
q2_node['complement'] > np_complement['input']
np_out_inds = p.add(NumpyWrite())
q2_node['output_inds'] > np_out_inds['input']
np_complement_inds = p.add(NumpyWrite())
q2_node['complement_inds'] > np_complement_inds['input']
self.run_pipeline(p)
self.assertTrue(np.array_equal(np_out.get_stage().result, dates[:2]))
self.assertTrue(np.array_equal(np_complement.get_stage().result, dates[2:]))
self.assertTrue(np.array_equal(np_out_inds.get_stage().result, inds[:2]))
self.assertTrue(np.array_equal(np_complement_inds.get_stage().result, inds[2:]))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:34,代码来源:test_transform.py
示例16: test_identity
def test_identity(self):
trials = [(('input0', 'input1'), ('output0', 'output1'),
{'input0': 'output0', 'input1': 'output1'},
True),
(('input0', 'input1', 'input2'),
('input0_out', 'input1_out', 'input2_out'),
('input0', 'input1', 'input2'),
True),
(('input0', 'input1'), ('output0', 'output1'),
{'output0': 'input0', 'output1': 'input1'},
False),
(('output0_in', 'output1_in', 'output2_in'),
('output0', 'output1', 'output2'),
('output0', 'output1', 'output2'),
False)]
for input_keys, output_keys, arg, specify_input in trials:
in_data_arrays = []
out_nodes = []
p = Pipeline()
if specify_input:
node_id = p.add(Identity(arg))
else:
node_id = p.add(Identity(output_keys=arg))
for input_key, output_key, in zip(input_keys, output_keys):
in_data = np_nd_to_sa(np.random.random((100, 10)))
node_in = p.add(NumpyRead(in_data))
node_in['output'] > node_id[input_key]
node_out = p.add(NumpyWrite())
node_id[output_key] > node_out['input']
in_data_arrays.append(in_data)
out_nodes.append(node_out)
self.run_pipeline(p)
for in_data, out_node in zip(in_data_arrays, out_nodes):
self.assertTrue(np.array_equal(in_data,
out_node.get_stage().result))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:45,代码来源:test_transform.py
示例17: test_rename_cols
def test_rename_cols(self):
infile_name = path_of_data('mixed_csv.csv')
rename_dict = {'name': 'designation', 'height': 'tallness'}
p = Pipeline()
csv_read_node = p.add(CSVRead(infile_name))
trans_node = p.add(RenameCols(rename_dict))
csv_write_node = p.add(CSVWrite(self._tmp_files('out.csv')))
csv_read_node['output'] > trans_node['input']
trans_node['output'] > csv_write_node['input']
self.run_pipeline(p)
control = {'id', 'designation', 'tallness'}
result = set(self._tmp_files.csv_read('out.csv').dtype.names)
self.assertTrue(np.array_equal(result, control))
开发者ID:Najah-lshanableh,项目名称:UPSG,代码行数:19,代码来源:test_transform.py
示例18: __metric_pipeline
def __metric_pipeline(self, metric, params={}, in_data=None):
X_in, y_in = self.__process_in_data(in_data)
metric_stage = wrap_and_make_instance(metric, **params)
in_keys = metric_stage.input_keys
out_keys = metric_stage.output_keys
p = Pipeline()
node_X_in = p.add(NumpyRead(X_in))
node_y_in = p.add(NumpyRead(y_in))
node_split = p.add(SplitTrainTest(2, random_state=0))
node_X_in["output"] > node_split["input0"]
node_y_in["output"] > node_split["input1"]
ctrl_X_train, ctrl_X_test, ctrl_y_train, ctrl_y_test = train_test_split(X_in, y_in, random_state=0)
node_clf = p.add(wrap_and_make_instance(SVC, random_state=0))
node_split["train0"] > node_clf["X_train"]
node_split["train1"] > node_clf["y_train"]
node_split["test0"] > node_clf["X_test"]
ctrl_clf = SVC(random_state=0, probability=True)
ctrl_clf.fit(ctrl_X_train, ctrl_y_train)
node_proba_1 = p.add(SplitY(1))
node_clf["pred_proba"] > node_proba_1["input"]
ctrl_y_score = ctrl_clf.predict_proba(ctrl_X_test)[:, 1]
node_metric = p.add(metric_stage)
ctrl_metric_args = {}
if "y_true" in in_keys:
node_split["test1"] > node_metric["y_true"]
ctrl_metric_args["y_true"] = ctrl_y_test
if "y_score" in in_keys:
node_proba_1["y"] > node_metric["y_score"]
ctrl_metric_args["y_score"] = ctrl_y_score
if "probas_pred" in in_keys:
node_proba_1["y"] > node_metric["probas_pred"]
ctrl_metric_args["probas_pred"] = ctrl_y_score
out_nodes = [p.add(CSVWrite(self._tmp_files("out_{}.csv".format(out_key)))) for out_key in out_keys]
[node_metric[out_key] > out_nodes[i]["input"] for i, out_key in enumerate(out_keys)]
self.run_pipeline(p)
ctrl_returns = metric(**ctrl_metric_args)
if len(out_keys) == 1:
ctrl_returns = (ctrl_returns,)
for i, out_key in enumerate(out_keys):
control = ctrl_returns[i]
result = self._tmp_files.csv_read("out_{}.csv".format(out_key), as_nd=True)
self.assertTrue(result.shape == control.shape and np.allclose(result, control))
开发者ID:macressler,项目名称:UPSG,代码行数:58,代码来源:test_wrap.py
示例19: test_cross_validation_score
def test_cross_validation_score(self):
rows = 100
folds = 10
X = np.random.random((rows, 10))
y = np.random.randint(0, 2, (rows))
trials = ((SKKFold,
{'random_state': 0, 'n_folds': folds},
{'n': rows, 'n_folds': folds, 'random_state': 0}),
(StratifiedKFold,
{'random_state': 0, 'n_folds': folds},
{'y': y, 'n_folds': folds, 'random_state': 0}))
for PartIter, res_kwargs, ctrl_kwargs in trials:
p = Pipeline()
np_in_X = p.add(NumpyRead(X))
np_in_y = p.add(NumpyRead(y))
cv_score = p.add(CrossValidationScore(
wrap(SVC),
{},
'score',
wrap(PartIter),
res_kwargs))
np_in_X['output'] > cv_score['X_train']
np_in_y['output'] > cv_score['y_train']
score_out = p.add(CSVWrite(self._tmp_files('out.csv')))
cv_score['score'] > score_out['input']
self.run_pipeline(p)
result = self._tmp_files.csv_read('out.csv')['f0']
ctrl_kf = PartIter(**ctrl_kwargs)
ctrl = np.mean(cross_val_score(SVC(), X, y, cv=ctrl_kf))
self.assertTrue(np.allclose(ctrl, result))
开发者ID:macressler,项目名称:UPSG,代码行数:42,代码来源:test_model.py
示例20: test_grid_search
def test_grid_search(self):
"""
Simulates behavior of example in:
http://scikit-learn.org/stable/modules/generated/sklearn.grid_search.GridSearchCV.html#sklearn.grid_search.GridSearchCV
"""
folds = 2
parameters = {
'kernel': (
'rbf',
'linear'),
'C': [
1,
10,
100],
'random_state': [0]}
iris = datasets.load_iris()
iris_data = iris.data
iris_target = iris.target
p = Pipeline()
node_data = p.add(NumpyRead(iris_data))
node_target = p.add(NumpyRead(iris_target))
node_split = p.add(SplitTrainTest(2, random_state=1))
node_search = p.add(GridSearch(
wrap(SVC),
parameters,
'score',
cv_stage_kwargs={'n_folds': folds}))
node_params_out = p.add(CSVWrite(self._tmp_files.get('out.csv')))
node_data['output'] > node_split['input0']
node_target['output'] > node_split['input1']
node_split['train0'] > node_search['X_train']
node_split['train1'] > node_search['y_train']
node_split['test0'] > node_search['X_test']
node_split['test1'] > node_search['y_test']
node_search['params_out'] > node_params_out['input']
self.run_pipeline(p)
result = self._tmp_files.csv_read('out.csv')
ctrl_X_train, _, ctrl_y_train, _ = train_test_split(
iris_data, iris_target, random_state=1)
ctrl_cv = SKKFold(ctrl_y_train.size, folds)
ctrl_search = grid_search.GridSearchCV(SVC(), parameters, cv=ctrl_cv)
ctrl_search.fit(ctrl_X_train, ctrl_y_train)
control = ctrl_search.best_params_
# TODO a number of configurations tie here, and sklearn picks a different
# best configuration than upsg does (although they have the same score)
# ideally, we want to find some parameters where there is a clear
# winner
control = {'C': 10, 'kernel': 'linear', 'random_state': 0}
self.assertEqual(np_sa_to_dict(np.array([result])), control)
开发者ID:macressler,项目名称:UPSG,代码行数:60,代码来源:test_model.py
注:本文中的upsg.pipeline.Pipeline类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论