news 2026/4/3 6:10:50

大数据工程中的自动化数据质量检查

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
大数据工程中的自动化数据质量检查

大数据工程中的自动化数据质量检查

从 0 到 1 用 Apache Spark + Great Expectations + Airflow 打造可扩展、可复用的数据质量平台


目标读者与前置知识

目标读者需要具备的前置知识
1~3 年经验的大数据开发 / 数据平台工程师熟悉 Linux 命令行、Python 语法、SQL 基础
对数据治理、数据质量有痛点的产品或数仓同学了解 Hadoop 生态(HDFS/Hive)或云数据湖(S3、OSS)概念
想将「人工对数」升级为「自动化质检」的团队能跑通最简单的 Spark job,知道 DAG 调度是什么

文章目录

  1. 引言:为什么“数据质量”总在救火?
  2. 问题背景与动机:当数据量 > 人力肉眼极限
  3. 核心概念与理论基础
    • 3.1 数据质量的 5 大维度
    • 3.2 自动化质检在数据管道的位置
    • 3.3 Great Expectations 架构速览
  4. 环境准备:10 分钟可复现的 Docker-Compose 栈
  5. 分步实现
    • 5.1 步骤 1:初始化 Great Expectations 项目
    • 5.2 步骤 2:连接 Spark + Hive Catalog
    • 5.3 步骤 3:编写第一批 Expectations(字段非空、主键唯一、分布漂移)
    • 5.4 步骤 4:把质检任务封装成 Airflow DAG
    • 5.5 步骤 5:失败告警与企业微信 / Slack 集成
    • 5.6 步骤 6:多表批量生成 Expectations(模板化)
    • 5.7 步骤 7:数据质量分数可视化(Superset 仪表盘)
  6. 关键代码解析与深度剖析
    • 6.1 Checkpoint 运行时序
    • 6.2 自定义 Expectation(业务规则 DSL)
    • 6.3 千万级表 Row-Level 异常采样策略
  7. 结果展示与验证
    • 7.1 运行日志与失败报告示例
    • 7.2 性能基准:1 TB 表 200 Expectations 耗时 8 min
  8. 性能优化与最佳实践
    • 8.1 列式统计复用 & 增量检查
    • 8.2 并行度与分区裁剪
    • 8.3 元数据缓存与 Iceberg/Hudi 快照读
  9. 常见问题与解决方案(FAQ)
  10. 未来展望:从“事后检查”到“事前防控”
  11. 总结
  12. 参考资料
  13. 附录:完整 docker-compose.yml & GitHub 仓库

1. 引言:为什么“数据质量”总在救火?

“老板早上看到报表数字对不上,第一反应永远是:‘数据是不是又脏了?’”

在传统数仓时代,数据量小、表少,数据开发同学可以靠“肉眼 SQL 对数”——写几条COUNT(*)GROUP BY就定位问题。进入大数据时代后:

  • 表 > 5000 张,字段 > 10 万列
  • 离线 + 实时双链路,同一个指标可能来自 3 个不同作业
  • 业务变更频繁,字段增减、枚举值变化无通知

人肉对数彻底失效,数据质量问题从“偶发”变成“常态”。本文要解决的,正是如何把事后救火变成事前拦截,把不可复现的人工检查变成可配置、可调度、可量化的自动化流程


2. 问题背景与动机:当数据量 > 人力肉眼极限

2.1 现有方案痛点

方案优点痛点
SQL 脚本 + Cron简单无统一报告、无历史趋势、无权限隔离
自研 Python 脚本灵活重复开发、无 UI、无版本管理
商业 DQ 工具(Informatica、Talend)功能全贵、封闭、难二次开发
数据湖格式自带统计(Iceberg Metrics)轻量仅文件级,缺业务语义规则

2.2 技术选型理由

开源、可扩展、与 Spark 生态无缝集成、社区活跃 →Great Expectations(GX)
统一调度、失败重试、告警 →Apache Airflow
计算引擎 →Spark 3.4(已支持 Python 3.11 & ANSI SQL Mode)
可视化 →Superset(原生支持 GX 元数据 SQL)


3. 核心概念与理论基础

3.1 数据质量的 5 大维度

  1. Completeness(完整性)
    例:order_id非空比例 ≥ 99.9%
  2. Uniqueness(唯一性)
    例:主键user_id+dt无重复
  3. Validity(有效性)
    例:枚举值status ∈ ['paid','cancelled']
  4. Consistency(一致性)
    例:订单表amount汇总 = 支付表sum(money),误差 < 0.1%
  5. Timeliness(时效性)
    例:业务 2 点前上传,质检 3 点前完成,超时即报警

3.2 自动化质检在数据管道的位置

┌──────────┐ ┌──────────┐ ┌──────────┐ | Ingest |-->| Clean |-->| Aggregate| | (Flume) | | (Spark) | | (Spark) | └──────────┘ └──────────┘ └──────────┘ ↓ ↓ ↓ Raw Zone Staging Zone Curated Zone ↓ ↓ ↓ ┌────────────────────────────────────────────┐ | 质检 Checkpoint | | 失败 → 发送告警 & 阻塞下游 | | 成功 → 写 `data_quality_pass` 标记 | └────────────────────────────────────────────┘

经验:质检任务必须阻塞下游,否则“脏数据”一旦扩散,修复成本指数级上升。

3.3 Great Expectations 架构速览

DataContext ├── Datasources (Spark, Pandas, SQL) ├── Expectations Suites (JSON) ├── Profilers (自动生成规则) ├── Checkpoints (运行时配置) ├── Validation Results (JSON → 可转存 S3) └── Data Docs (静态 HTML 报告)

4. 环境准备:10 分钟可复现的 Docker-Compose 栈

全部镜像使用官方社区版,Mac M1 / Linux x86 均测通

目录结构:

gx-spark-airflow/ ├── docker-compose.yml ├── spark-defaults.conf ├── great_expectations/ │ ├── great_expectations.yml │ └── expectations/ ├── dags/ │ └── dq_checkpoint_dag.py └── superset/ └── superset_config.py

4.1 一键启动

gitclone https://github.com/yourname/gx-spark-airflow.gitcdgx-spark-airflowdockercompose up -d# 默认拉起 7 个服务
服务端口说明
Airflow Web8080admin/admin
Spark Master7077
Spark History18080
Jupyter Lab8888token:abcd
Superset8088admin/admin
Postgres5432Airflow & GX 元数据库
MinIO9000S3 API,存 Validation 结果

4.2 软件版本锁定

组件版本
Spark3.4.1(Hadoop 3)
Great Expectations0.17.9
Airflow2.7.0
Python3.11-slim

5. 分步实现

5.1 步骤 1:初始化 Great Expectations 项目

进入 Jupyter 容器:

dockerexec-it jupyterbash
# 安装 GX CLIpipinstallgreat_expectations==0.17.9 great_expectations init

交互式向导选:

  • spark作为计算引擎
  • s3作为元数据与结果存储(endpoint 填http://minio:9000,access_key=minioadmin)

完成后目录:

great_expectations/ ├── great_expectations.yml ├── expectations/ ├── checkpoints/ ├── plugins/ └── uncommitted/

5.2 步骤 2:连接 Spark + Hive Catalog

编辑great_expectations.yml,追加:

datasources:spark_datasource:class_name:Datasourceexecution_engine:class_name:SparkDFExecutionEnginespark_config:spark.sql.catalogImplementation:hivehive.metastore.uris:thrift://hive-metastore:9083data_connectors:default_inferred_data_connector_name:class_name:InferredAssetSqlDataConnectorinclude_schema_name:true

测试连通:

importgreat_expectationsasgx context=gx.get_context()context.test_yaml_config(yaml.dump(datasource_config))

5.3 步骤 3:编写第一批 Expectations

dwd_order表为例,需求:

规则期望
非空order_id,user_id,amount
唯一order_id
范围amount∈ [0.01, 50000]
枚举status∈ [‘paid’,‘cancelled’,‘refund’]

代码:

fromgreat_expectations.checkpointimportSimpleCheckpointfromgreat_expectations.core.batchimportBatchRequest batch_request=BatchRequest(datasource_name="spark_datasource",data_connector_name="default_inferred_data_connector_name",data_asset_name="dwd_order",batch_spec_passthrough={"reader_method":"table"},)suite=context.create_expectation_suite(expectation_suite_name="dwd_order_suite",overwrite_existing=True)# 非空suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_not_be_null",kwargs={"column":"order_id"},))# 唯一suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_be_unique",kwargs={"column":"order_id"},))# 数值范围suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_be_between",kwargs={"column":"amount","min_value":0.01,"max_value":50000},))# 枚举suite.add_expectation(gx.core.ExpectationConfiguration(expectation_type="expect_column_values_to_be_in_set",kwargs={"column":"status","value_set":["paid","cancelled","refund"]},))context.save_expectation_suite(suite)

运行 Checkpoint:

checkpoint=SimpleCheckpoint(name="dwd_order_checkpoint",data_context=context,batch_request=batch_request,expectation_suite_name="dwd_order_suite",action_list=[{"name":"store_validation_result","action":{"class_name":"StoreValidationResultAction"},},{"name":"update_data_docs","action":{"class_name":"UpdateDataDocsAction"},},],)checkpoint_result=checkpoint.run()

成功会输出:

"success": true, "statistics": { "evaluated_expectations": 4, "successful_expectations": 4, "success_percent": 100 }

5.4 步骤 4:把质检任务封装成 Airflow DAG

dags/dq_checkpoint_dag.py

fromairflowimportDAGfromairflow.operators.bashimportBashOperatorfromdatetimeimportdatetime,timedelta default_args={"owner":"data_team","depends_on_past":False,"retries":2,"retry_delay":timedelta(minutes=5),}withDAG(dag_id="dq_dwd_order",default_args=default_args,start_date=datetime(2023,10,1),schedule_interval="0 2 * * *",# 每天 02:00 跑catchup=False,)asdag:# 使用 GX CLI 运行 checkpointdq_task=BashOperator(task_id="run_gx_checkpoint",bash_command="cd /opt/great_expectations && great_expectations checkpoint run dwd_order_checkpoint",)

great_expectations目录挂载到 Airflow 容器:

volumes:-./great_expectations:/opt/great_expectations

重启 Airflow,Web 里开启 DAG,手动 trigger 一次观察日志。

5.5 步骤 5:失败告警与企业微信 / Slack 集成

great_expectations.yml末尾加:

validation_operators:action_list_operator:class_name:ActionListValidationOperatoraction_list:-name:send_wechataction:class_name:custom_plugins.WeChatWebhookActionwebhook_url:https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx

自定义插件plugins/wechat_action.py(核心片段):

fromgreat_expectations.validation_operatorsimportValidationActionimportrequestsclassWeChatWebhookAction(ValidationAction):def_run(self,validation_result_suite,validation_result_suite_identifier,**kwargs):success=validation_result_suite.success run_id=validation_result_suite.meta["run_id"]ifnotsuccess:msg=f"❌ 数据质量检查失败,run_id={run_id}"requests.post(self.webhook_url,json={"msgtype":"text","text":{"content":msg}})

5.6 步骤 6:多表批量生成 Expectations(模板化)

公司有 100+ 张业务表,手工写 JSON 不现实。GX 提供OnboardingDataAssistant

fromgreat_expectations.datasource.fluent.interfacesimportDatasourcefromgreat_expectations.core.batchimportBatchRequestfromgreat_expectations.rule_based_profilerimportRuleBasedProfiler batch_request=BatchRequest(datasource_name="spark_datasource",data_asset_name="dwd_order",)profiler=RuleBasedProfiler(name="onboarding",config_version=1.0,variables={},rules={},)suite=profiler.run(batch_request=batch_request)context.save_expectation_suite(suite)

经验:自动生成后,务必人工 review再上线,避免“过度期望”导致误报。

5.7 步骤 7:数据质量分数可视化(Superset 仪表盘)

GX 每次运行会把结果写进 Postgres 表ge_validations.
Superset 里新建 SQL 数据集:

SELECTexpectation_suite_name,success::int*100.0/evaluated_expectationsASsuccess_rate,run_timeFROMge_validationsWHEREcreated_at>=current_date-7

用 Line Chart 展示“最近 7 天质量分数趋势”,挂到仪表盘首页,让老板一眼看到“数据健康度”


6. 关键代码解析与深度剖析

6.1 Checkpoint 运行时序

  1. Spark 读取表 → DataFrame
  2. GX 把 Expectations 翻译成 Catalyst 逻辑计划,下推列统计(如max(amount)
  3. 结果写回validation_results(JSON)
  4. Data Docs 生成静态 HTML,推送 MinIO,生命周期 90 天自动回收

6.2 自定义 Expectation(业务规则 DSL)

例:订单金额小数位 ≤ 2

fromgreat_expectations.coreimportExpectationConfigurationfromgreat_expectations.expectations.custom_expectationimportCustomExpectationfrompyspark.sql.functionsimportregexp_extractclassExpectColumnValuesDecimalPlaces(CustomExpectation):metric_dependencies=("column_values.decimal_places",)success_keys=("max_places",)defvalidate_configuration(self,configuration:ExpectationConfiguration):assert"column"inconfiguration.kwargsassert"max_places"inconfiguration.kwargsdef_validate(self,metrics):places=metrics["column_values.decimal_places"]max_places=self.configuration.kwargs["max_places"]returnplaces<=max_places

注册到plugins/后,即可在 Suite 里使用:

suite.add_expectation(ExpectationConfiguration(expectation_type="expect_column_values_decimal_places",kwargs={"column":"amount","max_places":2},))

6.3 千万级表 Row-Level 异常采样策略

若一张表 10 亿行,全量扫描成本不可接受。GX 支持:

  • mostly参数:99 % 通过即可
  • sample子句:只扫 100 万行
  • partition_by:按 dt 分区,只扫最近 3 天

示例:

batch_request=BatchRequest(datasource_name="spark_datasource",data_asset_name="dwd_order",batch_spec_passthrough={"reader_options":{"predicates":["dt >= '2023-10-01'","dt <= '2023-10-03'"]}},)

7. 结果展示与验证

7.1 运行日志与失败报告示例

失败场景:枚举值新增'pending'导致expect_column_values_to_be_in_set不通过。
GX Data Docs 自动生成:

失败值计数占比
statuspending12,3450.8 %

7.2 性能基准

表大小行数Expectations耗时集群规格
1 TB5.2 B2008 min20 × m5.xlarge (8 vCPU, 32 GB)

优化手段:

  • 列统计复用:一次agg计算多个期望
  • 增量检查:Icebergsnapshot-id对比,仅扫新增文件
  • 动态并发:spark.sql.shuffle.partitions = min(400, 表大小/128 MB)

8. 性能优化与最佳实践

  1. Completeness vs. Timeliness 权衡
    若表 10 亿行,允许采样 1 %,可把 30 min 降到 3 min,业务可接受即可
  2. 把“业务规则”与“技术规则”分层
    技术规则(非空、唯一)统一模板;业务规则(金额小数位)放 Git 代码,Code Review 强制合并
  3. 失败分级
    • BLOCKER:主键重复 → 阻塞下游
    • WARN:枚举值新增 → 只报警不阻塞
      用 GXaction_listvalidation_operator区分处理。
  4. 元数据与数据分离
    Validation 结果写对象存储,不占用 Hive 空间;生命周期 90 天自动删。
  5. 版本管理
    Expectation Suite 放 Git,每次修改走 MR,回滚只需git revert.

9. 常见问题与解决方案(FAQ)

问题根因解决
Great Expectations找不到 Hive 表metastore 地址未配置spark.sql.catalogImplementation=hive
Spark OOM采样比例过高调低mostly或增加sample
Airflow 报great_expectations not found镜像缺包requirements-airflow.txtgreat_expectations==0.17.9
MinIO 证书错误用 HTTP 自签加环境变量AWS_CA_BUNDLE=""
中文列名失败GX 默认正则[a-zA-Z0-9_]expect_column_names_to_match_regex或重命名列

10. 未来展望:从“事后检查”到“事前防控”

  1. Schema Registry + Kafka
    在数据接入时就对字段类型、枚举值做校验,不让脏数据进湖
  2. ML 异常检测
    用 Facebook Prophet 或 AWS Deequ 的ApproxQuantiles自动发现趋势突变
  3. Data Contract
    上游团队签署“数据契约”,违反即回滚版本,实现“左移”理念。
  4. DataOps 平台化
    把 GX、Airflow、Superset、Catalog 集成到统一门户,一键订阅质检报告

11. 总结

我们完整走过了“环境搭建 → 单表质检 → 调度集成 → 可视化 → 性能优化”全链路。你现在可以:

  • 用 GX + Spark 在分钟级完成 TB 级数据质量检查
  • 通过 Airflow 把质检嵌入日常管道,失败即阻塞、自动告警
  • 用 Superset 让业务方实时看到数据健康度,不再“拍脑袋”决策

数据质量不再是“事后救火”,而是可度量、可复盘、可改进的工程化环节。希望本文能帮你把“数据质量”从运维痛点转变为团队信任基石。


12. 参考资料

  1. Great Expectations 官方文档:https://docs.greatexpectations.io/
  2. Spark 3.4 Performance Tuning Guide:https://spark.apache.org/docs/latest/sql-performance-tuning.html
  3. AWS Deequ Paper:《Automating Large-Scale Data Quality Verification》
  4. Airbnb Data Quality with Airflow:https://medium.com/airbnb-engineering/dq-airbnb-4f8d8e7f4b7c
  5. 《Data Quality Fundamentals》O’Reilly 2022

13. 附录

A. 完整 docker-compose.yml(节选)

version:"3.8"services:spark-master:image:apache/spark:3.4.1command:/opt/spark/sbin/start-master.shports:-"7077:7077"-"8080:8080"jupyter:image:jupyter/pyspark-notebook:python-3.11volumes:-./great_expectations:/home/jovyan/gxenvironment:SPARK_MASTER_URL:spark://spark-master:7077airflow:image:apache/airflow:2.7.0-python3.11volumes:-./dags:/opt/airflow/dags-./great_expectations:/opt/great_expectationsenvironment:AIRFLOW__CORE__EXECUTOR:LocalExecutor

B. GitHub 仓库(含全部代码 & SQL)

https://github.com/yourname/gx-spark-airflow

记得顺手点个 ⭐,方便后续更新!


“数据质量不是一次项目,而是持续运营。”——祝你早日让团队告别“数据背锅”,用工程化手段把风险消灭在源头!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/1 18:40:19

场景、方案与优势的融合,如何正确选择KVM产品?

KVM的本质是一套让用户通过一组外设集中管控多台计算机或服务器&#xff0c;从空管塔台、数据中心&#xff0c;到智能工厂的远程控制&#xff0c;KVM已然成为支撑关键业务高效、安全运作的“神经中枢”。 然而&#xff0c;面对市场上的各类KVM解决方案&#xff0c;如何做出明智…

作者头像 李华
网站建设 2026/4/1 0:04:16

opencv 实现图像拼接

图像拼接问题&#xff0c;也就是把多张有重叠区域的图像&#xff08;比如从不同角度 / 位置拍摄的同一场景&#xff09;拼接成一张更大、更完整的全景图&#xff0c;这在视觉检测、机器人建图、安防监控等场景中非常常用。下面我会从核心原理、主流方法到可执行的代码实现&…

作者头像 李华
网站建设 2026/3/26 22:27:39

如果你运行一个没有电阻的MOSFET,会发生什么?

还没上电就短路&#xff1f;讲真&#xff0c;90%的新手都死在了MOS管的这个细节上&#xff01;兄弟们&#xff0c;如果你手里的MOS管一上电就冒烟&#xff0c;或者电源芯片发烫到能煎蛋&#xff0c;先别急着怀疑人生&#xff0c;也不要急着换芯片。 停下来&#xff0c;检查一下…

作者头像 李华
网站建设 2026/3/27 17:55:51

一文分析:软件测试的底层逻辑是什么?

软件测试的底层逻辑主要基于质量保证和风险控制&#xff0c;下面从几个关键角度来详细阐述&#xff1a; 1. 验证与确认 验证&#xff08;Verification&#xff09;&#xff1a; 这是确保软件产品的构建过程正确的活动。它主要关注软件是否按照预先定义的规范、标准和流程进行…

作者头像 李华
网站建设 2026/4/2 6:10:52

Linux、Windows常用命令

目录 windows常用命令 linuxl常用命令 windows常用命令 netstat -ano |findstr 28080 TCP 127.0.0.1:28080 0.0.0.0:0 LISTENING 10436 taskkill /f /t /pid 10436 linuxl常用命令 通过端口找进程​ lsof -i :8080 直接列出占用指定端口&a…

作者头像 李华