本文整理汇总了Python中pyspark.ml.feature.VectorAssembler类的典型用法代码示例。如果您正苦于以下问题:Python VectorAssembler类的具体用法?Python VectorAssembler怎么用?Python VectorAssembler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了VectorAssembler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: seg_model_lr
def seg_model_lr(train_data, test_data, regType, num_iter):
removelist_train= set(['stars', 'business_id', 'bus_id', 'b_id','review_id', 'user_id'])
newlist_train = [v for i, v in enumerate(train_data.columns) if v not in removelist_train]
# Putting data in vector assembler form
assembler_train = VectorAssembler(inputCols=newlist_train, outputCol="features")
transformed_train = assembler_train.transform(train_data.fillna(0))
# Creating input dataset in the form of labeled point for training the model
data_train= (transformed_train.select("features", "stars")).map(lambda row: LabeledPoint(row.stars, row.features))
# Training the model using Logistic regression Classifier
model_train = LogisticRegressionWithLBFGS.train(sc.parallelize(data_train.collect(),5),
regType =regType, iterations=num_iter, numClasses=5)
# Creating a list of features to be used for predictions
removelist_final = set(['business_id', 'bus_id', 'b_id','review_id', 'user_id'])
newlist_final = [v for i, v in enumerate(test_data.columns) if v not in removelist_final]
# Putting data in vector assembler form
assembler_final = VectorAssembler(inputCols=newlist_final,outputCol="features")
transformed_final= assembler_final.transform(test_data.fillna(0))
# Creating input dataset to be used for predictions
data_final = transformed_final.select("features", "review_id")
# Predicting ratings using the developed model
predictions = model_train.predict(data_final.map(lambda x: x.features))
labelsAndPredictions = data_final.map(lambda data_final: data_final.review_id).zip(predictions)
return labelsAndPredictions
开发者ID:USF-ML2,项目名称:Rectastic-,代码行数:32,代码来源:LR_models.py
示例2: writeLumbarReadings
def writeLumbarReadings(time, rdd):
try:
# Convert RDDs of the words DStream to DataFrame and run SQL query
connectionProperties = MySQLConnection.getDBConnectionProps('/home/erik/mysql_credentials.txt')
sqlContext = SQLContext(rdd.context)
if rdd.isEmpty() == False:
lumbarReadings = sqlContext.jsonRDD(rdd)
lumbarReadingsIntermediate = lumbarReadings.selectExpr("readingID","readingTime","deviceID","metricTypeID","uomID","actual.y AS actualYaw","actual.p AS actualPitch","actual.r AS actualRoll","setPoints.y AS setPointYaw","setPoints.p AS setPointPitch","setPoints.r AS setPointRoll")
assembler = VectorAssembler(
inputCols=["actualPitch"], # Must be in same order as what was used to train the model. Testing using only pitch since model has limited dataset.
outputCol="features")
lumbarReadingsIntermediate = assembler.transform(lumbarReadingsIntermediate)
predictions = loadedModel.predict(lumbarReadingsIntermediate.map(lambda x: x.features))
predictionsDF = lumbarReadingsIntermediate.map(lambda x: x.readingID).zip(predictions).toDF(["readingID","positionID"])
combinedDF = lumbarReadingsIntermediate.join(predictionsDF, lumbarReadingsIntermediate.readingID == predictionsDF.readingID).drop(predictionsDF.readingID)
combinedDF = combinedDF.drop("features")
combinedDF.show()
combinedDF.write.jdbc("jdbc:mysql://localhost/biosensor", "SensorReadings", properties=connectionProperties)
except:
pass
开发者ID:kringen,项目名称:IOT-Back-Brace,代码行数:26,代码来源:ProcessSensorReadings.py
示例3: seg_model_gb
def seg_model_gb(train_data, test_data, loss_type, num_iter, maxDepth):
removelist_train= set(['stars', 'business_id', 'bus_id', 'b_id','review_id', 'user_id'])
newlist_train = [v for i, v in enumerate(train_data.columns) if v not in removelist_train]
# Putting data in vector assembler form
assembler_train = VectorAssembler(inputCols=newlist_train, outputCol="features")
transformed_train = assembler_train.transform(train_data.fillna(0))
# Creating input dataset in the form of labeled point for training the model
data_train= (transformed_train.select("features", "stars")).map(lambda row: LabeledPoint(row.stars, row.features))
# Training the model using Gradient Boosted Trees regressor
model_train = GradientBoostedTrees.trainRegressor(sc.parallelize(data_train.collect(),5), categoricalFeaturesInfo={},
loss=loss_type,
numIterations=num_iter, maxDepth=maxDepth)
# Creating a list of features to be used for predictions
removelist_final = set(['business_id', 'bus_id', 'b_id','review_id', 'user_id'])
newlist_final = [v for i, v in enumerate(test_data.columns) if v not in removelist_final]
# Putting data in vector assembler form
assembler_final = VectorAssembler(inputCols=newlist_final,outputCol="features")
transformed_final= assembler_final.transform(test_data.fillna(0))
# Creating input dataset to be used for predictions
data_final = transformed_final.select("features", "review_id")
# Predicting ratings using the developed model
predictions = model_train.predict(data_final.map(lambda x: x.features))
labelsAndPredictions = data_final.map(lambda data_final: data_final.review_id).zip(predictions)
return labelsAndPredictions
开发者ID:USF-ML2,项目名称:Rectastic-,代码行数:33,代码来源:GB_models.py
示例4: _convertPythonXToJavaObject
def _convertPythonXToJavaObject(self, X):
"""
Converts the input python object X to a java-side object (either MatrixBlock or Java DataFrame)
Parameters
----------
X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
"""
if isinstance(X, SUPPORTED_TYPES) and self.transferUsingDF:
pdfX = convertToPandasDF(X)
df = assemble(
self.sparkSession,
pdfX,
pdfX.columns,
self.features_col).select(
self.features_col)
return df._jdf
elif isinstance(X, SUPPORTED_TYPES):
return convertToMatrixBlock(self.sc, X)
elif hasattr(X, '_jdf') and self.features_col in X.columns:
# No need to assemble as input DF is likely coming via MLPipeline
return X._jdf
elif hasattr(X, '_jdf'):
assembler = VectorAssembler(
inputCols=X.columns, outputCol=self.features_col)
df = assembler.transform(X)
return df._jdf
else:
raise Exception('Unsupported input type')
开发者ID:niketanpansare,项目名称:incubator-systemml,代码行数:29,代码来源:estimators.py
示例5: text_features
def text_features(p_df):
"""
Extracts features derived from the quora question texts.
:param p_df: A DataFrame.
:return: A DataFrame.
"""
diff_len = udf(lambda arr: arr[0] - arr[1], IntegerType())
common_words = udf(lambda arr: len(set(arr[0]).intersection(set(arr[1]))), IntegerType())
unique_chars = udf(lambda s: len(''.join(set(s.replace(' ', '')))), IntegerType())
p_df = p_df.withColumn("len_q1", length("question1")).withColumn("len_q2", length("question2"))
p_df = p_df.withColumn("diff_len", diff_len(array("len_q1", "len_q2")))
p_df = p_df.withColumn("words_q1", size("question1_words")).withColumn("words_q2", size("question2_words"))
p_df = p_df.withColumn("common_words", common_words(array("question1_words", "question2_words")))
p_df = p_df.withColumn(
"unique_chars_q1", unique_chars("question1")
).withColumn("unique_chars_q2", unique_chars("question2"))
assembler = VectorAssembler(
inputCols=["len_q1", "len_q2", "diff_len", "words_q1", "words_q2", "common_words", "unique_chars_q1", "unique_chars_q2"],
outputCol="text_features"
)
p_df = assembler.transform(p_df)
return p_df
开发者ID:rhasan,项目名称:machine-learning,代码行数:25,代码来源:Quora.py
示例6: predict
def predict(self, X):
if isinstance(X, SUPPORTED_TYPES):
if self.transferUsingDF:
pdfX = convertToPandasDF(X)
df = assemble(self.sqlCtx, pdfX, pdfX.columns, 'features').select('features')
retjDF = self.model.transform(df._jdf)
retDF = DataFrame(retjDF, self.sqlCtx)
retPDF = retDF.sort('ID').select('prediction').toPandas()
if isinstance(X, np.ndarray):
return retPDF.as_matrix().flatten()
else:
return retPDF
else:
retNumPy = convertToNumpyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X)))
if isinstance(X, np.ndarray):
return retNumPy
else:
return retNumPy # TODO: Convert to Pandas
elif hasattr(X, '_jdf'):
if 'features' in X.columns:
# No need to assemble as input DF is likely coming via MLPipeline
df = X
else:
assembler = VectorAssembler(inputCols=X.columns, outputCol='features')
df = assembler.transform(X)
retjDF = self.model.transform(df._jdf)
retDF = DataFrame(retjDF, self.sqlCtx)
# Return DF
return retDF.sort('ID')
else:
raise Exception('Unsupported input type')
开发者ID:MechCoder,项目名称:incubator-systemml,代码行数:31,代码来源:SystemML.py
示例7: scaleVecCol
def scaleVecCol(self, columns, nameOutputCol):
"""
This function groups the columns specified and put them in a list array in one column, then a scale
process is made. The scaling proccedure is spark scaling default (see the example
bellow).
+---------+----------+
|Price |AreaLiving|
+---------+----------+
|1261706.9|16 |
|1263607.9|16 |
|1109960.0|19 |
|978277.0 |19 |
|885000.0 |19 |
+---------+----------+
|
|
|
V
+----------------------------------------+
|['Price', 'AreaLiving'] |
+----------------------------------------+
|[0.1673858972637624,0.5] |
|[0.08966137157852398,0.3611111111111111]|
|[0.11587093205757598,0.3888888888888889]|
|[0.1139820728616421,0.3888888888888889] |
|[0.12260126542983639,0.4722222222222222]|
+----------------------------------------+
only showing top 5 rows
"""
# Check if columns argument must be a string or list datatype:
self.__assertTypeStrOrList(columns, "columns")
# Check if columns to be process are in dataframe
self.__assertColsInDF(columnsProvided=columns, columnsDF=self.__df.columns)
# Check if nameOutputCol argument a string datatype:
self.__assertTypeStr(nameOutputCol, "nameOutpuCol")
# Model to use vectorAssember:
vecAssembler = VectorAssembler(inputCols=columns, outputCol="features_assembler")
# Model for scaling feature column:
mmScaler = MinMaxScaler(inputCol="features_assembler", outputCol=nameOutputCol)
# Dataframe with feature_assembler column
tempDF = vecAssembler.transform(self.__df)
# Fitting scaler model with transformed dataframe
model = mmScaler.fit(tempDF)
exprs = list(filter(lambda x: x not in columns, self.__df.columns))
exprs.extend([nameOutputCol])
self.__df = model.transform(tempDF).select(*exprs)
self.__addTransformation() # checkpoint in case
return self
开发者ID:mood-agency,项目名称:optimus,代码行数:59,代码来源:DfTransf.py
示例8: convert_to_flat_by_sparkpy
def convert_to_flat_by_sparkpy(df):
subkeys = df.select("subkey").dropDuplicates().collect()
subkeys = [s[0] for s in subkeys]
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))
spark_df = spark_df.withColumnRenamed("parameter", "label")
spark_df = spark_df.select("label", "features")
return spark_df
开发者ID:constructor-igor,项目名称:TechSugar,代码行数:8,代码来源:df_flat.py
示例9: sparking_your_interest
def sparking_your_interest():
df = SQLContext.read.json('speeches_dataset.json')
df_fillna=df.fillna("")
print(df_fillna.count())
print(df_fillna.printSchema())
df_utf=call_utf_encoder(df)
df_cleaned=call_para_cleanup(df_utf)
print(df_cleaned)
df_with_bigrams = call_ngrams(df_cleaned, 2)
df_with_trigrams = call_ngrams(df_with_bigrams, 3)
df_with_4grams = call_ngrams(df_with_trigrams, 4)
df_with_5grams = call_ngrams(df_with_4grams, 4)
df_with_6grams = call_ngrams(df_with_5grams, 4)
df_with_vocab_score = call_speech_vocab(df_with_6grams)
df_with_2grams_idf_vectors = tf_feature_vectorizer(df_with_vocab_score,100,'2grams')
df_with_3grams_idf_vectors = tf_feature_vectorizer(df_with_2grams_idf_vectors,100,'3grams')
df_with_4grams_idf_vectors = tf_feature_vectorizer(df_with_3grams_idf_vectors,100,'4grams')
assembler = VectorAssembler(
inputCols=["2gramsfeatures", "2gramsfeatures", "2gramsfeatures", "vocab_score"],
outputCol="features")
assembler_output = assembler.transform(df_with_4grams_idf_vectors)
output = assembler_output.selectExpr('speaker','speech_id','para_cleaned_text','features')
print(output.show())
print(output.count())
output_tordd = output.rdd
train_rdd,test_rdd = output_tordd.randomSplit([0.8, 0.2], 123)
train_df = train_rdd.toDF()
test_df = test_rdd.toDF()
print(train_df)
print(test_df)
print('Train DF - Count: ')
print(train_df.count())
print('Test DF - Count: ')
print(test_df.count())
print("Initializing RF Model")
labelIndexer = StringIndexer(inputCol="speaker", outputCol="indexedLabel").fit(train_df)
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features",numTrees=1000, featureSubsetStrategy="auto", impurity='gini', maxDepth=4, maxBins=32)
pipeline = Pipeline(stages=[labelIndexer,rf])
model = pipeline.fit(output)
print("Completed RF Model")
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[1]
print(rfModel) # summary only
print("Predictions: ")
print(predictions.show())
开发者ID:vikaasa,项目名称:Spark_Workshop,代码行数:54,代码来源:sparking_your_interest.py
示例10: _prepare_data_spark
def _prepare_data_spark(self, data):
""" Prepare data for spark format, output data will have the feature format and other useful information """
keys = list(data.keys().difference({self.CHANGE_AMOUNT, self.CHANGE_DIRECTION, self.TARGET_PRICE,
self.TODAY_PRICE}))
df = self._spark.createDataFrame(data)
ass = VectorAssembler(inputCols=keys, outputCol="features")
output = ass.transform(df)
# output.select('features', 'ChangeDirection', 'ChangeAmount').write.save('test.parquet')
return output
开发者ID:WarnWang,项目名称:Dissertation,代码行数:11,代码来源:spark_train_system.py
示例11: predictPopularity
def predictPopularity(features):
print(features)
features = tuple(features)
feature_label = []
for i in range(0, len(features)):
feature_label.append('feature' +str(i))
data_frame = spark.createDataFrame([features], feature_label)
assembler = VectorAssembler(inputCols= feature_label, outputCol = 'features')
data_frame = assembler.transform(data_frame)
data_frame = data_frame.select('features')
result = rfc_model.transform(data_frame)
return result.select('prediction').head(1)[0][0]
开发者ID:Mohsin2018,项目名称:PredictTheShareOfArticle,代码行数:12,代码来源:Popularity.py
示例12: commit
def commit(self):
self.update_domain_role_hints()
if self.in_df is not None:
attributes = [att for att in self.used_attrs._list]
class_var = [var for var in self.class_attrs._list]
metas = [meta for meta in self.meta_attrs._list]
VA = VectorAssembler(inputCols = attributes, outputCol = 'features')
self.out_df = VA.transform(self.in_df)
if len(class_var):
self.out_df = self.out_df.withColumn('label', self.out_df[class_var[0]].cast('double'))
self.send("DataFrame", self.out_df)
else:
self.send("DataFrame", None)
开发者ID:jamartinh,项目名称:Orange3-Spark,代码行数:14,代码来源:spark_ml_dataset.py
示例13: test_train_data
def test_train_data(overall_segment):
removelist_train= set(['stars', 'business_id', 'bus_id', 'b_id','review_id', 'user_id'])
newlist_train = [v for i, v in enumerate(overall_segment.columns) if v not in removelist_train]
# Putting data in vector assembler form
assembler_train = VectorAssembler(inputCols=newlist_train, outputCol="features")
transformed_train = assembler_train.transform(overall_segment.fillna(0))
# Creating input dataset in the form of labeled point for training the model
data_train= (transformed_train.select("features", "stars")).map(lambda row: LabeledPoint(row.stars, row.features))
(trainingData, testData) = sc.parallelize(data_train.collect(),5).randomSplit([0.7, 0.3])
return (trainingData, testData)
开发者ID:USF-ML2,项目名称:Rectastic-,代码行数:14,代码来源:LR_models.py
示例14: tf_idf_features_quora
def tf_idf_features_quora(p_df):
"""
Extracts TF-IDF features from quora dataset.
:param p_df: A DataFrame.
:return: A DataFrame.
"""
tf_df = extract_tf_features(p_df, "question1_meaningful_words", "tf1")
tf_df = extract_tf_features(tf_df, "question2_meaningful_words", "tf2")
tf_idf_df = extract_idf_features(tf_df, "tf1", "tf-idf1")
tf_idf_df = extract_idf_features(tf_idf_df, "tf2", "tf-idf2")
assembler = VectorAssembler(
inputCols=["tf-idf1", "tf-idf2"],
outputCol="tf_idf_features"
)
return assembler.transform(tf_idf_df)
开发者ID:rhasan,项目名称:machine-learning,代码行数:15,代码来源:Quora.py
示例15: convert_to_flat_by_sparkpy
def convert_to_flat_by_sparkpy(df):
subkeys = df.select("subkey").dropDuplicates().collect()
subkeys = [s[0] for s in subkeys]
n = len(df.select("reference").first()[0])
# df = df.groupBy("key").agg(array(*[avg(col("reference")[i]) for i in range(n)]).alias("averages"))
df = df.groupBy("key").agg(array(*[collect_list(col("reference")[i]) for i in range(n)]).alias("averages"))
df.show()
r = df.collect()
# changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))
assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))
spark_df = spark_df.withColumnRenamed("parameter", "label")
spark_df = spark_df.select("label", "features")
return spark_df
开发者ID:constructor-igor,项目名称:TechSugar,代码行数:16,代码来源:df_convert.py
示例16: predict
def predict(self, X):
"""
Invokes the transform method on Estimator object on JVM if X and y are on of the supported data types
Parameters
----------
X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
"""
try:
if self.estimator is not None and self.model is not None:
self.estimator.copyProperties(self.model)
except AttributeError:
pass
if isinstance(X, SUPPORTED_TYPES):
if self.transferUsingDF:
pdfX = convertToPandasDF(X)
df = assemble(self.sparkSession, pdfX, pdfX.columns, self.features_col).select(self.features_col)
retjDF = self.model.transform(df._jdf)
retDF = DataFrame(retjDF, self.sparkSession)
retPDF = retDF.sort('__INDEX').select('prediction').toPandas()
if isinstance(X, np.ndarray):
return self.decode(retPDF.as_matrix().flatten())
else:
return self.decode(retPDF)
else:
try:
retNumPy = self.decode(convertToNumPyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X))))
except Py4JError:
traceback.print_exc()
if isinstance(X, np.ndarray):
return retNumPy
else:
return retNumPy # TODO: Convert to Pandas
elif hasattr(X, '_jdf'):
if self.features_col in X.columns:
# No need to assemble as input DF is likely coming via MLPipeline
df = X
else:
assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col)
df = assembler.transform(X)
retjDF = self.model.transform(df._jdf)
retDF = DataFrame(retjDF, self.sparkSession)
# Return DF
return retDF.sort('__INDEX')
else:
raise Exception('Unsupported input type')
开发者ID:frreiss,项目名称:fred-systemml,代码行数:46,代码来源:estimators.py
示例17: transform
def transform(self, df, featureCols, targetCol):
"""Keep the K most important features of the Spark DataFrame
Parameters
----------
df : Spark DataFrame
featureCols: array, names of feature columns
to consider in the feature selectio algorithm
targetCol: str, name of target column, i.e, column to which
compare each feature.
Returns
-------
transformed_df : New Spark DataFrame with only the most important
feature columns.
"""
# build features assemble
assembler = VectorAssembler(
inputCols = featureCols,
outputCol = 'features')
assembled_df = assembler.transform(df)
# rename target column
assembled_df = assembled_df.withColumnRenamed(targetCol,'target')
# extract features and target
feats = assembled_df.select('features').rdd
feats = feats.map(lambda x: x['features'])
target = assembled_df.select('target').rdd
target = target.map(lambda x: x['target'])
# compute per-column metric
scores = []
for i,feat in enumerate(featureCols):
vector = feats.map(lambda x: x[i])
scores.append(self.sfunc_(vector,target))
self.scores_ = scores
# sort scores
idx = sorted(range(len(self.scores_)),reverse=True,key=self.scores_.__getitem__)
# return dataframe with k-best columns
return df.select(*[featureCols[idd] for idd in idx[:self.k_]])
开发者ID:dasirra,项目名称:feat-sel-pyspark,代码行数:45,代码来源:univariate.py
示例18: convertToLabeledDF
def convertToLabeledDF(sparkSession, X, y=None):
from pyspark.ml.feature import VectorAssembler
if y is not None:
pd1 = pd.DataFrame(X)
pd2 = pd.DataFrame(y, columns=['label'])
pdf = pd.concat([pd1, pd2], axis=1)
inputColumns = ['C' + str(i) for i in pd1.columns]
outputColumns = inputColumns + ['label']
else:
pdf = pd.DataFrame(X)
inputColumns = ['C' + str(i) for i in pdf.columns]
outputColumns = inputColumns
assembler = VectorAssembler(inputCols=inputColumns, outputCol='features')
out = assembler.transform(sparkSession.createDataFrame(pdf, outputColumns))
if y is not None:
return out.select('features', 'label')
else:
return out.select('features')
开发者ID:frreiss,项目名称:fred-systemml,代码行数:18,代码来源:converters.py
示例19: merge_features
def merge_features(ddfs, join_column, merge_column, output_column='features', drop_merged_columns=True):
"""
join (inner) several DataFrames by same id and merge its columns (merge_column) into one column using using pyspark.ml.feature.VectorAssembler
Example:
ddf_merge = merge_features(ddfs=[ddf_pivot1,ddf_pivot2], join_column='customer_id', merge_column='features')
:param ddfs:
:param join_column: id column to join by (each ddf must have this column)
:param merge_column: column to merge (each ddf must have this column)
:param output_column:
:param drop_merged_columns:
:return:
"""
from pyspark.ml.feature import VectorAssembler
ddf_res = ddfs.pop(0)
merge_column_renamed = merge_column + str(0)
merge_columns = [merge_column_renamed]
ddf_res = ddf_res.withColumnRenamed(merge_column, merge_column_renamed)
for i,ddf in enumerate(ddfs):
merge_column_renamed = merge_column + str(i+1)
merge_columns.append(merge_column_renamed)
ddf_r = ddf.withColumnRenamed(merge_column, merge_column_renamed)
ddf_res = ddf_res.join(ddf_r, on=join_column, how='inner')
assembler = VectorAssembler(inputCols=merge_columns, outputCol=output_column)
res = assembler.transform(ddf_res)
if drop_merged_columns:
res = drop_columns(res, columns=merge_columns)
return res
# def pivot_aggregate(ddf, grpby_columns, pivot_column, aggs, pivot_filter_values=None, pivot_filter_support=None):
# if pivot_filter_support and not pivot_filter_values:
# frequent = ddf.freqItems([pivot_column], support=pivot_filter_support).first().asDict()[pivot_column+'_freqItems']
# pivot_filter_values = map(str,frequent)
#
# ddf_gr = ddf.groupBy(*grpby_columns)
# ddf_pivot = ddf_gr.pivot(pivot_column, pivot_filter_values)
# ddf_agg = ddf_pivot.agg(*aggs)
# return ddf_agg
开发者ID:sashaostr,项目名称:datasu,代码行数:44,代码来源:spark.py
示例20: preprocess
def preprocess(data):
data = data.select('Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime','ArrTime','CRSArrTime','UniqueCarrier'\
,'FlightNum','TailNum','ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay','DepDelay', 'Origin'\
,'Dest','Distance','TaxiIn','TaxiOut','Cancelled')
data = data.na.fill('999999')
for t in data.dtypes:
if t[1]=='string' and t[0] not in ['Origin','Dest','TailNum','UniqueCarrier','FlightNum']:
data=data.withColumn(t[0],x[t[0]].cast('integer'))
data = data.na.fill(999999)
data = data.withColumnRenamed('Cancelled','label')
data = data.withColumn('label',data.label.cast('double'))
assembler = VectorAssembler(
inputCols=['Year','Month','DayofMonth','DayOfWeek'
,'DepTime','CRSDepTime','ArrTime','CRSArrTime',
'ActualElapsedTime','CRSElapsedTime','AirTime',
'ArrDelay','DepDelay','Distance','TaxiIn','TaxiOut'],
outputCol='features')
data = assembler.transform(data)
data = data.select('features','label')
return data
开发者ID:robert501128,项目名称:BigDataHomework,代码行数:20,代码来源:train.py
注:本文中的pyspark.ml.feature.VectorAssembler类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论