Simple Twitter Monitor with ElasticSearch/OpenDistro

Hi everyone! I will make a simple use case about tracking twitter info and make alerts about non-expected activity 🤯.

For this use case I will use:

  • Docker-compose
  • Apache NIFI &
  • ElasticSearch OSS/OpenDistro

I will suppose that:

  • a Docker and Docker-compose instance is available, like other command line tools (i.e: curl, jq, …)
  • is a develop / test environment based on unix like system (MacOS, Linux, …)

First Step ElasticSearch

Ok, first thing to do is make a yaml to use with docker-compose, i will take reference file from OpenDistro site : https://opendistro.github.io/for-elasticsearch-docs/docs/install/docker/

version: '3'
services:
odfe-node1:
image: amazon/opendistro-for-elasticsearch:1.1.0
container_name: odfe-node1
environment:
- cluster.name=odfe-cluster
- node.name=odfe-node1
- discovery.seed_hosts=odfe-node1,odfe-node2
- cluster.initial_master_nodes=odfe-node1,odfe-node2
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
- "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the Elasticsearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- odfe-data1:/usr/share/elasticsearch/data
ports:
- 9200:9200
- 9600:9600 # required for Performance Analyzer
networks:
- odfe-net
odfe-node2:
image: amazon/opendistro-for-elasticsearch:1.1.0
container_name: odfe-node2
environment:
- cluster.name=odfe-cluster
- node.name=odfe-node2
- discovery.seed_hosts=odfe-node1,odfe-node2
- cluster.initial_master_nodes=odfe-node1,odfe-node2
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- odfe-data2:/usr/share/elasticsearch/data
networks:
- odfe-net
kibana:
image: amazon/opendistro-for-elasticsearch-kibana:1.1.0
container_name: odfe-kibana
ports:
- 5601:5601
expose:
- "5601"
environment:
ELASTICSEARCH_URL: https://odfe-node1:9200
ELASTICSEARCH_HOSTS: https://odfe-node1:9200
networks:
- odfe-net

volumes:
odfe-data1:
odfe-data2:

networks:
odfe-net:

Create a docker-compose.yml file in you work dir, and put previous config code in it. When you had finished run it with:

> docker-compose upodfe-node1 is up-to-date
odfe-node2 is up-to-date
odfe-kibana is up-to-date

Now you have, ElasticSearch and Kibana on (OpenDistro). You can access to Kibana user interface using http://localhost:5601/ . Default user/pass is admin. (Change it 😅)

Second Step. Enabling Data Capture

In previous step we had enabled a data repository, indexed (ES) and a “friendly” interface for user (Kibana). But we need data 🕵️‍♀️, for this example i will use twitter info. We’ll use Apache NIFI as Data Flow Controller, it will take data from Twitter and put in ES Index.

First we’ll add Apache NIFI container to docker-compose.yml:

...kibana:
image: amazon/opendistro-for-elasticsearch-kibana:1.1.0
...
networks:
- odfe-net
nifi:
image: apache/nifi:latest
ports:
- 8080:8080 # Unsecured HTTP Web Port
environment:
- NIFI_WEB_HTTP_PORT=8080
- NIFI_ELECTION_MAX_WAIT=1 min
volumes:
- ./nifi-data:/opt/nifi-data/
- ./python:/opt/nifi-python
extra_hosts:
- "elasticsearch:odfe-node1"
networks:
- odfe-net
volumes:
odfe-data1:

Enable NIFI on docker-compose deployment:

> docker-compose upCreating twitteralert_nifi_1 ... done

Now, we’ve a NIFI running on http://localhost:8080/nifi

One of features of OpenDistro is security extension, past versions of ES hadn’t got security plugin as default (premium feature). OpenDistro wants by default SSL connections, and default deployment enables internal secure communications between ES and Kibana, but using self-signed certificates, you can change it, but for easy integration I will disable it and only use http authenticated connections (non-secure).

Disable SSL on containers is quite easy, only add a environment variable to container (two) spec:

odfe-node1:
image: amazon/opendistro-for-elasticsearch:1.1.0
...
environment:
- cluster.name=odfe-cluster
- ...
- "ES_JAVA_OPTS=-Xms512m -Xmx512m" # minimum and maximum Java heap size, recommend setting both to 50% of system RA
- opendistro_security.ssl.http.enabled=false
ulimits:
...

Restart environment 🚨:

> docker-compose restart
Restarting twitteralert_nifi_1 ... done
Restarting odfe-node2 ... done
Restarting odfe-node1 ... done
Restarting odfe-kibana ... done

Third step. Put data pipes.

Now we have a very basic architecture to process realtime data:

  • NIFI can retrieve info from Twitter
  • ES can keep it and provides search tools
  • Kibana enables easy querying it

Now, we have to create a simple data flow on NIFI. We’ve to add 2 nifi processors:

  • GetTwitter.
  • PutElasticSearch
Apache NIFI User Interface

We’ve to configure these processors, to do it you have to alt-click on processor and use configure option. Next, we will define the parameters of each of the processors

GetTwitter

  • Properties > Twitter EndPoint: Filter Endpoint
  • Properties > Consumer Key, Consumer Secret, Access Key, Access Secret: API Keys from devs twitter portal.
  • Properties > Terms to Filter On: python,machine learning,deep learning

Notes: GetTwitter have several criteria to get Tweets, see docs.

To connect Twitter API with ES is needed and API Key from Twitter go to https://developer.twitter.com to get it.

PutElasticsearchHttp

  • Properties > ElasticSearch URL: http://odfe-node1:9200
  • Properties > Username: admin
  • Properties > Password: admin (if not changed before 🤪)
  • Properties > Index: twitter
  • Properties > Index: _doc
  • Settings > Mark automatically terminate on all cases (success, retry, failure)

Connect two processor on success. 🏁

Fourth step. Prepare index

ES index needs a index and a mapping to host information. ES by default creates it, but Twitter data have particularities thats isn’t interpreted well. Of course, this can be fixed easily, but I will use a fixed mapping, based on Christina Boididou mapping (published here).

Let’s create a index and add mapping for dates and location fields:

> curl -X PUT  http://localhost:9200/twitter -u admin:admin
{"acknowledged":true,"shards_acknowledged":true,"index":"twitter"}%
> curl -XPUT "http://localhost:9200/twitter/_mapping" -u admin:admin -H 'Content-Type: application/json' -d'
{
"properties": {
"created_at": {
"type": "date",
"format": "EEE MMM dd HH:mm:ss Z yyyy"
},
"retweeted_status.created_at": {
"type": "date",
"format": "EEE MMM dd HH:mm:ss Z yyyy"
},
"user.created_at": {
"type": "date",
"format": "EEE MMM dd HH:mm:ss Z yyyy"
},
"retweeted_status.user.created_at": {
"type": "date",
"format": "EEE MMM dd HH:mm:ss Z yyyy"
},
"coordinates.coordinates": {
"type": "geo_point"
},
"place.bounding_box": {
"type": "geo_shape",
"coerce": true,
"ignore_malformed": true
}
}
}'

Fifth step. Let data flows.

Now, we have a very simple data flow on Apache NIFI, it’s time to run it and retrieve some data. We can check whats happening watching and refreshing NIFI UI, and extend info with data provenance options.

Also, you can check if tweets are getting into ES easily with cURL:

> curl http://127.0.0.1:9200/twitter/_search -u admin:admin -s |jq .hits.total.value
197

Sixth step. Enable our “Little” Brother.

Now, we have data in our ES. It’s time to configure Kibana to see what’s happening and enable alerting module of OpenDistro.

We need to login into Kibana using http://localhost:5601 u/p:admin/admin and create new Kibana index pattern.

Steps to create a index pattern on Kibana

Once Kibana config is added, you can use discover interface (compass icon) to explore retrieved data:

Seventh. Looking for a Top Tweet!

Ok, we have a lot of tweets, but any alert. My current objective is alter when a Top Class Tweet appears, and I think that it happens when a lot of people Retweets on Tweet.

Creating a monitor is a process that can be done using Kibana / OpenDistro interface and have several stages:

  1. Query for monitoring,
  2. Add a monitor,
  3. Define a trigger and
  4. Create an action and
  5. Select a destination,

Query

After adding a destination we can create our monitor and the rest of stuff. Well, i think that a query is the primary target, every goes around it. Because I’m old-school developer I prefer SQL query to DSL. Fortunately, OpenDistro (and ES XPack) allows to translate SQL sentence to DSL query language (needed for alerting module).

E.g. I will alert on my Slack channel when a tweet have more than 100 RT in one hour.

> POST /_opendistro/_sql
{
"query": "select retweeted_status.id_str from twitter group by retweeted_status.id_str"
}

Equivalent DSL query is :

> GET _search
{
"from": 0,
"size": 0,
"_source": {
"includes": [
"retweeted_status.id_str"
],
"excludes": []
},
"stored_fields": "retweeted_status.id_str",
"aggregations": {
"retweeted_status.id_str.keyword": {
"terms": {
"field": "retweeted_status.id_str.keyword",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
}
}
}
}

But, this query give all results all over the time. Our monitor needs a time range to be useful, so we need to add extra clauses to our query (changes highlighted):

{
"from": 0,
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"created_at": {
"from": "{{period_end}}||-1h",
"to": "{{period_end}}",
"include_lower": true,
"include_upper": true,
"format": "epoch_millis",
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
},

"_source": {
"includes": [
"retweeted_status.id_str"
],
"excludes": []
},
"stored_fields": "retweeted_status.id_str",
"aggregations": {
"retweeted_status.id_str.keyword": {
"terms": {
"field": "retweeted_status.id_str.keyword",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
}
}
}
}

Monitor

Now we have our monitor query ready, let’s add it to Kibana. Again select Alerting model (“A” icon), click on “Monitors” and “Create monitor”. For current example we need to specify:

  • “Frecuency”, it sets how often our monitor will run, e.g. every 5 minutes
  • “How do you want to define the monitor”, it sets query mode, DSL query is better (and more complex) than visual graph, only useful for simplest cases.
  • “Define extraction query”, place to load our query (written before). You can check it using “Run” button on top of this panel.
Monitor creation interface

Trigger

Finish monitor click on “Create” at the end of this page. Kibana will load a new page with out new monitor detail, it’s time to create our trigger clicking on “Create” on “Trigger” panel.

Monitor detail interface

Now we can define our new trigger. In this form the most important field is “Trigger condition”, i this case it quite complex. Default trigger checks total results, but in current example i’m more interested on “buckets” info. I want be informed e.g. when a tweet is retweet more than fifty times in a hour:

ctx.results[0].aggregations["retweeted_status.id_str.keyword"]["buckets"][0]["doc_count"] > 100

Trigger only wants a true/false values, everything else will return an error.

Trigger creation interface

Change trigger condition for our criteria, and add an action.

Trigger user interface

Destination

Let’s go. Where’re going published our alerts? OpenDistro gives 3 options:

  • Amazon Chime (Amazon selling amazon!! Wow! 🤪)
  • Slack
  • Custom Webhook

I will use Slack, and we need a webhook from it. (Check references for help 🔍).

Now, click on “Add destination” and add our Slack Webhook like that:

Add destination to alerting module

Now we have a destination, let’s finish action selecting our destination and saving our action.

Action message interface

Eighth. Watching alerts 🏁

We had finished our simple alert system, we had covered all steps without customizations. I will try to add extend different parts in other post.

References

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store