本文整理汇总了Python中pyspark.sql.functions.pandas_udf函数的典型用法代码示例。如果您正苦于以下问题:Python pandas_udf函数的具体用法?Python pandas_udf怎么用?Python pandas_udf使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了pandas_udf函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_vectorized_udf_timestamps_respect_session_timezone
def test_vectorized_udf_timestamps_respect_session_timezone(self):
schema = StructType([
StructField("idx", LongType(), True),
StructField("timestamp", TimestampType(), True)])
data = [(1, datetime(1969, 1, 1, 1, 1, 1)),
(2, datetime(2012, 2, 2, 2, 2, 2)),
(3, None),
(4, datetime(2100, 3, 3, 3, 3, 3))]
df = self.spark.createDataFrame(data, schema=schema)
f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
internal_value = pandas_udf(
lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else None), LongType())
timezone = "America/New_York"
with self.sql_conf({
"spark.sql.execution.pandas.respectSessionTimeZone": False,
"spark.sql.session.timeZone": timezone}):
df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \
.withColumn("internal_value", internal_value(col("timestamp")))
result_la = df_la.select(col("idx"), col("internal_value")).collect()
# Correct result_la by adjusting 3 hours difference between Los Angeles and New York
diff = 3 * 60 * 60 * 1000 * 1000 * 1000
result_la_corrected = \
df_la.select(col("idx"), col("tscopy"), col("internal_value") + diff).collect()
with self.sql_conf({
"spark.sql.execution.pandas.respectSessionTimeZone": True,
"spark.sql.session.timeZone": timezone}):
df_ny = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \
.withColumn("internal_value", internal_value(col("timestamp")))
result_ny = df_ny.select(col("idx"), col("tscopy"), col("internal_value")).collect()
self.assertNotEqual(result_ny, result_la)
self.assertEqual(result_ny, result_la_corrected)
开发者ID:q977734161,项目名称:spark,代码行数:35,代码来源:test_pandas_udf_scalar.py
示例2: test_vectorized_udf_wrong_return_type
def test_vectorized_udf_wrong_return_type(self):
from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*MapType'):
pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
开发者ID:JingchengDu,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例3: test_udf_wrong_arg
def test_udf_wrong_arg(self):
with QuietTest(self.sc):
with self.assertRaises(ParseException):
@pandas_udf('blah')
def foo(x):
return x
with self.assertRaisesRegexp(ValueError, 'Invalid returnType.*None'):
@pandas_udf(functionType=PandasUDFType.SCALAR)
def foo(x):
return x
with self.assertRaisesRegexp(ValueError, 'Invalid functionType'):
@pandas_udf('double', 100)
def foo(x):
return x
with self.assertRaisesRegexp(ValueError, '0-arg pandas_udfs.*not.*supported'):
pandas_udf(lambda: 1, LongType(), PandasUDFType.SCALAR)
with self.assertRaisesRegexp(ValueError, '0-arg pandas_udfs.*not.*supported'):
@pandas_udf(LongType(), PandasUDFType.SCALAR)
def zero_with_type():
return 1
with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
@pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
@pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
with self.assertRaisesRegexp(ValueError, 'Invalid function'):
@pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUPED_MAP)
def foo(k, v, w):
return k
开发者ID:Lewuathe,项目名称:spark,代码行数:34,代码来源:test_pandas_udf.py
示例4: test_vectorized_udf_struct_type
def test_vectorized_udf_struct_type(self):
df = self.spark.range(10)
return_type = StructType([
StructField('id', LongType()),
StructField('str', StringType())])
def func(id):
return pd.DataFrame({'id': id, 'str': id.apply(unicode)})
f = pandas_udf(func, returnType=return_type)
expected = df.select(struct(col('id'), col('id').cast('string').alias('str'))
.alias('struct')).collect()
actual = df.select(f(col('id')).alias('struct')).collect()
self.assertEqual(expected, actual)
g = pandas_udf(func, 'id: long, str: string')
actual = df.select(g(col('id')).alias('struct')).collect()
self.assertEqual(expected, actual)
struct_f = pandas_udf(lambda x: x, return_type)
actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str'))))
if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
with QuietTest(self.sc):
from py4j.protocol import Py4JJavaError
with self.assertRaisesRegexp(
Py4JJavaError,
'Unsupported type in conversion from Arrow'):
self.assertEqual(expected, actual.collect())
else:
self.assertEqual(expected, actual.collect())
开发者ID:q977734161,项目名称:spark,代码行数:32,代码来源:test_pandas_udf_scalar.py
示例5: test_vectorized_udf_unsupported_types
def test_vectorized_udf_unsupported_types(self):
from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*MapType'):
pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
开发者ID:JingchengDu,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例6: test_vectorized_udf_chained
def test_vectorized_udf_chained(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
f = pandas_udf(lambda x: x + 1, LongType())
g = pandas_udf(lambda x: x - 1, LongType())
res = df.select(g(f(col('id'))))
self.assertEquals(df.collect(), res.collect())
开发者ID:JingchengDu,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例7: test_wrong_return_type
def test_wrong_return_type(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*grouped map Pandas UDF.*MapType'):
pandas_udf(
lambda pdf: pdf,
'id long, v map<int, int>',
PandasUDFType.GROUPED_MAP)
开发者ID:CodingCat,项目名称:spark,代码行数:9,代码来源:test_pandas_udf_grouped_map.py
示例8: test_vectorized_udf_unsupported_types
def test_vectorized_udf_unsupported_types(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*MapType'):
pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*ArrayType.StructType'):
pandas_udf(lambda x: x, ArrayType(StructType([StructField('a', IntegerType())])))
开发者ID:q977734161,项目名称:spark,代码行数:10,代码来源:test_pandas_udf_scalar.py
示例9: test_mixed_scalar_udfs_followed_by_grouby_apply
def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
df = self.spark.range(0, 10).toDF('v1')
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1']))
result = df.groupby() \
.apply(pandas_udf(lambda x: pd.DataFrame([x.sum().sum()]),
'sum int',
PandasUDFType.GROUPED_MAP))
self.assertEquals(result.collect()[0]['sum'], 165)
开发者ID:q977734161,项目名称:spark,代码行数:11,代码来源:test_pandas_udf_grouped_map.py
示例10: test_stopiteration_in_udf
def test_stopiteration_in_udf(self):
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from py4j.protocol import Py4JJavaError
def foo(x):
raise StopIteration()
def foofoo(x, y):
raise StopIteration()
exc_message = "Caught StopIteration thrown from user's code; failing the task"
df = self.spark.range(0, 100)
# plain udf (test for SPARK-23754)
self.assertRaisesRegexp(
Py4JJavaError,
exc_message,
df.withColumn('v', udf(foo)('id')).collect
)
# pandas scalar udf
self.assertRaisesRegexp(
Py4JJavaError,
exc_message,
df.withColumn(
'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
).collect
)
# pandas grouped map
self.assertRaisesRegexp(
Py4JJavaError,
exc_message,
df.groupBy('id').apply(
pandas_udf(foo, df.schema, PandasUDFType.GROUPED_MAP)
).collect
)
self.assertRaisesRegexp(
Py4JJavaError,
exc_message,
df.groupBy('id').apply(
pandas_udf(foofoo, df.schema, PandasUDFType.GROUPED_MAP)
).collect
)
# pandas grouped agg
self.assertRaisesRegexp(
Py4JJavaError,
exc_message,
df.groupBy('id').agg(
pandas_udf(foo, 'double', PandasUDFType.GROUPED_AGG)('id')
).collect
)
开发者ID:JingchengDu,项目名称:spark,代码行数:54,代码来源:test_pandas_udf.py
示例11: test_vectorized_udf_complex
def test_vectorized_udf_complex(self):
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'),
col('id').cast('double').alias('c'))
add = pandas_udf(lambda x, y: x + y, IntegerType())
power2 = pandas_udf(lambda x: 2 ** x, IntegerType())
mul = pandas_udf(lambda x, y: x * y, DoubleType())
res = df.select(add(col('a'), col('b')), power2(col('a')), mul(col('b'), col('c')))
expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * c'))
self.assertEquals(expected.collect(), res.collect())
开发者ID:q977734161,项目名称:spark,代码行数:11,代码来源:test_pandas_udf_scalar.py
示例12: test_vectorized_udf_nested_struct
def test_vectorized_udf_nested_struct(self):
nested_type = StructType([
StructField('id', IntegerType()),
StructField('nested', StructType([
StructField('foo', StringType()),
StructField('bar', FloatType())
]))
])
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Invalid returnType with scalar Pandas UDFs'):
pandas_udf(lambda x: x, returnType=nested_type)
开发者ID:q977734161,项目名称:spark,代码行数:14,代码来源:test_pandas_udf_scalar.py
示例13: test_unsupported_types
def test_unsupported_types(self):
common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*'
unsupported_types = [
StructField('map', MapType(StringType(), IntegerType())),
StructField('arr_ts', ArrayType(TimestampType())),
StructField('null', NullType()),
StructField('struct', StructType([StructField('l', LongType())])),
]
for unsupported_type in unsupported_types:
schema = StructType([StructField('id', LongType(), True), unsupported_type])
with QuietTest(self.sc):
with self.assertRaisesRegexp(NotImplementedError, common_err_msg):
pandas_udf(lambda x: x, schema, PandasUDFType.GROUPED_MAP)
开发者ID:apache,项目名称:spark,代码行数:14,代码来源:test_pandas_udf_grouped_map.py
示例14: test_vectorized_udf_basic
def test_vectorized_udf_basic(self):
from pyspark.sql.functions import pandas_udf, col, array
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
col('id').alias('long'),
col('id').cast('float').alias('float'),
col('id').cast('double').alias('double'),
col('id').cast('decimal').alias('decimal'),
col('id').cast('boolean').alias('bool'),
array(col('id')).alias('array_long'))
f = lambda x: x
str_f = pandas_udf(f, StringType())
int_f = pandas_udf(f, IntegerType())
long_f = pandas_udf(f, LongType())
float_f = pandas_udf(f, FloatType())
double_f = pandas_udf(f, DoubleType())
decimal_f = pandas_udf(f, DecimalType())
bool_f = pandas_udf(f, BooleanType())
array_long_f = pandas_udf(f, ArrayType(LongType()))
res = df.select(str_f(col('str')), int_f(col('int')),
long_f(col('long')), float_f(col('float')),
double_f(col('double')), decimal_f('decimal'),
bool_f(col('bool')), array_long_f('array_long'))
self.assertEquals(df.collect(), res.collect())
开发者ID:JingchengDu,项目名称:spark,代码行数:25,代码来源:test_pandas_udf_scalar.py
示例15: test_vectorized_udf_null_binary
def test_vectorized_udf_null_binary(self):
if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
'Invalid returnType.*scalar Pandas UDF.*BinaryType'):
pandas_udf(lambda x: x, BinaryType())
else:
data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)]
schema = StructType().add("binary", BinaryType())
df = self.spark.createDataFrame(data, schema)
str_f = pandas_udf(lambda x: x, BinaryType())
res = df.select(str_f(col('binary')))
self.assertEquals(df.collect(), res.collect())
开发者ID:q977734161,项目名称:spark,代码行数:14,代码来源:test_pandas_udf_scalar.py
示例16: test_vectorized_udf_null_int
def test_vectorized_udf_null_int(self):
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("int", IntegerType())
df = self.spark.createDataFrame(data, schema)
int_f = pandas_udf(lambda x: x, IntegerType())
res = df.select(int_f(col('int')))
self.assertEquals(df.collect(), res.collect())
开发者ID:q977734161,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例17: test_vectorized_udf_null_array
def test_vectorized_udf_null_array(self):
data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)]
array_schema = StructType([StructField("array", ArrayType(IntegerType()))])
df = self.spark.createDataFrame(data, schema=array_schema)
array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()))
result = df.select(array_f(col('array')))
self.assertEquals(df.collect(), result.collect())
开发者ID:q977734161,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例18: test_vectorized_udf_null_string
def test_vectorized_udf_null_string(self):
data = [("foo",), (None,), ("bar",), ("bar",)]
schema = StructType().add("str", StringType())
df = self.spark.createDataFrame(data, schema)
str_f = pandas_udf(lambda x: x, StringType())
res = df.select(str_f(col('str')))
self.assertEquals(df.collect(), res.collect())
开发者ID:q977734161,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例19: test_vectorized_udf_string_in_udf
def test_vectorized_udf_string_in_udf(self):
import pandas as pd
df = self.spark.range(10)
str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType())
actual = df.select(str_f(col('id')))
expected = df.select(col('id').cast('string'))
self.assertEquals(expected.collect(), actual.collect())
开发者ID:Brett-A,项目名称:spark,代码行数:7,代码来源:test_pandas_udf_scalar.py
示例20: test_manual
def test_manual(self):
df = self.data
sum_udf = self.pandas_agg_sum_udf
mean_udf = self.pandas_agg_mean_udf
mean_arr_udf = pandas_udf(
self.pandas_agg_mean_udf.func,
ArrayType(self.pandas_agg_mean_udf.returnType),
self.pandas_agg_mean_udf.evalType)
result1 = df.groupby('id').agg(
sum_udf(df.v),
mean_udf(df.v),
mean_arr_udf(array(df.v))).sort('id')
expected1 = self.spark.createDataFrame(
[[0, 245.0, 24.5, [24.5]],
[1, 255.0, 25.5, [25.5]],
[2, 265.0, 26.5, [26.5]],
[3, 275.0, 27.5, [27.5]],
[4, 285.0, 28.5, [28.5]],
[5, 295.0, 29.5, [29.5]],
[6, 305.0, 30.5, [30.5]],
[7, 315.0, 31.5, [31.5]],
[8, 325.0, 32.5, [32.5]],
[9, 335.0, 33.5, [33.5]]],
['id', 'sum(v)', 'avg(v)', 'avg(array(v))'])
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
开发者ID:Brett-A,项目名称:spark,代码行数:27,代码来源:test_pandas_udf_grouped_agg.py
注:本文中的pyspark.sql.functions.pandas_udf函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论