Hi Julia fellows!
I just trying to see the potential for Pyspark + Julia. I know that we have Spark.jl but there are some limitations for my end goal experiment. Then, decided to give a try and for basic cases is working pretty fine!
Now I’m testing more advanced features. In this case, I`m using UDF functions with the objective of reuse mostly of a large piece of code that do a lot of data processement and avoid rewriting in another language.
However, I`m facing some issues with basic UDF conversions due to error between Julia and Pickle Serialization… For example, I want to apply a UDF in Julia to sum 1 in this simple example:
using PyCall
pyspark = pyimport("pyspark")
SF = pyimport("pyspark.sql.functions")
ST = pyimport("pyspark.sql.types")
spark = pyspark.sql.SparkSession.builder.master("local[*]").getOrCreate()
function sumone(x::Int)::Int
x + 1
end
sumone_udf = SF.udf(sumone, ST.IntegerType())
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["X", "Y"])
df.withColumn("Z", sumone_udf("X")).show()
But I`m getting serialization error in last line when trying to use the new UDF. Is it possible to extend the PyCall.jlwrap struct with some custom code or another workaround to it serialize correctly?
Error message:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 468, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/usr/local/lib/python3.8/dist-packages/pyspark/cloudpickle.py", line 1097, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/dist-packages/pyspark/cloudpickle.py", line 357, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.8/pickle.py", line 485, in dump
self.save(obj)
File "/usr/lib/python3.8/pickle.py", line 558, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.8/pickle.py", line 884, in save_tuple
save(element)
File "/usr/lib/python3.8/pickle.py", line 576, in save
rv = reduce(self.proto)
TypeError: cannot pickle 'PyCall.jlwrap' object
ERROR: LoadError: PyError ($(Expr(:escape, :(ccall(#= /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:43 =# @pysym(:PyObject_Call), PyPtr, (PyPtr, PyPtr, PyPtr), o, pyargsptr, kw))))) <class '_pickle.PicklingError'>
PicklingError("Could not serialize object: TypeError: cannot pickle 'PyCall.jlwrap' object")
File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 197, in wrapper
return self(*args)
File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 175, in __call__
judf = self._judf
File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 159, in _judf
self._judf_placeholder = self._create_judf()
File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 168, in _create_judf
wrapped_func = _wrap_function(sc, self.func, self.returnType)
File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 34, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/usr/local/lib/python3.8/dist-packages/pyspark/rdd.py", line 2503, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 478, in dumps
raise pickle.PicklingError(msg)
Stacktrace:
[1] pyerr_check at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:60 [inlined]
[2] pyerr_check at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:64 [inlined]
[3] _handle_error(::String) at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:81
[4] macro expansion at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:95 [inlined]
[5] #110 at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:43 [inlined]
[6] disable_sigint at ./c.jl:446 [inlined]
[7] __pycall! at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:42 [inlined]
[8] _pycall!(::PyObject, ::PyObject, ::Tuple{String}, ::Int64, ::Ptr{Nothing}) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:29
[9] _pycall! at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:11 [inlined]
[10] #_#117 at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:86 [inlined]
[11] (::PyObject)(::String) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:86
[12] top-level scope at /seek/ContentBased.jl/scripts/run_spark.jl:83
[13] include(::String) at ./client.jl:439
[14] top-level scope at REPL[1]:1
in expression starting at /seek/ContentBased.jl/scripts/run_spark.jl:83
I appreciate any help or start a discussion what we can do about it. Thanks!
Reference for this topic: