PySpark + PyCall + UDF Integration - PyCall.jlwrap cannot pickle

Hi Julia fellows!

I just trying to see the potential for Pyspark + Julia. I know that we have Spark.jl but there are some limitations for my end goal experiment. Then, decided to give a try and for basic cases is working pretty fine!

Now I’m testing more advanced features. In this case, I`m using UDF functions with the objective of reuse mostly of a large piece of code that do a lot of data processement and avoid rewriting in another language.

However, I`m facing some issues with basic UDF conversions due to error between Julia and Pickle Serialization… For example, I want to apply a UDF in Julia to sum 1 in this simple example:

using PyCall
pyspark = pyimport("pyspark")
SF = pyimport("pyspark.sql.functions")
ST = pyimport("pyspark.sql.types")

spark = pyspark.sql.SparkSession.builder.master("local[*]").getOrCreate()

function sumone(x::Int)::Int
    x + 1
end

sumone_udf = SF.udf(sumone, ST.IntegerType())

df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["X", "Y"])

df.withColumn("Z", sumone_udf("X")).show()

But I`m getting serialization error in last line when trying to use the new UDF. Is it possible to extend the PyCall.jlwrap struct with some custom code or another workaround to it serialize correctly?

Error message:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 468, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/cloudpickle.py", line 1097, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/cloudpickle.py", line 357, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.8/pickle.py", line 485, in dump
    self.save(obj)
  File "/usr/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/usr/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/usr/lib/python3.8/pickle.py", line 576, in save
    rv = reduce(self.proto)
TypeError: cannot pickle 'PyCall.jlwrap' object
ERROR: LoadError: PyError ($(Expr(:escape, :(ccall(#= /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:43 =# @pysym(:PyObject_Call), PyPtr, (PyPtr, PyPtr, PyPtr), o, pyargsptr, kw))))) <class '_pickle.PicklingError'>
PicklingError("Could not serialize object: TypeError: cannot pickle 'PyCall.jlwrap' object")
  File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 197, in wrapper
    return self(*args)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 175, in __call__
    judf = self._judf
  File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 159, in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 168, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/sql/udf.py", line 34, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/rdd.py", line 2503, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/serializers.py", line 478, in dumps
    raise pickle.PicklingError(msg)

Stacktrace:
 [1] pyerr_check at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:60 [inlined]
 [2] pyerr_check at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:64 [inlined]
 [3] _handle_error(::String) at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:81
 [4] macro expansion at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:95 [inlined]
 [5] #110 at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:43 [inlined]
 [6] disable_sigint at ./c.jl:446 [inlined]
 [7] __pycall! at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:42 [inlined]
 [8] _pycall!(::PyObject, ::PyObject, ::Tuple{String}, ::Int64, ::Ptr{Nothing}) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:29
 [9] _pycall! at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:11 [inlined]
 [10] #_#117 at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:86 [inlined]
 [11] (::PyObject)(::String) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:86
 [12] top-level scope at /seek/ContentBased.jl/scripts/run_spark.jl:83
 [13] include(::String) at ./client.jl:439
 [14] top-level scope at REPL[1]:1
in expression starting at /seek/ContentBased.jl/scripts/run_spark.jl:83

I appreciate any help or start a discussion what we can do about it. Thanks!


Reference for this topic:

1 Like

Does PyCall allow functions defined in Julia to be called in Python to start with?

Try to call sumone first and see if that works in Python if not then no reason why it would work as a UDF in Spark.

Good question. I’m not so advanced in PyCall, but according to the package description, it allows you to do that in some conditions:

Example 1:

Arbitrary Julia functions can be passed to Python routines taking function arguments. For example, to find the root of cos(x) - x, we could call the Newton solver in scipy.optimize via:

so = pyimport("scipy.optimize")
so.newton(x -> cos(x) - x, 1)

Example 2:

Calling Julia from Python
Julia functions get converted to callable Python objects, so you can easily call Julia from Python via callback function arguments. The pyjulia module allows you to call Julia directly from Python, and also uses PyCall to do its conversions.

Well, I`ll try using example 2. I really want to see if we can do that because would be awesome! If you have other suggestions let me know. If I have an update on this, also let others know.

After some research, I could call normally Julia function inside of Python code. The main issue is that the UDF requires to serialize the function using cloudpickle in the middle of the process.

Due to this restriction the object PyCall.jlwrap cannot be pickled, some simple example:

using PyCall
pickle = pyimport("pickle")
pickle.dumps(x -> x + 1)

Error:

ERROR: PyError ($(Expr(:escape, :(ccall(#= /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:43 =# @pysym(:PyObject_Call), PyPtr, (PyPtr, PyPtr, PyPtr), o, pyargsptr, kw))))) <class 'TypeError'>
TypeError("cannot pickle 'PyCall.jlwrap' object")

Stacktrace:
 [1] pyerr_check at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:60 [inlined]
 [2] pyerr_check at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:64 [inlined]
 [3] _handle_error(::String) at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:81
 [4] macro expansion at /root/.julia/packages/PyCall/zqDXB/src/exception.jl:95 [inlined]
 [5] #110 at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:43 [inlined]
 [6] disable_sigint at ./c.jl:446 [inlined]
 [7] __pycall! at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:42 [inlined]
 [8] _pycall!(::PyObject, ::PyObject, ::Tuple{var"#3#4"}, ::Int64, ::Ptr{Nothing}) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:29
 [9] _pycall!(::PyObject, ::PyObject, ::Tuple{var"#3#4"}, ::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:11
 [10] (::PyObject)(::Function; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:86
 [11] (::PyObject)(::Function) at /root/.julia/packages/PyCall/zqDXB/src/pyfncall.jl:86
 [12] top-level scope at REPL[13]:1

Investigating if there is a way to overcome this…

Hi, did you found a way to overcome this ?

Have you tried PythonCall? I think it has better serialisation.