123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #!/bin/usr/env python
- #-*- coding: utf-8 -*-
- from optparse import OptionParser
- import sys
- import json
- import tabulate
- import zlib
- from ots2 import *
- class ConsoleConfig:
- def __init__(self, config_file):
- f = open(config_file, 'r')
- config = json.loads(f.read())
- self.endpoint = str(config['endpoint'])
- self.accessid = str(config['accessId'])
- self.accesskey = str(config['accessKey'])
- self.instance_name = str(config['instanceName'])
- self.status_table = str(config['statusTable'])
- self.ots = OTSClient(self.endpoint, self.accessid, self.accesskey, self.instance_name)
- def describe_job(config, options):
- '''
- 1. get job's description
- 2. get all job's checkpoints and check if it is done
- '''
- if not options.stream_id:
- print "Error: Should set the stream id using '-s' or '--streamid'."
- sys.exit(-1)
- if not options.timestamp:
- print "Error: Should set the timestamp using '-t' or '--timestamp'."
- sys.exit(-1)
- pk = [('StreamId', options.stream_id), ('StatusType', 'DataxJobDesc'), ('StatusValue', '%16d' % int(options.timestamp))]
- consumed, pk, attrs, next_token = config.ots.get_row(config.status_table, pk, [], None, 1)
- if not attrs:
- print 'Stream job is not found.'
- sys.exit(-1)
- job_detail = parse_job_detail(attrs)
- print '----------JobDescriptions----------'
- print json.dumps(job_detail, indent=2)
- print '-----------------------------------'
- stream_checkpoints = _list_checkpoints(config, options.stream_id, int(options.timestamp))
- cps_headers = ['ShardId', 'SendRecordCount', 'Checkpoint', 'SkipCount', 'Version']
- table_content = []
- for cp in stream_checkpoints:
- table_content.append([cp['ShardId'], cp['SendRecordCount'], cp['Checkpoint'], cp['SkipCount'], cp['Version']])
- print tabulate.tabulate(table_content, headers=cps_headers)
- # check if stream job has finished
- finished = True
- if len(job_detail['ShardIds']) != len(stream_checkpoints):
- finished = False
- for cp in stream_checkpoints:
- if cp['Version'] != job_detail['Version']:
- finished = False
- print '----------JobSummary----------'
- print 'ShardsCount:', len(job_detail['ShardIds'])
- print 'CheckPointsCount:', len(stream_checkpoints)
- print 'JobStatus:', 'Finished' if finished else 'NotFinished'
- print '------------------------------'
- def _list_checkpoints(config, stream_id, timestamp):
- start_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % timestamp)]
- end_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % (timestamp + 1))]
- consumed_counter = CapacityUnit(0, 0)
- columns_to_get = []
- checkpoints = []
- range_iter = config.ots.xget_range(
- config.status_table, Direction.FORWARD,
- start_pk, end_pk,
- consumed_counter, columns_to_get, 100,
- column_filter=None, max_version=1
- )
- rows = []
- for (primary_key, attrs) in range_iter:
- checkpoint = {}
- for attr in attrs:
- checkpoint[attr[0]] = attr[1]
- if not checkpoint.has_key('SendRecordCount'):
- checkpoint['SendRecordCount'] = 0
- checkpoint['ShardId'] = primary_key[2][1].split('\t')[1]
- checkpoints.append(checkpoint)
- return checkpoints
- def list_job(config, options):
- '''
- Two options:
- 1. list all jobs of stream
- 2. list all jobs and all streams
- '''
- consumed_counter = CapacityUnit(0, 0)
- if options.stream_id:
- start_pk = [('StreamId', options.stream_id), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
- end_pk = [('StreamId', options.stream_id), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
- else:
- start_pk = [('StreamId', INF_MIN), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
- end_pk = [('StreamId', INF_MAX), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
- columns_to_get = []
- range_iter = config.ots.xget_range(
- config.status_table, Direction.FORWARD,
- start_pk, end_pk,
- consumed_counter, columns_to_get, None,
- column_filter=None, max_version=1
- )
- rows = []
- for (primary_key, attrs) in range_iter:
- if primary_key[1][1] == 'DataxJobDesc':
- job_detail = parse_job_detail(attrs)
- rows.append([job_detail['TableName'], job_detail['JobStreamId'], job_detail['EndTime'], job_detail['StartTime'], job_detail['EndTime'], job_detail['Version']])
- headers = ['TableName', 'JobStreamId', 'Timestamp', 'StartTime', 'EndTime', 'Version']
- print tabulate.tabulate(rows, headers=headers)
- def parse_job_detail(attrs):
- job_details = {}
- shard_ids_content = ''
- for attr in attrs:
- if attr[0].startswith('ShardIds_'):
- shard_ids_content += attr[1]
- else:
- job_details[attr[0]] = attr[1]
- shard_ids = json.loads(zlib.decompress(shard_ids_content))
- if not job_details.has_key('Version'):
- job_details['Version'] = ''
- if not job_details.has_key('SkipCount'):
- job_details['SkipCount'] = 0
- job_details['ShardIds'] = shard_ids
- return job_details
- def parse_time(value):
- try:
- return int(value)
- except Exception,e:
- return int(time.mktime(time.strptime(value, '%Y-%m-%d %H:%M:%S')))
- if __name__ == '__main__':
- parser = OptionParser()
- parser.add_option('-c', '--config', dest='config_file', help='path of config file', metavar='tablestore_streamreader_config.json')
- parser.add_option('-a', '--action', dest='action', help='the action to do', choices = ['describe_job', 'list_job'], metavar='')
- parser.add_option('-t', '--timestamp', dest='timestamp', help='the timestamp', metavar='')
- parser.add_option('-s', '--streamid', dest='stream_id', help='the id of stream', metavar='')
- parser.add_option('-d', '--shardid', dest='shard_id', help='the id of shard', metavar='')
- options, args = parser.parse_args()
- if not options.config_file:
- print "Error: Should set the path of config file using '-c' or '--config'."
- sys.exit(-1)
- if not options.action:
- print "Error: Should set the action using '-a' or '--action'."
- sys.exit(-1)
- console_config = ConsoleConfig(options.config_file)
- if options.action == 'list_job':
- list_job(console_config, options)
- elif options.action == 'describe_job':
- describe_job(console_config, options)
|