OTSWriter插件实现了向OTS写入数据,目前支持了多版本数据的写入、主键自增列的写入等功能。
OTS是构建在阿里云飞天分布式系统之上的 NoSQL数据库服务,提供海量结构化数据的存储和实时访问。OTS 以实例和表的形式组织数据,通过数据分片和负载均衡技术,实现规模上的无缝扩展。
简而言之,OTSWriter通过OTS官方Java SDK连接到OTS服务端,并通过SDK写入OTS服务端。OTSWriter本身对于写入过程做了很多优化,包括写入超时重试、异常写入重试、批量提交等Feature。
normal模式
{
"job": {
"setting": {
},
"content": [
{
"reader": {},
"writer": {
"name": "otswriter",
"parameter": {
"endpoint":"",
"accessId":"",
"accessKey":"",
"instanceName":"",
"table":"",
// 可选 multiVersion||normal,可选配置,默认normal
"mode":"normal",
//newVersion定义是否使用新版本插件 可选值:true || false
"newVersion":"true",
//是否允许向包含主键自增列的ots表中写入数据
//与mode:multiVersion的多版本模式不兼容
"enableAutoIncrement":"true",
// 需要导入的PK列名,区分大小写
// 类型支持:STRING,INT,BINARY
// 必选
// 1. 支持类型转换,注意类型转换时的精度丢失
// 2. 顺序不要求和表的Meta一致
// 3. name全局唯一
"primaryKey":[
"userid",
"groupid"
],
// 需要导入的列名,区分大小写
// 类型支持STRING,INT,DOUBLE,BOOL和BINARY
// 必选
// 1.name全局唯一
"column":[
{"name":"addr", "type":"string"},
{"name":"height", "type":"int"}
],
// 如果用户配置了时间戳,系统将使用配置的时间戳,如果没有配置,使用OTS的系统时间戳
// 可选
"defaultTimestampInMillionSecond": 142722431,
// 写入OTS的方式
// PutRow : 等同于OTS API中PutRow操作,检查条件是ignore
// UpdateRow : 等同于OTS API中UpdateRow操作,检查条件是ignore
"writeMode":"PutRow"
}
}
}
]
}
}
endpoint
描述:OTS Server的EndPoint(服务地址),例如http://bazhen.cn−hangzhou.ots.aliyuncs.com。
必选:是
默认值:无
accessId
描述:OTS的accessId
必选:是
默认值:无
accessKey
描述:OTS的accessKey
必选:是
默认值:无
instanceName
描述:OTS的实例名称,实例是用户使用和管理 OTS 服务的实体,用户在开通 OTS 服务之后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。实例是 OTS 资源管理的基础单元,OTS 对应用程序的访问控制和资源计量都在实例级别完成。
必选:是
默认值:无
table
描述:所选取的需要抽取的表名称,这里有且只能填写一张表。在OTS不存在多表同步的需求。
必选:是
默认值:无
newVersion
描述:version定义了使用的ots SDK版本。
必选:否
默认值:false
mode
描述:是否为多版本数据,目前有两种模式。
必选:否
默认值:normal
enableAutoIncrement
描述:是否允许向包含主键自增列的ots表中写入数据。
必选:否
默认值:false
isTimeseriesTable
描述:写入的对应表是否为时序表,仅在mode=normal模式下生效。
必选:否
默认值:false
在写入时序数据表的模式下,不需要配置primaryKey
字段,只需要配置column
字段,配置样例:
"column": [
{
"name": "_m_name", // 表示度量名称(measurement)字段
},
{
"name": "_data_source", // 表示数据源(dataSource)字段
},
{
"name": "_tags", // 表示标签字段,会被解析为Map<String,String>类型
},
{
"name": "_time", // 表示时间戳字段,会被解析为long类型的值
},
{
"name": "tag_a",
"isTag":"true" // 表示标签内部字段,该字段会被解析到标签的字典内部
},
{
"name": "column1", // 属性列名称
"type": "string" // 属性列类型,支持 bool string int double binary
},
{
"name": "column2",
"type": "int"
}
],
primaryKey
OTS的PrimaryKey只能支持STRING,INT两种类型,因此OTSWriter本身也限定填写上述两种类型。
DataX本身支持类型转换的,因此对于源头数据非String/Int,OTSWriter会进行数据类型转换。
配置实例:
"primaryKey":[
"userid",
"groupid"
]
必选:是
默认值:无
column
描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。使用格式为
{"name":"col2", "type":"INT"},
其中的name指定写入的OTS列名,type指定写入的类型。OTS类型支持STRING,INT,DOUBLE,BOOL和BINARY几种类型 。
写入过程不支持常量、函数或者自定义表达式。
必选:是
默认值:无
writeMode
描述:写入模式,目前支持两种模式,
PutRow,对应于OTS API PutRow,插入数据到指定的行,如果该行不存在,则新增一行;若该行存在,则覆盖原有行。
UpdateRow,对应于OTS API UpdateRow,更新指定行的数据,如果该行不存在,则新增一行;若该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。
必选:是
默认值:无
目前OTSWriter支持所有OTS类型,下面列出OTSWriter针对OTS类型转换列表:
DataX 内部类型 | OTS 数据类型 |
---|---|
Long | Integer |
Double | Double |
String | String |
Boolean | Boolean |
Bytes | Binary |
multiVersion模式解决了ots数据库中多版本数据的导入问题。支持Hbase的全量数据迁移到OTS
当前只有hbase reader 与 ots reader提供这种模式,使用时切记注意
{
"job": {
"setting": {
},
"content": [
{
"reader": {},
"writer": {
"name": "otswriter",
"parameter": {
"endpoint":"",
"accessId":"",
"accessKey":"",
"instanceName":"",
"table":"",
// 多版本模式,插件会按照多版本模式去解析所有配置
"mode":"multiVersion",
"newVersion":"true",
// 配置PK信息
// 考虑到配置成本,并不需要配置PK在Record(Line)中的位置,要求
// Record的格式固定,PK一定在行首,PK之后是columnName,格式如下:
// 如:{pk0,pk1,pk2,pk3}, {columnName}, {timestamp}, {value}
"primaryKey":[
"userid",
"groupid"
],
// 列名前缀过滤
// 描述:hbase导入过来的数据,cf和qulifier共同组成columnName,
// OTS并不支持cf,所以需要将cf过滤掉
// 注意:
// 1.该参数选填,如果没有填写或者值为空字符串,表示不对列名进行过滤。
// 2.如果datax传入的数据columnName列不是以前缀开始,则将该Record放入脏数据回收器中
"columnNamePrefixFilter":"cf:"
}
}
}
]
}
}
OTS写入本身是支持幂等性的,也就是使用OTS SDK同一条数据写入OTS系统,一次和多次请求的结果可以理解为一致的。因此对于OTSWriter多次尝试写入同一条数据与写入一条数据结果是等同的。
由于OTS写入本身是幂等性的,因此可以支持单任务FailOver。即一旦写入Fail,DataX会重新启动相关子任务进行重试。