Browse Source

update README.md

wuya 8 years ago
parent
commit
c8a073b6c2
28 changed files with 775 additions and 442 deletions
  1. 235 2
      README.md
  2. 13 4
      bin/hdata
  3. 0 38
      bin/hdata.bat
  4. 21 49
      conf/plugins.xml
  5. 137 132
      hdata-core/src/main/java/com/github/stuxuhai/hdata/CliDriver.java
  6. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/common/Constants.java
  7. 12 5
      hdata-core/src/main/java/com/github/stuxuhai/hdata/common/HDataConfigConstants.java
  8. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/config/DefaultEngineConfig.java
  9. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/config/DefaultJobConfig.java
  10. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/DefaultRecord.java
  11. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/DefaultRecordCollector.java
  12. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/DefaultStorage.java
  13. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/HData.java
  14. 20 13
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/PluginClassLoader.java
  15. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/PluginLoader.java
  16. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/ReaderWorker.java
  17. 7 1
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/RecordEvent.java
  18. 35 27
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/RecordEventExceptionHandler.java
  19. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/RecordWorkHandler.java
  20. 27 21
      hdata-core/src/main/java/com/github/stuxuhai/hdata/core/WaitStrategyFactory.java
  21. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/util/JdbcUtils.java
  22. 9 2
      hdata-core/src/main/java/com/github/stuxuhai/hdata/util/NumberUtils.java
  23. 8 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/util/PluginUtils.java
  24. 42 35
      hdata-core/src/main/java/com/github/stuxuhai/hdata/util/TypeConvertUtils.java
  25. 7 0
      hdata-core/src/main/java/com/github/stuxuhai/hdata/util/Utils.java
  26. 0 13
      hdata-jdbc/src/main/java/com/github/stuxuhai/hdata/plugin/writer/jdbc/JBDCWriterProperties.java
  27. 112 100
      hdata-jdbc/src/main/java/com/github/stuxuhai/hdata/plugin/writer/jdbc/JDBCWriter.java
  28. 13 0
      hdata-jdbc/src/main/java/com/github/stuxuhai/hdata/plugin/writer/jdbc/JDBCWriterProperties.java

+ 235 - 2
README.md

@@ -1,6 +1,6 @@
 ## HData
 
-HData是一个异构的数据传输工具,致力于使用一个工具解决不同数据源(RDBMS、Hive、HDFS、HBase、MongoDB、FTP等)之间数据交换的问题。HData在设计上同时参考了开源的Sqoop、DataX,却与之有不同的实现。HData采用“框架+插件”的结构,具有较好的扩展性,框架相当于数据缓冲区,插件则为访问不同的数据源提供实现。
+HData是一个异构的数据传输工具,致力于使用一个工具解决不同数据源(JDBC、Hive、HDFS、HBase、MongoDB、FTP、Http、CSV、Excel、Kafka等)之间数据交换的问题。HData在设计上同时参考了开源的Sqoop、DataX,却与之有不同的实现。HData采用“框架+插件”的结构,具有较好的扩展性,框架相当于数据缓冲区,插件则为访问不同的数据源提供实现。
 
 ![HData](./doc/img/1.png)
 
@@ -38,4 +38,237 @@ HData是一个异构的数据传输工具,致力于使用一个工具解决不
 
 - Writer:数据写入模块,负责从RingBuffer中读取数据并写入目标数据源。
 
-HData框架通过配置读取解析、RingBugffer 缓冲区、线程池封装等技术,统一处理了数据传输中的基本问题,并提供Reader、Splitter、Writer插件接口,基于此可以方便地开发出各种插件,以满足各种数据源访问的需求。
+HData框架通过配置读取解析、RingBugffer 缓冲区、线程池封装等技术,统一处理了数据传输中的基本问题,并提供Reader、Splitter、Writer插件接口,基于此可以方便地开发出各种插件,以满足各种数据源访问的需求。
+
+
+#### 【运行命令】
+
+./bin/hdata --reader READER_NAME -Rk1=v1 -Rk2=v2 --writer WRITER_NAME -Wk1=v1 -Wk2=v2
+
+READER_NAME、WRITER_NAME分别为读/写插件的名称,例如:jdbc、hive
+Reader插件的参数配置以-R为前缀,Writer插件的参数配置以-W为前缀。
+
+#### 【配置参数】
+
+其中,参数parallelism为读/写并行度,所有插件均有该参数,默认为1。
+
+设置合理的parallelism参数可提高性能。
+
+#### 【Reader配置参数】
+
+* console
+
+无配置参数,一般仅用于测试
+
+* jdbc
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+driver|是|JDBC驱动类名,如:com.mysql.jdbc.Driver|
+url|是|JDBC连接地址,如: jdbc:mysql://localhost:3306/db|
+username|是|数据库用户名|
+password|是|数据库密码|
+table|是|表名(包含数据库名或schema名),如:db.table,也支持分表,例如:table[001-100]|
+columns|否|字段名,多个字段用逗号“,”分隔。不填则选取所有字段。|
+exclude.columns|否|排除的字段名,多个字段用逗号“,”分隔|
+where|否|查询条件,如:day=’20140418’|
+sql|否|自定义查询SQL|
+split.by|否|并行读取切分的字段|
+max.size.per.fetch|否|单次执行SQL获取的最多记录数|
+null.string|否|替换当字符串类型的字段值为NULL时的值|
+null.non.string|否|替换当非字符串类型的字段值为NULL时的值|
+field.wrap.replace.string|否|若字符串字段中存在换行符时需要替换的值|
+number.format|否|小数类型字段的输出格式|
+
+* hive
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+metastore.uris|是|Hive Metastore连接地址,如:thrift://localhost:9083|
+database|否|数据库名,默认:default|
+table|是|表名|
+partitions|否|分区,例如: visit_date='2016-07-07'|
+hadoop.user|否|具有HDFS读权限的用户名|
+hdfs.conf.path|否|hdfs-site.xml配置文件路径|
+
+* hdfs
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+dir|是|HDFS目录路径,如:hdfs://192.168.1.1:8020/user/dir1|
+filename|是|文件名,支持正则表达式|
+schema|否|输出的字段定义|
+fields.separator|否|字段分隔符,默认:\0001|
+encoding|否|文件编码,默认:UTF-8|
+hadoop.user|否|具有HDFS读权限的用户名|
+hdfs.conf.path|否|hdfs-site.xml配置文件路径|
+
+* hbase
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+zookeeper.quorum|是|Zookeeper连接地址,如:192.168.1.16,192.168.1.17|
+zookeeper.client.port|否|Zookeeper客户端端口,默认:2181|
+table|是|表名|
+start.rowkey|否|Rowkey起始值|
+end.rowkey|否|Rowkey结束值|
+columns|是|读取的列,如::rowkey,cf:start_ip,cf:end_ip|
+schema|是|输出的字段定义,如:id,start_ip,end_ip|
+
+* http
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+url|是|URL链接|
+encoding|否|编码,默认UTF-8|
+
+* kafka
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+topic|是|需要消费的topic|
+group.id|是|consumer组id|
+zookeeper.connect|是|Zookeeper连接地址,如:198.168.12.34:2181|
+consumer.stream.count|否|数据消费流的数量,默认为1|
+encoding|否|编码,默认UTF-8|
+max.fetch.size|否|最大fetch数,默认:100000|
+max.wait.second|否|最大等待时间(单位:秒),默认:300|
+partition.id|否|默认:0|
+start.offset|否|需要消费的起始offset|
+fields.separator|否|字段分隔符,默认\t|
+schema|否|输出的字段定义,如:id,start_ip,end_ip|
+
+* ftp
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+host|是|FTP连接地址,如:192.168.1.1|
+port|否|FTP端口,默认:21|
+username|是|用户名|
+password|是|密码|
+dir|是|FTP目录,如:/input/dir|
+filename|是|文件名,支持正则表达式|
+recursive|否|是否递归搜索文件,默认:false|
+encoding|否|文件编码,默认:UTF-8|
+fields.separator|否|字段分隔符,默认:\t|
+schema|否|输出的字段定义|
+fields.count.filter|否|符合的字段数,不符合则过滤记录|
+
+* mongodb
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+uri|是|MongoDB连接地址,如:mongodb://localhost/test.ip|
+query|否|查询语句,如:{"city":"XXX"}|
+
+* csv
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+path|是|本地文件路径|
+start.row|否|数据起始行数,默认:1|
+encoding|否|编码,默认:UTF-8|
+
+* excel
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+path|是|文件路径|
+include.column.names|否|是否包含列名,默认:false|
+
+
+#### 【Writer配置参数】
+
+* jdbc
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+driver|是|JDBC驱动类名,如:com.mysql.jdbc.Driver|
+url|是|JDBC连接地址,如: jdbc:mysql://localhost:3306/db|
+username|是|数据库用户名|
+password|是|数据库密码|
+table|是|表名(包含数据库名或schema名),如:db.table|
+batch.insert.size|否|批量插入的记录数,默认值:10000|
+
+* hive
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+metastore.uris|是|Hive Metastore连接地址,如:thrift://localhost:9083|
+database|否|数据库名,默认:default|
+table|是|表名|
+partitions|否|分区条件,如:day='20140418'|
+hadoop.user|否|具有HDFS写权限的用户名|
+hdfs.conf.path|否|hdfs-site.xml配置文件路径|
+
+* hdfs
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+path|是|HDFS文件路径,如:hdfs://192.168.1.1:8020/user/1.txt|
+fields.separator|否|字段分隔符,默认:\t|
+line.separator|否|行分隔符,默认:\n|
+encoding|否|文件编码,默认:UTF-8|
+compress.codec|否|压缩编码,如:org.apache.hadoop.io.compress.GzipCodec|
+hadoop.user|否|具有HDFS写权限的用户名|
+max.file.size.mb|否|单个文件最大大小限制(单位:MB)|
+partition.date.index|否|日期字段索引值,起始值为0|
+partition.date.format|否|日期格式,如:yyyy-MM-dd|
+hdfs.conf.path|否|hdfs-site.xml配置文件路径|
+
+* hbase
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+zookeeper.quorum|是|Zookeeper连接地址,如:192.168.1.16,192.168.1.17|
+zookeeper.client.port|否|Zookeeper客户端端口,默认:2181|
+table|是|表名|
+columns|是|列名,如::rowkey,cf:start_ip|
+batch.insert.size|否|批量插入的记录数,默认值:10000|
+
+* kafka
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+topic|是|需要消费的topic|
+fields.separator|否|字段分隔符,默认\t|
+
+* ftp
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+host|是|FTP连接地址,如:192.168.1.1|
+port|否|FTP端口,默认:21|
+username|是|用户名|
+password|是|密码|
+path|是|FTP保存目录|
+encoding|否|文件编码,默认:UTF-8|
+fields.separator|否|字段分隔符,默认:\t|
+line.separator|否|行分隔符,默认\n|
+gzip.compress|否|是否启用gzip压缩,默认:false|
+
+* mongodb
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+uri|是|MongoDB连接地址,如:mongodb://localhost/test.ip|
+query|否|查询语句,如:{"city":"XXX"}|
+
+* csv
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+path|是|本地文件路径|
+separator|否|字段分隔符,默认逗号","|
+encoding|否|编码,默认:UTF-8|
+
+* excel
+
+参数        | 是否必选   | 描述                    |
+-----------| ----- | ---------------------------------------- |
+path|是|文件路径|
+include.column.names|否|是否包含列名,默认:false|
+
+* console
+
+无配置参数,一般仅用于测试

+ 13 - 4
bin/hdata

@@ -35,12 +35,21 @@ for f in $HDATA_LIB_DIR/*.jar; do
     HDATA_CLASSPATH=${HDATA_CLASSPATH}:$f;
 done
 
+JAVA_OPTS="$JAVA_OPTS -Xss256k"
+JAVA_OPTS="$JAVA_OPTS -Xms1G -Xmx1G -Xmn512M"
+JAVA_OPTS="$JAVA_OPTS -XX:+UseParNewGC"
+JAVA_OPTS="$JAVA_OPTS -XX:+UseConcMarkSweepGC"
+JAVA_OPTS="$JAVA_OPTS -XX:+CMSClassUnloadingEnabled"
+JAVA_OPTS="$JAVA_OPTS -XX:+CMSParallelRemarkEnabled"
+JAVA_OPTS="$JAVA_OPTS -XX:+DisableExplicitGC"
+JAVA_OPTS="$JAVA_OPTS -XX:CMSInitiatingOccupancyFraction=75"
+JAVA_OPTS="$JAVA_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
+JAVA_OPTS="$JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError"
+JAVA_OPTS="$JAVA_OPTS -XX:SoftRefLRUPolicyMSPerMB=0"
+
 JAVA_OPTS="$JAVA_OPTS -Dhdata.conf.dir=$HDATA_CONF_DIR"
 JAVA_OPTS="$JAVA_OPTS -Dlog4j.configurationFile=file:///$HDATA_CONF_DIR/log4j2.xml"
 
-MAIN_CLASS="com.suning.hdata.CliDriver"
-if [ "$1" = "execute-sql" ]; then
-    MAIN_CLASS="com.suning.hdata.tool.SQLExecuteTool"
-fi
+MAIN_CLASS="com.github.stuxuhai.hdata.CliDriver"
 
 exec "$JAVA" $JAVA_OPTS -cp "$HDATA_CLASSPATH" $MAIN_CLASS "$@"

+ 0 - 38
bin/hdata.bat

@@ -1,38 +0,0 @@
-@echo off
-
-SETLOCAL
-
-if NOT DEFINED JAVA_HOME goto err
-
-set SCRIPT_DIR=%~dp0
-for %%I in ("%SCRIPT_DIR%..") do set HDATA_HOME=%%~dpfI
-
-set MAIN_CLASSPATH=.;%HDATA_HOME%\lib\*
-set HDATA_CONF_DIR=%HDATA_HOME%\conf
-
-set JAVA_OPTS=%JAVA_OPTS% -Xss256k
-set JAVA_OPTS=%JAVA_OPTS% -XX:+UseParNewGC
-set JAVA_OPTS=%JAVA_OPTS% -XX:+UseConcMarkSweepGC
-
-set JAVA_OPTS=%JAVA_OPTS% -XX:CMSInitiatingOccupancyFraction=75
-set JAVA_OPTS=%JAVA_OPTS% -XX:+UseCMSInitiatingOccupancyOnly
-set JAVA_OPTS=%JAVA_OPTS% -XX:+HeapDumpOnOutOfMemoryError
-set JAVA_OPTS=%JAVA_OPTS% -Dhdata.conf.dir="%HDATA_CONF_DIR%"
-set JAVA_OPTS=%JAVA_OPTS% -Dlog4j.configurationFile="file:///%HDATA_CONF_DIR%/log4j2.xml"
-
-set FIRST_ARG=%1
-set MAIN_CLASS="com.suning.hdata.CliDriver"
-if "%FIRST_ARG%"=="execute-sql" (set MAIN_CLASS="com.suning.hdata.tool.SQLExecuteTool")
-
-"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp "%MAIN_CLASSPATH%" %MAIN_CLASS% %*
-
-goto finally
-
-:err
-echo JAVA_HOME environment variable must be set!
-pause
-
-
-:finally
-
-ENDLOCAL

+ 21 - 49
conf/plugins.xml

@@ -4,118 +4,90 @@
 	<readers>
 		<reader>
 			<name>jdbc</name>
-			<class>com.mogujie.hdata.plugin.reader.jdbc.JDBCReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.jdbc.JDBCReader</class>
 		</reader>
 		<reader>
 			<name>hive</name>
-			<class>com.mogujie.hdata.plugin.reader.hive.HiveReader</class>
-		</reader>
-		<reader>
-			<name>dump</name>
-			<class>com.mogujie.hdata.plugin.reader.dump.DumpReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.hive.HiveReader</class>
 		</reader>
 		<reader>
 			<name>hdfs</name>
-			<class>com.mogujie.hdata.plugin.reader.hdfs.HDFSReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.hdfs.HDFSReader</class>
 		</reader>
 		<reader>
 			<name>ftp</name>
-			<class>com.mogujie.hdata.plugin.reader.ftp.FTPReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.ftp.FTPReader</class>
 		</reader>
 		<reader>
 			<name>mongodb</name>
-			<class>com.mogujie.hdata.plugin.reader.mongodb.MongoDBReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.mongodb.MongoDBReader</class>
 		</reader>
 		<reader>
 			<name>hbase</name>
-			<class>com.mogujie.hdata.plugin.reader.hbase.HBaseReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.hbase.HBaseReader</class>
 		</reader>
 		<reader>
 			<name>http</name>
-			<class>com.mogujie.hdata.plugin.reader.http.HttpReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.http.HttpReader</class>
 		</reader>
 		<reader>
 			<name>csv</name>
-			<class>com.mogujie.hdata.plugin.reader.csv.CSVReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.csv.CSVReader</class>
 		</reader>
 		<reader>
 			<name>kafka</name>
-			<class>com.mogujie.hdata.plugin.reader.kafka.KafkaReader</class>
-		</reader>
-		<reader>
-			<name>corgi</name>
-			<class>com.mogujie.hdata.plugin.reader.corgi.CorgiReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.kafka.KafkaReader</class>
 		</reader>
 		<reader>
 			<name>console</name>
-			<class>com.mogujie.hdata.plugin.reader.console.ConsoleReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.console.ConsoleReader</class>
 		</reader>
 		<reader>
 			<name>excel</name>
-			<class>com.mogujie.hdata.plugin.excel.reader.ExcelReader</class>
-		</reader>
-		<reader>
-			<name>elasticsearch2</name>
-			<class>com.mogujie.hdata.plugin.reader.elasticsearch2.ElasticSearchReader</class>
-		</reader>
-		<reader>
-			<name>elasticsearch1</name>
-			<class>com.mogujie.hdata.plugin.reader.elasticsearch1.ElasticSearchReader</class>
+			<class>com.github.stuxuhai.hdata.plugin.excel.reader.ExcelReader</class>
 		</reader>
 	</readers>
 
 	<writers>
 		<writer>
 			<name>console</name>
-			<class>com.mogujie.hdata.plugin.writer.console.ConsoleWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.console.ConsoleWriter</class>
 		</writer>
 		<writer>
 			<name>jdbc</name>
-			<class>com.mogujie.hdata.plugin.writer.jdbc.JDBCWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.jdbc.JDBCWriter</class>
 		</writer>
 		<writer>
 			<name>hive</name>
-			<class>com.mogujie.hdata.plugin.writer.hive.HiveWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.hive.HiveWriter</class>
 		</writer>
 		<writer>
 			<name>hdfs</name>
-			<class>com.mogujie.hdata.plugin.writer.hdfs.HDFSWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.hdfs.HDFSWriter</class>
 		</writer>
 		<writer>
 			<name>ftp</name>
-			<class>com.mogujie.hdata.plugin.writer.ftp.FTPWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.ftp.FTPWriter</class>
 		</writer>
 		<writer>
 			<name>mongodb</name>
-			<class>com.mogujie.hdata.plugin.writer.mongodb.MongoDBWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.mongodb.MongoDBWriter</class>
 		</writer>
 		<writer>
 			<name>hbase</name>
-			<class>com.mogujie.hdata.plugin.writer.hbase.HBaseWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.writer.hbase.HBaseWriter</class>
 		</writer>
 		<writer>
 			<name>csv</name>
-			<class>com.mogujie.hdata.plugin.writer.csv.CSVWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.csv.CSVWriter</class>
 		</writer>
 		<writer>
 			<name>kafka</name>
-			<class>com.mogujie.hdata.plugin.writer.kafka.KafkaWriter</class>
-		</writer>
-		<writer>
-			<name>corgi</name>
-			<class>com.mogujie.hdata.plugin.writer.corgi.CorgiWriter</class>
-		</writer>
-		<writer>
-			<name>elasticsearch2</name>
-			<class>com.mogujie.hdata.plugin.writer.elasticsearch2.ElasticSearchWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.reader.kafka.KafkaWriter</class>
 		</writer>
 		<writer>
 			<name>excel</name>
-			<class>com.mogujie.hdata.plugin.excel.writer.ExcelWriter</class>
-		</writer>
-		<writer>
-			<name>elasticsearch1</name>
-			<class>com.mogujie.hdata.plugin.writer.elasticsearch1.ElasticSearchWriter</class>
+			<class>com.github.stuxuhai.hdata.plugin.excel.writer.ExcelWriter</class>
 		</writer>
 	</writers>
 </plugins>

+ 137 - 132
hdata-core/src/main/java/com/github/stuxuhai/hdata/CliDriver.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata;
 
 import java.util.Map.Entry;
@@ -24,136 +31,134 @@ import com.google.common.base.Throwables;
 
 public class CliDriver {
 
-	private static final String XML_FILE_OPTION = "f";
-	private static final String HDATA_VARS_OPTION = "D";
-	private static final String QUIET_OPTION = "q";
-	private static final String READER_OPTION = "reader";
-	private static final String WRITER_OPTION = "writer";
-	private static final String READER_VARS_OPTION = "R";
-	private static final String WRITER_VARS_OPTION = "W";
-
-	private static final Logger LOGGER = LogManager.getLogger();
-
-	/**
-	 * 创建命令行选项
-	 * 
-	 * @return
-	 */
-	public Options createOptions() {
-		Options options = new Options();
-		options.addOption(XML_FILE_OPTION, null, true, "job xml path");
-		options.addOption(QUIET_OPTION, null, false, "quiet");
-		options.addOption(Option.builder(HDATA_VARS_OPTION).hasArgs().build());
-
-		options.addOption(null, READER_OPTION, true, "reader name");
-		options.addOption(Option.builder(READER_VARS_OPTION).hasArgs().build());
-
-		options.addOption(null, WRITER_OPTION, true, "writer name");
-		options.addOption(Option.builder(WRITER_VARS_OPTION).hasArgs().build());
-		return options;
-	}
-
-	/**
-	 * 打印命令行帮助信息
-	 * 
-	 * @param options
-	 */
-	public void printHelp(Options options) {
-		HelpFormatter formatter = new HelpFormatter();
-		formatter.printHelp(" ", options);
-	}
-
-	/**
-	 * 替换命令行变量
-	 * 
-	 * @param config
-	 * @param vars
-	 */
-	public void replaceConfigVars(PluginConfig config, Properties vars) {
-		for (Entry<Object, Object> confEntry : config.entrySet()) {
-			if (confEntry.getKey().getClass() == String.class && confEntry.getValue().getClass() == String.class) {
-				for (Entry<Object, Object> varEntry : vars.entrySet()) {
-					String replaceVar = "${" + varEntry.getKey() + "}";
-					if (confEntry.getValue().toString().contains(replaceVar)) {
-						config.put(confEntry.getKey(),
-								confEntry.getValue().toString().replace(replaceVar, varEntry.getValue().toString()));
-					}
-				}
-			}
-		}
-	}
-
-	private void putOptionValues(Properties props, String[] values) {
-		if (props != null && values != null) {
-			for (int i = 0, len = values.length; i < len; i++) {
-				props.put(values[i], values[++i]);
-			}
-		}
-	}
-
-	/**
-	 * 主程序入口
-	 * 
-	 * @param args
-	 */
-	public static void main(String[] args) {
-
-		CliDriver cliDriver = new CliDriver();
-		Options options = cliDriver.createOptions();
-		if (args.length < 1) {
-			cliDriver.printHelp(options);
-			System.exit(-1);
-		}
-
-		CommandLineParser parser = new DefaultParser();
-		CommandLine cmd = null;
-		try {
-			cmd = parser.parse(options, args);
-			if (cmd.hasOption(QUIET_OPTION)) {
-				LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
-				Configuration conf = ctx.getConfiguration();
-				conf.getLoggerConfig(LogManager.ROOT_LOGGER_NAME).setLevel(Level.WARN);
-				ctx.updateLoggers(conf);
-			}
-
-			final DefaultJobConfig jobConfig;
-			if (cmd.hasOption(XML_FILE_OPTION)) {
-				String jobXmlPath = cmd.getOptionValue(XML_FILE_OPTION);
-				jobConfig = DefaultJobConfig.createFromXML(jobXmlPath);
-				Properties vars = new Properties();
-				cliDriver.putOptionValues(vars, cmd.getOptionValues(HDATA_VARS_OPTION));
-
-				final PluginConfig readerConfig = jobConfig.getReaderConfig();
-				final PluginConfig writerConfig = jobConfig.getWriterConfig();
-
-				cliDriver.replaceConfigVars(readerConfig, vars);
-				cliDriver.replaceConfigVars(writerConfig, vars);
-			} else {
-				if (!cmd.hasOption(READER_OPTION) || !cmd.hasOption(WRITER_OPTION)) {
-					throw new HDataException(
-							"Option --reader and --writer should be both given if -f option not exists.");
-				}
-
-				String readerName = cmd.getOptionValue(READER_OPTION);
-				String writerName = cmd.getOptionValue(WRITER_OPTION);
-
-				PluginConfig readerConfig = new PluginConfig();
-				cliDriver.putOptionValues(readerConfig, cmd.getOptionValues(READER_VARS_OPTION));
-
-				PluginConfig writerConfig = new PluginConfig();
-				cliDriver.putOptionValues(writerConfig, cmd.getOptionValues(WRITER_VARS_OPTION));
-
-				jobConfig = new DefaultJobConfig(readerName, readerConfig, writerName, writerConfig);
-			}
-
-			HData hData = new HData();
-			hData.start(jobConfig);
-		} catch (ParseException e) {
-			cliDriver.printHelp(options);
-			System.exit(-1);
-		} catch (Exception e) {
-			LOGGER.error(Throwables.getStackTraceAsString(e));
-			System.exit(-1);
-		}
-	}
+  private static final String XML_FILE_OPTION = "f";
+  private static final String HDATA_VARS_OPTION = "D";
+  private static final String QUIET_OPTION = "q";
+  private static final String READER_OPTION = "reader";
+  private static final String WRITER_OPTION = "writer";
+  private static final String READER_VARS_OPTION = "R";
+  private static final String WRITER_VARS_OPTION = "W";
+
+  private static final Logger LOGGER = LogManager.getLogger();
+
+  /**
+   * 创建命令行选项
+   * 
+   * @return
+   */
+  public Options createOptions() {
+    Options options = new Options();
+    options.addOption(XML_FILE_OPTION, null, true, "job xml path");
+    options.addOption(QUIET_OPTION, null, false, "quiet");
+    options.addOption(Option.builder(HDATA_VARS_OPTION).hasArgs().build());
+
+    options.addOption(null, READER_OPTION, true, "reader name");
+    options.addOption(Option.builder(READER_VARS_OPTION).hasArgs().build());
+
+    options.addOption(null, WRITER_OPTION, true, "writer name");
+    options.addOption(Option.builder(WRITER_VARS_OPTION).hasArgs().build());
+    return options;
+  }
+
+  /**
+   * 打印命令行帮助信息
+   * 
+   * @param options
+   */
+  public void printHelp(Options options) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp(" ", options);
+  }
+
+  /**
+   * 替换命令行变量
+   * 
+   * @param config
+   * @param vars
+   */
+  public void replaceConfigVars(PluginConfig config, Properties vars) {
+    for (Entry<Object, Object> confEntry : config.entrySet()) {
+      if (confEntry.getKey().getClass() == String.class && confEntry.getValue().getClass() == String.class) {
+        for (Entry<Object, Object> varEntry : vars.entrySet()) {
+          String replaceVar = "${" + varEntry.getKey() + "}";
+          if (confEntry.getValue().toString().contains(replaceVar)) {
+            config.put(confEntry.getKey(), confEntry.getValue().toString().replace(replaceVar, varEntry.getValue().toString()));
+          }
+        }
+      }
+    }
+  }
+
+  private void putOptionValues(Properties props, String[] values) {
+    if (props != null && values != null) {
+      for (int i = 0, len = values.length; i < len; i++) {
+        props.put(values[i], values[++i]);
+      }
+    }
+  }
+
+  /**
+   * 主程序入口
+   * 
+   * @param args
+   */
+  public static void main(String[] args) {
+
+    CliDriver cliDriver = new CliDriver();
+    Options options = cliDriver.createOptions();
+    if (args.length < 1) {
+      cliDriver.printHelp(options);
+      System.exit(-1);
+    }
+
+    CommandLineParser parser = new DefaultParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, args);
+      if (cmd.hasOption(QUIET_OPTION)) {
+        LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
+        Configuration conf = ctx.getConfiguration();
+        conf.getLoggerConfig(LogManager.ROOT_LOGGER_NAME).setLevel(Level.WARN);
+        ctx.updateLoggers(conf);
+      }
+
+      final DefaultJobConfig jobConfig;
+      if (cmd.hasOption(XML_FILE_OPTION)) {
+        String jobXmlPath = cmd.getOptionValue(XML_FILE_OPTION);
+        jobConfig = DefaultJobConfig.createFromXML(jobXmlPath);
+        Properties vars = new Properties();
+        cliDriver.putOptionValues(vars, cmd.getOptionValues(HDATA_VARS_OPTION));
+
+        final PluginConfig readerConfig = jobConfig.getReaderConfig();
+        final PluginConfig writerConfig = jobConfig.getWriterConfig();
+
+        cliDriver.replaceConfigVars(readerConfig, vars);
+        cliDriver.replaceConfigVars(writerConfig, vars);
+      } else {
+        if (!cmd.hasOption(READER_OPTION) || !cmd.hasOption(WRITER_OPTION)) {
+          throw new HDataException("Option --reader and --writer should be both given if -f option not exists.");
+        }
+
+        String readerName = cmd.getOptionValue(READER_OPTION);
+        String writerName = cmd.getOptionValue(WRITER_OPTION);
+
+        PluginConfig readerConfig = new PluginConfig();
+        cliDriver.putOptionValues(readerConfig, cmd.getOptionValues(READER_VARS_OPTION));
+
+        PluginConfig writerConfig = new PluginConfig();
+        cliDriver.putOptionValues(writerConfig, cmd.getOptionValues(WRITER_VARS_OPTION));
+
+        jobConfig = new DefaultJobConfig(readerName, readerConfig, writerName, writerConfig);
+      }
+
+      HData hData = new HData();
+      hData.start(jobConfig);
+    } catch (ParseException e) {
+      cliDriver.printHelp(options);
+      System.exit(-1);
+    } catch (Exception e) {
+      LOGGER.error(Throwables.getStackTraceAsString(e));
+      System.exit(-1);
+    }
+  }
 }

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/common/Constants.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.common;
 
 public interface Constants {

+ 12 - 5
hdata-core/src/main/java/com/github/stuxuhai/hdata/common/HDataConfigConstants.java

@@ -1,11 +1,18 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.common;
 
 public interface HDataConfigConstants {
 
-	public static final String STORAGE_BUFFER_SIZE = "hdata.storage.default.buffer.size";
-	public static final String HDATA_STORAGE_DISRUPTOR_WAIT_STRATEGY = "hdata.storage.disruptor.wait.strategy";
-	public static final String HDATA_SLEEP_MILLIS = "hdata.sleep.millis";
-	public static final String HDATA_HIVE_WRITER_TMP_DIR = "hdata.hive.writer.tmp.dir";
-	public static final String JDBC_READER_SQL_METRIC_TIME_MS = "jdbc.reader.sql.metric.time.ms";
+    public static final String STORAGE_BUFFER_SIZE = "hdata.storage.default.buffer.size";
+    public static final String HDATA_STORAGE_DISRUPTOR_WAIT_STRATEGY = "hdata.storage.disruptor.wait.strategy";
+    public static final String HDATA_SLEEP_MILLIS = "hdata.sleep.millis";
+    public static final String HDATA_HIVE_WRITER_TMP_DIR = "hdata.hive.writer.tmp.dir";
+    public static final String JDBC_READER_SQL_METRIC_TIME_MS = "jdbc.reader.sql.metric.time.ms";
 
 }

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/config/DefaultEngineConfig.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.config;
 
 import java.util.List;

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/config/DefaultJobConfig.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.config;
 
 import java.util.Iterator;

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/DefaultRecord.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import com.github.stuxuhai.hdata.api.Record;

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/DefaultRecordCollector.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import java.util.concurrent.TimeUnit;

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/DefaultStorage.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import com.github.stuxuhai.hdata.api.JobContext;

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/HData.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import java.text.DecimalFormat;

+ 20 - 13
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/PluginClassLoader.java

@@ -1,3 +1,11 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2016 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2016年4月10日 下午4:42:43
+ */
+
 package com.github.stuxuhai.hdata.core;
 
 import java.net.URL;
@@ -5,19 +13,18 @@ import java.net.URLClassLoader;
 
 public class PluginClassLoader extends URLClassLoader {
 
-	public PluginClassLoader(URL[] urls, ClassLoader parent) {
-		super(urls, parent);
-	}
+    public PluginClassLoader(URL[] urls, ClassLoader parent) {
+        super(urls, parent);
+    }
 
-	@Override
-	public Class<?> loadClass(String name) throws ClassNotFoundException {
-		if (name.startsWith("com.mogujie.hdata.api.") || name.startsWith("org.apache.logging.")
-				|| name.startsWith("org.apache.log4j.") || name.startsWith("org.slf4j.")
-				|| name.startsWith("org.apache.commons.logging.")) {
-			return getClass().getClassLoader().loadClass(name);
-		} else {
-			return super.loadClass(name);
-		}
-	}
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException {
+        if (name.startsWith("com.github.stuxuhai.hdata.api.") || name.startsWith("org.apache.logging.") || name.startsWith("org.apache.log4j.")
+                || name.startsWith("org.slf4j.") || name.startsWith("org.apache.commons.logging.")) {
+            return getClass().getClassLoader().loadClass(name);
+        } else {
+            return super.loadClass(name);
+        }
+    }
 
 }

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/PluginLoader.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import java.util.HashMap;

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/ReaderWorker.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import java.util.concurrent.Callable;

+ 7 - 1
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/RecordEvent.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import com.github.stuxuhai.hdata.api.Record;
@@ -17,7 +24,6 @@ public class RecordEvent {
 
 	public static final EventFactory<RecordEvent> FACTORY = new EventFactory<RecordEvent>() {
 
-		@Override
 		public RecordEvent newInstance() {
 			return new RecordEvent();
 		}

+ 35 - 27
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/RecordEventExceptionHandler.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年7月2日 下午3:48:21
+ */
 package com.github.stuxuhai.hdata.core;
 
 import org.apache.logging.log4j.LogManager;
@@ -8,33 +15,34 @@ import com.google.common.base.Throwables;
 import com.lmax.disruptor.ExceptionHandler;
 import com.lmax.disruptor.dsl.Disruptor;
 
+/**
+ * @author wuya
+ *
+ */
 public class RecordEventExceptionHandler implements ExceptionHandler<Object> {
 
-	private final Disruptor<RecordEvent> disruptor;
-	private final JobContext context;
-	private static Logger LOGGER = LogManager.getLogger(RecordEventExceptionHandler.class);
-
-	public RecordEventExceptionHandler(Disruptor<RecordEvent> disruptor, JobContext context) {
-		this.disruptor = disruptor;
-		this.context = context;
-	}
-
-	@Override
-	public void handleEventException(Throwable t, long sequence, Object event) {
-		LOGGER.error(Throwables.getStackTraceAsString(t));
-		context.setWriterError(true);
-		disruptor.shutdown();
-	}
-
-	@Override
-	public void handleOnShutdownException(Throwable t) {
-		LOGGER.error(Throwables.getStackTraceAsString(t));
-		disruptor.shutdown();
-	}
-
-	@Override
-	public void handleOnStartException(Throwable t) {
-		LOGGER.error(Throwables.getStackTraceAsString(t));
-		disruptor.shutdown();
-	}
+    private final Disruptor<RecordEvent> disruptor;
+    private final JobContext context;
+    private static Logger LOGGER = LogManager.getLogger(RecordEventExceptionHandler.class);
+
+    public RecordEventExceptionHandler(Disruptor<RecordEvent> disruptor, JobContext context) {
+        this.disruptor = disruptor;
+        this.context = context;
+    }
+
+    public void handleEventException(Throwable t, long sequence, Object event) {
+        LOGGER.error(Throwables.getStackTraceAsString(t));
+        context.setWriterError(true);
+        disruptor.shutdown();
+    }
+
+    public void handleOnShutdownException(Throwable t) {
+        LOGGER.error(Throwables.getStackTraceAsString(t));
+        disruptor.shutdown();
+    }
+
+    public void handleOnStartException(Throwable t) {
+        LOGGER.error(Throwables.getStackTraceAsString(t));
+        disruptor.shutdown();
+    }
 }

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/RecordWorkHandler.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import com.github.stuxuhai.hdata.api.JobContext;

+ 27 - 21
hdata-core/src/main/java/com/github/stuxuhai/hdata/core/WaitStrategyFactory.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.core;
 
 import java.util.List;
@@ -12,26 +19,25 @@ import com.lmax.disruptor.YieldingWaitStrategy;
 
 public class WaitStrategyFactory {
 
-	private static final List<String> WAIT_STRATEGY_SUPPORTED = Lists.newArrayList(BlockingWaitStrategy.class.getName(),
-			BusySpinWaitStrategy.class.getName(), SleepingWaitStrategy.class.getName(),
-			YieldingWaitStrategy.class.getName());
+    private static final List<String> WAIT_STRATEGY_SUPPORTED = Lists.newArrayList(BlockingWaitStrategy.class.getName(),
+            BusySpinWaitStrategy.class.getName(), SleepingWaitStrategy.class.getName(), YieldingWaitStrategy.class.getName());
 
-	/**
-	 * 构造线程等待策略
-	 */
-	public static WaitStrategy build(String name) {
-		if (WAIT_STRATEGY_SUPPORTED.contains(name)) {
-			try {
-				return (WaitStrategy) Class.forName(name).newInstance();
-			} catch (InstantiationException e) {
-				throw new HDataException(e);
-			} catch (IllegalAccessException e) {
-				throw new HDataException(e);
-			} catch (ClassNotFoundException e) {
-				throw new HDataException(e);
-			}
-		} else {
-			throw new HDataException("Invalid wait strategy: " + name);
-		}
-	}
+    /**
+     * 构造线程等待策略
+     */
+    public static WaitStrategy build(String name) {
+        if (WAIT_STRATEGY_SUPPORTED.contains(name)) {
+            try {
+                return (WaitStrategy) Class.forName(name).newInstance();
+            } catch (InstantiationException e) {
+                throw new HDataException(e);
+            } catch (IllegalAccessException e) {
+                throw new HDataException(e);
+            } catch (ClassNotFoundException e) {
+                throw new HDataException(e);
+            }
+        } else {
+            throw new HDataException("Invalid wait strategy: " + name);
+        }
+    }
 }

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/util/JdbcUtils.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.util;
 
 import java.sql.Connection;

+ 9 - 2
hdata-core/src/main/java/com/github/stuxuhai/hdata/util/NumberUtils.java

@@ -1,14 +1,21 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: xingtian
+ * Create Date: 2015年1月16日 下午3:35:16
+ */
 package com.github.stuxuhai.hdata.util;
 
 /**
  * 数字处理工具类
  *
- */
+ * */
 public class NumberUtils {
 	/**
 	 * 获取 起始和结束 范围内的所有 数值
 	 * 
-	 */
+	 * */
 	public static int[] getRange(int before, int after) {
 		int bigger = Math.max(before, after);
 		int smaller = Math.min(before, after);

+ 8 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/util/PluginUtils.java

@@ -1,3 +1,11 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2016 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2016年4月9日 上午11:39:24
+ */
+
 package com.github.stuxuhai.hdata.util;
 
 import java.io.File;

+ 42 - 35
hdata-core/src/main/java/com/github/stuxuhai/hdata/util/TypeConvertUtils.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.util;
 
 import java.math.BigDecimal;
@@ -5,39 +12,39 @@ import java.math.BigInteger;
 
 public class TypeConvertUtils {
 
-	/**
-	 * 数据类型转换
-	 * 
-	 * @param src
-	 * @param clazz
-	 * @return
-	 */
-	public static Object convert(Object src, Class<?> clazz) {
-		if (src == null) {
-			return null;
-		} else if (src instanceof String) {
-			if (clazz == Integer.class) {
-				return Integer.valueOf(src.toString());
-			} else if (clazz == Long.class) {
-				return Long.valueOf(src.toString());
-			} else if (clazz == Double.class) {
-				return Double.valueOf(src.toString());
-			} else if (clazz == Float.class) {
-				return Float.valueOf(src.toString());
-			} else if (clazz == Boolean.class) {
-				return Boolean.valueOf(src.toString());
-			} else if (clazz == Short.class) {
-				return Short.valueOf(src.toString());
-			} else if (clazz == Byte.class) {
-				return Byte.valueOf(src.toString());
-			} else if (clazz == BigInteger.class) {
-				return BigInteger.valueOf(Long.valueOf(src.toString()));
-			} else if (clazz == BigDecimal.class) {
-				return new BigDecimal(src.toString());
-			}
-		} else if (clazz == String.class) {
-			return src.toString();
-		}
-		return src;
-	}
+    /**
+     * 数据类型转换
+     * 
+     * @param src
+     * @param clazz
+     * @return
+     */
+    public static Object convert(Object src, Class<?> clazz) {
+        if (src == null) {
+            return null;
+        } else if (src instanceof String) {
+            if (clazz == Integer.class) {
+                return Integer.valueOf(src.toString());
+            } else if (clazz == Long.class) {
+                return Long.valueOf(src.toString());
+            } else if (clazz == Double.class) {
+                return Double.valueOf(src.toString());
+            } else if (clazz == Float.class) {
+                return Float.valueOf(src.toString());
+            } else if (clazz == Boolean.class) {
+                return Boolean.valueOf(src.toString());
+            } else if (clazz == Short.class) {
+                return Short.valueOf(src.toString());
+            } else if (clazz == Byte.class) {
+                return Byte.valueOf(src.toString());
+            } else if (clazz == BigInteger.class) {
+                return BigInteger.valueOf(Long.valueOf(src.toString()));
+            } else if (clazz == BigDecimal.class) {
+                return new BigDecimal(src.toString());
+            }
+        } else if (clazz == String.class) {
+            return src.toString();
+        }
+        return src;
+    }
 }

+ 7 - 0
hdata-core/src/main/java/com/github/stuxuhai/hdata/util/Utils.java

@@ -1,3 +1,10 @@
+/*
+ * 蘑菇街 Inc.
+ * Copyright (c) 2010-2014 All Rights Reserved.
+ *
+ * Author: wuya
+ * Create Date: 2014年6月26日 下午4:35:16
+ */
 package com.github.stuxuhai.hdata.util;
 
 import java.util.ArrayList;

+ 0 - 13
hdata-jdbc/src/main/java/com/github/stuxuhai/hdata/plugin/writer/jdbc/JBDCWriterProperties.java

@@ -1,13 +0,0 @@
-package com.github.stuxuhai.hdata.plugin.writer.jdbc;
-
-public class JBDCWriterProperties {
-
-	public static final String DRIVER = "driver";
-	public static final String URL = "url";
-	public static final String USERNAME = "username";
-	public static final String PASSWORD = "password";
-	public static final String TABLE = "table";
-	public static final String BATCH_INSERT_SIZE = "batch.insert.size";
-	public static final String PARALLELISM = "parallelism";
-
-}

+ 112 - 100
hdata-jdbc/src/main/java/com/github/stuxuhai/hdata/plugin/writer/jdbc/JDBCWriter.java

@@ -26,104 +26,116 @@ import com.google.common.base.Preconditions;
 
 public class JDBCWriter extends Writer {
 
-	private Connection connection = null;
-	private PreparedStatement statement = null;
-	private int count;
-	private int batchInsertSize;
-	private Fields columns;
-	private String table;
-	private Map<String, Integer> columnTypes;
-	private final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(Constants.DATE_FORMAT_STRING);
-	private static final int DEFAULT_BATCH_INSERT_SIZE = 10000;
-	private static final Logger LOG = LogManager.getLogger(JDBCWriter.class);
-
-	@Override
-	public void prepare(JobContext context, PluginConfig writerConfig) {
-		columns = context.getFields();
-		String driver = writerConfig.getString(JBDCWriterProperties.DRIVER);
-		Preconditions.checkNotNull(driver, "JDBC writer required property: driver");
-
-		String url = writerConfig.getString(JBDCWriterProperties.URL);
-		Preconditions.checkNotNull(url, "HDFS writer required property: url");
-
-		String username = writerConfig.getString(JBDCWriterProperties.USERNAME);
-		String password = writerConfig.getString(JBDCWriterProperties.PASSWORD);
-		String table = writerConfig.getString(JBDCWriterProperties.TABLE);
-		Preconditions.checkNotNull(table, "HDFS writer required property: table");
-
-		this.table = table;
-		batchInsertSize = writerConfig.getInt(JBDCWriterProperties.BATCH_INSERT_SIZE, DEFAULT_BATCH_INSERT_SIZE);
-		if (batchInsertSize < 1) {
-			batchInsertSize = DEFAULT_BATCH_INSERT_SIZE;
-		}
-
-		try {
-			connection = JdbcUtils.getConnection(driver, url, username, password);
-			connection.setAutoCommit(false);
-			columnTypes = JdbcUtils.getColumnTypes(connection, table);
-
-			String sql = null;
-			if (columns != null) {
-				String[] placeholder = new String[columns.size()];
-				Arrays.fill(placeholder, "?");
-				sql = String.format("INSERT INTO %s(%s) VALUES(%s)", table, "`" + Joiner.on("`, `").join(columns) + "`",
-						Joiner.on(", ").join(placeholder));
-				LOG.debug(sql);
-				statement = connection.prepareStatement(sql);
-			}
-		} catch (Exception e) {
-			throw new HDataException(e);
-		}
-	}
-
-	@Override
-	public void execute(Record record) {
-		try {
-			if (statement == null) {
-				String[] placeholder = new String[record.size()];
-				Arrays.fill(placeholder, "?");
-				String sql = String.format("INSERT INTO %s VALUES(%s)", table, Joiner.on(", ").join(placeholder));
-				LOG.debug(sql);
-				statement = connection.prepareStatement(sql);
-			}
-
-			for (int i = 0, len = record.size(); i < len; i++) {
-				if (record.get(i) instanceof Timestamp
-						&& !Integer.valueOf(Types.TIMESTAMP).equals(columnTypes.get(columns.get(i).toLowerCase()))) {
-					statement.setObject(i + 1, DATE_FORMAT.format(record.get(i)));
-				} else {
-					statement.setObject(i + 1, record.get(i));
-				}
-			}
-
-			count++;
-			statement.addBatch();
-
-			if (count % batchInsertSize == 0) {
-				count = 0;
-				statement.executeBatch();
-				connection.commit();
-			}
-		} catch (SQLException e) {
-			throw new HDataException(e);
-		}
-	}
-
-	@Override
-	public void close() {
-		try {
-			if (connection != null && statement != null && count > 0) {
-				statement.executeBatch();
-				connection.commit();
-			}
-
-			if (statement != null) {
-				statement.close();
-			}
-		} catch (SQLException e) {
-			throw new HDataException(e);
-		} finally {
-			DbUtils.closeQuietly(connection);
-		}
-	}
+    private Connection connection = null;
+    private PreparedStatement statement = null;
+    private int count;
+    private int batchInsertSize;
+    private Fields columns;
+    private String[] schema;
+    private String table;
+    private Map<String, Integer> columnTypes;
+    private final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(Constants.DATE_FORMAT_STRING);
+    private static final int DEFAULT_BATCH_INSERT_SIZE = 10000;
+    private static final Logger LOG = LogManager.getLogger(JDBCWriter.class);
+
+    @Override
+    public void prepare(JobContext context, PluginConfig writerConfig) {
+        columns = context.getFields();
+        String driver = writerConfig.getString(JDBCWriterProperties.DRIVER);
+        Preconditions.checkNotNull(driver, "JDBC writer required property: driver");
+
+        String schemaStr = writerConfig.getString("schema");
+        if ((schemaStr != null) && (!schemaStr.trim().isEmpty())) {
+            this.schema = schemaStr.split(",");
+        }
+
+        String url = writerConfig.getString(JDBCWriterProperties.URL);
+        Preconditions.checkNotNull(url, "HDFS writer required property: url");
+
+        String username = writerConfig.getString(JDBCWriterProperties.USERNAME);
+        String password = writerConfig.getString(JDBCWriterProperties.PASSWORD);
+        String table = writerConfig.getString(JDBCWriterProperties.TABLE);
+        Preconditions.checkNotNull(table, "HDFS writer required property: table");
+
+        this.table = table;
+        batchInsertSize = writerConfig.getInt(JDBCWriterProperties.BATCH_INSERT_SIZE, DEFAULT_BATCH_INSERT_SIZE);
+        if (batchInsertSize < 1) {
+            batchInsertSize = DEFAULT_BATCH_INSERT_SIZE;
+        }
+
+        try {
+            connection = JdbcUtils.getConnection(driver, url, username, password);
+            connection.setAutoCommit(false);
+            columnTypes = JdbcUtils.getColumnTypes(connection, table);
+
+            String sql = null;
+            if (this.schema != null) {
+                String[] placeholder = new String[this.schema.length];
+                Arrays.fill(placeholder, "?");
+                sql = String.format("INSERT INTO %s(%s) VALUES(%s)",
+                        new Object[] { table, "`" + Joiner.on("`, `").join(this.schema) + "`", Joiner.on(", ").join(placeholder) });
+                LOG.debug(sql);
+                this.statement = this.connection.prepareStatement(sql);
+            } else if (this.columns != null) {
+                String[] placeholder = new String[this.columns.size()];
+                Arrays.fill(placeholder, "?");
+                sql = String.format("INSERT INTO %s(%s) VALUES(%s)",
+                        new Object[] { table, "`" + Joiner.on("`, `").join(this.columns) + "`", Joiner.on(", ").join(placeholder) });
+                LOG.debug(sql);
+                this.statement = this.connection.prepareStatement(sql);
+            }
+        } catch (Exception e) {
+            throw new HDataException(e);
+        }
+    }
+
+    @Override
+    public void execute(Record record) {
+        try {
+            if (statement == null) {
+                String[] placeholder = new String[record.size()];
+                Arrays.fill(placeholder, "?");
+                String sql = String.format("INSERT INTO %s VALUES(%s)", table, Joiner.on(", ").join(placeholder));
+                LOG.debug(sql);
+                statement = connection.prepareStatement(sql);
+            }
+
+            for (int i = 0, len = record.size(); i < len; i++) {
+                if (record.get(i) instanceof Timestamp && !Integer.valueOf(Types.TIMESTAMP).equals(columnTypes.get(columns.get(i).toLowerCase()))) {
+                    statement.setObject(i + 1, DATE_FORMAT.format(record.get(i)));
+                } else {
+                    statement.setObject(i + 1, record.get(i));
+                }
+            }
+
+            count++;
+            statement.addBatch();
+
+            if (count % batchInsertSize == 0) {
+                count = 0;
+                statement.executeBatch();
+                connection.commit();
+            }
+        } catch (SQLException e) {
+            throw new HDataException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (connection != null && statement != null && count > 0) {
+                statement.executeBatch();
+                connection.commit();
+            }
+
+            if (statement != null) {
+                statement.close();
+            }
+        } catch (SQLException e) {
+            throw new HDataException(e);
+        } finally {
+            DbUtils.closeQuietly(connection);
+        }
+    }
 }

+ 13 - 0
hdata-jdbc/src/main/java/com/github/stuxuhai/hdata/plugin/writer/jdbc/JDBCWriterProperties.java

@@ -0,0 +1,13 @@
+package com.github.stuxuhai.hdata.plugin.writer.jdbc;
+
+public class JDBCWriterProperties {
+
+    public static final String DRIVER = "driver";
+    public static final String URL = "url";
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+    public static final String TABLE = "table";
+    public static final String BATCH_INSERT_SIZE = "batch.insert.size";
+    public static final String PARALLELISM = "parallelism";
+    public static final String SCHEMA = "schema";
+}