使用 DuckDB UDF 加速 Pandas 数据处理

最近在看UDF,碰巧看到知乎上一篇实战文章,经作者同意,转摘如下。

另外有兴趣的可以试试使用PyArrow 类型 UDF改写下下面的函数,是不是可以得到更多惊喜。

对DuckDB UDF感兴趣的,别错过另外几篇文章,


原文:https://zhuanlan.zhihu.com/p/646788236

最近需要对一批商品数据进行分析,加上前后各种数据清洗,初版程序使用 Pandas 运行耗时几个小时,中间耗时最长的是各种 groupby 后再计算。经过逐步优化,耗时减少到不到半个小时,随时记录下中间的一些优化过程。

其中有一个 groupby 是计算每个商品的回归系数,Mock 数据如下:

import numpy as np
import pandas as pd
import statsmodels.api as sm

def mock(i: int):
    nobs = 10
    X = np.random.random((nobs, 2))
    beta = [1.1.5]
    e = np.random.random(nobs)
    y = np.dot(sm.add_constant(X), beta) + e
    return pd.DataFrame(X, columns=["x1""x2"]).assign(y=y, key=f"c{i:0>4}").filter(["key""x1""x2""y"])


df = pd.concat([mock(i) for i in range(10000)])

商品 c1, c2, ...,自变量 x1 、x2 ,因变量 y,分别计算每个商品的回归系数:

key        x1        x2         y
0  c0000  0.980457  0.229095  1.757049
1  c0000  0.113148  0.252636  1.193941
... ...
k  c1234  0.836241  0.550951  2.348318
... ...
n  c9999  0.866876  0.130058  1.765817

Pandas 实现

版本一

最直接的写法是 groupby.apply

def ols1(d):
    X = sm.add_constant(d[["x1""x2"]])
    y = d["y"]
    res = sm.OLS(y, X).fit()
    return res.params

%timeit df.groupby(["key"]).apply(ols1)

统计耗时 14.8s 左右:

14.8 s ± 249 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

版本二

考虑到上面数据通过 apply 返回合并为 DataFrame 会比较慢,做一下改版:

def ols2(d):
    X = sm.add_constant(d[["x1""x2"]].to_numpy())
    y = d["y"].to_numpy()
    res = sm.OLS(y, X).fit()
    return res.params

%timeit df.groupby(["key"]).apply(ols2)

耗时统计 5.6s 左右,优化效果很明显:

5.6 s ± 230 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

版本三

众所周知,groupby.apply 并没有并行执行,再写一下并行版本进一步进行优化:

from joblib import Parallel, delayed

def ols3(key, d):
    X = sm.add_constant(d[["x1""x2"]].to_numpy())
    y = d["y"].to_numpy()
    res = sm.OLS(y, X).fit()
    return np.append([key], res.params)
%%timeit
grouped = df.groupby(["key"])
results = Parallel(n_jobs=-1)(delayed(ols3)(key, group) for key, group in grouped)
pd.DataFrame(results, columns=["key""const""x1""x2"])

耗时统计 2.1s 左右,又优化一大截:

2.1 s ± 33.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

如果不将结果转换为 DataFrame 是 `1.81s` :

1.81 s ± 108 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

DuckDB 实现

最近使用 DuckDB 处理数据比较多,与 Python 语言交互时,我们可以从 Python 函数中创建一个 DuckDB 用户定义函数(UDF)[1],这样它就可以在 SQL 查询中使用。这样定义的函数,由数据库调度运行,看下是否能据此优化我们的代码。

版本四

首先定义一个回归函数,然后注册给 DuckDB

import duckdb
from contextlib import suppress

def ols4(x: list, y: list) -> list[float]:
    X = sm.add_constant(np.array([[r["x1"], r["x2"]] for r in x]))
    res = sm.OLS(y, X).fit()
    return res.params

with suppress(Exception):
    duckdb.remove_function("ols4")

duckdb.create_function("ols4", ols4)

这样我们就可以在 SQL 中直接调用,执行测试:

sql = """
with tmp as (
    select key, ols4(list((x1, x2)), list(y)) as coef
    from df
    group by all
)
select key, coef[1] as const, coef[2] as x1, coef[3] as x2
from tmp
order by all
"""

%timeit duckdb.sql(sql).df()

耗时为 2.9s,看上去还没有上面 Python 并行化版本效率高:

2.9 s ± 26.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

版本五

猜测 *.df() 转 DataFrame 格式耗时比较久,如果不进行格式转换,直接运行:

%timeit duckdb.sql(sql)

耗时仅有 825 µs,非常 amazing!

825 µs ± 5.98 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

版本六

注意到上面 SQL 中为 select * from df,我们查询的是 DataFrame。如果我们直接查询的是 DuckDB 的表,应该还会进一步减少耗时。例如将 df 存为表:

sql = """
create or replace table example as
select * from df
"""

duckdb.sql(sql)

然后,重新执行计算:

sql = """
with tmp as (
    select key, ols4(list((x1, x2)), list(y)) as coef
    from example
    group by all
)
select key, coef[1] as const, coef[2] as x1, coef[3] as x2
from tmp
order by all
"""

%timeit duckdb.sql(sql)

统计耗时仅有 131 µs

131 µs ± 1.46 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

总结

回顾下一路下来的优化:

版本 说明 耗时
版本一 Pandas 直接 apply 14.8s
版本二 numpy 优化 5.6s
版本三 joblib 并行 2.1s
版本四 DuckDB UDF 处理和输出 DataFrame 2.9s
版本五 DuckDB UDF 处理 DataFrame 825µs
版本六 DuckDB UDF 直接处理表 131µs

其实上述比较并不严谨,它们输出的结果格式并不统一(Numpy/DataFrame/DuckDB 等),而不同技术栈有不同契合的输入/输出上下文。可以看到版本四相比版本三其实还要慢一点,测试用例只是保证了其在本技术栈上代码和逻辑上都尽可能简洁直观。

对于版本六,如果将结果输出为 DuckDB 的表,加上 IO,也需要花费 2s 多,如果使用 CTE,直接使用计算结果,进一步计算诸如最大系数之类的,耗时会更少:

with tmp as (
    select key, ols4(list((x1, x2)), list(y)) as coef
    from example
    group by all
)
select max(coef[1]) from tmp

因此,我们也不用标题党的说,通过 DuckDB 将 Pandas 代码优化了多少多少倍。

实践上,如果我们上下游数据处理都是通过 DuckDB,对于复杂的运算在 SQL 中不好实现,我们可以通过 Python 来实现,这样我们就可以利用 DuckDB 高性能的同时,也能使用 Python 丰富的生态,达到两者兼顾的目的,大大提升我们分析数据的效率。即使你日常中更多的是使用 Pandas/Polars 等,上例中看到 DuckDB 和 DataFrame 交互是非常方便的,也可以借用 DuckDB 来优化我们的代码,算是一种不错的选择。

引用链接

[1] DuckDB 用户定义函数(UDF): https://link.zhihu.com/?target=https%3A//duckdb.org/docs/api/python/function





原文始发于微信公众号(alitrack):使用 DuckDB UDF 加速 Pandas 数据处理

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/222827.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!