twitter_stream_agent_spec.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. require 'rails_helper'
  2. describe Agents::TwitterStreamAgent do
  3. before do
  4. @opts = {
  5. :consumer_key => "---",
  6. :consumer_secret => "---",
  7. :oauth_token => "---",
  8. :oauth_token_secret => "---",
  9. :filters => %w[keyword1 keyword2],
  10. :expected_update_period_in_days => "2",
  11. :generate => "events",
  12. :include_retweets => "false"
  13. }
  14. @agent = Agents::TwitterStreamAgent.new(:name => "HuginnBot", :options => @opts)
  15. @agent.service = services(:generic)
  16. @agent.user = users(:bob)
  17. @agent.save!
  18. end
  19. describe '#process_tweet' do
  20. context "when generate is set to 'counts'" do
  21. before do
  22. @agent.options[:generate] = 'counts'
  23. end
  24. it 'records counts' do
  25. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  26. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  27. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  28. @agent.reload
  29. expect(@agent.memory[:filter_counts][:keyword1]).to eq(2)
  30. expect(@agent.memory[:filter_counts][:keyword2]).to eq(1)
  31. end
  32. it 'records counts for keyword sets as well' do
  33. @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
  34. @agent.save!
  35. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  36. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  37. @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
  38. @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
  39. @agent.process_tweet('keyword1-3', {:text => "something", :user => {:name => "Mr. Someone"}})
  40. @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
  41. @agent.reload
  42. expect(@agent.memory[:filter_counts][:'keyword1-1']).to eq(4) # it stores on the first keyword
  43. expect(@agent.memory[:filter_counts][:keyword2]).to eq(2)
  44. end
  45. it 'removes unused keys' do
  46. @agent.memory[:filter_counts] = {:keyword1 => 2, :keyword2 => 3, :keyword3 => 4}
  47. @agent.save!
  48. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  49. expect(@agent.reload.memory[:filter_counts]).to eq({ 'keyword1' => 3, 'keyword2' => 3 })
  50. end
  51. end
  52. context "when generate is set to 'events'" do
  53. it 'emits events immediately' do
  54. expect {
  55. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  56. }.to change { @agent.events.count }.by(1)
  57. expect(@agent.events.last.payload).to eq({
  58. 'filter' => 'keyword1',
  59. 'text' => "something",
  60. 'user' => { 'name' => "Mr. Someone" }
  61. })
  62. end
  63. it 'handles keyword sets too' do
  64. @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
  65. @agent.save!
  66. expect {
  67. @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
  68. }.to change { @agent.events.count }.by(1)
  69. expect(@agent.events.last.payload).to eq({
  70. 'filter' => 'keyword1-1',
  71. 'text' => "something",
  72. 'user' => { 'name' => "Mr. Someone" }
  73. })
  74. end
  75. end
  76. end
  77. describe '#check' do
  78. context "when generate is set to 'counts'" do
  79. before do
  80. @agent.options[:generate] = 'counts'
  81. @agent.save!
  82. end
  83. it 'emits events' do
  84. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  85. @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
  86. @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
  87. expect {
  88. @agent.reload.check
  89. }.to change { @agent.events.count }.by(2)
  90. expect(@agent.events[-1].payload[:filter]).to eq('keyword1')
  91. expect(@agent.events[-1].payload[:count]).to eq(2)
  92. expect(@agent.events[-2].payload[:filter]).to eq('keyword2')
  93. expect(@agent.events[-2].payload[:count]).to eq(1)
  94. expect(@agent.memory[:filter_counts]).to eq({})
  95. end
  96. end
  97. context "when generate is not set to 'counts'" do
  98. it 'does nothing' do
  99. @agent.memory[:filter_counts] = { :keyword1 => 2 }
  100. @agent.save!
  101. expect {
  102. @agent.reload.check
  103. }.not_to change { Event.count }
  104. expect(@agent.memory[:filter_counts]).to eq({})
  105. end
  106. end
  107. end
  108. context "#setup_worker" do
  109. it "ensures the dependencies are available" do
  110. expect(STDERR).to receive(:puts).with(Agents::TwitterStreamAgent.twitter_dependencies_missing)
  111. expect(Agents::TwitterStreamAgent).to receive(:dependencies_missing?) { true }
  112. expect(Agents::TwitterStreamAgent.setup_worker).to eq(false)
  113. end
  114. it "returns now workers if no agent is active" do
  115. @agent.destroy
  116. expect(Agents::TwitterStreamAgent.active).to be_empty
  117. expect(Agents::TwitterStreamAgent.setup_worker).to eq([])
  118. end
  119. it "returns a worker for an active agent" do
  120. expect(Agents::TwitterStreamAgent.active).to eq([@agent])
  121. workers = Agents::TwitterStreamAgent.setup_worker
  122. expect(workers).to be_a(Array)
  123. expect(workers.length).to eq(1)
  124. expect(workers.first).to be_a(Agents::TwitterStreamAgent::Worker)
  125. filter_to_agent_map = workers.first.config[:filter_to_agent_map]
  126. expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2'])
  127. expect(filter_to_agent_map.values).to eq([[@agent], [@agent]])
  128. end
  129. it "correctly maps keywords to agents" do
  130. agent2 = @agent.dup
  131. agent2.options[:filters] = ['agent2']
  132. agent2.save!
  133. expect(Agents::TwitterStreamAgent.active.order(:id).pluck(:id)).to eq([@agent.id, agent2.id])
  134. workers = Agents::TwitterStreamAgent.setup_worker
  135. filter_to_agent_map = workers.first.config[:filter_to_agent_map]
  136. expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2', 'agent2'])
  137. expect(filter_to_agent_map['keyword1']).to eq([@agent])
  138. expect(filter_to_agent_map['agent2']).to eq([agent2])
  139. end
  140. end
  141. describe Agents::TwitterStreamAgent::Worker do
  142. before(:each) do
  143. @mock_agent = double
  144. @config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
  145. @worker = Agents::TwitterStreamAgent::Worker.new(@config)
  146. @worker.instance_variable_set(:@recent_tweets, [])
  147. #mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
  148. @worker.setup!(nil, Mutex.new)
  149. end
  150. context "#run" do
  151. before(:each) do
  152. allow(EventMachine).to receive(:run).and_yield
  153. allow(EventMachine).to receive(:add_periodic_timer).with(3600)
  154. end
  155. it "starts the stream" do
  156. expect(@worker).to receive(:stream!).with(['agent'], @agent)
  157. expect(Thread).to receive(:stop)
  158. @worker.run
  159. end
  160. it "yields received tweets" do
  161. expect(@worker).to receive(:stream!).with(['agent'], @agent).and_yield('status' => 'hello')
  162. expect(@worker).to receive(:handle_status).with('status' => 'hello')
  163. expect(Thread).to receive(:stop)
  164. @worker.run
  165. end
  166. end
  167. context "#stop" do
  168. it "stops the thread" do
  169. expect(@worker).to receive(:terminate_thread!)
  170. @worker.stop
  171. end
  172. end
  173. context "stream!" do
  174. def stream_stub
  175. stream = double(
  176. each_item: nil,
  177. on_error: nil,
  178. on_no_data: nil,
  179. on_max_reconnects: nil,
  180. )
  181. allow(Twitter::JSONStream).to receive(:connect) { stream }
  182. stream
  183. end
  184. def stream_stub_yielding(**pairs)
  185. stream = stream_stub
  186. pairs.each do |method, args|
  187. expect(stream).to receive(method).and_yield(*args)
  188. end
  189. stream
  190. end
  191. it "initializes Twitter::JSONStream" do
  192. expect(Twitter::JSONStream).to receive(:connect).with({:path=>"/1.1/statuses/filter.json?track=agent",
  193. :ssl=>true, :oauth=>{:consumer_key=>"twitteroauthkey",
  194. :consumer_secret=>"twitteroauthsecret",
  195. :access_key=>"1234token",
  196. :access_secret=>"56789secret"}
  197. }) { stream_stub }
  198. @worker.send(:stream!, ['agent'], @agent)
  199. end
  200. context "callback handling" do
  201. it "logs error messages" do
  202. stream_stub_yielding(on_error: ['woups'])
  203. expect(STDERR).to receive(:puts).with(anything) { |text| expect(text).to match(/woups/) }
  204. expect(STDERR).to receive(:puts).with(anything) { |text| expect(text).to match(/Sleeping/) }
  205. expect(@worker).to receive(:sleep).with(15)
  206. expect(@worker).to receive(:restart!)
  207. @worker.send(:stream!, ['agent'], @agent)
  208. end
  209. it "stop when no data was received"do
  210. stream_stub_yielding(on_no_data: ['woups'])
  211. expect(@worker).to receive(:restart!)
  212. expect(STDERR).to receive(:puts).with(anything)
  213. @worker.send(:stream!, ['agent'], @agent)
  214. end
  215. it "sleeps for 60 seconds on_max_reconnects" do
  216. stream_stub_yielding(on_max_reconnects: [1, 1])
  217. expect(STDERR).to receive(:puts).with(anything)
  218. expect(@worker).to receive(:sleep).with(60)
  219. expect(@worker).to receive(:restart!)
  220. @worker.send(:stream!, ['agent'], @agent)
  221. end
  222. it "yields every status received" do
  223. stream_stub_yielding(each_item: [{'text' => 'hello'}])
  224. @worker.send(:stream!, ['agent'], @agent) do |status|
  225. expect(status).to eq({'text' => 'hello'})
  226. end
  227. end
  228. end
  229. end
  230. context "#handle_status" do
  231. it "skips retweets" do
  232. @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}, 'id_str' => '123' })
  233. expect(@worker.instance_variable_get(:'@recent_tweets')).not_to include('123')
  234. end
  235. it "includes retweets if configured" do
  236. @agent.options[:include_retweets] = 'true'
  237. @agent.save!
  238. @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}, 'id_str' => '1234' })
  239. expect(@worker.instance_variable_get(:'@recent_tweets')).to include('1234')
  240. end
  241. it "deduplicates tweets" do
  242. @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
  243. expect(@worker).to receive(:puts).with(anything) { |text| expect(text).to match(/Skipping/) }
  244. @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
  245. expect(@worker.instance_variable_get(:'@recent_tweets').select { |str| str == '1' }.length).to eq 1
  246. end
  247. it "calls the agent to process the tweet" do
  248. expect(@mock_agent).to receive(:name) { 'mock' }
  249. expect(@mock_agent).to receive(:process_tweet).with('agent', {'text' => 'agent', 'id_str' => '123'})
  250. expect(@worker).to receive(:puts).with(a_string_matching(/received/))
  251. @worker.send(:handle_status, {'text' => 'agent', 'id_str' => '123'})
  252. expect(@worker.instance_variable_get(:'@recent_tweets')).to include('123')
  253. end
  254. end
  255. end
  256. end