1. 项目概述:为什么多维聚合不是“加个groupby”就能搞定的事
我在银行数据平台组干了八年,从最早用SQL写几十行嵌套子查询做客户分层,到现在每天在Jupyter里调试pandas的agg链式调用,踩过的坑比写的代码还多。今天这篇讲的“多维聚合”,绝不是教你怎么把df.groupby('col').sum()敲得更顺——那是实习生第一天就能学会的。真正卡住90%数据工程师、分析师、甚至初级数据科学家的,是当业务方甩来一句:“我要看华东区高净值客户在旅游和餐饮类商户的月度交易金额中位数、30天滚动均值、单笔交易极差,再按季度同比拆解,最后导出成Excel给风控总监过会”时,你手里的pandas代码是不是当场就报KeyError: 'region',或者跑出来一个带三层索引、列名全是元组、连自己都看不懂的DataFrame。
这背后根本不是语法问题,而是业务逻辑到计算范式的映射断层。比如“滚动均值”在风控场景里从来不是为了平滑曲线,而是为了捕捉异常行为的时间窗口敏感性——3天太短,抓不住团伙作案节奏;7天又太长,等模型报警时钱已经转走了。再比如“中位数”在财务报表里被反复强调,不是因为它比平均数“高级”,而是因为信用卡数据里永远有那1%的黑产用户,单笔刷50万,能把整个区域的均值拉高3倍,但中位数纹丝不动。这些细节,官方文档不会写,Stack Overflow的答案也只告诉你rolling(7).mean()怎么写,却没人告诉你为什么是7,而不是6或8。
我见过太多团队把这部分工作硬塞给BI工程师,结果他们用Power BI拖拽半天,发现“滚动计算+多维分组+自定义函数”三者一叠加,性能直接崩盘,刷新一次要两分钟。最后不得不回退到Python脚本预处理,再把结果灌进BI工具——本质上,是把数据处理的复杂性从分析层转移到了ETL层,治标不治本。这篇文章要解决的,就是让你亲手把这套逻辑“焊死”在pandas里,让它像呼吸一样自然:输入原始交易流水,输出可直接喂给风控模型、财务看板、运营日报的干净结构化数据。核心关键词就三个:多维聚合、滚动计算、业务可解释性。适合所有每天和交易数据、日志数据、IoT时序数据打交道的人,无论你是刚转行的数据分析师,还是带团队的数仓架构师——只要你的日报里还有“环比”“同比”“Top N”“异常波动”这类词,你就绕不开它。
2. 多维聚合的核心设计:为什么必须放弃“先group再merge”的老路
2.1 传统思路的致命缺陷:三次IO,两次内存爆炸
先说个真实案例。去年我们给某城商行做反洗钱系统升级,原始需求是:统计每个客户在每个商户类别(餐饮/零售/旅游)下的“近30天交易笔数”“近30天交易金额总和”“单笔交易金额标准差”“最大单笔金额”。当时外包团队交来的方案是典型的“分步法”:
# 步骤1:算笔数 count_df = df.groupby(['customer_id', 'category'])['amount'].count().reset_index(name='txn_count') # 步骤2:算金额总和 sum_df = df.groupby(['customer_id', 'category'])['amount'].sum().reset_index(name='txn_sum') # 步骤3:算标准差 std_df = df.groupby(['customer_id', 'category'])['amount'].std().reset_index(name='txn_std') # 最后merge! result = count_df.merge(sum_df, on=['customer_id','category']).merge(std_df, on=['customer_id','category'])表面看没问题,但实际跑起来,当客户量超过50万、日均交易1000万笔时,光是第一次groupby就吃掉12GB内存,merge操作更是让机器风扇狂转,单次计算耗时47分钟。问题出在哪?三次独立的groupby,意味着对同一份原始数据扫描了三遍。pandas的groupby底层是哈希分组,每次都要重建哈希表、重排数据块、重新分配内存。更糟的是,merge操作需要对三个DataFrame再次排序或建索引,又是一轮CPU和内存的双重暴击。
提示:pandas的
agg字典映射不是语法糖,而是底层优化的入口。当你写df.groupby(...).agg({'col1': 'sum', 'col2': 'mean'})时,pandas会启动一个单通道聚合引擎,在一次数据遍历中,对每一行同时计算多个列的不同聚合函数,中间结果全程驻留内存,不落磁盘、不重建索引。这是性能差异的根源。
2.2 真正高效的多维聚合:单次扫描,多路输出
我们重构后的核心代码只有这一行:
result = df.groupby(['customer_id', 'category']).agg({ 'amount': ['count', 'sum', 'std', 'max'], 'fee': ['sum', 'mean'] })注意看输出结构:
amount fee count sum std max sum mean customer_id category C001 Dining 6 1887.12 98.23 447.39 47.18 7.86 Groceries 6 1880.28 112.45 251.61 47.01 7.84 ...这里的关键在于列名的层级设计。外层amount和fee是原始字段名,内层count/sum/std是聚合函数名。这种双层索引(MultiIndex)不是bug,是feature——它天然保留了计算维度的语义关系。比如你要取所有客户的“交易金额总和”,直接result[('amount','sum')]即可;要筛选“标准差大于100的商户类别”,用result[('amount','std')] > 100一行搞定。而传统merge方案产出的是扁平列名(txn_count,txn_sum,txn_std),语义割裂,后续筛选时还得手动拼字符串。
但新手常在这里栽跟头:如何把这种层级结构变成下游系统能吃的格式?直接result.reset_index()会得到一个带元组列名的DataFrame,Excel打不开,BI工具认不了。正确解法是result.columns = ['_'.join(col).strip() for col in result.columns.values],把('amount','sum')变成'amount_sum',这才是生产环境该有的输出。
2.3 实战避坑:当业务要求“不同列用不同分组键”时怎么办?
最棘手的场景来了:财务部要“按客户+产品线统计收入”,风控部要“按客户+地区统计风险敞口”,两个维度完全不同,但数据源是一张大宽表。难道要拆成两个groupby?不,用pd.concat配合keys参数:
# 定义两个分组视角 by_customer_product = df.groupby(['customer_id', 'product_line'])['revenue'].sum() by_customer_region = df.groupby(['customer_id', 'region'])['exposure'].sum() # 合并为一张表,用keys标记来源 combined = pd.concat([ by_customer_product, by_customer_region ], keys=['revenue_by_product', 'exposure_by_region'], axis=1)输出是:
revenue_by_product exposure_by_region customer_id region product_line C001 North Widget 15500.0 8900.0 South Gadget 13750.0 12400.0这样既避免重复扫描,又保持了业务语义的清晰隔离。我在线上系统里用这招处理过17个不同部门的定制化报表需求,维护成本降了80%。
3. 自定义聚合函数:别再用lambda写“一行式业务逻辑”
3.1 Lambda的幻觉:看似简洁,实则埋雷
原文示例里用lambda x: x.max() - x.min()算极差,初学者一看:“哇,一行搞定!”但真放到生产环境,这行代码会成为你半夜被叫醒的元凶。原因有三:
- 无法调试:当
x.max() - x.min()返回NaN时,你根本不知道是x为空、还是x.max()本身是NaN。lambda没有函数名,日志里只显示<lambda>,排查时只能靠猜。 - 无法复用:同一个极差计算,财务报表要一次,风控模型要一次,运营日报又要一次。每次复制粘贴lambda,哪天业务方说“极差要排除异常值”,你得改三处。
- 无法测试:单元测试框架没法对lambda打桩(mock),你只能测整个groupby流程,测试覆盖率暴跌。
我见过最惨的案例:某支付公司用lambda实现“加权平均费率”,上线后发现对新客(交易少)计算结果偏差20%,但因为lambda没文档、没类型提示,花了三天才定位到权重向量生成逻辑有误。
3.2 命名函数的工业级写法:文档即代码,类型即契约
正确的姿势是写带完整类型注解、文档字符串、边界检查的命名函数。以“风险加权交易额”为例(银行真实需求:对近期交易赋予更高权重,反映客户当前活跃度):
from typing import Union, Optional import numpy as np def risk_weighted_transaction(series: pd.Series, window_days: int = 30, decay_factor: float = 0.95) -> float: """ 计算风险加权交易额:时间越近的交易,权重越高 Parameters ---------- series : pd.Series 交易金额序列,索引应为datetime(否则抛ValueError) window_days : int, default 30 权重衰减窗口,单位:天 decay_factor : float, default 0.95 每日衰减系数,0.95表示每天权重乘以0.95 Returns ------- float 加权后交易总额,若series为空返回np.nan Raises ------ ValueError 当series索引非datetime类型时抛出 """ if len(series) == 0: return np.nan # 强制校验索引类型 if not isinstance(series.index, pd.DatetimeIndex): raise ValueError("Series index must be DatetimeIndex for time-based weighting") # 计算每笔交易距最新日期的天数 latest_date = series.index.max() days_diff = (latest_date - series.index).days # 生成权重:越近的天数,权重越大(指数衰减) weights = np.power(decay_factor, days_diff) # 加权求和 weighted_sum = np.average(series, weights=weights) * len(series) return float(weighted_sum) # 在agg中使用 result = df.groupby('customer_id').agg({ 'amount': risk_weighted_transaction, 'fee': 'sum' })这段代码的价值远超计算本身:
- 类型注解让IDE能自动补全、静态检查,减少低级错误;
- 文档字符串直接告诉同事“这个函数干什么、参数怎么配、异常怎么处理”,比写Wiki还及时;
- 边界检查(空序列、索引类型)让错误在聚合前暴露,而不是等报表生成失败才报警;
- 可测试性:你可以单独写单元测试验证
risk_weighted_transaction(pd.Series([100,200], index=pd.date_range('2024-01-01','2024-01-02'))) == 295.0,精准覆盖业务逻辑。
3.3 高阶技巧:用partial固化参数,避免重复配置
业务方经常提这种需求:“A部门用7天衰减,B部门用14天衰减,C部门用30天衰减”。如果为每个部门写一个函数,代码冗余爆炸。用functools.partial:
from functools import partial # 固化参数,生成专用函数 risk_weighted_7d = partial(risk_weighted_transaction, window_days=7, decay_factor=0.98) risk_weighted_14d = partial(risk_weighted_transaction, window_days=14, decay_factor=0.97) risk_weighted_30d = partial(risk_weighted_transaction, window_days=30, decay_factor=0.95) # 在agg中直接使用 result = df.groupby('customer_id').agg({ 'amount': risk_weighted_7d, # A部门 'fee': risk_weighted_14d # B部门 })partial返回的是真正的函数对象,支持__name__、__doc__,日志里清清楚楚写着risk_weighted_7d,运维排查时一眼就知道是哪个部门的策略。
4. 滚动与扩展窗口:时间维度不是加个rolling就完事
4.1 滚动窗口的三大生死线:对齐、填充、最小周期
原文示例df.rolling(window=3).mean()输出前两行是NaN,这在演示时无所谓,但在生产环境是灾难。想象一下:风控模型用这个滚动均值判断“交易是否异常”,第一天就返回NaN,模型直接拒绝评分,整条流水线卡死。我们必须明确回答三个问题:
- 对齐方式:
rolling默认closed='right'(包含当前行),但有些场景需要closed='both'(包含首尾)或closed='left'(不含当前行,纯历史)。比如“预测明天交易额”,就不能用包含今天的窗口。 - 空值填充:
NaN不能直接喂给下游。常见方案有:fillna(method='ffill'):向前填充,适合趋势稳定的指标;fillna(0):归零,适合计数类指标(如“近3天交易笔数”,没数据就是0);dropna():丢弃,适合离线批处理,但实时流处理不可用。
- 最小周期:
min_periods参数是救命稻草。设min_periods=1,第一行就能计算(用单个值的均值),避免全NaN。但要注意:min_periods=1时,rolling(3).mean()对第一行返回x[0],第二行返回(x[0]+x[1])/2,第三行才是(x[0]+x[1]+x[2])/3——这符合业务吗?财务报表通常要求“满窗才计算”,而风控可能要求“见数据就预警”。
我们线上系统的标准模板:
def safe_rolling_mean(series: pd.Series, window: int = 7, min_periods: int = 3, fill_method: str = 'ffill') -> pd.Series: """带安全兜底的滚动均值计算""" rolled = series.rolling(window=window, min_periods=min_periods).mean() if fill_method == 'ffill': return rolled.fillna(method='ffill') elif fill_method == 'zero': return rolled.fillna(0) else: return rolled.dropna() # 使用 df['7day_avg_amount'] = safe_rolling_mean(df['amount'], window=7, min_periods=3)4.2 扩展窗口的隐藏陷阱:cumsum不是万能的
expanding().sum()看着很美,但有个致命问题:它不支持分组内的独立计算。原文示例df_ts.groupby('category')['daily_revenue'].expanding().sum()能跑,是因为category只有一类('Electronics')。一旦你有多类,比如['Electronics','Clothing'],expanding()会把所有类别的数据混在一起累计!这是pandas 1.x的已知bug,直到2.0才修复。
正确解法是用groupby.apply配合cumsum:
# 错误示范(pandas <2.0) df.groupby('category')['revenue'].expanding().sum() # 会跨类别累计! # 正确解法:先分组,再对每组内部cumsum df['cumulative_revenue'] = df.groupby('category')['revenue'].apply(lambda x: x.cumsum())更进一步,如果你要“按客户计算年累计消费”,但客户数据是按天插入的,中间有缺失日期(比如客户某天没交易),cumsum()会跳过空值,导致累计值断层。这时必须先reindex补全日期:
# 补全客户每日交易记录(无交易记0) date_range = pd.date_range(df['date'].min(), df['date'].max(), freq='D') df_full = df.set_index('date').reindex(date_range, fill_value=0).reset_index() # 再按客户cumsum df_full['ytd_spend'] = df_full.groupby('customer_id')['amount'].cumsum()4.3 时间窗口的终极形态:基于业务事件的滚动计算
最高阶的需求是:“统计客户最近5笔交易的平均金额”。注意,这不是“最近5天”,而是“最近5笔”——和时间无关,和事件次数有关。rolling的window参数只接受整数或时间字符串,不支持“按行数滚动”。解法是groupby+apply+tail:
def last_n_avg(series: pd.Series, n: int = 5) -> float: """计算最后n笔交易的平均值""" return series.tail(n).mean() if len(series) >= n else np.nan # 按客户分组,对每组的amount序列取最后5个均值 df['last_5_avg'] = df.groupby('customer_id')['amount'].apply(last_n_avg)这个模式在反欺诈中极其关键:黑产洗钱往往有固定笔数模式(如每轮5笔),监控“最后5笔均值突变”比“7天均值突变”灵敏得多。
5. 多级分组与透视:让老板一眼看懂的报表长什么样
5.1 unstack的本质:从“树状索引”到“表格矩阵”
原文用unstack()把groupby(['region','product'])的结果转成行列分明的表格,但没讲透为什么这一步不可替代。核心在于:人类大脑处理二维表格的效率,远高于处理嵌套文本。
看这个原始groupby结果:
region product North Widget 15500.0 Gadget 12000.0 South Widget 18000.0 Gadget 13750.0 Name: revenue, dtype: float64这是个Series,索引是MultiIndex。销售总监扫一眼,得在脑子里构建坐标系:“North的Widget是15500,South的Widget是18000…所以Widget在South更好?”而unstack()后:
product Gadget Widget region North 12000 15500 South 13750 18000立刻变成可比矩阵:同一行看区域差异,同一列看产品差异。这就是BI工具拼命想模拟却总差口气的“交叉分析”能力。
但unstack()有坑:默认会把最内层索引(这里是product)转成列。如果你想要“产品作行、区域作列”,得用unstack(level=0)(指定转哪一层)。更常见的是,分组后有多列聚合,比如:
result = df.groupby(['region','product']).agg({'revenue':'sum', 'profit':'mean'}) # result是两层索引(region,product)+两层列(revenue,profit) # unstack()默认转product,但列名会变成(revenue,'sum'), (profit,'mean')——太丑解决方案是unstack(level=1)(转product层),再droplevel(0, axis=1)去掉多余的外层列名。
5.2 生产级透视:处理缺失值、重命名、导出Excel
真实数据永远有缺失。unstack()遇到某个region-product组合无数据时,会填NaN,但财务报表要求“空值显示为0”。unstack(fill_value=0)是基础,但还不够:
# 标准化透视流程 pivot_table = ( df.groupby(['region', 'product'])['revenue'] .sum() .unstack(fill_value=0) # 缺失值填0 .rename(columns={'Widget': '小部件', 'Gadget': '小工具'}) # 中文列名 .rename_axis(index='大区', columns='产品线') # 中文轴名 .sort_index() # 按region字母序排列 ) # 导出Excel时,确保格式正确 with pd.ExcelWriter('revenue_pivot.xlsx') as writer: pivot_table.to_excel(writer, sheet_name='营收透视') # 获取工作表对象,设置列宽 worksheet = writer.sheets['营收透视'] for column in ['A', 'B', 'C']: worksheet.column_dimensions[column].width = 15这段代码产出的Excel,打开就是老板想要的样子:中文标题、数字右对齐、列宽适中、无NaN。而不用你手动在Excel里调格式。
5.3 超越unstack:用crosstab做轻量级交叉分析
当你的聚合只是简单计数或求和,pd.crosstab比groupby.unstack()更直观:
# 统计各地区各产品线的交易笔数 count_table = pd.crosstab( df['region'], df['product'], values=df['amount'], # 可选:用amount值代替计数 aggfunc='count', # 或'sum', 'mean' margins=True, # 添加总计行/列 dropna=False # 保留NaN分类 )crosstab的优势是:语法直白(crosstab(行, 列)),内置margins(总计)、normalize(百分比)等开箱即用功能,适合快速探索性分析。但复杂聚合(如同时算sum和std)还是得回groupby。
6. 端到端实战:银行信用卡风控分析流水线
6.1 数据准备:模拟真实交易流的五个关键特征
原文生成的模拟数据过于理想。真实信用卡数据有五大特征,必须在模拟时体现:
- 时间不连续:客户不会每天交易,周末交易量低;
- 金额长尾分布:90%交易在100元以下,10%在1000元以上,0.1%在10000元以上;
- 商户类别漂移:客户消费习惯随季节变化(夏季旅游多,冬季餐饮多);
- 手续费非线性:小额交易费率高(2.5%),大额交易费率低(0.5%);
- 设备指纹:同一客户可能用手机、POS机、网银多渠道交易。
我们重写数据生成器:
def generate_realistic_transactions(n_samples=10000): np.random.seed(42) customers = [f'C{str(i).zfill(3)}' for i in np.random.choice(range(1, 500), n_samples)] # 模拟时间:集中在工作日,避开节假日 base_dates = pd.date_range('2024-01-01', '2024-12-31', freq='D') workdays = base_dates[base_dates.weekday < 5] # 去掉周末 dates = np.random.choice(workdays, n_samples, replace=True) # 商户类别:按季度调整权重(Q1餐饮多,Q3旅游多) quarters = pd.cut(pd.to_datetime(dates).quarter, bins=[0,1,2,3,4], labels=['Q1','Q2','Q3','Q4']) category_weights = { 'Q1': {'Groceries':0.4, 'Dining':0.3, 'Retail':0.2, 'Travel':0.1}, 'Q2': {'Groceries':0.3, 'Dining':0.4, 'Retail':0.2, 'Travel':0.1}, 'Q3': {'Groceries':0.2, 'Dining':0.2, 'Retail':0.2, 'Travel':0.4}, 'Q4': {'Groceries':0.3, 'Dining':0.3, 'Retail':0.3, 'Travel':0.1} } categories = [] for q in quarters: weights = list(category_weights[q].values()) cats = list(category_weights[q].keys()) categories.append(np.random.choice(cats, p=weights)) # 金额:对数正态分布,模拟长尾 log_amounts = np.random.lognormal(mean=5.5, sigma=1.2, size=n_samples) amounts = np.clip(log_amounts, 10, 50000) # 截断极端值 # 手续费:阶梯费率 fees = [] for amt in amounts: if amt < 100: fee_rate = 0.025 elif amt < 1000: fee_rate = 0.018 else: fee_rate = 0.005 fees.append(round(amt * fee_rate, 2)) return pd.DataFrame({ 'date': dates, 'customer_id': customers, 'category': categories, 'amount': np.round(amounts, 2), 'fee': fees }) df = generate_realistic_transactions(50000) print(f"生成{len(df)}条交易记录,时间范围:{df['date'].min()} ~ {df['date'].max()}")运行后,你会看到数据分布接近真实:amount的describe()显示25%分位数约85元,75%分位数约820元,长尾明显。
6.2 六步风控分析流水线:从原始数据到决策建议
步骤1:多维基础统计(防欺诈第一道关)
# 按客户+商户类别,计算核心风控指标 base_stats = df.groupby(['customer_id', 'category']).agg({ 'amount': ['count', 'sum', 'mean', 'std', 'min', 'max'], 'fee': ['sum', 'mean'] }).round(2) # 重命名列,便于理解 base_stats.columns = ['txn_count', 'txn_sum', 'txn_mean', 'txn_std', 'txn_min', 'txn_max', 'fee_sum', 'fee_mean'] # 标记高风险客户:单类商户交易标准差 > 5000 且 均值 > 5000 high_risk_mask = (base_stats['txn_std'] > 5000) & (base_stats['txn_mean'] > 5000) high_risk_customers = base_stats[high_risk_mask].index.get_level_values('customer_id').unique() print(f"初步识别高风险客户{len(high_risk_customers)}名:{list(high_risk_customers[:5])}")步骤2:滚动窗口检测异常模式(动态风控)
# 按客户排序,计算7天滚动交易均值和标准差 df_sorted = df.sort_values(['customer_id', 'date']).set_index('date') df_sorted['7day_mean'] = df_sorted.groupby('customer_id')['amount'].rolling('7D').mean().values df_sorted['7day_std'] = df_sorted.groupby('customer_id')['amount'].rolling('7D').std().values # 计算“当前交易 vs 近期均值”的偏离度(Z-score) df_sorted['z_score'] = (df_sorted['amount'] - df_sorted['7day_mean']) / (df_sorted['7day_std'] + 1e-8) # 标记Z-score > 3的异常交易(3倍标准差) anomaly_mask = df_sorted['z_score'].abs() > 3 anomaly_trx = df_sorted[anomaly_mask].reset_index()[['customer_id', 'date', 'category', 'amount', 'z_score']] print(f"发现{len(anomaly_trx)}笔异常交易,最高Z-score:{anomaly_trx['z_score'].abs().max():.2f}")步骤3:扩展窗口追踪客户生命周期价值(CLV)
# 按客户计算年累计消费(YTD) df_sorted['ytd_spend'] = df_sorted.groupby('customer_id')['amount'].expanding().sum().values # 计算客户“首次交易日”和“最近交易日” first_last = df.groupby('customer_id')['date'].agg(['min', 'max']).rename(columns={'min':'first_txn', 'max':'last_txn'}) df_sorted = df_sorted.merge(first_last, left_on='customer_id', right_index=True) # 计算客户年龄(天数)和YTD消费/年龄比值(活跃度) df_sorted['customer_age_days'] = (df_sorted['date'] - df_sorted['first_txn']).dt.days df_sorted['spend_per_day'] = df_sorted['ytd_spend'] / (df_sorted['customer_age_days'] + 1) # +1防0 # 识别高价值休眠客户:YTD消费>10万,但最近30天无交易 inactive_high_value = ( df_sorted.groupby('customer_id') .apply(lambda x: (x['ytd_spend'].max() > 100000) & (x['date'].max() < pd.Timestamp('2024-12-01'))) .loc[lambda x: x] ) print(f"高价值休眠客户{len(inactive_high_value)}名,需定向唤醒")步骤4:多维透视揭示交叉风险(管理驾驶舱)
# 构建“地区-商户类别”风险热力图 region_category_risk = df.groupby(['region', 'category']).agg({ 'amount': lambda x: x.std() / (x.mean() + 1e-8), # 变异系数(风险指标) 'fee': 'sum' }).rename(columns={'amount': 'risk_coeff', 'fee': 'total_fee'}) # unstack成矩阵,用于热力图 risk_matrix = region_category_risk['risk_coeff'].unstack(fill_value=0) # 用seaborn画热力图(此处省略绘图代码,重点在数据结构) # 输出:哪些地区+商户组合变异系数最高?如“华南-旅游”可能达0.8,需重点监控步骤5:自定义风险分层(业务规则引擎)
def risk_segmentation(series: pd.Series) -> str: """根据交易金额分布,将客户分为四类""" if len(series) < 5: return 'InsufficientData' q1, q3 = series.quantile([0.25, 0.75]) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr # 统计异常值比例 outliers = ((series < lower_bound) | (series > upper_bound)).sum() outlier_ratio = outliers / len(series) if outlier_ratio > 0.3: return 'HighVolatility' # 高波动 elif series.mean() > 5000 and outlier_ratio < 0.1: return 'HighValueStable' # 高价值稳定 elif series.mean() < 500: return 'LowValueFrequent' # 低价值高频 else: return 'Normal' # 应用分层 df['risk_profile'] = df.groupby('customer_id')['amount'].apply(risk_segmentation) profile_dist = df['risk_profile'].value_counts(normalize=True).round(3) * 100 print("客户风险画像分布:") print(profile_dist)步骤6:生成执行摘要(给老板的一页纸)
# 综合指标汇总 summary = df.groupby('customer_id').agg({ 'amount': ['sum', 'mean', 'count', lambda x: x.std() / (x.mean() + 1e-8)], 'fee': 'sum', 'date': ['min', 'max'] }).round(2) summary.columns = ['total_spend', 'avg_txn', 'txn_count', 'risk_coeff', 'total_fee', 'first_txn', 'last_txn'] summary['tenure_days'] = (summary['last_txn'] - summary['first_txn']).dt.days summary['spend_per_day'] = summary['total_spend'] / (summary['tenure_days'] + 1) # 分层统计 summary['segment'] = summary['risk_coeff'].apply( lambda x: 'HighRisk' if x > 0.5 else ('MediumRisk' if x > 0.2 else 'LowRisk') ) final_summary = summary.groupby('segment').agg({ 'total_spend': ['count', 'sum', 'mean'], 'avg_txn': 'mean', 'risk_coeff': 'mean' }).round(2) final_summary.columns = ['customer_count', 'total_revenue', 'avg_revenue_per_customer', 'avg_txn_amount', 'avg_risk_coeff'] print("\n=== 执行摘要:客户风险分层报告 ===") print(final_summary)输出示例:
=== 执行摘要:客户风险分层报告 === customer_count total_revenue avg_revenue_per_customer avg_txn_amount avg_risk_coeff segment HighRisk 127 12500000 98425.20 4250.30 0.72 MediumRisk 2150 185000000 8604