ProxyCrawl.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # coding:utf-8
  2. from gevent import monkey
  3. monkey.patch_all()
  4. import sys
  5. import time
  6. import gevent
  7. from gevent.pool import Pool
  8. from multiprocessing import Queue, Process, Value
  9. from api.apiServer import start_api_server
  10. from config import THREADNUM, parserList, UPDATE_TIME, MINNUM, MAX_CHECK_CONCURRENT_PER_PROCESS, MAX_DOWNLOAD_CONCURRENT
  11. from db.DataStore import store_data, sqlhelper
  12. from spider.HtmlDownloader import Html_Downloader
  13. from spider.HtmlPraser import Html_Parser
  14. from validator.Validator import validator, getMyIP, detect_from_db
  15. '''
  16. 这个类的作用是描述爬虫的逻辑
  17. '''
  18. def startProxyCrawl(queue, db_proxy_num,myip):
  19. crawl = ProxyCrawl(queue, db_proxy_num,myip)
  20. crawl.run()
  21. class ProxyCrawl(object):
  22. proxies = set()
  23. def __init__(self, queue, db_proxy_num,myip):
  24. self.crawl_pool = Pool(THREADNUM)
  25. self.queue = queue
  26. self.db_proxy_num = db_proxy_num
  27. self.myip = myip
  28. def run(self):
  29. while True:
  30. self.proxies.clear()
  31. str = 'IPProxyPool----->>>>>>>>beginning'
  32. sys.stdout.write(str + "\r\n")
  33. sys.stdout.flush()
  34. proxylist = sqlhelper.select()
  35. spawns = []
  36. for proxy in proxylist:
  37. spawns.append(gevent.spawn(detect_from_db, self.myip, proxy, self.proxies))
  38. if len(spawns) >= MAX_CHECK_CONCURRENT_PER_PROCESS:
  39. gevent.joinall(spawns)
  40. spawns= []
  41. gevent.joinall(spawns)
  42. self.db_proxy_num.value = len(self.proxies)
  43. str = 'IPProxyPool----->>>>>>>>db exists ip:%d' % len(self.proxies)
  44. if len(self.proxies) < MINNUM:
  45. str += '\r\nIPProxyPool----->>>>>>>>now ip num < MINNUM,start crawling...'
  46. sys.stdout.write(str + "\r\n")
  47. sys.stdout.flush()
  48. spawns = []
  49. for p in parserList:
  50. spawns.append(gevent.spawn(self.crawl, p))
  51. if len(spawns) >= MAX_DOWNLOAD_CONCURRENT:
  52. gevent.joinall(spawns)
  53. spawns= []
  54. gevent.joinall(spawns)
  55. else:
  56. str += '\r\nIPProxyPool----->>>>>>>>now ip num meet the requirement,wait UPDATE_TIME...'
  57. sys.stdout.write(str + "\r\n")
  58. sys.stdout.flush()
  59. time.sleep(UPDATE_TIME)
  60. def crawl(self, parser):
  61. html_parser = Html_Parser()
  62. for url in parser['urls']:
  63. response = Html_Downloader.download(url)
  64. if response is not None:
  65. proxylist = html_parser.parse(response, parser)
  66. if proxylist is not None:
  67. for proxy in proxylist:
  68. proxy_str = '%s:%s' % (proxy['ip'], proxy['port'])
  69. if proxy_str not in self.proxies:
  70. self.proxies.add(proxy_str)
  71. while True:
  72. if self.queue.full():
  73. time.sleep(0.1)
  74. else:
  75. self.queue.put(proxy)
  76. break
  77. if __name__ == "__main__":
  78. DB_PROXY_NUM = Value('i', 0)
  79. q1 = Queue()
  80. q2 = Queue()
  81. p0 = Process(target=start_api_server)
  82. p1 = Process(target=startProxyCrawl, args=(q1, DB_PROXY_NUM))
  83. p2 = Process(target=validator, args=(q1, q2))
  84. p3 = Process(target=store_data, args=(q2, DB_PROXY_NUM))
  85. p0.start()
  86. p1.start()
  87. p2.start()
  88. p3.start()
  89. # spider = ProxyCrawl()
  90. # spider.run()