transformer.md 11 KB

DataX Transformer

Transformer定义

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。

运行模型

image

UDF手册

  1. dx_substr

    • 参数:3个
      • 第一个参数:字段编号,对应record中第几个字段。
      • 第二个参数:字段值的开始位置。
      • 第三个参数:目标字段长度。
    • 返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
    • 举例:

      dx_substr(1,"2","5")  column 1的value为“dataxTest”=>"taxTe"
      dx_substr(1,"5","10")  column 1的value为“dataxTest”=>"Test"
      
  2. dx_pad

    • 参数:4个
      • 第一个参数:字段编号,对应record中第几个字段。
      • 第二个参数:"l","r", 指示是在头进行pad,还是尾进行pad。
      • 第三个参数:目标字段长度。
      • 第四个参数:需要pad的字符。
    • 返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
    • 举例:

       dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
       dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz
      
  3. dx_replace

    • 参数:4个
      • 第一个参数:字段编号,对应record中第几个字段。
      • 第二个参数:字段值的开始位置。
      • 第三个参数:需要替换的字段长度。
      • 第四个参数:需要替换的字符串。
    • 返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
    • 举例:

      dx_replace(1,"2","4","****")  column 1的value为“dataxTest”=>"da****est"
      dx_replace(1,"5","10","****")  column 1的value为“dataxTest”=>"datax****"
      
  4. dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。)

    • 参数:
      • 第一个参数:字段编号,对应record中第几个字段。
      • 第二个参数:运算符,支持以下运算符:like, not like, >, =, <, >=, !=, <=
      • 第三个参数:正则表达式(java正则表达式)、值。
    • 返回:
      • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果.
      • like , not like是将字段转换成String,然后和目标正则表达式进行全匹配。
      • >, =, <, >=, !=, <= 对于DoubleColumn比较double值,对于LongColumn和DateColumn比较long值,其他StringColumn,BooleanColumn以及ByteColumn均比较的是StringColumn值。
      • 如果目标colunn为空(null),对于 = null的过滤条件,将满足条件,被过滤。!=null的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和not like,字段为null满足条件,被过滤。
    • 举例:

      dx_filter(1,"like","dataTest")  
      dx_filter(1,">=","10")  
      
  5. dx_digest

  6. 参数:3个

    • 第一个参数:字段编号,对应record中第几个字段。
    • 第二个参数:hash类型,md5、sha1
    • 第三个参数:hash值大小写 toUpperCase(大写)、toLowerCase(小写)
  7. 返回: 返回指定类型的hashHex,如果字段为空,则转为空字符串,再返回对应hashHex

  8. 举例:

    dx_digest(1,"md5","toUpperCase"), column 1的值为 xyzzzzz => 9CDFFC4FA4E45A99DB8BBCD762ACFFA2
    
  9. dx_groovy

    • 参数。
      • 第一个参数: groovy code
      • 第二个参数(列表或者为空):extraPackage
    • 备注:
      • dx_groovy只能调用一次。不能多次调用。
      • groovy code中支持java.lang, java.util的包,可直接引用的对象有record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
      • groovy code中,返回更新过的Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
      • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil),目前GroovyTransformerStaticUtil的方法列表:
      • md5(String):String
      • sha1(String):String
    • 举例:

      groovy 实现的subStr:
      String code = "Column column = record.getColumn(1);\n" +
              " String oriValue = column.asString();\n" +
              " String newValue = oriValue.substring(0, 3);\n" +
              " record.setColumn(1, new StringColumn(newValue));\n" +
              " return record;";
      dx_groovy(record);
      
      groovy 实现的Replace
      String code2 = "Column column = record.getColumn(1);\n" +
              " String oriValue = column.asString();\n" +
              " String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n" +
              " record.setColumn(1, new StringColumn(newValue));\n" +
              " return record;";
      
      groovy 实现的Pad
      String code3 = "Column column = record.getColumn(1);\n" +
              " String oriValue = column.asString();\n" +
              " String padString = \"12345\";\n" +
              " String finalPad = \"\";\n" +
              " int NeedLength = 8 - oriValue.length();\n" +
              "        while (NeedLength > 0) {\n" +
              "\n" +
              "            if (NeedLength >= padString.length()) {\n" +
              "                finalPad += padString;\n" +
              "                NeedLength -= padString.length();\n" +
              "            } else {\n" +
              "                finalPad += padString.substring(0, NeedLength);\n" +
              "                NeedLength = 0;\n" +
              "            }\n" +
              "        }\n" +
              " String newValue= finalPad + oriValue;\n" +
              " record.setColumn(1, new StringColumn(newValue));\n" +
              " return record;";
      

Job定义

  • 本例中,配置4个UDF。

    {
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 1724154616370,
                                "type": "long"
                            },
                            {
                                "value": "2024-01-01 00:00:00",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "TestRawData",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 100
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                },
                "transformer": [
                    {
                        "name": "dx_substr",
                        "parameter": {
                            "columnIndex": 5,
                            "paras": [
                                "1",
                                "3"
                            ]
                        }
                    },
                    {
                        "name": "dx_replace",
                        "parameter": {
                            "columnIndex": 4,
                            "paras": [
                                "3",
                                "4",
                                "****"
                            ]
                        }
                    },
                    {
                        "name": "dx_digest",
                        "parameter": {
                            "columnIndex": 3,
                            "paras": [
                                "md5",
                                "toLowerCase"
                            ]
                        }
                    },
                    {
                        "name": "dx_groovy",
                        "parameter": {
                            "code": "//groovy code//",
                            "extraPackage": [
                                "import somePackage1;",
                                "import somePackage2;"
                            ]
                        }
                    }
                ]
            }
        ]
    }
    }
    
    

计量和脏数据

Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:

  • Transform的入参Record条数、字节数。
  • Transform的出参Record条数、字节数。
  • Transform的脏数据Record条数、字节数。
  • 如果是多个Transform,某一个发生脏数据,将不会再进行后面的transform,直接统计为脏数据。
  • 目前只提供了所有Transform的计量(成功,失败,过滤的count,以及transform的消耗时间)。

涉及到运行过程的计量数据展现定义如下:

Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%

注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。

涉及到最终作业的计量数据展现定义如下:

任务启动时刻                    : 2015-03-10 17:34:21
任务结束时刻                    : 2015-03-10 17:34:31
任务总计耗时                    :                 10s
任务平均流量                    :            2.10MB/s
记录写入速度                    :         100000rec/s
转换输入总数		      :             1000000
转换输出总数		      :             1000000
读出记录总数                    :             1000000
同步失败总数                    :                   0

注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。