大数据工程中的自动化数据质量检查
从 0 到 1 用 Apache Spark + Great Expectations + Airflow 打造可扩展、可复用的数据质量平台
目标读者与前置知识
| 目标读者 | 需要具备的前置知识 |
|---|---|
| 1~3 年经验的大数据开发 / 数据平台工程师 | 熟悉 Linux 命令行、Python 语法、SQL 基础 |
| 对数据治理、数据质量有痛点的产品或数仓同学 | 了解 Hadoop 生态(HDFS/Hive)或云数据湖(S3、OSS)概念 |
| 想将「人工对数」升级为「自动化质检」的团队 | 能跑通最简单的 Spark job,知道 DAG 调度是什么 |
文章目录
- 引言:为什么“数据质量”总在救火?
- 问题背景与动机:当数据量 > 人力肉眼极限
- 核心概念与理论基础
- 3.1 数据质量的 5 大维度
- 3.2 自动化质检在数据管道的位置
- 3.3 Great Expectations 架构速览
- 环境准备:10 分钟可复现的 Docker-Compose 栈
- 分步实现
- 5.1 步骤 1:初始化 Great Expectations 项目
- 5.2 步骤 2:连接 Spark + Hive Catalog
- 5.3 步骤 3:编写第一批 Expectations(字段非空、主键唯一、分布漂移)
- 5.4 步骤 4:把质检任务封装成 Airflow DAG
- 5.5 步骤 5:失败告警与企业微信 / Slack 集成
- 5.6 步骤 6:多表批量生成 Expectations(模板化)
- 5.7 步骤 7:数据质量分数可视化(Superset 仪表盘)
- 关键代码解析与深度剖析
- 6.1 Checkpoint 运行时序
- 6.2 自定义 Expectation(业务规则 DSL)
- 6.3 千万级表 Row-Level 异常采样策略
- 结果展示与验证
- 7.1 运行日志与失败报告示例
- 7.2 性能基准:1 TB 表 200 Expectations 耗时 8 min
- 性能优化与最佳实践
- 8.1 列式统计复用 & 增量检查
- 8.2 并行度与分区裁剪
- 8.3 元数据缓存与 Iceberg/Hudi 快照读
- 常见问题与解决方案(FAQ)
- 未来展望:从“事后检查”到“事前防控”
- 总结
- 参考资料
- 附录:完整 docker-compose.yml & GitHub 仓库
1. 引言:为什么“数据质量”总在救火?
“老板早上看到报表数字对不上,第一反应永远是:‘数据是不是又脏了?’”
在传统数仓时代,数据量小、表少,数据开发同学可以靠“肉眼 SQL 对数”——写几条COUNT(*)、GROUP BY就定位问题。进入大数据时代后:
- 表 > 5000 张,字段 > 10 万列
- 离线 + 实时双链路,同一个指标可能来自 3 个不同作业
- 业务变更频繁,字段增减、枚举值变化无通知
人肉对数彻底失效,数据质量问题从“偶发”变成“常态”。本文要解决的,正是如何把事后救火变成事前拦截,把不可复现的人工检查变成可配置、可调度、可量化的自动化流程。
2. 问题背景与动机:当数据量 > 人力肉眼极限
2.1 现有方案痛点
| 方案 | 优点 | 痛点 |
|---|---|---|
| SQL 脚本 + Cron | 简单 | 无统一报告、无历史趋势、无权限隔离 |
| 自研 Python 脚本 | 灵活 | 重复开发、无 UI、无版本管理 |
| 商业 DQ 工具(Informatica、Talend) | 功能全 | 贵、封闭、难二次开发 |
| 数据湖格式自带统计(Iceberg Metrics) | 轻量 | 仅文件级,缺业务语义规则 |
2.2 技术选型理由
开源、可扩展、与 Spark 生态无缝集成、社区活跃 →Great Expectations(GX)
统一调度、失败重试、告警 →Apache Airflow
计算引擎 →Spark 3.4(已支持 Python 3.11 & ANSI SQL Mode)
可视化 →Superset(原生支持 GX 元数据 SQL)
3. 核心概念与理论基础
3.1 数据质量的 5 大维度
- Completeness(完整性)
例:order_id非空比例 ≥ 99.9% - Uniqueness(唯一性)
例:主键user_id+dt无重复 - Validity(有效性)
例:枚举值status ∈ ['paid','cancelled'] - Consistency(一致性)
例:订单表amount汇总 = 支付表sum(money),误差 < 0.1% - Timeliness(时效性)
例:业务 2 点前上传,质检 3 点前完成,超时即报警
3.2 自动化质检在数据管道的位置
┌──────────┐ ┌──────────┐ ┌──────────┐ | Ingest |-->| Clean |-->| Aggregate| | (Flume) | | (Spark) | | (Spark) | └──────────┘ └──────────┘ └──────────┘ ↓ ↓ ↓ Raw Zone Staging Zone Curated Zone ↓ ↓ ↓ ┌────────────────────────────────────────────┐ | 质检 Checkpoint | | 失败 → 发送告警 & 阻塞下游 | | 成功 → 写 `data_quality_pass` 标记 | └────────────────────────────────────────────┘经验:质检任务必须阻塞下游,否则“脏数据”一旦扩散,修复成本指数级上升。
3.3 Great Expectations 架构速览
DataContext ├── Datasources (Spark, Pandas, SQL) ├── Expectations Suites (JSON) ├── Profilers (自动生成规则) ├── Checkpoints (运行时配置) ├── Validation Results (JSON → 可转存 S3) └── Data Docs (静态 HTML 报告)4. 环境准备:10 分钟可复现的 Docker-Compose 栈
全部镜像使用官方社区版,Mac M1 / Linux x86 均测通。
目录结构:
gx-spark-airflow/ ├── docker-compose.yml ├── spark-defaults.conf ├── great_expectations/ │ ├── great_expectations.yml │ └── expectations/ ├── dags/ │ └── dq_checkpoint_dag.py └── superset/ └── superset_config.py4.1 一键启动
gitclone https://github.com/yourname/gx-spark-airflow.gitcdgx-spark-airflowdockercompose up -d# 默认拉起 7 个服务| 服务 | 端口 | 说明 |
|---|---|---|
| Airflow Web | 8080 | admin/admin |
| Spark Master | 7077 | |
| Spark History | 18080 | |
| Jupyter Lab | 8888 | token:abcd |
| Superset | 8088 | admin/admin |
| Postgres | 5432 | Airflow & GX 元数据库 |
| MinIO | 9000 | S3 API,存 Validation 结果 |
4.2 软件版本锁定
| 组件 | 版本 |
|---|---|
| Spark | 3.4.1(Hadoop 3) |
| Great Expectations | 0.17.9 |
| Airflow | 2.7.0 |
| Python | 3.11-slim |
5. 分步实现
5.1 步骤 1:初始化 Great Expectations 项目
进入 Jupyter 容器:
dockerexec-it jupyterbash# 安装 GX CLIpipinstallgreat_expectations==0.17.9 great_expectations init交互式向导选:
spark作为计算引擎s3作为元数据与结果存储(endpoint 填http://minio:9000,access_key=minioadmin)
完成后目录:
great_expectations/ ├── great_expectations.yml ├── expectations/ ├── checkpoints/ ├── plugins/ └── uncommitted/5.2 步骤 2:连接 Spark + Hive Catalog
编辑great_expectations.yml,追加:
datasources:spark_datasource:class_name:Datasourceexecution_engine:class_name:SparkDFExecutionEnginespark_config:spark.sql.catalogImplementation:hivehive.metastore.uris:thrift://hive-metastore:9083data_connectors:default_inferred_data_connector_name:class_name:InferredAssetSqlDataConnectorinclude_schema_name:true测试连通:
importgreat_expectationsasgx context=gx.get_context()context.test_yaml_config(yaml.dump(datasource_config))5.3 步骤 3:编写第一批 Expectations
以dwd_order表为例,需求:
| 规则 | 期望 |
|---|---|
| 非空 | order_id,user_id,amount |
| 唯一 | order_id |
| 范围 | amount∈ [0.01, 50000] |
| 枚举 | status∈ [‘paid’,‘cancelled’,‘refund’] |
代码:
fromgreat_expectations.checkpointimportSimpleCheckpointfromgreat_expectations.core.batchimportBatchRequest batch_request=BatchRequest(datasource_name="spark_datasource",data_connector_name="default_inferred_data_connector_name",data_asset_name="dwd_order",batch_spec_passthrough={"reader_method":"table"},)suite=context.create_expectation_suite(expectation_suite_name="dwd_order_suite",overwrite_existing=True)# 非空suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_not_be_null",kwargs={"column":"order_id"},))# 唯一suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_be_unique",kwargs={"column":"order_id"},))# 数值范围suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_be_between",kwargs={"column":"amount","min_value":0.01,"max_value":50000},))# 枚举suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_be_in_set",kwargs={"column":"status","value_set":["paid","cancelled","refund"]},))context.save_expectation_suite(suite)运行 Checkpoint:
checkpoint=SimpleCheckpoint(name="dwd_order_checkpoint",data_context=context,batch_request=batch_request,expectation_suite_name="dwd_order_suite",action_list=[{"name":"store_validation_result","action":{"class_name":"StoreValidationResultAction"},},{"name":"update_data_docs","action":{"class_name":"UpdateDataDocsAction"},},],)checkpoint_result=checkpoint.run()成功会输出:
"success": true, "statistics": { "evaluated_expectations": 4, "successful_expectations": 4, "success_percent": 100 }5.4 步骤 4:把质检任务封装成 Airflow DAG
dags/dq_checkpoint_dag.py
fromairflowimportDAGfromairflow.operators.bashimportBashOperatorfromdatetimeimportdatetime,timedelta default_args={"owner":"data_team","depends_on_past":False,"retries":2,"retry_delay":timedelta(minutes=5),}withDAG(dag_id="dq_dwd_order",default_args=default_args,start_date=datetime(2023,10,1),schedule_interval="0 2 * * *",# 每天 02:00 跑catchup=False,)asdag:# 使用 GX CLI 运行 checkpointdq_task=BashOperator(task_id="run_gx_checkpoint",bash_command="cd /opt/great_expectations && great_expectations checkpoint run dwd_order_checkpoint",)把great_expectations目录挂载到 Airflow 容器:
volumes:-./great_expectations:/opt/great_expectations重启 Airflow,Web 里开启 DAG,手动 trigger 一次观察日志。
5.5 步骤 5:失败告警与企业微信 / Slack 集成
在great_expectations.yml末尾加:
validation_operators:action_list_operator:class_name:ActionListValidationOperatoraction_list:-name:send_wechataction:class_name:custom_plugins.WeChatWebhookActionwebhook_url:https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx自定义插件plugins/wechat_action.py(核心片段):
fromgreat_expectations.validation_operatorsimportValidationActionimportrequestsclassWeChatWebhookAction(ValidationAction):def_run(self,validation_result_suite,validation_result_suite_identifier,**kwargs):success=validation_result_suite.success run_id=validation_result_suite.meta["run_id"]ifnotsuccess:msg=f"❌ 数据质量检查失败,run_id={run_id}"requests.post(self.webhook_url,json={"msgtype":"text","text":{"content":msg}})5.6 步骤 6:多表批量生成 Expectations(模板化)
公司有 100+ 张业务表,手工写 JSON 不现实。GX 提供OnboardingDataAssistant:
fromgreat_expectations.datasource.fluent.interfacesimportDatasourcefromgreat_expectations.core.batchimportBatchRequestfromgreat_expectations.rule_based_profilerimportRuleBasedProfiler batch_request=BatchRequest(datasource_name="spark_datasource",data_asset_name="dwd_order",)profiler=RuleBasedProfiler(name="onboarding",config_version=1.0,variables={},rules={},)suite=profiler.run(batch_request=batch_request)context.save_expectation_suite(suite)经验:自动生成后,务必人工 review再上线,避免“过度期望”导致误报。
5.7 步骤 7:数据质量分数可视化(Superset 仪表盘)
GX 每次运行会把结果写进 Postgres 表ge_validations.
Superset 里新建 SQL 数据集:
SELECTexpectation_suite_name,success::int*100.0/evaluated_expectationsASsuccess_rate,run_timeFROMge_validationsWHEREcreated_at>=current_date-7用 Line Chart 展示“最近 7 天质量分数趋势”,挂到仪表盘首页,让老板一眼看到“数据健康度”。
6. 关键代码解析与深度剖析
6.1 Checkpoint 运行时序
- Spark 读取表 → DataFrame
- GX 把 Expectations 翻译成 Catalyst 逻辑计划,下推列统计(如
max(amount)) - 结果写回
validation_results(JSON) - Data Docs 生成静态 HTML,推送 MinIO,生命周期 90 天自动回收
6.2 自定义 Expectation(业务规则 DSL)
例:订单金额小数位 ≤ 2
fromgreat_expectations.coreimportExpectationConfigurationfromgreat_expectations.expectations.custom_expectationimportCustomExpectationfrompyspark.sql.functionsimportregexp_extractclassExpectColumnValuesDecimalPlaces(CustomExpectation):metric_dependencies=("column_values.decimal_places",)success_keys=("max_places",)defvalidate_configuration(self,configuration:ExpectationConfiguration):assert"column"inconfiguration.kwargsassert"max_places"inconfiguration.kwargsdef_validate(self,metrics):places=metrics["column_values.decimal_places"]max_places=self.configuration.kwargs["max_places"]returnplaces<=max_places注册到plugins/后,即可在 Suite 里使用:
suite.add_expectation(ExpectationConfiguration(expectation_type="expect_column_values_decimal_places",kwargs={"column":"amount","max_places":2},))6.3 千万级表 Row-Level 异常采样策略
若一张表 10 亿行,全量扫描成本不可接受。GX 支持:
mostly参数:99 % 通过即可sample子句:只扫 100 万行partition_by:按 dt 分区,只扫最近 3 天
示例:
batch_request=BatchRequest(datasource_name="spark_datasource",data_asset_name="dwd_order",batch_spec_passthrough={"reader_options":{"predicates":["dt >= '2023-10-01'","dt <= '2023-10-03'"]}},)7. 结果展示与验证
7.1 运行日志与失败报告示例
失败场景:枚举值新增'pending'导致expect_column_values_to_be_in_set不通过。
GX Data Docs 自动生成:
| 列 | 失败值 | 计数 | 占比 |
|---|---|---|---|
| status | pending | 12,345 | 0.8 % |
7.2 性能基准
| 表大小 | 行数 | Expectations | 耗时 | 集群规格 |
|---|---|---|---|---|
| 1 TB | 5.2 B | 200 | 8 min | 20 × m5.xlarge (8 vCPU, 32 GB) |
优化手段:
- 列统计复用:一次
agg计算多个期望 - 增量检查:Iceberg
snapshot-id对比,仅扫新增文件 - 动态并发:
spark.sql.shuffle.partitions = min(400, 表大小/128 MB)
8. 性能优化与最佳实践
- Completeness vs. Timeliness 权衡
若表 10 亿行,允许采样 1 %,可把 30 min 降到 3 min,业务可接受即可。 - 把“业务规则”与“技术规则”分层
技术规则(非空、唯一)统一模板;业务规则(金额小数位)放 Git 代码,Code Review 强制合并。 - 失败分级
- BLOCKER:主键重复 → 阻塞下游
- WARN:枚举值新增 → 只报警不阻塞
用 GXaction_list的validation_operator区分处理。
- 元数据与数据分离
Validation 结果写对象存储,不占用 Hive 空间;生命周期 90 天自动删。 - 版本管理
Expectation Suite 放 Git,每次修改走 MR,回滚只需git revert.
9. 常见问题与解决方案(FAQ)
| 问题 | 根因 | 解决 |
|---|---|---|
Great Expectations找不到 Hive 表 | metastore 地址未配置 | spark.sql.catalogImplementation=hive |
| Spark OOM | 采样比例过高 | 调低mostly或增加sample |
Airflow 报great_expectations not found | 镜像缺包 | 在requirements-airflow.txt加great_expectations==0.17.9 |
| MinIO 证书错误 | 用 HTTP 自签 | 加环境变量AWS_CA_BUNDLE="" |
| 中文列名失败 | GX 默认正则[a-zA-Z0-9_] | 改expect_column_names_to_match_regex或重命名列 |
10. 未来展望:从“事后检查”到“事前防控”
- Schema Registry + Kafka
在数据接入时就对字段类型、枚举值做校验,不让脏数据进湖。 - ML 异常检测
用 Facebook Prophet 或 AWS Deequ 的ApproxQuantiles,自动发现趋势突变。 - Data Contract
上游团队签署“数据契约”,违反即回滚版本,实现“左移”理念。 - DataOps 平台化
把 GX、Airflow、Superset、Catalog 集成到统一门户,一键订阅质检报告。
11. 总结
我们完整走过了“环境搭建 → 单表质检 → 调度集成 → 可视化 → 性能优化”全链路。你现在可以:
- 用 GX + Spark 在分钟级完成 TB 级数据质量检查
- 通过 Airflow 把质检嵌入日常管道,失败即阻塞、自动告警
- 用 Superset 让业务方实时看到数据健康度,不再“拍脑袋”决策
数据质量不再是“事后救火”,而是可度量、可复盘、可改进的工程化环节。希望本文能帮你把“数据质量”从运维痛点转变为团队信任基石。
12. 参考资料
- Great Expectations 官方文档:https://docs.greatexpectations.io/
- Spark 3.4 Performance Tuning Guide:https://spark.apache.org/docs/latest/sql-performance-tuning.html
- AWS Deequ Paper:《Automating Large-Scale Data Quality Verification》
- Airbnb Data Quality with Airflow:https://medium.com/airbnb-engineering/dq-airbnb-4f8d8e7f4b7c
- 《Data Quality Fundamentals》O’Reilly 2022
13. 附录
A. 完整 docker-compose.yml(节选)
version:"3.8"services:spark-master:image:apache/spark:3.4.1command:/opt/spark/sbin/start-master.shports:-"7077:7077"-"8080:8080"jupyter:image:jupyter/pyspark-notebook:python-3.11volumes:-./great_expectations:/home/jovyan/gxenvironment:SPARK_MASTER_URL:spark://spark-master:7077airflow:image:apache/airflow:2.7.0-python3.11volumes:-./dags:/opt/airflow/dags-./great_expectations:/opt/great_expectationsenvironment:AIRFLOW__CORE__EXECUTOR:LocalExecutorB. GitHub 仓库(含全部代码 & SQL)
https://github.com/yourname/gx-spark-airflow
记得顺手点个 ⭐,方便后续更新!
“数据质量不是一次项目,而是持续运营。”——祝你早日让团队告别“数据背锅”,用工程化手段把风险消灭在源头!