在AWS上构建AI驱动的客户流失预测管道(逐步指南)

发布: (2026年1月1日 GMT+8 08:56)
7 分钟阅读
原文: Dev.to

Source: Dev.to

我们的成果

  • 84.2% AUC 在验证数据上
  • ✅ 通过 SageMaker 端点进行实时预测
  • ✅ 由 Claude(Bedrock)提供支持的自然语言解释

Source:

🎯 我们正在构建的内容

一个端到端的机器学习流水线:

步骤操作
摄取将客户数据导入 S3
训练使用 SageMaker XGBoost 进行流失预测模型训练
部署实时推理端点
解释使用 Amazon Bedrock(Claude)解释预测
暴露通过 API Gateway + Lambda 提供所有功能

先决条件:AWS 账户,基础 Python 知识

🏗️ 架构概览

AWS Churn Prediction Architecture

该管道由 5 层组成

层级服务目的
数据摄取S3存储原始客户数据
机器学习训练SageMaker Training训练 XGBoost 模型
模型存储S3存储模型制品
推理与 AISageMaker Endpoint, Bedrock实时预测 + 自然语言解释
API 层API Gateway, Lambda提供 REST API

第一步 – 设置 S3 并上传数据

# 使用您的账户 ID 设置存储桶名称
export BUCKET_NAME=churn-prediction-$(aws sts get-caller-identity --query Account --output text)

# 创建存储桶
aws s3 mb s3://$BUCKET_NAME

# 上传您的数据
aws s3 cp WA_Fn-UseC_-Telco-Customer-Churn.csv s3://$BUCKET_NAME/raw/

📥 数据集:从 Kaggle 下载 Telco Customer Churn 数据集

Step 2 – Create SageMaker IAM Role

  1. 打开 IAM 控制台 → RolesCreate role
  2. 选择 SageMaker – Execution 作为受信实体。
  3. 附加策略:AmazonSageMakerFullAccessAmazonS3FullAccess
  4. 将角色命名为 SageMakerChurnRole

第三步 – 训练模型

创建一个名为 train_churn.py 的文件,内容如下:

import boto3
import sagemaker
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from sagemaker.inputs import TrainingInput

# Config
BUCKET = os.environ['BUCKET_NAME']
ROLE   = os.environ['ROLE_ARN']
PREFIX = 'churn-prediction'

session = sagemaker.Session()
region  = session.boto_region_name

# Load and prepare data
df = pd.read_csv('WA_Fn-UseC_-Telco-Customer-Churn.csv')
df['TotalCharges'] = pd.to_numeric(df['TotalCharges'], errors='coerce').fillna(0)
df['Churn'] = (df['Churn'] == 'Yes').astype(int)

# Encode categorical columns
cat_cols = [
    'gender', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines',
    'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
    'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract',
    'PaperlessBilling', 'PaymentMethod'
]

for col in cat_cols:
    df[col] = df[col].astype('category').cat.codes

# Feature set
feature_cols = ['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges'] + cat_cols
X = df[feature_cols]
y = df['Churn']

# Train‑test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

train_df = pd.concat([y_train.reset_index(drop=True), X_train.reset_index(drop=True)], axis=1)
test_df  = pd.concat([y_test.reset_index(drop=True),  X_test.reset_index(drop=True)], axis=1)

train_df.to_csv('train.csv', index=False, header=False)
test_df.to_csv('test.csv',   index=False, header=False)

# Upload CSVs to S3
s3 = boto3.client('s3')
s3.upload_file('train.csv', BUCKET, f'{PREFIX}/train/train.csv')
s3.upload_file('test.csv',  BUCKET, f'{PREFIX}/test/test.csv')

# XGBoost container
container = sagemaker.image_uris.retrieve('xgboost', region, '1.7-1')

xgb = sagemaker.estimator.Estimator(
    image_uri=container,
    role=ROLE,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    output_path=f's3://{BUCKET}/{PREFIX}/output',
    sagemaker_session=session
)

xgb.set_hyperparameters(
    objective='binary:logistic',
    num_round=100,
    max_depth=5,
    eta=0.2,
    eval_metric='auc'
)

xgb.fit({
    'train': TrainingInput(f's3://{BUCKET}/{PREFIX}/train', content_type='csv'),
    'validation': TrainingInput(f's3://{BUCKET}/{PREFIX}/test', content_type='csv')
})

# Deploy endpoint
predictor = xgb.deploy(
    initial_instance_count=1,
    instance_type='ml.t2.medium',
    endpoint_name='churn-prediction-endpoint',
    serializer=sagemaker.serializers.CSVSerializer()
)

运行脚本

export BUCKET_NAME=churn-prediction-YOUR_ACCOUNT_ID
export ROLE_ARN=arn:aws:iam::YOUR_ACCOUNT_ID:role/SageMakerChurnRole
python3 train_churn.py

示例训练输出

2026-01-01 00:24:27 Uploading - Uploading generated training model
2026-01-01 00:24:27 Completed - Training job completed
Training seconds: 103
Billable seconds: 103

✅ Training complete!
Model artifact: s3://churn-prediction-905418352184/churn-prediction/output/sagemaker-xgboost-2026-01-01-00-22-03-339/output/model.tar.gz

Deploying endpoint (3‑5 min)...
INFO:sagemaker:Creating model with name: sagemaker-xgboost-2026-01-01-00-24-53-959
INFO:sagemaker:Creating endpoint-config with name churn-prediction-endpoint
INFO:sagemaker:Creating endpoint with name churn-prediction-endpoint
✅ Endpoint deployed: churn-prediction-endpoint
Test prediction: 0.4% churn probability

现在,你已经拥有一个完整功能的流失预测模型、一个实时的 SageMaker 端点,并且(在接下来的步骤中)还有一个调用 Bedrock 为每个预测生成简明英文解释的 API 层。敬请期待本教程的其余部分!

第 4 步 – 创建带 Bedrock 集成的 Lambda

创建一个名为 ChurnPredictionAPI 的 Lambda 函数,代码如下:

import json
import boto3
import os

sagemaker_runtime = boto3.client('sagemaker-runtime')
bedrock = boto3.client('bedrock-runtime')

ENDPOINT_NAME = os.environ.get('SAGEMAKER_ENDPOINT', 'churn-prediction-endpoint')

def lambda_handler(event, context):
    body = json.loads(event['body']) if isinstance(event.get('body'), str) else event

    # Get prediction from SageMaker
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=ENDPOINT_NAME,
        ContentType='text/csv',
        Body=body['features']
    )
    churn_prob = float(response['Body'].read().decode())

    # Generate explanation with Bedrock Claude
    prompt = f"""A customer has {churn_prob:.1%} churn probability.
Customer: Tenure {body.get('tenure', 'N/A')} months, ${body.get('monthly_charges', 'N/A')}/month, {body.get('contract', 'N/A')} contract.
In 2 sentences, explain the risk and suggest one retention action."""

    bedrock_response = bedrock.invoke_model(
        modelId='anthropic.claude-3-haiku-20240307-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 100,
            "messages": [{"role": "user", "content": prompt}]
        })
    )
    explanation = json.loads(bedrock_response['body'].read())['content'][0]['text']
    risk = "High" if churn_prob > 0.7 else "Medium" if churn_prob > 0.4 else "Low"

    return {
        'statusCode': 200,
        'headers': {'Content-Type': 'application/json'},
        'body': json.dumps({
            'churn_probability': f"{churn_prob:.1%}",
            'risk_level': risk,
            'explanation': explanation
        })
    }

测试成功

Lambda 配置

  • 运行时(Runtime): Python 3.11
  • 超时时间(Timeout): 30 秒
  • 角色(Role): LambdaChurnRole(具备 SageMaker + Bedrock 权限)
  • 环境变量(Environment variable): SAGEMAKER_ENDPOINT=churn-prediction-endpoint

步骤 5 – 创建 API 网关

  1. 在 API Gateway 中创建一个 HTTP API
  2. 添加 Lambda 集成 → ChurnPredictionAPI
  3. 创建一个 POST 路由:/predict
  4. 部署并记录调用 URL。

Create API Gateway
Configure Integration

🧪 测试 API

curl -X POST "https://YOUR_API_URL/predict" \
  -H "Content-Type: application/json" \
  -d '{
    "features": "0,24,65.5,1500.0,1,0,1,2,0,0,1,1,0,0,1,0,2,1,1",
    "tenure": 24,
    "monthly_charges": 65.5,
    "contract": "Month-to-month"
}'

流失 API 响应

示例响应

{
  "churn_probability": "0.6%",
  "risk_level": "Low",
  "explanation": "The customer's high churn probability of 0.6% and the month-to-month contract indicate a significant risk of losing the customer. To mitigate this risk, a retention action could be to offer the customer a longer-term contract with a discounted monthly rate or additional benefits, which may help increase their loyalty and reduce the likelihood of churn."
}

🧹 清理

完成后删除资源以避免产生费用:

# Delete SageMaker endpoint (most expensive!)
aws sagemaker delete-endpoint --endpoint-name churn-prediction-endpoint
aws sagemaker delete-endpoint-config --endpoint-config-name churn-prediction-endpoint

# Delete Lambda
aws lambda delete-function --function-name ChurnPredictionAPI

# Delete S3 bucket
aws s3 rb s3://$BUCKET_NAME --force

💡 关键经验教训

  • SageMaker XGBoost 已具备生产就绪能力 – 通过最小调优实现约 84 % 的 AUC。
  • Bedrock 带来真实的业务价值 – 将原始预测转化为可操作的洞察,使非技术利益相关者也能使用机器学习。
  • IAM 权限很棘手 – 通过控制台创建角色可以帮助避免 “explicit deny” 错误。
  • 成本意识很重要 – 当不再需要时,务必删除昂贵的资源(例如 SageMaker 端点)。

未使用时的端点

(~$0.05/小时累计!)

资源

感谢阅读!如果对您有帮助,请关注我获取更多 AWS + 数据工程 内容。

有问题吗? 在下方留言!

Back to Blog

相关文章

阅读更多 »

RGB LED 支线任务 💡

markdown !Jennifer Davishttps://media2.dev.to/dynamic/image/width=50,height=50,fit=cover,gravity=auto,format=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%...

Mendex:我为何构建

介绍 大家好。今天我想分享一下我是谁、我在构建什么以及为什么。 早期职业生涯与倦怠 我在 17 年前开始我的 developer 生涯……