ConnectorX 旨在通过为开发人员提供高效、轻量级和易于使用的工具来加快从数据库加载数据的过程。在本文中,我们将通过回答以下问题为您简要介绍 ConnectorX:
-
什么是 ConnectorX? -
如何使用 ConnectorX? -
为什么 ConnectorX 是更好的选择? -
接下来会发生什么?
什么是 ConnectorX?
ConnectorX 是一个开源库,可加速将数据从数据库加载到 pandas.DataFrame 等数据结构,以便进一步处理和分析。
ConnectorX 由两个主要概念组成:Source (例如 PostgreSQL )和 Destination (例如 pandas.DataFrame )。ConnectorX 会将用户给出的 SQL 查询转发给 Source ,然后高效地将查询结果从 Source 传输到 Destination 。上图显示了当前支持的源和目标的示例。
为了提供高效的性能和强大的功能,ConnectorX 中繁重的工作负载是用 Rust 编写的。它还具有与 Python 中的其他数据科学库轻松互操作的简单 API 的 Python 绑定。
接下来,让我们看看如何通过一行代码以不同的方式使用 ConnectorX。
如何使用连接器 X?
第一步:安装 ConnectorX:
pip install connectorx
第 2 步:使用 read_sql 从数据库加载数据。源是使用连接字符串定义的,目标是默认的 pandas.DataFrame 并且可以通过设置_return_type_[1]来改变:
import connectorx as cx
# source: PostgreSQL, destination: pandas.DataFrame
df = cx.read_sql("postgres://postgres:postgres@localhost:5432/tpch", "SELECT * FROM lineitem")
查询结果将在 pandas.DataFrame 中返回,如下所示:
为了进一步加快速度,ConnectorX 支持通过对查询进行分区并并行执行分区查询来提高 CPU 和带宽资源的利用率。用户可以指定分区列或手动定义分区查询:
# 指定分区列和分区数
df = cx.read_sql("postgres://postgres:postgres@localhost:5432/tpch",
"SELECT * FROM lineitem", partition_on="l_orderkey", partition_num=4)
# 手动partition the query
# 下面的分区与上面的分区等效
df = cx.read_sql("postgres://postgres:postgres@localhost:5432/tpch", [
"SELECT * FROM lineitem WHERE l_orderkey > 0 AND l_orderkey <= 15000000 ",
"SELECT * FROM lineitem WHERE l_orderkey > 15000000 AND l_orderkey <= 30000000",
"SELECT * FROM lineitem WHERE l_orderkey > 30000000 AND l_orderkey <= 45000000",
"SELECT * FROM lineitem WHERE l_orderkey > 45000000 AND l_orderkey <= 60000000"
])
ConnectorX 将并行运行所有分区查询,并将所有查询结果连接到一个 pandas.DataFrame 中。至于分区,目前,ConnectorX 支持对 SPJA 查询的整数列进行自动分区。下面是一个更复杂的查询示例:
query = f”””
SELECT l_orderkey,
SUM(l_extendedprice * ( 1 — l_discount )) AS 收入,
o_orderdate,
o_shippriority
FROM customer,
orders,
lineitem
WHERE c_mktsegment = 'BUILDING'
AND c_custkey = o_custkey
AND l_orderkey = o_orderkey
AND o_orderdate < DATE ' 1995–03–15'
AND l_shipdate > DATE '1995–03–15'
GROUP BY l_orderkey,
o_orderdate,
o_shippriority
“””
df = read_sql(“postgresql://postgres:postgres@localhost:5432/tpch”,
query, partition_on=”l_orderkey”, partition_num=4)
请查看我们的 Github 存储库[2]以获取更多用法和示例!
为什么 ConnectorX 是更好的选择?
其他常用的 Python 库如 Pandas 已经提供了类似的 read_sql 函数,那么为什么要使用 ConnectorX?为了回答这个问题,让我们看一个简单的基准测试,我们将 ConnectorX 与其他三个现有的解决方案(Pandas[3]、Modin[4] 和 Dask[5])进行了比较,这些解决方案也能够将数据从数据库加载到数据帧。
我们使用 TPC-H 中的 LINEITEM 表并将比例因子设置为 10。该表包含 60M 记录 x 16 列,如果转储到 CSV 文件中,大小为 8 GB,在 PostgreSQL 中为 11 GB。我们测量每个解决方案在不同的并行度(1 到 10 个核心)和带宽条件下从数据库加载整个表并写入 pandas.DataFrame 所需的时间。我们还测量了此过程中每种方法的内存使用峰值。我们在此处展示的测试结果是在 AWS r5.4xlarge 实例上进行的,我们在该实例上运行 ConnectorX 并从运行在 AWS RDS 上的 db.m6g.4xlarge 实例上的 PostgreSQL 加载数据。(有关其他数据库的更多基准测试结果,请在此处查看) 结果表明,ConnectorX 是所有场景中速度最快且内存效率最高的解决方案!
-
更快的数据加载
使用 4 个 CPU 核对不同方法的速度测试结果如下图所示。我们可以看到,ConnectorX 是所有解决方案中最快的,与 PostgreSQL 上的 Modin、Pandas 和 Dask 相比,数据加载速度分别提高了 3 倍、10 倍和 20 倍 !
我们还将用于 read_sql 的核心(分区)数量从 1 变为 10,并在不同的网络条件和机器下对其进行了测试。(由于内存不足 (OOM), Dask 无法完成使用 1 个核心。Pandas 不支持并行性,因此我们只使用 1 个核心。)我们发现 ConnectorX 在所有设置中始终优于其他方法。
2. 更小的内存占用
接下来,我们来看看每个方法的内存使用情况。这里我们绘制了 4 核设置的比较结果。我们可以看到,ConnectorX 使用的内存比其他方法少 3 倍。 在不同的并行度下,结果保持不变。
ConnectorX 是如何做到这一点的?
三个主要原因使 ConnectorX 能够实现这一性能:
-
用本地语言编写: 与其他库不同,ConnectorX 是用 Rust 编写的,这避免了在 Python 中实现数据密集型应用程序的额外成本。 -
只复制一次: 现有解决方案在从数据库下载数据时或多或少会进行多次数据复制,但 ConnectorX 的实现遵循“零复制”原则。即使在并行性下,我们也设法将数据直接从源复制到目标一次。 -
CPU 缓存高效: 我们应用了多项优化来使 ConnectorX CPU 缓存友好。除了“零拷贝”实现,ConnectorX 中的数据处理以流方式进行,以减少缓存未命中。另一个例子是,当我们在 Python 中构造字符串时,我们将一批字符串写入一个预先分配的缓冲区,而不是为每个字符串分配单独的位置。
接下来会发生什么?
到目前为止,ConnectorX 支持广泛使用的 源 ,包括 PostgreSQL、MySQL、SQLite 以及采用相同 wire protocol 的其他数据库(例如通过 PostgreSQL 的 Redshift、通过 MySQL 的 Clickhouse)。我们现在正致力于添加更多流行的数据库和数据仓库[6]。我们还计划在未来支持以不同文件格式(例如 CSV、JSON、Parquet)从数据存储(如 Amazon S3 )传输数据。[7]
至于 Destinations ,ConnectorX 支持 Python 和 Rust 中所有流行的数据帧,包括 Pandas[8]、Apache Arrow[9]、Modin[10]、Dask[11] 和 Polars[12]。可在此处[13]找到已完成和正在进行的来源和目的地的完整列表。如果您有其他建议[14],请告诉我们!
除了支持更多来源和目的地之外,该团队还致力于开发新功能,以帮助最大限度地减少数据加载的时间成本,例如维护客户端数据缓存[15]。我们还对优化查询分区[16]感兴趣,例如支持使用从数据库收集的元数据进行自动分区。您可以在 Github[17] 上关注我们,了解我们最新的实施状态和我们的下一个计划!
参考资料
return_type: https://github.com/sfu-db/connector-x#parameters
[2]Github 存储库: https://github.com/sfu-db/connector-x#detailed-usage-and-examples
[3]Pandas: https://pandas.pydata.org/docs/reference/api/pandas.read_sql.html
[4]Modin: https://modin.readthedocs.io/en/latest/supported_apis/io_supported.html
[5]Dask: https://docs.dask.org/en/latest/dataframe-sql.html#loading-from-sql-with-read-sql-table
[6]流行的数据库和数据仓库: https://github.com/sfu-db/connector-x/discussions/61
[7]Amazon S3 )传输数据。: https://aws.amazon.com/s3/
[8]Pandas: https://pandas.pydata.org/
[9]Apache Arrow: https://arrow.apache.org/
[10]Modin: https://modin.readthedocs.io/en/latest/#
[11]Dask: https://dask.org/
[12]Polars: https://github.com/pola-rs/polars
[13]此处: https://github.com/sfu-db/connector-x#supported-sources–destinations
[14]如果您有其他建议: https://github.com/sfu-db/connector-x/discussions
[15]客户端数据缓存: https://github.com/sfu-db/connector-x/discussions/64
[16]优化查询分区: https://github.com/sfu-db/connector-x/discussions/65
[17]Github: https://github.com/sfu-db/connector-x
原文:
https://towardsdatascience.com/connectorx-the-fastest-way-to-load-data-from-databases-a65d4d4062d5
原文始发于微信公众号(alitrack):一行代码将 Pandas read_sql 加速 10 倍
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/62781.html