123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- require 'rails_helper'
- describe Agents::TwitterStreamAgent do
- before do
- @opts = {
- consumer_key: "---",
- consumer_secret: "---",
- oauth_token: "---",
- oauth_token_secret: "---",
- filters: %w[keyword1 keyword2],
- expected_update_period_in_days: "2",
- generate: "events",
- include_retweets: "false"
- }
- @agent = Agents::TwitterStreamAgent.new(name: "HuginnBot", options: @opts)
- @agent.service = services(:generic)
- @agent.user = users(:bob)
- @agent.save!
- end
- describe '#process_tweet' do
- context "when generate is set to 'counts'" do
- before do
- @agent.options[:generate] = 'counts'
- end
- it 'records counts' do
- @agent.process_tweet('keyword1', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword2', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword1', { text: "something", user: { name: "Mr. Someone" } })
- @agent.reload
- expect(@agent.memory[:filter_counts][:keyword1]).to eq(2)
- expect(@agent.memory[:filter_counts][:keyword2]).to eq(1)
- end
- it 'records counts for keyword sets as well' do
- @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
- @agent.save!
- @agent.process_tweet('keyword2', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword2', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword1-1', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword1-2', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword1-3', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword1-1', { text: "something", user: { name: "Mr. Someone" } })
- @agent.reload
- expect(@agent.memory[:filter_counts][:'keyword1-1']).to eq(4) # it stores on the first keyword
- expect(@agent.memory[:filter_counts][:keyword2]).to eq(2)
- end
- it 'removes unused keys' do
- @agent.memory[:filter_counts] = { keyword1: 2, keyword2: 3, keyword3: 4 }
- @agent.save!
- @agent.process_tweet('keyword1', { text: "something", user: { name: "Mr. Someone" } })
- expect(@agent.reload.memory[:filter_counts]).to eq({ 'keyword1' => 3, 'keyword2' => 3 })
- end
- end
- context "when generate is set to 'events'" do
- it 'emits events immediately' do
- expect {
- @agent.process_tweet('keyword1', { text: "something", user: { name: "Mr. Someone" } })
- }.to change { @agent.events.count }.by(1)
- expect(@agent.events.last.payload).to eq({
- 'filter' => 'keyword1',
- 'text' => "something",
- 'user' => { 'name' => "Mr. Someone" }
- })
- end
- it 'handles keyword sets too' do
- @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
- @agent.save!
- expect {
- @agent.process_tweet('keyword1-2', { text: "something", user: { name: "Mr. Someone" } })
- }.to change { @agent.events.count }.by(1)
- expect(@agent.events.last.payload).to eq({
- 'filter' => 'keyword1-1',
- 'text' => "something",
- 'user' => { 'name' => "Mr. Someone" }
- })
- end
- end
- end
- describe '#check' do
- context "when generate is set to 'counts'" do
- before do
- @agent.options[:generate] = 'counts'
- @agent.save!
- end
- it 'emits events' do
- @agent.process_tweet('keyword1', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword2', { text: "something", user: { name: "Mr. Someone" } })
- @agent.process_tweet('keyword1', { text: "something", user: { name: "Mr. Someone" } })
- expect {
- @agent.reload.check
- }.to change { @agent.events.count }.by(2)
- expect(@agent.events[-1].payload[:filter]).to eq('keyword1')
- expect(@agent.events[-1].payload[:count]).to eq(2)
- expect(@agent.events[-2].payload[:filter]).to eq('keyword2')
- expect(@agent.events[-2].payload[:count]).to eq(1)
- expect(@agent.memory[:filter_counts]).to eq({})
- end
- end
- context "when generate is not set to 'counts'" do
- it 'does nothing' do
- @agent.memory[:filter_counts] = { keyword1: 2 }
- @agent.save!
- expect {
- @agent.reload.check
- }.not_to change { Event.count }
- expect(@agent.memory[:filter_counts]).to eq({})
- end
- end
- end
- context "#setup_worker" do
- it "ensures the dependencies are available" do
- expect(Agents::TwitterStreamAgent).to receive(:warn).with(Agents::TwitterStreamAgent.twitter_dependencies_missing)
- expect(Agents::TwitterStreamAgent).to receive(:dependencies_missing?) { true }
- expect(Agents::TwitterStreamAgent.setup_worker).to eq(false)
- end
- it "returns now workers if no agent is active" do
- @agent.destroy
- expect(Agents::TwitterStreamAgent.active).to be_empty
- expect(Agents::TwitterStreamAgent.setup_worker).to eq([])
- end
- it "returns a worker for an active agent" do
- expect(Agents::TwitterStreamAgent.active).to eq([@agent])
- workers = Agents::TwitterStreamAgent.setup_worker
- expect(workers).to be_a(Array)
- expect(workers.length).to eq(1)
- expect(workers.first).to be_a(Agents::TwitterStreamAgent::Worker)
- filter_to_agent_map = workers.first.config[:filter_to_agent_map]
- expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2'])
- expect(filter_to_agent_map.values).to eq([[@agent], [@agent]])
- end
- it "correctly maps keywords to agents" do
- agent2 = @agent.dup
- agent2.options[:filters] = ['agent2']
- agent2.save!
- expect(Agents::TwitterStreamAgent.active.order(:id).pluck(:id)).to eq([@agent.id, agent2.id])
- workers = Agents::TwitterStreamAgent.setup_worker
- filter_to_agent_map = workers.first.config[:filter_to_agent_map]
- expect(filter_to_agent_map.keys).to eq(['keyword1', 'keyword2', 'agent2'])
- expect(filter_to_agent_map['keyword1']).to eq([@agent])
- expect(filter_to_agent_map['agent2']).to eq([agent2])
- end
- end
- describe Agents::TwitterStreamAgent::Worker do
- before(:each) do
- @mock_agent = double
- @config = { agent: @agent, config: { filter_to_agent_map: { 'agent' => [@mock_agent] } } }
- @worker = Agents::TwitterStreamAgent::Worker.new(@config)
- @worker.instance_variable_set(:@recent_tweets, [])
- # mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
- @worker.setup!(nil, Mutex.new)
- end
- context "#run" do
- before(:each) do
- allow(EventMachine).to receive(:run).and_yield
- allow(EventMachine).to receive(:add_periodic_timer).with(3600)
- end
- it "starts the stream" do
- expect(@worker).to receive(:stream!).with(['agent'], @agent)
- expect(Thread).to receive(:stop)
- @worker.run
- end
- it "yields received tweets" do
- expect(@worker).to receive(:stream!).with(['agent'], @agent).and_yield('status' => 'hello')
- expect(@worker).to receive(:handle_status).with({ 'status' => 'hello' })
- expect(Thread).to receive(:stop)
- @worker.run
- end
- end
- context "#stop" do
- it "stops the thread" do
- expect(@worker).to receive(:terminate_thread!)
- @worker.stop
- end
- end
- context "stream!" do
- def stream_stub
- stream = double(
- each_item: nil,
- on_error: nil,
- on_no_data: nil,
- on_max_reconnects: nil,
- )
- allow(Twitter::JSONStream).to receive(:connect) { stream }
- stream
- end
- def stream_stub_yielding(**pairs)
- stream = stream_stub
- pairs.each do |method, args|
- expect(stream).to receive(method).and_yield(*args)
- end
- stream
- end
- it "initializes Twitter::JSONStream" do
- expect(Twitter::JSONStream).to receive(:connect).with({ path: "/1.1/statuses/filter.json?track=agent",
- ssl: true, oauth: { consumer_key: "twitteroauthkey",
- consumer_secret: "twitteroauthsecret",
- access_key: "1234token",
- access_secret: "56789secret" } }) { stream_stub }
- @worker.send(:stream!, ['agent'], @agent)
- end
- context "callback handling" do
- it "logs error messages" do
- stream_stub_yielding(on_error: ['woups'])
- expect(@worker).to receive(:warn).with(anything) { |text| expect(text).to match(/woups/) }
- expect(@worker).to receive(:warn).with(anything) { |text| expect(text).to match(/Sleeping/) }
- expect(@worker).to receive(:sleep).with(15)
- expect(@worker).to receive(:restart!)
- @worker.send(:stream!, ['agent'], @agent)
- end
- it "stop when no data was received" do
- stream_stub_yielding(on_no_data: ['woups'])
- expect(@worker).to receive(:restart!)
- expect(@worker).to receive(:warn).with(anything)
- @worker.send(:stream!, ['agent'], @agent)
- end
- it "sleeps for 60 seconds on_max_reconnects" do
- stream_stub_yielding(on_max_reconnects: [1, 1])
- expect(@worker).to receive(:warn).with(anything)
- expect(@worker).to receive(:sleep).with(60)
- expect(@worker).to receive(:restart!)
- @worker.send(:stream!, ['agent'], @agent)
- end
- it "yields every status received" do
- stream_stub_yielding(each_item: [{ 'text' => 'hello' }])
- @worker.send(:stream!, ['agent'], @agent) do |status|
- expect(status).to eq({ 'text' => 'hello' })
- end
- end
- end
- end
- context "#handle_status" do
- it "skips retweets" do
- @worker.send(:handle_status, { 'text' => 'retweet', 'retweeted_status' => { one: true }, 'id_str' => '123' })
- expect(@worker.instance_variable_get(:'@recent_tweets')).not_to include('123')
- end
- it "includes retweets if configured" do
- @agent.options[:include_retweets] = 'true'
- @agent.save!
- @worker.send(:handle_status, { 'text' => 'retweet', 'retweeted_status' => { one: true }, 'id_str' => '1234' })
- expect(@worker.instance_variable_get(:'@recent_tweets')).to include('1234')
- end
- it "deduplicates tweets" do
- @worker.send(:handle_status, { 'text' => 'dup', 'id_str' => '1' })
- expect(@worker).to receive(:puts).with(anything) { |text| expect(text).to match(/Skipping/) }
- @worker.send(:handle_status, { 'text' => 'dup', 'id_str' => '1' })
- expect(@worker.instance_variable_get(:'@recent_tweets').select { |str| str == '1' }.length).to eq 1
- end
- it "calls the agent to process the tweet" do
- expect(@mock_agent).to receive(:name) { 'mock' }
- expect(@mock_agent).to receive(:process_tweet).with('agent',
- { text: 'agent', id_str: '123', expanded_text: 'agent' })
- expect(@worker).to receive(:puts).with(a_string_matching(/received/))
- @worker.send(:handle_status, { 'text' => 'agent', 'id_str' => '123' })
- expect(@worker.instance_variable_get(:'@recent_tweets')).to include('123')
- end
- end
- end
- end
|