1
0

tablestore_streamreader_console.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #!/bin/usr/env python
  2. #-*- coding: utf-8 -*-
  3. from optparse import OptionParser
  4. import sys
  5. import json
  6. import tabulate
  7. import zlib
  8. from ots2 import *
  9. class ConsoleConfig:
  10. def __init__(self, config_file):
  11. f = open(config_file, 'r')
  12. config = json.loads(f.read())
  13. self.endpoint = str(config['endpoint'])
  14. self.accessid = str(config['accessId'])
  15. self.accesskey = str(config['accessKey'])
  16. self.instance_name = str(config['instanceName'])
  17. self.status_table = str(config['statusTable'])
  18. self.ots = OTSClient(self.endpoint, self.accessid, self.accesskey, self.instance_name)
  19. def describe_job(config, options):
  20. '''
  21. 1. get job's description
  22. 2. get all job's checkpoints and check if it is done
  23. '''
  24. if not options.stream_id:
  25. print "Error: Should set the stream id using '-s' or '--streamid'."
  26. sys.exit(-1)
  27. if not options.timestamp:
  28. print "Error: Should set the timestamp using '-t' or '--timestamp'."
  29. sys.exit(-1)
  30. pk = [('StreamId', options.stream_id), ('StatusType', 'DataxJobDesc'), ('StatusValue', '%16d' % int(options.timestamp))]
  31. consumed, pk, attrs, next_token = config.ots.get_row(config.status_table, pk, [], None, 1)
  32. if not attrs:
  33. print 'Stream job is not found.'
  34. sys.exit(-1)
  35. job_detail = parse_job_detail(attrs)
  36. print '----------JobDescriptions----------'
  37. print json.dumps(job_detail, indent=2)
  38. print '-----------------------------------'
  39. stream_checkpoints = _list_checkpoints(config, options.stream_id, int(options.timestamp))
  40. cps_headers = ['ShardId', 'SendRecordCount', 'Checkpoint', 'SkipCount', 'Version']
  41. table_content = []
  42. for cp in stream_checkpoints:
  43. table_content.append([cp['ShardId'], cp['SendRecordCount'], cp['Checkpoint'], cp['SkipCount'], cp['Version']])
  44. print tabulate.tabulate(table_content, headers=cps_headers)
  45. # check if stream job has finished
  46. finished = True
  47. if len(job_detail['ShardIds']) != len(stream_checkpoints):
  48. finished = False
  49. for cp in stream_checkpoints:
  50. if cp['Version'] != job_detail['Version']:
  51. finished = False
  52. print '----------JobSummary----------'
  53. print 'ShardsCount:', len(job_detail['ShardIds'])
  54. print 'CheckPointsCount:', len(stream_checkpoints)
  55. print 'JobStatus:', 'Finished' if finished else 'NotFinished'
  56. print '------------------------------'
  57. def _list_checkpoints(config, stream_id, timestamp):
  58. start_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % timestamp)]
  59. end_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % (timestamp + 1))]
  60. consumed_counter = CapacityUnit(0, 0)
  61. columns_to_get = []
  62. checkpoints = []
  63. range_iter = config.ots.xget_range(
  64. config.status_table, Direction.FORWARD,
  65. start_pk, end_pk,
  66. consumed_counter, columns_to_get, 100,
  67. column_filter=None, max_version=1
  68. )
  69. rows = []
  70. for (primary_key, attrs) in range_iter:
  71. checkpoint = {}
  72. for attr in attrs:
  73. checkpoint[attr[0]] = attr[1]
  74. if not checkpoint.has_key('SendRecordCount'):
  75. checkpoint['SendRecordCount'] = 0
  76. checkpoint['ShardId'] = primary_key[2][1].split('\t')[1]
  77. checkpoints.append(checkpoint)
  78. return checkpoints
  79. def list_job(config, options):
  80. '''
  81. Two options:
  82. 1. list all jobs of stream
  83. 2. list all jobs and all streams
  84. '''
  85. consumed_counter = CapacityUnit(0, 0)
  86. if options.stream_id:
  87. start_pk = [('StreamId', options.stream_id), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
  88. end_pk = [('StreamId', options.stream_id), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
  89. else:
  90. start_pk = [('StreamId', INF_MIN), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
  91. end_pk = [('StreamId', INF_MAX), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
  92. columns_to_get = []
  93. range_iter = config.ots.xget_range(
  94. config.status_table, Direction.FORWARD,
  95. start_pk, end_pk,
  96. consumed_counter, columns_to_get, None,
  97. column_filter=None, max_version=1
  98. )
  99. rows = []
  100. for (primary_key, attrs) in range_iter:
  101. if primary_key[1][1] == 'DataxJobDesc':
  102. job_detail = parse_job_detail(attrs)
  103. rows.append([job_detail['TableName'], job_detail['JobStreamId'], job_detail['EndTime'], job_detail['StartTime'], job_detail['EndTime'], job_detail['Version']])
  104. headers = ['TableName', 'JobStreamId', 'Timestamp', 'StartTime', 'EndTime', 'Version']
  105. print tabulate.tabulate(rows, headers=headers)
  106. def parse_job_detail(attrs):
  107. job_details = {}
  108. shard_ids_content = ''
  109. for attr in attrs:
  110. if attr[0].startswith('ShardIds_'):
  111. shard_ids_content += attr[1]
  112. else:
  113. job_details[attr[0]] = attr[1]
  114. shard_ids = json.loads(zlib.decompress(shard_ids_content))
  115. if not job_details.has_key('Version'):
  116. job_details['Version'] = ''
  117. if not job_details.has_key('SkipCount'):
  118. job_details['SkipCount'] = 0
  119. job_details['ShardIds'] = shard_ids
  120. return job_details
  121. def parse_time(value):
  122. try:
  123. return int(value)
  124. except Exception,e:
  125. return int(time.mktime(time.strptime(value, '%Y-%m-%d %H:%M:%S')))
  126. if __name__ == '__main__':
  127. parser = OptionParser()
  128. parser.add_option('-c', '--config', dest='config_file', help='path of config file', metavar='tablestore_streamreader_config.json')
  129. parser.add_option('-a', '--action', dest='action', help='the action to do', choices = ['describe_job', 'list_job'], metavar='')
  130. parser.add_option('-t', '--timestamp', dest='timestamp', help='the timestamp', metavar='')
  131. parser.add_option('-s', '--streamid', dest='stream_id', help='the id of stream', metavar='')
  132. parser.add_option('-d', '--shardid', dest='shard_id', help='the id of shard', metavar='')
  133. options, args = parser.parse_args()
  134. if not options.config_file:
  135. print "Error: Should set the path of config file using '-c' or '--config'."
  136. sys.exit(-1)
  137. if not options.action:
  138. print "Error: Should set the action using '-a' or '--action'."
  139. sys.exit(-1)
  140. console_config = ConsoleConfig(options.config_file)
  141. if options.action == 'list_job':
  142. list_job(console_config, options)
  143. elif options.action == 'describe_job':
  144. describe_job(console_config, options)