在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。
dx_substr
举例:
dx_substr(1,"2","5") column 1的value为“dataxTest”=>"taxTe"
dx_substr(1,"5","10") column 1的value为“dataxTest”=>"Test"
dx_pad
举例:
dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
dx_replace
举例:
dx_replace(1,"2","4","****") column 1的value为“dataxTest”=>"da****est"
dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"datax****"
dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)
举例:
dx_filter(1,"like","dataTest")
dx_filter(1,">=","10")
dx_digest
参数:3个
返回: 返回指定类型的hashHex,如果字段为空,则转为空字符串,再返回对应hashHex
举例:
dx_digest(1,"md5","toUpperCase"), column 1的值为 xyzzzzz => 9CDFFC4FA4E45A99DB8BBCD762ACFFA2
dx_groovy
举例:
groovy 实现的subStr:
String code = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = oriValue.substring(0, 3);\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
dx_groovy(record);
groovy 实现的Replace
String code2 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
groovy 实现的Pad
String code3 = "Column column = record.getColumn(1);\n" +
" String oriValue = column.asString();\n" +
" String padString = \"12345\";\n" +
" String finalPad = \"\";\n" +
" int NeedLength = 8 - oriValue.length();\n" +
" while (NeedLength > 0) {\n" +
"\n" +
" if (NeedLength >= padString.length()) {\n" +
" finalPad += padString;\n" +
" NeedLength -= padString.length();\n" +
" } else {\n" +
" finalPad += padString.substring(0, NeedLength);\n" +
" NeedLength = 0;\n" +
" }\n" +
" }\n" +
" String newValue= finalPad + oriValue;\n" +
" record.setColumn(1, new StringColumn(newValue));\n" +
" return record;";
本例中,配置4个UDF。
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 1724154616370,
"type": "long"
},
{
"value": "2024-01-01 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "TestRawData",
"type": "bytes"
}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_substr",
"parameter": {
"columnIndex": 5,
"paras": [
"1",
"3"
]
}
},
{
"name": "dx_replace",
"parameter": {
"columnIndex": 4,
"paras": [
"3",
"4",
"****"
]
}
},
{
"name": "dx_digest",
"parameter": {
"columnIndex": 3,
"paras": [
"md5",
"toLowerCase"
]
}
},
{
"name": "dx_groovy",
"parameter": {
"code": "//groovy code//",
"extraPackage": [
"import somePackage1;",
"import somePackage2;"
]
}
}
]
}
]
}
}
Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:
涉及到运行过程的计量数据展现定义如下:
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。
涉及到最终作业的计量数据展现定义如下:
任务启动时刻 : 2015-03-10 17:34:21
任务结束时刻 : 2015-03-10 17:34:31
任务总计耗时 : 10s
任务平均流量 : 2.10MB/s
记录写入速度 : 100000rec/s
转换输入总数 : 1000000
转换输出总数 : 1000000
读出记录总数 : 1000000
同步失败总数 : 0
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。