Importing COVID-19 data into Elasticsearch

 

UPDATE

Due to a change in the way that John Hopkins Hospital organizes the COVID-19 related data the pipeline together with the index template have been changed. The below configurations for both the pipeline and template have been updated to work properly when importing COVID-19 data into elasticsearch. These changes should not impact the dashboard.

We launched an interactive Kibana dashboard for the COVID-19 outbreak. You can use the dashboard for filtering and comparing COVID-19 data from different countries, regions and counties enabling you to run your own analysis of the outbreak.

SEE DASHBOARD

The outbreak of COVID-19 virus has had a huge impact on both personal lives as well as business activities and in light of this, access to data is essential in allowing everyone to make informed decisions regarding day-to-day activities. John Hopkins Hospital, which is deeply involved in the international response to the virus, is collecting and publishing the data regarding the outbreak through GitHub as well as daily reports and a live dashboard.

In this blog post we will provide you with the details needed to create a logstash pipeline that brings the data collected by John Hopkins Hospital into your own Elasticsearch cluster. We also added the details for this process in a GitHub repository that can be found here.

If you are interested to see how Siscale can help you during these uncertain times check out this page. For more information on ingesting COVID-19 data read on!

 

Logstash

Below is the Logstash code required to collect, transform the data into time series data and import it into Elasticsearch. The code contains commented lines which explain what each part of the code does.


input {
    
    #Use the generator input to update the dataset when the pipeline is first started
	generator {
		lines => [ "first-time-run" ]
		count => 1
		tags => "check_github"
	}
    
	#The http_poller input plugin is used to schedule checks for dataset updates
    #The "schedule" setting is defined to check for updates every new hour (at minute 0)
	http_poller {
		urls => {
			check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_time_series"
		}
		tags => "check_github"
		request_timeout => 60
		schedule => { "cron" => "0 * * * * UTC" }
		codec => "plain"
		metadata_target => "http_poller_metadata"
	}
}

filter {

	#The pipeline will treat two types of events:
	#The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data 
	#The second type is the time series data itself
	#This 'if' discriminates between the two types. The time series data is treated later

    if "check_github" in [tags] {
        ruby {
            init => '
                require "csv"
                require "open-uri"

                #Use a hashmap to store the time series data
                @event_map = Hash.new

                #Function used for extracting time series data from the CSV
                #Arguments: The CSV, the type of the CSV (confirmed/deaths/recovered)
                def parse_csv(csv, type)
                    csv.each do |csv_row|
                        #Drop the first four headers (Province/State, Country/Region, Lat, Lon) - we only want the date headers
                        csv.headers.drop(4).each do |date_header|
                            #Create a unique id from the Province/State, Country/Region, and the date
                            #This will be used for updating the ES index without duplicating data
                            key = ((csv_row[0]||"").downcase + "_" + (csv_row[1]||"").downcase + "_" + (date_header||"").to_s).gsub(/[^\w\d]/, "_")
                            
                            #If the key is already used, then the event is already created
                            #E.g. if the "Confirmed cases" CSV has already been parsed, and Logstash is currently processing the "Deaths" CSV
                            #then the event will be updated with the number of deaths for that Province/Region/Date, based on the unique key generated above
                            if @event_map.key?(key)
                                @event_map[key].set(type, csv_row[date_header])
                            #..else, create a new Logstash event and add it to the Hashmap
                            else
				@event_map[key] = LogStash::Event.new
                                @event_map[key].set("updated_at", Time.now)
				@event_map[key].set("province_state", csv_row[0])
                                @event_map[key].set("country_region", csv_row[1])
                                @event_map[key].set("[location][lat]", csv_row[2])
                                @event_map[key].set("[location][lon]", csv_row[3])
                                @event_map[key].set("timestamp_from_header", date_header)
                                @event_map[key].set("unique_id", key)
                                @event_map[key].set(type, csv_row[date_header])
                            end
                        end
                    end
                end
            '
            code => '
                begin
                    #Download the CSV files and parse them as CSV objects
                    url_confirmed = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv"
                    url_deaths = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv"
                    #url_recovered = ""
                    
                    csv_confirmed = CSV.parse(open(url_confirmed).read, headers: true)
                    csv_deaths = CSV.parse(open(url_deaths).read, headers: true)
                    #csv_recovered = CSV.parse(open(url_recovered).read, headers: true)

                    #Parse the CSVs using the function defined above
                    parse_csv(csv_confirmed, "confirmed")
                    parse_csv(csv_deaths, "deaths")
                    #parse_csv(csv_recovered, "recovered")

                    #For each event in the hashmap:
                    #Calculate ratios, then use "new_event_block.call(event)" to create a new event. 
                    #These new events represent the time series data extracted from the CSVs
                    @event_map.each do |key, event|
                        ratio_deaths_to_confirmed = event.get("deaths").to_f / event.get("confirmed").to_f
                        ratio_recovered_to_confirmed = event.get("recovered").to_f / event.get("confirmed").to_f
                        ratio_deaths_to_recovered = event.get("deaths").to_f / event.get("recovered").to_f
                        if ratio_deaths_to_confirmed.finite?
                            event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed)
                        else
                            event.set("ratio_deaths_to_confirmed", 0.0)
                        end
                        if ratio_recovered_to_confirmed.finite?
                            event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed)
                        else
                            event.set("ratio_recovered_to_confirmed", 0.0)
                        end
                        if ratio_deaths_to_recovered.finite?
                            event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered)							    
                        else
                            event.set("ratio_deaths_to_recovered", 0.0)
                        end
                        new_event_block.call(event)
                    end
                    #After all the parsing is done, cancel this event. 
                    #We only need the time series data (that was already extracted and sent back through the pipeline)
                    #not the initial event itself.
                    event.cancel
                rescue
                    #In case anything goes wrong, log an error
                    @logger.error? and @logger.error("Something went wrong while processing the CSV. Does Logstash have internet access? Are the URLs correct?")
                    event.cancel
		end
            '
        }#end ruby 
    }#end if
    
    #Each time series data event will be sent back through the pipeline.
    #This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
    if [timestamp_from_header] {
        #Transform the date extracted from the CSV into a timefield. 
        #By default, the parsed date will be stored in the '@timestamp' field
        date {
            match => [ "timestamp_from_header", "M/d/yy" ]
        }#end date
           
        #Extract county from the "province_state" field, where possible
        if [province_state] =~ /(.*?),\s(\w{2})/ {
            ruby {
                code => '
                    matched = event.get("province_state").match(/(.*?),\s(\w{2})/)
                    event.set("province_state", matched[2])
                    event.set("county", matched[1])
                    event.tag("added_county")
                '
            }
        }
 
        #This is used to rename the fields if you want to correlate the data with other data sources.
        #Modify the right-hand part of the hash
        #E.g. modify '"confirmed" => "confirmed"' to '"confirmed" => "[cases][confirmed]"' to store the number of confirmed cases under the field 'cases.confirmed'	
        mutate {
            rename => { 
                "confirmed" => "confirmed" 
                "country_region" => "country_region"
                "county" => "county" 
                "deaths" => "deaths"
                "[location][lat]" => "[location][lat]"
                "[location][lon]" => "[location][lon]"
                "province_state" => "province_state"
                "ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
                "ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
                "ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
                "recovered" => "recovered"
                "timestamp_from_header" => "timestamp_from_header"
                "updated_at" => "updated_at"
                "unique_id" => "unique_id"
            }
        }#end mutate
    }#end if
}#end filter

output {
    #Send the data to Elasticsearch
    elasticsearch {
        #Add your Elasticsearch hosts
        hosts => [""]
        #Add the index name
        index => "covid-19-live-update"
        #Add Elasticsearch credentials
        user => ""
        password => ""
        #Add SSL details
        #ssl => true
        #cacert => "/path/to/cert/"

        #Update documents based on the unique id that we defined
        action => "update"
        document_id => "%{unique_id}"
        #..or create a new document if it doesn't already exist
        doc_as_upsert => true
        manage_template => false
    }
}


 

Elasticsearch index

In addition to the Logstash code you will also require a template for the index in elasticsearch. The field mapping can be found below (feel free to change the name of the index but remember to change it also in the Logstash code):


PUT _template/covid-19-live-update
{
    "order" : 0,
    "index_patterns" : [
      "covid-19-live-update"
    ],
    "settings" : { },
    "mappings" : {
      "properties" : {
        "recovered" : {
          "type" : "long"
        },
        "updated_at" : {
          "type" : "date"
        },
        "ratio_deaths_to_recovered" : {
          "type" : "float"
        },
        "ratio_recovered_to_confirmed" : {
          "type" : "float"
        },
        "ratio_deaths_to_confirmed" : {
          "type" : "float"
        },
        "location" : {
          "type" : "geo_point"
        },
        "confirmed" : {
          "type" : "long"
        },
        "deaths" : {
          "type" : "long"
        },
        "last_update" : {
          "type" : "date"
        },
        "last_process" : {
          "type" : "date"
        },
        "@timestamp" : {
          "type" : "date"
        }
      }
    },
    "aliases" : { }
  }

 

Kibana dashboard

In the GitHub repository you will also find the dashboard from this article which you can import into your own Kibana. The dashboard was built using the latest version of the Elastic Stack (7.6.1). In order to import the dashboard you will have to navigate in the Kibana interface to Management > Saved Objects and click import.

The GitHub repository can be found here.

 

Watcher alerting

Once these steps are done you will have the data into Elasticsearch and available through Kibana

We are also attaching two Watcher codes (alerting) to this article which will send an email to the configured email address with details regarding the number of infected people, recovered and deceased depending on the region of interest. Because not all countries have information at the province/region level (this is how the data is collected by John Hopkins Hospital) there is one Watcher for regional data and one for country-wide data. In the code you will find the following tag “change this” which should be changed with the desired value.

A few considerations for the alerts:

  • Make sure to change just the text within the double quotes without deleting them
  • Province and country names should be written with capital letters ( e.g. New York, China etc.)
  • Feel free to change the interval value at the top of the code with the desired interval with which you want the alerting to occur (values are in the form of a number followed by m for minute, h for hour, d for days without deleting the double quotes)
  • Keep in mind that the data from John Hopkins Hospital is updated once or twice a day from our observations
  • The following countries have regional information:
    • US
    • China
    • Australia
    • Canada
    • France
    • United Kingdom
    • Denmark
    • Cruise Ship
  • In order to be able to send emails you need to configure your Elastic Stack accordingly. Details can be found here.

 

Province/Region Watcher alert


{
  "trigger": {
    "schedule": {
      "interval": "24h"
    }
  },
  "input": {
    "search": {
      "request": {
        "body": {
          "query" : {
            "bool" : {
              "must" : [
                {
                    "match" : {
                        "province_state.keyword" : "Change this"
                    }
                }
              ]
            }
          },
          "size" : 2,
          "sort" : {
            "@timestamp" : {
              "order" : "desc"
            }
          }
        },
        "indices": [
          "covid-19-live-update"
        ]
      }
    }
  },
  "condition": {
      "compare" : {
          "ctx.payload.hits.total" : {
              "gt" : 1
          }
      }
  },
  "transform" : {
      "script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
  },
  "actions": {
      "email" : { 
        "email" : { 
          "to" : "username@example.org", 
          "subject" : "COVID-19 Status for {{ctx.payload.province}}, {{ctx.payload.country}}", 
          "body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})" 
        }
      }
  }
}

 

Country level Watcher alert


{
  "trigger": {
    "schedule": {
      "interval": "24h"
    }
  },
  "input": {
    "search": {
      "request": {
        "body": {
          "query" : {
            "bool" : {
              "must" : [
                {
                    "match" : {
                        "country_region.keyword" : "Change this"
                    }
                }
              ]
            }
          },
          "size" : 2,
          "sort" : {
            "@timestamp" : {
              "order" : "desc"
            }
          }
        },
        "indices": [
          "covid-19-live-update"
        ]
      }
    }
  },
  "condition": {
      "compare" : {
          "ctx.payload.hits.total" : {
              "gt" : 1
          }
      }
  },
  "transform" : {
      "script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
  },
  "actions": {
      "email" : { 
        "email" : { 
          "to" : "username@example.org", 
          "subject" : "COVID-19 Status for {{ctx.payload.country}}", 
          "body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})" 
        }
      }
  }
}

 

In part 2 of this blog post we will also show how you can correlate the Coronavirus data with other data sets. This will help businesses to correlate their operational and business data with the COVID-19 outbreak in order to adjust their activities.

If you require help or assistance with this process please contact us at contact@siscale.com