twitter_stream_agent_spec.rb 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. require 'spec_helper'
  2. describe Agents::TwitterStreamAgent do
  3. before do
  4. @opts = {
  5. :consumer_key => "---",
  6. :consumer_secret => "---",
  7. :oauth_token => "---",
  8. :oauth_token_secret => "---",
  9. :filters => %w[keyword1 keyword2],
  10. :expected_update_period_in_days => "2",
  11. :generate => "events"
  12. }
  13. @agent = Agents::TwitterStreamAgent.new(:name => "HuginnBot", :options => @opts)
  14. @agent.service = services(:generic)
  15. @agent.user = users(:bob)
  16. @agent.save!
  17. end
  18. describe '#process_tweet' do
  19. context "when generate is set to 'counts'" do
  20. before do
  21. @agent.options[:generate] = 'counts'
  22. end
  23. it 'records counts' do
  24. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  25. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  26. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  27. @agent.reload
  28. expect(@agent.memory[:filter_counts][:keyword1]).to eq(2)
  29. expect(@agent.memory[:filter_counts][:keyword2]).to eq(1)
  30. end
  31. it 'records counts for keyword sets as well' do
  32. @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
  33. @agent.save!
  34. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  35. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  36. @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
  37. @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
  38. @agent.process_tweet('keyword1-3', {:text => "something", :user => {:name => "Mr. Someone"}})
  39. @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
  40. @agent.reload
  41. expect(@agent.memory[:filter_counts][:'keyword1-1']).to eq(4) # it stores on the first keyword
  42. expect(@agent.memory[:filter_counts][:keyword2]).to eq(2)
  43. end
  44. it 'removes unused keys' do
  45. @agent.memory[:filter_counts] = {:keyword1 => 2, :keyword2 => 3, :keyword3 => 4}
  46. @agent.save!
  47. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  48. expect(@agent.reload.memory[:filter_counts]).to eq({ 'keyword1' => 3, 'keyword2' => 3 })
  49. end
  50. end
  51. context "when generate is set to 'events'" do
  52. it 'emits events immediately' do
  53. expect {
  54. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  55. }.to change { @agent.events.count }.by(1)
  56. expect(@agent.events.last.payload).to eq({
  57. 'filter' => 'keyword1',
  58. 'text' => "something",
  59. 'user' => { 'name' => "Mr. Someone" }
  60. })
  61. end
  62. it 'handles keyword sets too' do
  63. @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
  64. @agent.save!
  65. expect {
  66. @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
  67. }.to change { @agent.events.count }.by(1)
  68. expect(@agent.events.last.payload).to eq({
  69. 'filter' => 'keyword1-1',
  70. 'text' => "something",
  71. 'user' => { 'name' => "Mr. Someone" }
  72. })
  73. end
  74. end
  75. end
  76. describe '#check' do
  77. context "when generate is set to 'counts'" do
  78. before do
  79. @agent.options[:generate] = 'counts'
  80. @agent.save!
  81. end
  82. it 'emits events' do
  83. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  84. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  85. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  86. expect {
  87. @agent.reload.check
  88. }.to change { @agent.events.count }.by(2)
  89. expect(@agent.events[-1].payload[:filter]).to eq('keyword1')
  90. expect(@agent.events[-1].payload[:count]).to eq(2)
  91. expect(@agent.events[-2].payload[:filter]).to eq('keyword2')
  92. expect(@agent.events[-2].payload[:count]).to eq(1)
  93. expect(@agent.memory[:filter_counts]).to eq({})
  94. end
  95. end
  96. context "when generate is not set to 'counts'" do
  97. it 'does nothing' do
  98. @agent.memory[:filter_counts] = { :keyword1 => 2 }
  99. @agent.save!
  100. expect {
  101. @agent.reload.check
  102. }.not_to change { Event.count }
  103. expect(@agent.memory[:filter_counts]).to eq({})
  104. end
  105. end
  106. end
  107. context "#setup_worker" do
  108. it "ensures the dependencies are available" do
  109. mock(STDERR).puts(Agents::TwitterStreamAgent.twitter_dependencies_missing)
  110. mock(Agents::TwitterStreamAgent).dependencies_missing? { true }
  111. expect(Agents::TwitterStreamAgent.setup_worker).to eq(false)
  112. end
  113. it "returns now workers if no agent is active" do
  114. mock(Agents::TwitterStreamAgent).active { [] }
  115. expect(Agents::TwitterStreamAgent.setup_worker).to eq([])
  116. end
  117. it "returns a worker for an active agent" do
  118. mock(Agents::TwitterStreamAgent).active { [@agent] }
  119. workers = Agents::TwitterStreamAgent.setup_worker
  120. expect(workers).to be_a(Array)
  121. expect(workers.length).to eq(1)
  122. expect(workers.first).to be_a(Agents::TwitterStreamAgent::Worker)
  123. filter_to_agent_map = workers.first.config[:filter_to_agent_map]
  124. expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2'])
  125. expect(filter_to_agent_map.values).to eq([[@agent], [@agent]])
  126. end
  127. it "correctly maps keywords to agents" do
  128. agent2 = @agent.dup
  129. agent2.id = 123455
  130. agent2.options[:filters] = ['agent2']
  131. mock(Agents::TwitterStreamAgent).active { [@agent, agent2] }
  132. workers = Agents::TwitterStreamAgent.setup_worker
  133. filter_to_agent_map = workers.first.config[:filter_to_agent_map]
  134. expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2', 'agent2'])
  135. expect(filter_to_agent_map['keyword1']).to eq([@agent])
  136. expect(filter_to_agent_map['agent2']).to eq([agent2])
  137. end
  138. end
  139. describe Agents::TwitterStreamAgent::Worker do
  140. before(:each) do
  141. @mock_agent = mock!
  142. @config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
  143. @worker = Agents::TwitterStreamAgent::Worker.new(@config)
  144. @worker.instance_variable_set(:@recent_tweets, [])
  145. mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
  146. @worker.setup!(nil, Mutex.new)
  147. end
  148. context "#run" do
  149. it "starts the stream" do
  150. mock(EventMachine).run.yields
  151. mock(@worker).stream!(['agent'], @agent)
  152. mock(Thread).stop
  153. @worker.run
  154. end
  155. it "yields received tweets" do
  156. mock(EventMachine).run.yields
  157. mock(@worker).stream!(['agent'], @agent).yields('status' => 'hello')
  158. mock(@worker).handle_status('status' => 'hello')
  159. mock(Thread).stop
  160. @worker.run
  161. end
  162. end
  163. context "#stop" do
  164. it "stops the thread" do
  165. mock(@worker.thread).terminate
  166. @worker.stop
  167. end
  168. end
  169. context "stream!" do
  170. def stub_without(method = nil)
  171. stream_stub = stub!
  172. stream_stub.each_item if method != :each_item
  173. stream_stub.on_error if method != :on_error
  174. stream_stub.on_no_data if method != :on_no_data
  175. stream_stub.on_max_reconnects if method != :on_max_reconnects
  176. stub(Twitter::JSONStream).connect { stream_stub }
  177. stream_stub
  178. end
  179. it "initializes Twitter::JSONStream" do
  180. mock(Twitter::JSONStream).connect({:path=>"/1/statuses/filter.json?track=agent",
  181. :ssl=>true, :oauth=>{:consumer_key=>"twitteroauthkey",
  182. :consumer_secret=>"twitteroauthsecret",
  183. :access_key=>"1234token",
  184. :access_secret=>"56789secret"}
  185. }) { stub_without }
  186. @worker.send(:stream!, ['agent'], @agent)
  187. end
  188. context "callback handling" do
  189. it "logs error messages" do
  190. stub_without(:on_error).on_error.yields('woups')
  191. mock(STDERR).puts(anything) { |text| expect(text).to match(/woups/) }
  192. @worker.send(:stream!, ['agent'], @agent)
  193. end
  194. it "stop when no data was received"do
  195. stub_without(:on_no_data).on_no_data.yields
  196. mock(@worker).restart!
  197. mock(STDERR).puts(anything)
  198. @worker.send(:stream!, ['agent'], @agent)
  199. end
  200. it "sleeps for 60 seconds on_max_reconnects" do
  201. stub_without(:on_max_reconnects).on_max_reconnects.yields
  202. mock(STDERR).puts(anything)
  203. mock(@worker).sleep(60)
  204. mock(@worker).restart!
  205. @worker.send(:stream!, ['agent'], @agent)
  206. end
  207. it "yields every status received" do
  208. stub_without(:each_item).each_item.yields({'text' => 'hello'})
  209. @worker.send(:stream!, ['agent'], @agent) do |status|
  210. expect(status).to eq({'text' => 'hello'})
  211. end
  212. end
  213. end
  214. end
  215. context "#handle_status" do
  216. it "skips retweets" do
  217. @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}, 'id_str' => '123' })
  218. expect(@worker.instance_variable_get(:'@recent_tweets')).not_to include('123')
  219. end
  220. it "deduplicates tweets" do
  221. @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
  222. @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
  223. expect(@worker.instance_variable_get(:'@recent_tweets').select { |str| str == '1' }.length).to eq 1
  224. end
  225. it "calls the agent to process the tweet" do
  226. mock(@mock_agent).name { 'mock' }
  227. mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
  228. @worker.send(:handle_status, {'text' => 'agent', 'id_str' => '123'})
  229. expect(@worker.instance_variable_get(:'@recent_tweets')).to include('123')
  230. end
  231. end
  232. end
  233. end