Cython(为 pandas 编写 C 扩展)#
对于许多用例,用纯 Python 和 NumPy 编写 pandas 代码就足够了。然而,在一些计算密集型应用中,通过将工作卸载到 Cython,可以实现显著的加速。
本教程假设您已尽可能地在 Python 中重构代码,例如尝试删除 for 循环并利用 NumPy 向量化。始终值得先在 Python 中进行优化。
本教程将介绍一个“典型”的 Cython 化慢速计算过程。我们使用来自 Cython 文档的示例,但将其置于 pandas 的上下文中。我们最终的 Cython 化解决方案比纯 Python 解决方案快约 100 倍。
纯 Python#
我们有一个 DataFrame,我们希望对其按行应用一个函数。
In [1]: df = pd.DataFrame(
...: {
...: "a": np.random.randn(1000),
...: "b": np.random.randn(1000),
...: "N": np.random.randint(100, 1000, (1000)),
...: "x": "x",
...: }
...: )
...:
In [2]: df
Out[2]:
a b N x
0 0.469112 -0.218470 585 x
1 -0.282863 -0.061645 841 x
2 -1.509059 -0.723780 251 x
3 -1.135632 0.551225 972 x
4 1.212112 -0.497767 181 x
.. ... ... ... ..
995 -1.512743 0.874737 374 x
996 0.933753 1.120790 246 x
997 -0.308013 0.198768 157 x
998 -0.079915 1.757555 977 x
999 -1.010589 -1.115680 770 x
[1000 rows x 4 columns]
这是纯 Python 中的函数
In [3]: def f(x):
...: return x * (x - 1)
...:
In [4]: def integrate_f(a, b, N):
...: s = 0
...: dx = (b - a) / N
...: for i in range(N):
...: s += f(a + i * dx)
...: return s * dx
...:
我们通过使用 DataFrame.apply()(按行)来获得结果
In [5]: %timeit df.apply(lambda x: integrate_f(x["a"], x["b"], x["N"]), axis=1)
80.3 ms +- 1.18 ms per loop (mean +- std. dev. of 7 runs, 10 loops each)
让我们使用 prun ipython magic 函数来看看此操作中时间花在了哪里
# most time consuming 4 calls
In [6]: %prun -l 4 df.apply(lambda x: integrate_f(x["a"], x["b"], x["N"]), axis=1) # noqa E999
605956 function calls (605938 primitive calls) in 0.173 seconds
Ordered by: internal time
List reduced from 163 to 4 due to restriction <4>
ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.101 0.000 0.154 0.000
552423 0.053 0.000 0.053 0.000
3000 0.003 0.000 0.013 0.000 series.py:1104(__getitem__)
3000 0.002 0.000 0.006 0.000 series.py:1229(_get_value)
绝大部分时间都花在 integrate_f 或 f 内部,因此我们将集中精力对这两个函数进行 Cython 化。
普通 Cython#
首先,我们需要将 Cython magic 函数导入 IPython
In [7]: %load_ext Cython
现在,让我们简单地将函数复制到 Cython
In [8]: %%cython
...: def f_plain(x):
...: return x * (x - 1)
...: def integrate_f_plain(a, b, N):
...: s = 0
...: dx = (b - a) / N
...: for i in range(N):
...: s += f_plain(a + i * dx)
...: return s * dx
...:
In [9]: %timeit df.apply(lambda x: integrate_f_plain(x["a"], x["b"], x["N"]), axis=1)
48.7 ms +- 490 us per loop (mean +- std. dev. of 7 runs, 10 loops each)
这将性能比纯 Python 方法提高了三分之一。
声明 C 类型#
我们可以标注函数变量和返回类型,并使用 cdef 和 cpdef 来提升性能
In [10]: %%cython
....: cdef double f_typed(double x) except? -2:
....: return x * (x - 1)
....: cpdef double integrate_f_typed(double a, double b, int N):
....: cdef int i
....: cdef double s, dx
....: s = 0
....: dx = (b - a) / N
....: for i in range(N):
....: s += f_typed(a + i * dx)
....: return s * dx
....:
In [11]: %timeit df.apply(lambda x: integrate_f_typed(x["a"], x["b"], x["N"]), axis=1)
7.5 ms +- 29.6 us per loop (mean +- std. dev. of 7 runs, 100 loops each)
使用 C 类型标注函数可以使性能比原始 Python 实现提高十倍以上。
使用 ndarray#
重新分析性能时,时间花费在从每一行创建 Series,以及从索引和 Series 调用 __getitem__(每行三次)上。这些 Python 函数调用开销很大,可以通过传递一个 np.ndarray 来改进。
In [12]: %prun -l 4 df.apply(lambda x: integrate_f_typed(x["a"], x["b"], x["N"]), axis=1)
52533 function calls (52515 primitive calls) in 0.019 seconds
Ordered by: internal time
List reduced from 161 to 4 due to restriction <4>
ncalls tottime percall cumtime percall filename:lineno(function)
3000 0.003 0.000 0.012 0.000 series.py:1104(__getitem__)
3000 0.002 0.000 0.005 0.000 series.py:1229(_get_value)
3000 0.002 0.000 0.003 0.000 indexing.py:2765(check_dict_or_set_indexers)
3000 0.002 0.000 0.002 0.000 base.py:3784(get_loc)
In [13]: %%cython
....: cimport numpy as np
....: import numpy as np
....: cdef double f_typed(double x) except? -2:
....: return x * (x - 1)
....: cpdef double integrate_f_typed(double a, double b, int N):
....: cdef int i
....: cdef double s, dx
....: s = 0
....: dx = (b - a) / N
....: for i in range(N):
....: s += f_typed(a + i * dx)
....: return s * dx
....: cpdef np.ndarray[double] apply_integrate_f(np.ndarray col_a, np.ndarray col_b,
....: np.ndarray col_N):
....: assert (col_a.dtype == np.float64
....: and col_b.dtype == np.float64 and col_N.dtype == np.dtype(int))
....: cdef Py_ssize_t i, n = len(col_N)
....: assert (len(col_a) == len(col_b) == n)
....: cdef np.ndarray[double] res = np.empty(n)
....: for i in range(len(col_a)):
....: res[i] = integrate_f_typed(col_a[i], col_b[i], col_N[i])
....: return res
....:
Content of stderr:
In file included from /home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/ndarraytypes.h:1929,
from /home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/ndarrayobject.h:12,
from /home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/arrayobject.h:5,
from /home/runner/.cache/ipython/cython/_cython_magic_1f8c1b875aeb076a8ef75ac5199664d0fea77dfb626f30a4e36b3263c3db7ec2.c:1138:
/home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/npy_1_7_deprecated_api.h:17:2: warning: #warning "Using deprecated NumPy API, disable it with " "#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION" [-Wcpp]
17 | #warning "Using deprecated NumPy API, disable it with " \
| ^~~~~~~
此实现创建一个零数组,并插入将 integrate_f_typed 应用于每一行的结果。在 Cython 中,遍历 ndarray 比遍历 Series 对象更快。
由于 apply_integrate_f 被类型化为接受 np.ndarray,因此需要调用 Series.to_numpy() 来使用此函数。
In [14]: %timeit apply_integrate_f(df["a"].to_numpy(), df["b"].to_numpy(), df["N"].to_numpy())
830 us +- 945 ns per loop (mean +- std. dev. of 7 runs, 1,000 loops each)
性能比之前的实现提高了近十倍。
禁用编译器指令#
现在大部分时间都花在 apply_integrate_f 中。禁用 Cython 的 boundscheck 和 wraparound 检查可以带来更高的性能。
In [15]: %prun -l 4 apply_integrate_f(df["a"].to_numpy(), df["b"].to_numpy(), df["N"].to_numpy())
78 function calls in 0.001 seconds
Ordered by: internal time
List reduced from 21 to 4 due to restriction <4>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.001 0.001 0.001 0.001
1 0.000 0.000 0.001 0.001 {built-in method builtins.exec}
3 0.000 0.000 0.000 0.000 frame.py:4067(__getitem__)
3 0.000 0.000 0.000 0.000 base.py:545(to_numpy)
In [16]: %%cython
....: cimport cython
....: cimport numpy as np
....: import numpy as np
....: cdef np.float64_t f_typed(np.float64_t x) except? -2:
....: return x * (x - 1)
....: cpdef np.float64_t integrate_f_typed(np.float64_t a, np.float64_t b, np.int64_t N):
....: cdef np.int64_t i
....: cdef np.float64_t s = 0.0, dx
....: dx = (b - a) / N
....: for i in range(N):
....: s += f_typed(a + i * dx)
....: return s * dx
....: @cython.boundscheck(False)
....: @cython.wraparound(False)
....: cpdef np.ndarray[np.float64_t] apply_integrate_f_wrap(
....: np.ndarray[np.float64_t] col_a,
....: np.ndarray[np.float64_t] col_b,
....: np.ndarray[np.int64_t] col_N
....: ):
....: cdef np.int64_t i, n = len(col_N)
....: assert len(col_a) == len(col_b) == n
....: cdef np.ndarray[np.float64_t] res = np.empty(n, dtype=np.float64)
....: for i in range(n):
....: res[i] = integrate_f_typed(col_a[i], col_b[i], col_N[i])
....: return res
....:
Content of stderr:
In file included from /home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/ndarraytypes.h:1929,
from /home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/ndarrayobject.h:12,
from /home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/arrayobject.h:5,
from /home/runner/.cache/ipython/cython/_cython_magic_344a9e4468707236d239faf5bdfacf0d14a35efa7e89d2a7b09ae36b339492db.c:1139:
/home/runner/micromamba/envs/test/lib/python3.10/site-packages/numpy/core/include/numpy/npy_1_7_deprecated_api.h:17:2: warning: #warning "Using deprecated NumPy API, disable it with " "#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION" [-Wcpp]
17 | #warning "Using deprecated NumPy API, disable it with " \
| ^~~~~~~
In [17]: %timeit apply_integrate_f_wrap(df["a"].to_numpy(), df["b"].to_numpy(), df["N"].to_numpy())
624 us +- 2.43 us per loop (mean +- std. dev. of 7 runs, 1,000 loops each)
然而,如果循环索引器 i 访问数组中的无效位置,则会导致段错误,因为内存访问未被检查。有关 boundscheck 和 wraparound 的更多信息,请参阅 Cython 文档中关于编译器指令的部分。