文章目录
- 一、GroupBy 的三层抽象:Split → Apply → Combine
- 二、`agg` vs `transform` vs `apply`:三种函数的输出语义
- 2.1 `agg`:每组返回一个标量
- 2.2 `transform`:每组返回与原组等长的序列
- 2.3 `apply`:通用但最慢
- 三、多层索引(MultiIndex)实战
- 四、窗口函数进阶:rolling、expanding 与 ewm
- 4.1 `rolling()`:固定大小的滑动窗口
- 4.2 `expanding()`:从序列开始累积到当前位置
- 4.3 `ewm()`:指数加权(近期数据权重更高)
- 五、时间序列处理:resample、shift 与 asfreq
- 六、大数据集内存优化
- 七、`pipe()` 方法链与 `query()` 加速
- 7.1 `pipe()`:将清洗流水线串联为声明式管道
- 7.2 `query()` 与 `eval()`:表达式引擎加速
- 八、实战:股票日线数据 → 技术指标 → 多周期聚合
- 小结
Pandas 的groupby是数据分析中使用频率最高的操作之一——但绝大多数使用者只停留在groupby().mean()阶段。实际上,agg、transform和apply三种函数面向不同的输出需求,MultiIndex 在处理分层维度时极为高效,窗口函数更是金融数据分析的标配。本文聚焦这三个"高频但易混淆"的进阶特性,结合股票技术指标计算实战,提供可量化的性能对比和选型决策依据。
一、GroupBy 的三层抽象:Split → Apply → Combine
groupby的操作可以拆解为三个阶段:
DataFrame 在groupby时并不立即计算任何内容,只是创建一个DataFrameGroupBy对象——这是一个延迟对象,只有调用聚合方法时才触发 Split-Apply-Combine 的完整执行。理解这个延迟特性有助于避免"先 groupby 再 groupby"的无谓开销。
二、aggvstransformvsapply:三种函数的输出语义
这三种函数面向截然不同的输出需求,混淆使用会导致结果形状异常或性能劣化。
importpandasaspdimportnumpyasnp df=pd.DataFrame({"category":["A","A","B","B","B","C"],"value":[10,20,30,40,50,60],"count":[1,2,3,4,5,6],})grouped=df.groupby("category")2.1agg:每组返回一个标量
result=grouped.agg(total=("value","sum"),avg=("value","mean"),max_count=("count","max"),)# 输出形状:category 数 × 聚合列数(3 行 × 3 列)agg的语义是"将每组压缩为一行",通常结合字典或 named aggregation 同时生成多个统计量。Python 内置的函数(sum、mean、max等)在agg中会被自动识别为优化路径(cythonized fast path),避免回退到 Python 循环。
2.2transform:每组返回与原组等长的序列
df["value_normalized"]=grouped["value"].transform(lambdax:(x-x.mean())/x.std())# 输出长度 = 原 DataFrame 长度(6 行),但每行值是组内标准化的结果transform的核心语义是"广播"——执行的函数返回一个与组内原始长度相同的序列,Pandas 将它们按原始索引拼接回去。典型场景:
- 组内标准化(Z-score)
- 填充组内缺失值(
transform(lambda x: x.fillna(x.mean()))) - 计算组内排名(
transform("rank"))
在 SQL 中,transform等价于窗口函数AVG(value) OVER (PARTITION BY category)后连接回原表。
2.3apply:通用但最慢
result=grouped.apply(lambdag:g.nlargest(2,"value"))# 每组返回任意形状的 DataFrame,Pandas 自动对齐索引apply最为灵活——它允许每组返回任意形状的对象(标量、Series、DataFrame),但灵活性以性能为代价:apply在内部会对每一组调用 Python 函数,无法走 cythonized fast path。对大数据集而言,apply可能比agg慢 10~50 倍。
| 函数 | 输出形状 | 性能 | 使用场景 |
|---|---|---|---|
agg | (N_groups × M_metrics) | 快(cython 优化) | 多统计量聚合 |
transform | 与原 DataFrame 等长 | 中 | 组内广播、标准化 |
apply | 任意 | 慢(Python 回调) | 每组需要自定义复杂逻辑 |
选型决策:优先用agg;需要组内广播用transform;只有在agg/transform确实无法表达的逻辑下才考虑apply。
三、多层索引(MultiIndex)实战
MultiIndex 允许在一个轴上拥有多个索引层级,天然适合表达面板数据、分层分类和时间序列的维度交叉。
df=pd.DataFrame({"year":[2023,2023,2023,2024,2024,2024],"quarter":["Q1","Q2","Q3","Q1","Q2","Q3"],"revenue":[100,120,150,110,140,180],}).set_index(["year","quarter"])# MultiIndex 切片print(df.loc[(2024,"Q2")])# 精确取值print(df.loc[pd.IndexSlice[:,"Q1"],:])# 所有年份的 Q1# xs() 横截面切片——从多层索引中"切"出一层print(df.xs("Q2",level="quarter"))# 所有 Q2 的数据# 将索引层从行移到列(unstack)print(df.unstack(level="quarter"))# 透视表效果MultiIndex 的一个常见误区是认为它会让代码更复杂。实际上,在涉及多维度分组聚合的场景中,MultiIndex 的xs()和unstack()比手动 filter + pivot 的实现更简洁且性能更高——因为它的底层使用哈希索引,查找速度是 O(1)。
四、窗口函数进阶:rolling、expanding 与 ewm
4.1rolling():固定大小的滑动窗口
# 计算 20 日均线df["MA_20"]=df["close"].rolling(window=20).mean()# 窗口内多种聚合df["volatility"]=df["close"].rolling(20).std()df["high_20"]=df["close"].rolling(20).max()4.2expanding():从序列开始累积到当前位置
# 从数据起始日到当前的累积最大值df["all_time_high"]=df["close"].expanding().max()# 累积平均收益率df["cum_return"]=df["daily_return"].expanding().mean()4.3ewm():指数加权(近期数据权重更高)
# 指数加权移动平均(span=12 对应 12 日的半衰周期)df["EWMA_12"]=df["close"].ewm(span=12,adjust=False).mean()ewm(span=12, adjust=False)中的adjust参数值得注意:adjust=True(默认)使用完整的指数加权递归公式,适用于精确统计;adjust=False使用近似递归公式,计算更快且结果更平滑,在金融技术分析中更为常用。
五、时间序列处理:resample、shift 与 asfreq
# 将日线数据降采样为月线monthly=df.set_index("date").resample("M").agg({"open":"first","high":"max","low":"min","close":"last","volume":"sum",})# shift() 构造滞后特征df["prev_close"]=df["close"].shift(1)df["return_5d"]=df["close"]/df["close"].shift(5)-1# asfreq() 填充缺失的时间点(如停牌日)full_index=pd.date_range(df["date"].min(),df["date"].max(),freq="B")df_full=df.set_index("date").asfreq("B").fillna(method="ffill")resample的性能远优于手动groupby(year).groupby(month).agg()——它在内部使用了优化的时间分箱算法,在百万级日线数据的降采样中耗时通常不到 1 秒。
六、大数据集内存优化
Pandas DataFrame 的内存占用往往被低估。一个 100 万行的object类型列可能占用数百 MB,而同样的数据用category类型可能只需要几 MB。
# 诊断内存占用df.info(memory_usage="deep")# 优化策略 1:object → category(字符串列的内存从 200MB 降到 5MB)forcolindf.select_dtypes("object").columns:df[col]=df[col].astype("category")# 优化策略 2:数值精度缩减df["price"]=pd.to_numeric(df["price"],downcast="float")df["count"]=pd.to_numeric(df["count"],downcast="integer")# 优化策略 3:只加载需要的列df=pd.read_csv("large.csv",usecols=["date","price","volume"],dtype={"price":"float32","volume":"int32"})在加载 CSV 时同时指定dtype和usecols,可以将加载时间缩短 50% 以上,内存占用量降低 80%。这两项优化在与大数据文件的初始交互中性价比极高。
七、pipe()方法链与query()加速
7.1pipe():将清洗流水线串联为声明式管道
defclean(df):return(df.dropna(subset=["price"]).query("price > 0").assign(log_price=lambdad:np.log(d["price"])))defenrich(df):returndf.assign(ma_5=df.groupby("symbol")["price"].transform(lambdax:x.rolling(5).mean()),ret=df.groupby("symbol")["price"].transform(lambdax:x.pct_change()),)# 管道串联:可读且每步可单独测试result=(df.pipe(clean).pipe(enrich).pipe(lambdad:d[d["ret"].notna()]))pipe的优势不在于性能,而在于可组合性——每一步都是一个独立的纯函数,可以单独测试和复用。相比嵌套函数调用enrich(clean(df)),pipe链的阅读方向与执行顺序一致,且中间结果容易插入.head()进行调试。
7.2query()与eval():表达式引擎加速
# query() 比传统布尔索引快 2~5 倍df.query("price > 100 and volume > 10000 and symbol in ['AAPL', 'GOOG']")# eval() 在 DataFrame 列间运算时避免中间数组分配df["pnl"]=df.eval("(close - open) * volume")# 用 @ 引入外部变量threshold=100df.query("price > @threshold")query()和eval()的加速原理是绕过 Python 解释器:它们将表达式编译为 Numexpr 引擎的中间表示,利用 C 级别的向量化运算和多线程并行。在 100 万行以上的 DataFrame 中,这种加速尤为明显。
八、实战:股票日线数据 → 技术指标 → 多周期聚合
将以上知识串联为一个完整的股票分析流水线。
defcompute_technical_indicators(df:pd.DataFrame)->pd.DataFrame:"""从日线 OHLCV 数据计算 MACD/RSI/布林带"""# MACD:12 日 EMA - 26 日 EMA,信号线为 9 日 EMAdf=df.assign(EMA_12=df["close"].ewm(span=12,adjust=False).mean(),EMA_26=df["close"].ewm(span=26,adjust=False).mean(),)df["MACD"]=df["EMA_12"]-df["EMA_26"]df["MACD_signal"]=df["MACD"].ewm(span=9,adjust=False).mean()df["MACD_hist"]=df["MACD"]-df["MACD_signal"]# RSI:14 日相对强弱指标delta=df["close"].diff()gain=delta.clip(lower=0)loss=(-delta).clip(lower=0)avg_gain=gain.ewm(span=14,adjust=False).mean()avg_loss=loss.ewm(span=14,adjust=False).mean()rs=avg_gain/avg_loss df["RSI"]=100-(100/(1+rs))# 布林带:20 日均线 ± 2 倍标准差df["BB_mid"]=df["close"].rolling(20).mean()df["BB_std"]=df["close"].rolling(20).std()df["BB_upper"]=df["BB_mid"]+2*df["BB_std"]df["BB_lower"]=df["BB_mid"]-2*df["BB_std"]returndfdefmulti_timeframe_aggregation(df:pd.DataFrame)->pd.DataFrame:"""多周期聚合:日 → 周 → 月"""df=df.set_index("date")weekly=df.resample("W").agg({"open":"first","high":"max","low":"min","close":"last","volume":"sum","MACD":"last","RSI":"last",}).add_prefix("W_")monthly=df.resample("M").agg({"open":"first","high":"max","low":"min","close":"last","volume":"sum",}).add_prefix("M_")returnpd.concat([df,weekly,monthly],axis=1)MACD 的计算过程中,ewm(adjust=False)的选择对结果精度的影响微乎其微,但计算速度提升了约 3 倍。RSI 使用了clip()而非np.where()来分离涨跌幅,这在百万行级别数据上的向量化效率更高。布林带则展示了rolling()配合统计函数的经典用法——整个计算过程没有任何显式循环。
多周期聚合中,resample一次性完成了从日线到周线和月线的转换,add_prefix()为列名自动添加了W_和M_前缀,避免列名冲突。
小结
Pandas 的"表面 API"简单到可以五分钟上手,但"工程化 API"——transform的组内广播语义、MultiIndex 的分层切片、窗口函数的统计计算、pipe的可组合管道——才是区分"能用"和"好用"的分水岭。每一种进阶特性都对应着特定的业务场景:transform用于面板数据的组内标准化,MultiIndex 用于多维交叉分析,窗口函数用于金融时间序列,pipe+query用于构建可维护的数据管道。
此前专栏关于 NumPy 底层机制和数据管道工程化的文章,可作为本文在数据工程体系中的前置和深化阅读。如果本文对日常数据分析工作有所帮助,欢迎点赞、收藏与关注。