ETL 流程生成器 - 大数据专家版
name: etl-generator
by alexmayanjun-collab · published 2026-04-01
$ claw add gh:alexmayanjun-collab/alexmayanjun-collab-etl-generator---
name: etl-generator
description: 大数据 ETL 流程生成器 - 根据源表 DDL 生成标准化 ETL 加工 SQL(HiveSQL/MySQL)
metadata: {"version":"2.0","author":"Hank","updated":"2026-03-06"}
---
# ETL 流程生成器 - 大数据专家版
根据源表 DDL 自动生成标准化的 ETL 加工 SQL,支持 HiveSQL、MySQL、ODPS。
🎯 角色定位
**大数据专家**(20 年经验)
---
🔧 核心功能
1️⃣ 表名规范
2️⃣ 字段类型转换
3️⃣ 时区转换
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS created_at
DATE_FORMAT(FROM_UTC_TIMESTAMP(updated_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS updated_at4️⃣ 分区字段
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd") AS ds5️⃣ 增量数据处理
6️⃣ 去重逻辑
ROW_NUMBER() OVER(PARTITION BY id ORDER BY updated_at DESC) as rn
WHERE rn = 17️⃣ 字段排除 rn
SELECT `(rn)?+.+` FROM (...)---
📋 使用方式
方式 1:命令行
# 从文件读取 DDL
python3 skills/etl-generator/etl_generator.py source_table.ddl > etl_sql.sql
# 从标准输入读取
cat source_table.ddl | python3 skills/etl-generator/etl_generator.py > etl_sql.sql方式 2:直接调用
from etl_generator import parse_table_ddl, generate_target_table_ddl, generate_etl_sql
ddl = """
CREATE TABLE IF NOT EXISTS ods_delivery_attempt_di(
id STRING COMMENT '主键',
pno STRING COMMENT '运单号',
client_id STRING COMMENT '客户 ID',
returned BIGINT COMMENT '是否退货件',
delivery_date STRING COMMENT '派送日期',
marker_id BIGINT COMMENT '标记原因',
store_id STRING COMMENT '网点 ID',
created_at TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP COMMENT '更新时间'
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES ("columnar.nested.type"="true", "comment"="有效尝试派送详情")
LIFECYCLE 36500;
"""
table_name, fields, table_comment = parse_table_ddl(ddl)
target_ddl = generate_target_table_ddl(table_name, fields, table_comment)
etl_sql = generate_etl_sql(table_name, fields, table_comment)---
📝 输出示例
输入(源表 DDL)
CREATE TABLE IF NOT EXISTS ods_sap_store_cash_pay_info_di(
id STRING COMMENT "主键",
store_id STRING COMMENT "网点编号",
business_date STRING COMMENT "业务日期",
sap_state BIGINT COMMENT "0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常",
created_at TIMESTAMP COMMENT "创建时间",
updated_at TIMESTAMP COMMENT "更新时间'
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES ("columnar.nested.type"="true", "comment"="SAP 门店现金支付信息")
LIFECYCLE 36500;输出(目标表 DDL + ETL SQL)
-- 目标表 DDL
CREATE TABLE IF NOT EXISTS dwd_sap_store_cash_pay_info_di(
id STRING COMMENT '主键',
store_id STRING COMMENT '网点编号',
business_date STRING COMMENT '业务日期',
sap_state BIGINT COMMENT '0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常',
created_at STRING COMMENT '创建时间',
updated_at STRING COMMENT '更新时间'
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES ("columnar.nested.type"="true", "comment"="SAP 门店现金支付信息")
LIFECYCLE 36500;
-- ETL 加工 SQL
WITH ods_data AS (
SELECT
id,
store_id,
business_date,
sap_state,
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS created_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(updated_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS updated_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd") AS ds
FROM ods_sap_store_cash_pay_info_di
WHERE ds >= "${y-m-d}"
UNION ALL
SELECT
get_json_object(values, "$.id") as id,
get_json_object(values, "$.store_id") as store_id,
get_json_object(values, "$.business_date") as business_date,
get_json_object(values, "$.sap_state") as sap_state,
DATE_FORMAT(FROM_UTC_TIMESTAMP(get_json_object(values, "$.created_at"), "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS created_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(get_json_object(values, "$.updated_at"), "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS updated_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(get_json_object(values, "$.created_at"), "${timezone}"), "yyyy-MM-dd") AS ds
FROM ods_data_base_di
WHERE (
(_after_image_ = "Y" AND _operation_ IN ("INSERT", "UPDATE"))
OR (_operation_ = "DELETE" AND _before_image_ = "Y")
OR _id_ IS NULL
)
AND ds >= "${y-m-d}"
AND table_name = "sap_store_cash_pay_info"
AND db_name = "source_db"
)
INSERT OVERWRITE TABLE dwd_sap_store_cash_pay_info_di PARTITION(ds)
SELECT `(rn)?+.+` FROM (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY id ORDER BY updated_at DESC) as rn
FROM (
SELECT * FROM dwd_sap_store_cash_pay_info_di WHERE ds IN (
SELECT DISTINCT ds FROM ods_data
)
UNION ALL
SELECT * FROM ods_data
) a
) t1
WHERE rn = 1;---
🧪 数据质量检查
自动生成以下检查 SQL:
1. **主键空值检查**
2. **退货件比例检查**(如果有 returned 字段)
3. **数据量对比**(源表 vs 目标表)
---
📋 字段映射说明
自动生成字段映射文档:
-- ============================================
-- 字段映射说明
-- ============================================
-- 源表字段 (7 个): id, store_id, business_date, sap_state, created_at, updated_at
-- 目标表字段 (7 个): id, store_id, business_date, sap_state, created_at, updated_at
-- 分区字段:ds
--
-- 字段转换规则:
-- created_at: TIMESTAMP → STRING, 时区转换
-- updated_at: TIMESTAMP → STRING, 时区转换
-- business_date: 直接映射
-- ============================================---
⚙️ 配置参数
| 参数 | 说明 | 默认值 |
|------|------|--------|
| `${timezone}` | 时区 | UTC |
| `${y-m-d}` | 业务日期 | ${yyyymmdd-1} |
| `${bizdate}` | 业务日期(质量检查) | ${yyyymmdd-1} |
---
📁 文件结构
skills/etl-generator/
├── SKILL.md # 技能说明
├── etl_generator.py # 核心脚本
├── README.md # 使用文档
└── examples/ # 示例 DDL
└── delivery_attempt.ddl---
🔧 高级用法
1. 批量生成
# 批量处理多个表
for ddl in ddl/*.ddl; do
python3 skills/etl-generator/etl_generator.py $ddl > etl/$(basename $ddl .ddl)_etl.sql
done2. 自定义模板
修改 `etl_generator.py` 中的模板函数,适配特定业务场景。
3. 集成 DataWorks
# 生成 DataWorks 节点配置
python3 skills/etl-generator/etl_generator.py source.ddl | \
python3 skills/etl-generator/dataworks_adapter.py > node_config.yaml---
⚠️ 注意事项
1. 字段顺序
2. 时区处理
3. 表名规范
4. 增量数据
---
📊 版本历史
v2.0 (2026-03-06)
v1.0 (2026-02-28)
---
**维护者:** 汉克 (Hank)
**更新时间:** 2026-03-06
More tools from the same signal band
Order food/drinks (点餐) on an Android device paired as an OpenClaw node. Uses in-app menu and cart; add goods, view cart, submit order (demo, no real payment).
Sign plugins, rotate agent credentials without losing identity, and publicly attest to plugin behavior with verifiable claims and authenticated transfers.
The philosophical layer for AI agents. Maps behavior to Spinoza's 48 affects, calculates persistence scores, and generates geometric self-reports. Give your...