RedisHelper.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # coding:utf-8
  2. from __future__ import unicode_literals
  3. from redis import Redis
  4. import config
  5. from db.ISqlHelper import ISqlHelper
  6. from db.SqlHelper import Proxy
  7. class RedisHelper(ISqlHelper):
  8. def __init__(self, url=None):
  9. self.index_names = ('types', 'protocol', 'country', 'area', 'score')
  10. self.redis_url = url or config.DB_CONFIG['DB_CONNECT_STRING']
  11. def get_proxy_name(self, ip=None, port=None, protocal=None, proxy=None):
  12. ip = ip or proxy.ip
  13. port = port or proxy.port
  14. protocal = protocal or proxy.protocol
  15. return "proxy::{}:{}:{}".format(ip, port, protocal)
  16. def get_index_name(self, index_name, value=None):
  17. if index_name == 'score':
  18. return 'index::score'
  19. return "index::{}:{}".format(index_name, value)
  20. def get_proxy_by_name(self, name):
  21. pd = self.redis.hgetall(name)
  22. if pd:
  23. return Proxy(**{k.decode('utf8'): v.decode('utf8') for k, v in pd.items()})
  24. def init_db(self, url=None):
  25. self.redis = Redis.from_url(url or self.redis_url)
  26. def drop_db(self):
  27. return self.redis.flushdb()
  28. def get_keys(self, conditions):
  29. select_keys = {self.get_index_name(key, conditions[key]) for key in conditions.keys() if
  30. key in self.index_names}
  31. if 'ip' in conditions and 'port' in conditions:
  32. return self.redis.keys(self.get_proxy_name(conditions['ip'], conditions['port'], '*'))
  33. if select_keys:
  34. return [name.decode('utf8') for name in self.redis.sinter(keys=select_keys)]
  35. return []
  36. def insert(self, value):
  37. proxy = Proxy(ip=value['ip'], port=value['port'], types=value['types'], protocol=value['protocol'],
  38. country=value['country'], area=value['area'],
  39. speed=value['speed'], score=value.get('score', config.DEFAULT_SCORE))
  40. mapping = proxy.__dict__
  41. for k in list(mapping.keys()):
  42. if k.startswith('_'):
  43. mapping.pop(k)
  44. object_name = self.get_proxy_name(proxy=proxy)
  45. # 存结构
  46. insert_num = self.redis.hmset(object_name, mapping)
  47. # 创建索引
  48. if insert_num > 0:
  49. for index_name in self.index_names:
  50. self.create_index(index_name, object_name, proxy)
  51. return insert_num
  52. def create_index(self, index_name, object_name, proxy):
  53. redis_key = self.get_index_name(index_name, getattr(proxy, index_name))
  54. if index_name == 'score':
  55. return self.redis.zadd(redis_key, object_name, int(proxy.score))
  56. return self.redis.sadd(redis_key, object_name)
  57. def delete(self, conditions):
  58. proxy_keys = self.get_keys(conditions)
  59. index_keys = self.redis.keys(u"index::*")
  60. if not proxy_keys:
  61. return 0
  62. for iname in index_keys:
  63. if iname == b'index::score':
  64. self.redis.zrem(self.get_index_name('score'), *proxy_keys)
  65. else:
  66. self.redis.srem(iname, *proxy_keys)
  67. return self.redis.delete(*proxy_keys) if proxy_keys else 0
  68. def update(self, conditions, values):
  69. objects = self.get_keys(conditions)
  70. count = 0
  71. for name in objects:
  72. for k, v in values.items():
  73. if k == 'score':
  74. self.redis.zrem(self.get_index_name('score'), [name])
  75. self.redis.zadd(self.get_index_name('score'), name, int(v))
  76. self.redis.hset(name, key=k, value=v)
  77. count += 1
  78. return count
  79. def select(self, count=None, conditions=None):
  80. count = (count and int(count)) or 1000 # 最多返回1000条数据
  81. count = 1000 if count > 1000 else count
  82. querys = {k: v for k, v in conditions.items() if k in self.index_names} if conditions else None
  83. if querys:
  84. objects = list(self.get_keys(querys))[:count]
  85. redis_name = self.get_index_name('score')
  86. objects.sort(key=lambda x: int(self.redis.zscore(redis_name, x)))
  87. else:
  88. objects = list(
  89. self.redis.zrevrangebyscore(self.get_index_name("score"), '+inf', '-inf', start=0, num=count))
  90. result = []
  91. for name in objects:
  92. p = self.get_proxy_by_name(name)
  93. result.append((p.ip, p.port, p.score))
  94. return result
  95. if __name__ == '__main__':
  96. sqlhelper = RedisHelper()
  97. sqlhelper.init_db('redis://localhost:6379/9')
  98. proxy = {'ip': '192.168.1.1', 'port': 80, 'type': 0, 'protocol': 0, 'country': '中国', 'area': '广州', 'speed': 11.123,
  99. 'types': 1}
  100. proxy2 = {'ip': 'localhost', 'port': 433, 'type': 1, 'protocol': 1, 'country': u'中国', 'area': u'广州', 'speed': 123,
  101. 'types': 0, 'score': 100}
  102. assert sqlhelper.insert(proxy) == True
  103. assert sqlhelper.insert(proxy2) == True
  104. assert sqlhelper.get_keys({'types': 1}) == ['proxy::192.168.1.1:80:0', ], sqlhelper.get_keys({'types': 1})
  105. assert sqlhelper.select(conditions={'protocol': 0}) == [('192.168.1.1', '80', '0')]
  106. assert sqlhelper.update({'types': 1}, {'score': 888}) == 1
  107. assert sqlhelper.select() == [('192.168.1.1', '80', '888'), ('localhost', '433', '100')]
  108. # assert sqlhelper.delete({'types': 1}) == 1
  109. # sqlhelper.drop_db()
  110. print('All pass.')