数据分析 - 用SQL替换Pandas

文章目录
  1. 1. pandas与sql的性能分析
    1. 1.1. 背景
    2. 1.2. 导出所有数据再转为DataFrame
    3. 1.3. 导出指定数据列再转为DataFrame
    4. 1.4. 不导出数据,数据库中做分析
    5. 1.5. 结果对比表
  2. 2. SQL替换Pandas
    1. 2.1. 描述性统计
    2. 2.2. 填充缺失值
      1. 2.2.1. 常量填充
      2. 2.2.2. 前、后向填充

Pandas可谓是目前数据分析最便捷的工具,但是当数据规模变大时,其“将数据加载到内存中”的特性成为了其弱点。虽然有一些方式可以一定程度上使得其能够处理大规模的数据,但是这些操作都是治标不治本。SQL自上世纪七十年代以来不断被优化,支持大规模增删改查数据。因此SQL是大规模数据分析的一把利剑。

pandas与sql的性能分析

背景

数据库中有百万条记录,每条记录分为三列:用户id,用户名及是否激活。表占据了87M的磁盘空间。

样本如下

1
2
3
4
5
6
7
8
9
10
11
12
13
postgres=# select * from users;
id | username | activated
------+----------------------------------+-----------
1 | 2764568ccd3b3cf5fb8dd00bd0c4e95b | f
2 | 53eb7dc38c4964b2d40d1077b1070da5 | f
3 | 37c3462bfa0a3f21ff79cff3abdd88ac | f
4 | d7590c1dad4019eeffdce1b9688d2df8 | f
5 | c61e1ce08ffbc247b3d74432f58e6c5c | f
postgres=# select pg_size_pretty(pg_total_relation_size('public.users'));
pg_size_pretty
----------------
87 MB
(1 行记录)

导出所有数据再转为DataFrame

Python程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# load_all_then_convert.py
from memory_profiler import profile

@profile
def run():
import psycopg2
import pandas as pd

connection = psycopg2.connect("db")

with connection.cursor() as cursor:
cursor.execute('SELECT * FROM users')
df = pd.DataFrame(
cursor.fetchall(),
columns=['id', 'username', 'activated'],
)

result = df.groupby(by='activated').count()
print(result)

if __name__=="__main__":
run()

性能分析如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
JinlongLi (master *) mine_rpn $ python load_all_then_convert.py
id username
activated
False 900749 900749
True 100251 100251
Filename: load_all_then_convert.py

Line # Mem usage Increment Occurences Line Contents
============================================================
11 40.1 MiB 40.1 MiB 1 @profile
12 def run():
13 41.4 MiB 1.3 MiB 1 import psycopg2
14 72.5 MiB 31.1 MiB 1 import pandas as pd
15
16 73.1 MiB 0.6 MiB 1 connection = psycopg2.connect("db")
17
18 73.1 MiB 0.0 MiB 1 with connection.cursor() as cursor:
19 176.0 MiB 102.8 MiB 1 cursor.execute('SELECT * FROM users')
20 276.2 MiB -93.6 MiB 2 df = pd.DataFrame(
21 369.8 MiB 193.8 MiB 1 cursor.fetchall(),
22 369.8 MiB 0.0 MiB 1 columns=['id', 'username', 'activated'],
23 )
24
25 276.3 MiB 0.1 MiB 1 result = df.groupby(by='activated').count()
26 276.3 MiB 0.0 MiB 1 print(result)

从profiling结果可以看出

  1. 导入profile库耗费40.1M内存;导入Pandas库耗费31.1M内存;
  2. 从数据库中取数据,转化为Pandas.DataFrame耗费~300M内存。

移除profile工具占用的内存,则一共占据330M内存(峰值)。数据在表中占据87M的空间。

导出指定数据列再转为DataFrame

如果不是导出所有的列,而是感兴趣的指定列,性能如何?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
JinlongLi (master *) mine_rpn $ python load_specified_cols.py
id
activated
False 900749
True 100251
Filename: load_specified_cols.py

Line # Mem usage Increment Occurences Line Contents
============================================================
9 40.0 MiB 40.0 MiB 1 @profile
10 def run():
11 41.1 MiB 1.2 MiB 1 import psycopg2
12 72.7 MiB 31.6 MiB 1 import pandas as pd
13
14 73.3 MiB 0.6 MiB 1 connection = psycopg2.connect("db")
15
16 73.3 MiB 0.0 MiB 1 with connection.cursor() as cursor:
17 128.0 MiB 54.7 MiB 1 cursor.execute('SELECT id, activated FROM users')
18 128.0 MiB -146.2 MiB 2 df = pd.DataFrame(
19 228.7 MiB 100.7 MiB 1 cursor.fetchall(),
20 228.7 MiB 0.0 MiB 1 columns=['id', 'activated'],
21 )
22
23 82.7 MiB -45.3 MiB 1 result = df.groupby(by='activated').count()
24 82.8 MiB 0.1 MiB 1 print(result)

不导出数据,数据库中做分析

如果在不将数据导出,而直接在数据库中做聚合操作,性能表现如何呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
JinlongLi (master *) mine_rpn $ python agg_in_db.py
cnt activated
0 False 900749
1 True 100251
Filename: agg_in_db.py

Line # Mem usage Increment Occurences Line Contents
============================================================
9 40.0 MiB 40.0 MiB 1 @profile
10 def run():
11 41.2 MiB 1.2 MiB 1 import psycopg2
12 72.8 MiB 31.6 MiB 1 import pandas as pd
13
14 73.4 MiB 0.6 MiB 1 connection = psycopg2.connect("db")
15
16 73.4 MiB 0.0 MiB 1 with connection.cursor() as cursor:
17 73.4 MiB 0.0 MiB 1 cursor.execute('''
18 SELECT activated, count(*) as cnt
19 FROM users
20 GROUP BY activated
21 ''')
22 73.6 MiB 0.1 MiB 2 result = pd.DataFrame(
23 73.4 MiB 0.0 MiB 1 cursor.fetchall(),
24 73.4 MiB 0.0 MiB 1 columns=['cnt', 'activated'],
25 )
26
27 74.0 MiB 0.4 MiB 1 print(result)

可以看到,移除profile工具占用的内存,整个程序一共耗费34M的内存。与之前的方式相比,这种方式大大降低了内存占用。

结果对比表

项目 内存(M) 内存-移除pandas(M) 耗时(s)
导出整张表 330 300 3.6
导出指定列 188 157 3.2
在数据库中做聚合操作,导出结果 34 2.4 2.4

可以看出pandas处理百万行三列数据耗费内存0.3G,而利用SQL来执行分析运算,将其内存占用降低了百倍。

SQL替换Pandas

描述性统计

  1. 如何用pandas实现一些pandas提供的功能描述性统计(descriptive statistics)

pandas:数值Series

1
2
3
4
5
6
7
8
9
10
11
>>> s = pd.Series([1, 2, 3])
>>> s.describe()
count 3.0
mean 2.0
std 1.0
min 1.0
25% 1.5
50% 2.0
75% 2.5
max 3.0
dtype: float64

sql

1
2
3
4
5
6
7
8
9
SELECT
count(*),
avg(n),
stddev(n),
min(n),
percentile_cont(array[0.25, 0.5, 0.75]) WITHIN GROUP (ORDER BY n),
max(n)
FROM
s;

pandas:类别Series

1
2
3
4
5
6
7
>>> s = pd.Series(['a', 'a', 'b', 'c'])
>> s.describe()
count 4
unique 3
top a
freq 2
dtype: object

sql

1
2
3
4
5
6
SELECT
count(*),
count(DISTINCT V) AS unique,
mode() WITHIN GROUP (ORDER BY V) AS top
FROM
s;

填充缺失值

常量填充

pandas:

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import pandas as pd
>>> import numpy as np
>>> df = pd.DataFrame(['A', 'B', np.NaN, 'D', np.NaN, np.NaN, 'G'])
>>> df.fillna('X')

0
0 A
1 B
2 X
3 D
4 X
5 X
6 G

sql

1
2
3
4
5
SELECT
n,
coalesce(v, 'X') AS v
FROM
tb;

coalesce: 接收任意个参数,返回第一个非NULL的参数

前、后向填充

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
WITH tb AS (
SELECT * FROM (VALUES
(1, 'A' ),
(2, 'B' ),
(3, null),
(4, 'D' ),
(5, null),
(6, null),
(7, 'G' )
) AS t(n, v)
)
SELECT
*,
-- 后面第一个非空的值
coalesce(v, (
SELECT v
FROM tb AS tb_
WHERE tb_.n < tb.n AND v IS NOT NULL
ORDER BY n DESC
LIMIT 1
)) AS ffill_v,

-- 前面第一个非空的值
coalesce(v, (
SELECT v
FROM tb as tb_
WHERE tb_.n > tb.n AND v IS NOT NULL
ORDER BY n ASC
LIMIT 1
)) as bfill_v
FROM
tb;