• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python feature.VectorAssembler类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyspark.ml.feature.VectorAssembler的典型用法代码示例。如果您正苦于以下问题:Python VectorAssembler类的具体用法?Python VectorAssembler怎么用?Python VectorAssembler使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了VectorAssembler类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: seg_model_lr

def seg_model_lr(train_data, test_data, regType, num_iter):
    removelist_train= set(['stars', 'business_id', 'bus_id', 'b_id','review_id', 'user_id'])
    newlist_train = [v for i, v in enumerate(train_data.columns) if v not in removelist_train]

    # Putting data in vector assembler form
    assembler_train = VectorAssembler(inputCols=newlist_train, outputCol="features")

    transformed_train = assembler_train.transform(train_data.fillna(0))

    # Creating input dataset in the form of labeled point for training the model
    data_train= (transformed_train.select("features", "stars")).map(lambda row: LabeledPoint(row.stars, row.features))

    # Training the model using Logistic regression Classifier
    model_train = LogisticRegressionWithLBFGS.train(sc.parallelize(data_train.collect(),5),
                                                    regType =regType, iterations=num_iter, numClasses=5)

    # Creating a list of features to be used for predictions
    removelist_final = set(['business_id', 'bus_id', 'b_id','review_id', 'user_id'])
    newlist_final = [v for i, v in enumerate(test_data.columns) if v not in removelist_final]

    # Putting data in vector assembler form
    assembler_final = VectorAssembler(inputCols=newlist_final,outputCol="features")

    transformed_final= assembler_final.transform(test_data.fillna(0))

    # Creating input dataset to be used for predictions
    data_final = transformed_final.select("features", "review_id")

    # Predicting ratings using the developed model
    predictions = model_train.predict(data_final.map(lambda x: x.features))
    labelsAndPredictions = data_final.map(lambda data_final: data_final.review_id).zip(predictions)
    return labelsAndPredictions
开发者ID:USF-ML2,项目名称:Rectastic-,代码行数:32,代码来源:LR_models.py


示例2: writeLumbarReadings

def writeLumbarReadings(time, rdd):
	try:
		# Convert RDDs of the words DStream to DataFrame and run SQL query
		connectionProperties = MySQLConnection.getDBConnectionProps('/home/erik/mysql_credentials.txt')
		sqlContext = SQLContext(rdd.context)
		if rdd.isEmpty() == False:
			lumbarReadings = sqlContext.jsonRDD(rdd)
			lumbarReadingsIntermediate = lumbarReadings.selectExpr("readingID","readingTime","deviceID","metricTypeID","uomID","actual.y AS actualYaw","actual.p AS actualPitch","actual.r AS actualRoll","setPoints.y AS setPointYaw","setPoints.p AS setPointPitch","setPoints.r AS setPointRoll")
			assembler = VectorAssembler(
						inputCols=["actualPitch"], # Must be in same order as what was used to train the model.  Testing using only pitch since model has limited dataset.
						outputCol="features")
			lumbarReadingsIntermediate = assembler.transform(lumbarReadingsIntermediate)

			
			predictions = loadedModel.predict(lumbarReadingsIntermediate.map(lambda x: x.features))
			predictionsDF = lumbarReadingsIntermediate.map(lambda x: x.readingID).zip(predictions).toDF(["readingID","positionID"])
			combinedDF = lumbarReadingsIntermediate.join(predictionsDF, lumbarReadingsIntermediate.readingID == predictionsDF.readingID).drop(predictionsDF.readingID)
			
			combinedDF = combinedDF.drop("features")
			
			combinedDF.show()


			combinedDF.write.jdbc("jdbc:mysql://localhost/biosensor", "SensorReadings", properties=connectionProperties)
	except:
		pass
开发者ID:kringen,项目名称:IOT-Back-Brace,代码行数:26,代码来源:ProcessSensorReadings.py


示例3: seg_model_gb

def seg_model_gb(train_data, test_data, loss_type, num_iter, maxDepth):
    removelist_train= set(['stars', 'business_id', 'bus_id', 'b_id','review_id', 'user_id'])
    newlist_train = [v for i, v in enumerate(train_data.columns) if v not in removelist_train]

    # Putting data in vector assembler form
    assembler_train = VectorAssembler(inputCols=newlist_train, outputCol="features")

    transformed_train = assembler_train.transform(train_data.fillna(0))

    # Creating input dataset in the form of labeled point for training the model
    data_train= (transformed_train.select("features", "stars")).map(lambda row: LabeledPoint(row.stars, row.features))

    # Training the model using Gradient Boosted Trees regressor
    model_train = GradientBoostedTrees.trainRegressor(sc.parallelize(data_train.collect(),5), categoricalFeaturesInfo={},
                                                      loss=loss_type,
                                                      numIterations=num_iter, maxDepth=maxDepth)

    # Creating a list of features to be used for predictions
    removelist_final = set(['business_id', 'bus_id', 'b_id','review_id', 'user_id'])
    newlist_final = [v for i, v in enumerate(test_data.columns) if v not in removelist_final]

    # Putting data in vector assembler form
    assembler_final = VectorAssembler(inputCols=newlist_final,outputCol="features")

    transformed_final= assembler_final.transform(test_data.fillna(0))

    # Creating input dataset to be used for predictions
    data_final = transformed_final.select("features", "review_id")

    # Predicting ratings using the developed model
    predictions = model_train.predict(data_final.map(lambda x: x.features))
    labelsAndPredictions = data_final.map(lambda data_final: data_final.review_id).zip(predictions)
    return labelsAndPredictions
开发者ID:USF-ML2,项目名称:Rectastic-,代码行数:33,代码来源:GB_models.py


示例4: _convertPythonXToJavaObject

    def _convertPythonXToJavaObject(self, X):
        """
        Converts the input python object X to a java-side object (either MatrixBlock or Java DataFrame)

        Parameters
        ----------
        X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
        """
        if isinstance(X, SUPPORTED_TYPES) and self.transferUsingDF:
            pdfX = convertToPandasDF(X)
            df = assemble(
                self.sparkSession,
                pdfX,
                pdfX.columns,
                self.features_col).select(
                self.features_col)
            return df._jdf
        elif isinstance(X, SUPPORTED_TYPES):
            return convertToMatrixBlock(self.sc, X)
        elif hasattr(X, '_jdf') and self.features_col in X.columns:
            # No need to assemble as input DF is likely coming via MLPipeline
            return X._jdf
        elif hasattr(X, '_jdf'):
            assembler = VectorAssembler(
                inputCols=X.columns, outputCol=self.features_col)
            df = assembler.transform(X)
            return df._jdf
        else:
            raise Exception('Unsupported input type')
开发者ID:niketanpansare,项目名称:incubator-systemml,代码行数:29,代码来源:estimators.py


示例5: text_features

def text_features(p_df):
    """
    Extracts features derived from the quora question texts.
    :param p_df: A DataFrame.
    :return: A DataFrame.  
    """
    diff_len = udf(lambda arr: arr[0] - arr[1], IntegerType())
    common_words = udf(lambda arr: len(set(arr[0]).intersection(set(arr[1]))), IntegerType())
    unique_chars = udf(lambda s: len(''.join(set(s.replace(' ', '')))), IntegerType())


    p_df = p_df.withColumn("len_q1", length("question1")).withColumn("len_q2", length("question2"))
    p_df = p_df.withColumn("diff_len", diff_len(array("len_q1", "len_q2")))
    p_df = p_df.withColumn("words_q1", size("question1_words")).withColumn("words_q2", size("question2_words"))
    p_df = p_df.withColumn("common_words", common_words(array("question1_words", "question2_words")))
    p_df = p_df.withColumn(
        "unique_chars_q1", unique_chars("question1")
    ).withColumn("unique_chars_q2", unique_chars("question2"))

    assembler = VectorAssembler(
        inputCols=["len_q1", "len_q2", "diff_len", "words_q1", "words_q2", "common_words", "unique_chars_q1", "unique_chars_q2"],
        outputCol="text_features"
    )
    p_df = assembler.transform(p_df)    
    return p_df
开发者ID:rhasan,项目名称:machine-learning,代码行数:25,代码来源:Quora.py


示例6: predict

 def predict(self, X):
     if isinstance(X, SUPPORTED_TYPES):
         if self.transferUsingDF:
             pdfX = convertToPandasDF(X)
             df = assemble(self.sqlCtx, pdfX, pdfX.columns, 'features').select('features')
             retjDF = self.model.transform(df._jdf)
             retDF = DataFrame(retjDF, self.sqlCtx)
             retPDF = retDF.sort('ID').select('prediction').toPandas()
             if isinstance(X, np.ndarray):
                 return retPDF.as_matrix().flatten()
             else:
                 return retPDF
         else:
             retNumPy = convertToNumpyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X)))
             if isinstance(X, np.ndarray):
                 return retNumPy
             else:
                 return retNumPy # TODO: Convert to Pandas
     elif hasattr(X, '_jdf'):
         if 'features' in X.columns:
             # No need to assemble as input DF is likely coming via MLPipeline
             df = X
         else:
             assembler = VectorAssembler(inputCols=X.columns, outputCol='features')
             df = assembler.transform(X)
         retjDF = self.model.transform(df._jdf)
         retDF = DataFrame(retjDF, self.sqlCtx)
         # Return DF
         return retDF.sort('ID')
     else:
         raise Exception('Unsupported input type')
开发者ID:MechCoder,项目名称:incubator-systemml,代码行数:31,代码来源:SystemML.py


示例7: scaleVecCol

    def scaleVecCol(self, columns, nameOutputCol):
        """
        This function groups the columns specified and put them in a list array in one column, then a scale
        process is made. The scaling proccedure is spark scaling default (see the example
        bellow).

        +---------+----------+
        |Price    |AreaLiving|
        +---------+----------+
        |1261706.9|16        |
        |1263607.9|16        |
        |1109960.0|19        |
        |978277.0 |19        |
        |885000.0 |19        |
        +---------+----------+

                    |
                    |
                    |
                    V
        +----------------------------------------+
        |['Price', 'AreaLiving']                 |
        +----------------------------------------+
        |[0.1673858972637624,0.5]                |
        |[0.08966137157852398,0.3611111111111111]|
        |[0.11587093205757598,0.3888888888888889]|
        |[0.1139820728616421,0.3888888888888889] |
        |[0.12260126542983639,0.4722222222222222]|
        +----------------------------------------+
        only showing top 5 rows

        """

        # Check if columns argument must be a string or list datatype:
        self.__assertTypeStrOrList(columns, "columns")

        # Check if columns to be process are in dataframe
        self.__assertColsInDF(columnsProvided=columns, columnsDF=self.__df.columns)

        # Check if nameOutputCol argument a string datatype:
        self.__assertTypeStr(nameOutputCol, "nameOutpuCol")

        # Model to use vectorAssember:
        vecAssembler = VectorAssembler(inputCols=columns, outputCol="features_assembler")
        # Model for scaling feature column:
        mmScaler = MinMaxScaler(inputCol="features_assembler", outputCol=nameOutputCol)
        # Dataframe with feature_assembler column
        tempDF = vecAssembler.transform(self.__df)
        # Fitting scaler model with transformed dataframe
        model = mmScaler.fit(tempDF)

        exprs = list(filter(lambda x: x not in columns, self.__df.columns))

        exprs.extend([nameOutputCol])

        self.__df = model.transform(tempDF).select(*exprs)
        self.__addTransformation()  # checkpoint in case

        return self
开发者ID:mood-agency,项目名称:optimus,代码行数:59,代码来源:DfTransf.py


示例8: convert_to_flat_by_sparkpy

def convert_to_flat_by_sparkpy(df):
    subkeys = df.select("subkey").dropDuplicates().collect()
    subkeys = [s[0] for s in subkeys]
    assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
    spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))
    spark_df = spark_df.withColumnRenamed("parameter", "label")
    spark_df = spark_df.select("label", "features")
    return spark_df
开发者ID:constructor-igor,项目名称:TechSugar,代码行数:8,代码来源:df_flat.py


示例9: sparking_your_interest

def sparking_your_interest():
	df = SQLContext.read.json('speeches_dataset.json')
	df_fillna=df.fillna("")
	print(df_fillna.count())
	print(df_fillna.printSchema())

	df_utf=call_utf_encoder(df)
	df_cleaned=call_para_cleanup(df_utf)
	print(df_cleaned)
	df_with_bigrams = call_ngrams(df_cleaned, 2)
	df_with_trigrams = call_ngrams(df_with_bigrams, 3)
	df_with_4grams = call_ngrams(df_with_trigrams, 4)
	df_with_5grams = call_ngrams(df_with_4grams, 4)
	df_with_6grams = call_ngrams(df_with_5grams, 4)
	df_with_vocab_score = call_speech_vocab(df_with_6grams)

	df_with_2grams_idf_vectors = tf_feature_vectorizer(df_with_vocab_score,100,'2grams')
	df_with_3grams_idf_vectors = tf_feature_vectorizer(df_with_2grams_idf_vectors,100,'3grams')
	df_with_4grams_idf_vectors = tf_feature_vectorizer(df_with_3grams_idf_vectors,100,'4grams')
	assembler = VectorAssembler(
	    inputCols=["2gramsfeatures", "2gramsfeatures", "2gramsfeatures", "vocab_score"],
	    outputCol="features")
	assembler_output = assembler.transform(df_with_4grams_idf_vectors)
	output = assembler_output.selectExpr('speaker','speech_id','para_cleaned_text','features')
	print(output.show())
	print(output.count())

	output_tordd = output.rdd
	train_rdd,test_rdd = output_tordd.randomSplit([0.8, 0.2], 123)
	train_df = train_rdd.toDF()
	test_df = test_rdd.toDF()
	print(train_df)
	print(test_df)

	print('Train DF - Count: ')
	print(train_df.count())
	print('Test DF - Count: ')
	print(test_df.count())

	print("Initializing RF Model")
	labelIndexer = StringIndexer(inputCol="speaker", outputCol="indexedLabel").fit(train_df)       
	rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features",numTrees=1000, featureSubsetStrategy="auto", impurity='gini', maxDepth=4, maxBins=32)
	pipeline = Pipeline(stages=[labelIndexer,rf])
	model = pipeline.fit(output)
	print("Completed RF Model")

	predictions = model.transform(test_df)
	evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
	accuracy = evaluator.evaluate(predictions)
	print("Test Error = %g" % (1.0 - accuracy))
	rfModel = model.stages[1]
	print(rfModel)  # summary only
	print("Predictions: ")
	print(predictions.show())
开发者ID:vikaasa,项目名称:Spark_Workshop,代码行数:54,代码来源:sparking_your_interest.py


示例10: _prepare_data_spark

    def _prepare_data_spark(self, data):
        """ Prepare data for spark format, output data will have the feature format and other useful information """

        keys = list(data.keys().difference({self.CHANGE_AMOUNT, self.CHANGE_DIRECTION, self.TARGET_PRICE,
                                            self.TODAY_PRICE}))

        df = self._spark.createDataFrame(data)
        ass = VectorAssembler(inputCols=keys, outputCol="features")
        output = ass.transform(df)
        # output.select('features', 'ChangeDirection', 'ChangeAmount').write.save('test.parquet')
        return output
开发者ID:WarnWang,项目名称:Dissertation,代码行数:11,代码来源:spark_train_system.py


示例11: predictPopularity

def predictPopularity(features):
    print(features)
    features = tuple(features)
    feature_label = []    
    for i in range(0, len(features)):
        feature_label.append('feature' +str(i))
    data_frame = spark.createDataFrame([features], feature_label)
    assembler = VectorAssembler(inputCols= feature_label, outputCol = 'features')
    data_frame = assembler.transform(data_frame)
    data_frame = data_frame.select('features')
    result = rfc_model.transform(data_frame)
    return result.select('prediction').head(1)[0][0]
开发者ID:Mohsin2018,项目名称:PredictTheShareOfArticle,代码行数:12,代码来源:Popularity.py


示例12: commit

    def commit(self):
        self.update_domain_role_hints()
        if self.in_df is not None:
            attributes = [att for att in self.used_attrs._list]
            class_var = [var for var in self.class_attrs._list]
            metas = [meta for meta in self.meta_attrs._list]
            VA = VectorAssembler(inputCols = attributes, outputCol = 'features')
            self.out_df = VA.transform(self.in_df)
            if len(class_var):
                self.out_df = self.out_df.withColumn('label', self.out_df[class_var[0]].cast('double'))

            self.send("DataFrame", self.out_df)
        else:
            self.send("DataFrame", None)
开发者ID:jamartinh,项目名称:Orange3-Spark,代码行数:14,代码来源:spark_ml_dataset.py


示例13: test_train_data

def test_train_data(overall_segment):
    removelist_train= set(['stars', 'business_id', 'bus_id', 'b_id','review_id', 'user_id'])
    newlist_train = [v for i, v in enumerate(overall_segment.columns) if v not in removelist_train]

    # Putting data in vector assembler form
    assembler_train = VectorAssembler(inputCols=newlist_train, outputCol="features")

    transformed_train = assembler_train.transform(overall_segment.fillna(0))

    # Creating input dataset in the form of labeled point for training the model
    data_train= (transformed_train.select("features", "stars")).map(lambda row: LabeledPoint(row.stars, row.features))

    (trainingData, testData) = sc.parallelize(data_train.collect(),5).randomSplit([0.7, 0.3])
    return (trainingData, testData)
开发者ID:USF-ML2,项目名称:Rectastic-,代码行数:14,代码来源:LR_models.py


示例14: tf_idf_features_quora

def tf_idf_features_quora(p_df):
    """
    Extracts TF-IDF features from quora dataset.
    :param p_df: A DataFrame.
    :return: A DataFrame.    
    """     
    tf_df = extract_tf_features(p_df, "question1_meaningful_words", "tf1")
    tf_df = extract_tf_features(tf_df, "question2_meaningful_words", "tf2")
    tf_idf_df = extract_idf_features(tf_df, "tf1", "tf-idf1")
    tf_idf_df = extract_idf_features(tf_idf_df, "tf2", "tf-idf2")
    assembler = VectorAssembler(
        inputCols=["tf-idf1", "tf-idf2"],
        outputCol="tf_idf_features"
    )
    return assembler.transform(tf_idf_df)
开发者ID:rhasan,项目名称:machine-learning,代码行数:15,代码来源:Quora.py


示例15: convert_to_flat_by_sparkpy

def convert_to_flat_by_sparkpy(df):
    subkeys = df.select("subkey").dropDuplicates().collect()
    subkeys = [s[0] for s in subkeys]

    n = len(df.select("reference").first()[0])
    # df = df.groupBy("key").agg(array(*[avg(col("reference")[i]) for i in range(n)]).alias("averages"))
    df = df.groupBy("key").agg(array(*[collect_list(col("reference")[i]) for i in range(n)]).alias("averages"))
    df.show()
    r = df.collect()

    # changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))
    assembler = VectorAssembler().setInputCols(subkeys).setOutputCol("features")
    spark_df = assembler.transform(df.groupBy("key", "parameter").pivot("subkey").agg(first(col("reference"))))
    spark_df = spark_df.withColumnRenamed("parameter", "label")
    spark_df = spark_df.select("label", "features")
    return spark_df
开发者ID:constructor-igor,项目名称:TechSugar,代码行数:16,代码来源:df_convert.py


示例16: predict

    def predict(self, X):
        """
        Invokes the transform method on Estimator object on JVM if X and y are on of the supported data types

        Parameters
        ----------
        X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
        """
        try:
            if self.estimator is not None and self.model is not None:
                self.estimator.copyProperties(self.model)
        except AttributeError:
            pass
        if isinstance(X, SUPPORTED_TYPES):
            if self.transferUsingDF:
                pdfX = convertToPandasDF(X)
                df = assemble(self.sparkSession, pdfX, pdfX.columns, self.features_col).select(self.features_col)
                retjDF = self.model.transform(df._jdf)
                retDF = DataFrame(retjDF, self.sparkSession)
                retPDF = retDF.sort('__INDEX').select('prediction').toPandas()
                if isinstance(X, np.ndarray):
                    return self.decode(retPDF.as_matrix().flatten())
                else:
                    return self.decode(retPDF)
            else:
                try:
                    retNumPy = self.decode(convertToNumPyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X))))
                except Py4JError:
                    traceback.print_exc()
                if isinstance(X, np.ndarray):
                    return retNumPy
                else:
                    return retNumPy # TODO: Convert to Pandas
        elif hasattr(X, '_jdf'):
            if self.features_col in X.columns:
                # No need to assemble as input DF is likely coming via MLPipeline
                df = X
            else:
                assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col)
                df = assembler.transform(X)
            retjDF = self.model.transform(df._jdf)
            retDF = DataFrame(retjDF, self.sparkSession)
            # Return DF
            return retDF.sort('__INDEX')
        else:
            raise Exception('Unsupported input type')
开发者ID:frreiss,项目名称:fred-systemml,代码行数:46,代码来源:estimators.py


示例17: transform

    def transform(self, df, featureCols, targetCol):
        """Keep the K most important features of the Spark DataFrame

        Parameters
        ----------
        df : Spark DataFrame
        featureCols: array, names of feature columns
            to consider in the feature selectio algorithm
        targetCol: str, name of target column, i.e, column to which
            compare each feature.

        Returns
        -------
        transformed_df : New Spark DataFrame with only the most important
            feature columns.

        """

        # build features assemble
        assembler = VectorAssembler(
            inputCols = featureCols,
            outputCol = 'features')
        assembled_df = assembler.transform(df)

        # rename target column
        assembled_df = assembled_df.withColumnRenamed(targetCol,'target')

        # extract features and target
        feats = assembled_df.select('features').rdd
        feats = feats.map(lambda x: x['features'])
        target = assembled_df.select('target').rdd
        target = target.map(lambda x: x['target'])

        # compute per-column metric
        scores = []
        for i,feat in enumerate(featureCols):
            vector = feats.map(lambda x: x[i])
            scores.append(self.sfunc_(vector,target))
        self.scores_ = scores
        
        # sort scores
        idx = sorted(range(len(self.scores_)),reverse=True,key=self.scores_.__getitem__)
        
        # return dataframe with k-best columns 
        return df.select(*[featureCols[idd] for idd in idx[:self.k_]])
开发者ID:dasirra,项目名称:feat-sel-pyspark,代码行数:45,代码来源:univariate.py


示例18: convertToLabeledDF

def convertToLabeledDF(sparkSession, X, y=None):
    from pyspark.ml.feature import VectorAssembler
    if y is not None:
        pd1 = pd.DataFrame(X)
        pd2 = pd.DataFrame(y, columns=['label'])
        pdf = pd.concat([pd1, pd2], axis=1)
        inputColumns = ['C' + str(i) for i in pd1.columns]
        outputColumns = inputColumns + ['label']
    else:
        pdf = pd.DataFrame(X)
        inputColumns = ['C' + str(i) for i in pdf.columns]
        outputColumns = inputColumns
    assembler = VectorAssembler(inputCols=inputColumns, outputCol='features')
    out = assembler.transform(sparkSession.createDataFrame(pdf, outputColumns))
    if y is not None:
        return out.select('features', 'label')
    else:
        return out.select('features')
开发者ID:frreiss,项目名称:fred-systemml,代码行数:18,代码来源:converters.py


示例19: merge_features

def merge_features(ddfs, join_column, merge_column, output_column='features', drop_merged_columns=True):
    """
    join (inner) several DataFrames by same id and merge its columns (merge_column) into one column using using pyspark.ml.feature.VectorAssembler

    Example:
        ddf_merge = merge_features(ddfs=[ddf_pivot1,ddf_pivot2], join_column='customer_id', merge_column='features')
    :param ddfs:
    :param join_column: id column to join by (each ddf must have this column)
    :param merge_column: column to merge (each ddf must have this column)
    :param output_column:
    :param drop_merged_columns:
    :return:
    """
    from pyspark.ml.feature import VectorAssembler

    ddf_res = ddfs.pop(0)
    merge_column_renamed = merge_column + str(0)
    merge_columns = [merge_column_renamed]
    ddf_res = ddf_res.withColumnRenamed(merge_column, merge_column_renamed)

    for i,ddf in enumerate(ddfs):
        merge_column_renamed = merge_column + str(i+1)
        merge_columns.append(merge_column_renamed)
        ddf_r = ddf.withColumnRenamed(merge_column, merge_column_renamed)
        ddf_res = ddf_res.join(ddf_r, on=join_column, how='inner')

    assembler = VectorAssembler(inputCols=merge_columns, outputCol=output_column)
    res = assembler.transform(ddf_res)

    if drop_merged_columns:
        res = drop_columns(res, columns=merge_columns)

    return res


# def pivot_aggregate(ddf, grpby_columns, pivot_column, aggs, pivot_filter_values=None, pivot_filter_support=None):
#     if pivot_filter_support and not pivot_filter_values:
#         frequent = ddf.freqItems([pivot_column], support=pivot_filter_support).first().asDict()[pivot_column+'_freqItems']
#         pivot_filter_values = map(str,frequent)
#
#     ddf_gr = ddf.groupBy(*grpby_columns)
#     ddf_pivot = ddf_gr.pivot(pivot_column, pivot_filter_values)
#     ddf_agg = ddf_pivot.agg(*aggs)
#     return ddf_agg
开发者ID:sashaostr,项目名称:datasu,代码行数:44,代码来源:spark.py


示例20: preprocess

def preprocess(data):
  data = data.select('Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime','ArrTime','CRSArrTime','UniqueCarrier'\
                            ,'FlightNum','TailNum','ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay','DepDelay', 'Origin'\
                            ,'Dest','Distance','TaxiIn','TaxiOut','Cancelled')
  data = data.na.fill('999999')
  for t in data.dtypes:
   if t[1]=='string' and t[0] not in ['Origin','Dest','TailNum','UniqueCarrier','FlightNum']:
     data=data.withColumn(t[0],x[t[0]].cast('integer'))
  data = data.na.fill(999999)
  data = data.withColumnRenamed('Cancelled','label')
  data = data.withColumn('label',data.label.cast('double'))
  assembler = VectorAssembler(
	    inputCols=['Year','Month','DayofMonth','DayOfWeek'
		,'DepTime','CRSDepTime','ArrTime','CRSArrTime',
		'ActualElapsedTime','CRSElapsedTime','AirTime',
		'ArrDelay','DepDelay','Distance','TaxiIn','TaxiOut'],
	    outputCol='features')
  data = assembler.transform(data)
  data = data.select('features','label')
  return data
开发者ID:robert501128,项目名称:BigDataHomework,代码行数:20,代码来源:train.py



注:本文中的pyspark.ml.feature.VectorAssembler类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python linalg.Vectors类代码示例发布时间:2022-05-26
下一篇:
Python feature.Tokenizer类代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap