Importing COVID-19 data into Elasticsearch

UPDATE

Due to a change in the way that John Hopkins Hospital organizes the COVID-19 related data the pipeline together with the index template have been changed. The below configurations for both the pipeline and template have been updated to work properly when importing COVID-19 data into elasticsearch. These changes should not impact the dashboard.

We launched an interactive Kibana dashboard for the COVID-19 outbreak. You can use the dashboard for filtering and comparing COVID-19 data from different countries, regions and counties enabling you to run your own analysis of the outbreak.

SEE DASHBOARD

The outbreak of COVID-19 virus has had a huge impact on both personal lives as well as business activities and in light of this, access to data is essential in allowing everyone to make informed decisions regarding day-to-day activities. John Hopkins Hospital, which is deeply involved in the international response to the virus, is collecting and publishing the data regarding the outbreak through GitHub as well as daily reports and a live dashboard.

In this blog post we will provide you with the details needed to create a logstash pipeline that brings the data collected by John Hopkins Hospital into your own Elasticsearch cluster. We also added the details for this process in a GitHub repository that can be found here.

If you are interested to see how Siscale can help you during these uncertain times check out this page. For more information on ingesting COVID-19 data read on!

Logstash

Below is the Logstash code required to collect, transform the data into time series data and import it into Elasticsearch. The code contains commented lines which explain what each part of the code does.

-- CODE markdown--
input {
   
   #Use the generator input to update the dataset when the pipeline is first started
   generator {
       lines => [ "first-time-run" ]
       count => 1
       tags => "check_github"
   }

   #The http_poller input plugin is used to schedule checks for dataset updates
   #The "schedule" setting is defined to check for updates every new hour (at minute 0)
   http_poller {
       urls => {
           check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports"
       }
       tags => "check_github"
       request_timeout => 60
       schedule => { "cron" => "0 * * * * UTC" }
       codec => "plain"
       metadata_target => "http_poller_metadata"
   }
}

filter {

   #The pipeline will treat two types of events:
   #The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data
   #The second type is the time series data itself
   #This 'if' discriminates between the two types. The time series data is treated later
   if "check_github" in [tags] {
       ruby {
           init => '
               require "csv"
               require "open-uri"
               require "digest"
               require "json"

               #Path to store the MD5 hashes of the downloaded CSV files
               #This is used to keep track of any changes to the Github repo and optimize the update process
               @github_stored_hashes_path = "/etc/logstash/covid-19-hashes.json"

               #Base URL for downloading the time series CSVs
               @github_daily_reports_base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/"

               @states = { "AL" => "Alabama", "AK" => "Alaska", "AS" => "American Samoa", "AZ" => "Arizona", "AR" => "Arkansas", "CA" => "California", "CO" => "Colorado", "CT" => "Connecticut", "DE" => "Delaware", "DC" => "District of Columbia", "FM" => "Federated States of Micronesia", "FL" => "Florida", "GA" => "Georgia", "GU" => "Guam", "HI" => "Hawaii", "ID" => "Idaho", "IL" => "Illinois", "IN" => "Indiana", "IA" => "Iowa", "KS" => "Kansas", "KY" => "Kentucky", "LA" => "Louisiana", "ME" => "Maine", "MH" => "Marshall Islands", "MD" => "Maryland", "MA" => "Massachusetts", "MI" => "Michigan", "MN" => "Minnesota", "MS" => "Mississippi", "MO" => "Missouri", "MT" => "Montana", "NE" => "Nebraska", "NV" => "Nevada", "NH" => "New Hampshire", "NJ" => "New Jersey", "NM" => "New Mexico", "NY" => "New York", "NC" => "North Carolina", "ND" => "North Dakota", "MP" => "Northern Mariana Islands", "OH" => "Ohio", "OK" => "Oklahoma", "OR" => "Oregon", "PW" => "Palau", "PA" => "Pennsylvania", "PR" => "Puerto Rico", "RI" => "Rhode Island", "SC" => "South Carolina", "SD" => "South Dakota", "TN" => "Tennessee", "TX" => "Texas", "UT" => "Utah", "VT" => "Vermont", "VI" => "Virgin Islands", "VA" => "Virginia", "WA" => "Washington", "WV" => "West Virginia", "WI" => "Wisconsin", "WY" => "Wyoming" }

               #Input: a CSV and the date of the CSV
               #Output: an array of Logstash events extracted from the CSV that will be sent to Elasticsearch
               def create_events(csv, current_date)
                   totals_by_country = Hash.new
                   totals_by_country["WORLD"] = LogStash::Event.new
                   totals_by_country["WORLD"].set("total_confirmed", 0)
                   totals_by_country["WORLD"].set("total_recovered", 0)
                   totals_by_country["WORLD"].set("total_deaths", 0)
                   totals_by_country["WORLD"].tag("total_world")
                   totals_by_country["WORLD"].tag("covid_time_series")
                   totals_by_country["WORLD"].set("timestamp", current_date.iso8601(3).to_s)
                   totals_by_country["WORLD"].set("unique_id", "totals_for_world_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
                   all_country_region = Array.new
                   all_province_state = Array.new
                   all_county = Array.new
                   all_country_region.append("WORLD")

                   events_array = Array.new
                   #For each event..
                   csv.each do |row|
                       #..create a new Logstash event that will store the data
                       new_event = LogStash::Event.new

                       #..add current row data to the current event
                       row.each do |k, v|
                           #new_event[k] = v
                           new_event.set(k, v&.strip)
                       end

                       #..store the date of the reporting
                       new_event.set("timestamp", current_date.iso8601(3).to_s)
                       #..store the date when Logstash processed the event
                       new_event.set("last_process", Time.now.iso8601(3).to_s)

                       #..some early events contain empty fields when the reported numbers are 0. Fix this here
                       new_event.set("confirmed", 0) if new_event.get("confirmed").nil?
                       new_event.set("recovered", 0) if new_event.get("recovered").nil?
                       new_event.set("deaths", 0) if new_event.get("deaths").nil?
                       #..calculate the number of active cases, if the field is not already populated
                       active = new_event.get("confirmed").to_i - new_event.get("recovered").to_i - new_event.get("deaths").to_i
                       new_event.set("active", active) if active >= 0

                       #..calculate ratios and store them
                       ratio_deaths_to_confirmed = new_event.get("deaths").to_f / new_event.get("confirmed").to_f
                       ratio_recovered_to_confirmed = new_event.get("recovered").to_f / new_event.get("confirmed").to_f
                       ratio_deaths_to_recovered = new_event.get("deaths").to_f / new_event.get("recovered").to_f
                       new_event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
                       new_event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
                       new_event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?

                       if new_event.get("province_state")&.match?(/(.*?),\s(\w{2})/)
                           matched = new_event.get("province_state").match(/(.*?),\s(\w{2})/)
                           @states[matched[2]].nil? ? new_event.set("province_state", matched[2]) : new_event.set("province_state", @states[matched[2]])
                           new_event.set("admin2", matched[1].gsub(/ [Cc]ounty/, ""))
                           new_event.tag("added_county")
                       end

                       #..generate a unique key for the current event
                       key = ((new_event.get("admin2") || "").downcase + "_" + (new_event.get("province_state") || "").downcase + "_" + (new_event.get("country_region") || "").downcase + "_" + (current_date.strftime("%m-%d-%Y") || "").to_s).gsub(/[^\w\d]/, "_")
                       new_event.set("unique_id", key)

                       #..add a tag to identify the event later
                       new_event.tag("covid_time_series")
                       events_array.append(new_event)

                       if ["Channel Islands", "Cayman Islands"].include?(new_event.get("country_region"))
                           new_event.set("province_state", new_event.get("country_region"))
                           new_event.set("country_region", "United Kingdom")
                       end
                       if ["Saint Martin", "St. Martin"].include?(new_event.get("country_region"))
                           new_event.set("province_state", "St Martin")
                           new_event.set("country_region", "France")
                       end

                       country_normalization = {
                           "Mainland China" => "China",
                           "South Korea" => "Korea, South",
                           "Republic of Korea" => "Korea, South",
                           "Iran \(Islamic Republic of\)" => "Iran",
                           "Bahamas, The" => "Bahamas",
                           "Gambia, The" => "Gambia",
                           "Hong Kong SAR" => "China",
                           "Hong Kong" => "China",
                           "Macao SAR" => "China",
                           "Macau" => "China",
                           "Republic of Ireland" => "Ireland",
                           "Republic of Moldova" => "Moldova",
                           "Republic of the Congo" => "Congo (Brazzaville)",
                           "Russian Federation" => "Russia",
                           "Taipei and environs" => "Taiwan",
                           "Taiwan\*" => "Taiwan",
                           "UK" => "United Kingdom",
                           "Vatican City" => "Holy See",
                           "Viet Nam" => "Vietnam",
                           "occupied Palestinian territory" => "West Bank and Gaza",
                           "Ivory Coast" => "Cote d\'Ivoire",
                           "East Timor" => "Timor-Leste",
                           "Czech Republic" => "Czechia",
                           "The Bahamas" => "Bahamas",
                           "The Gambia" => "Gambia",
                           "Cape Verde" => "Cabo Verde",
                           "North Ireland" => "Ireland",
                           "Palestine" => "West Bank and Gaza",
                       }
                       new_event.set("country_region", new_event.get("country_region").gsub(/.*/, country_normalization)) unless country_normalization[new_event.get("country_region")].nil?

                       if totals_by_country[new_event.get("country_region")].nil?
                           totals_by_country[new_event.get("country_region")] = LogStash::Event.new
                           totals_by_country[new_event.get("country_region")].set("confirmed", 0)
                           totals_by_country[new_event.get("country_region")].set("recovered", 0)
                           totals_by_country[new_event.get("country_region")].set("deaths", 0)
                           totals_by_country[new_event.get("country_region")].set("country_region", new_event.get("country_region"))
                           totals_by_country[new_event.get("country_region")].set("timestamp", new_event.get("timestamp"))
                           totals_by_country[new_event.get("country_region")].tag("total_by_country_region")
                           totals_by_country[new_event.get("country_region")].tag("covid_time_series")
                           totals_by_country[new_event.get("country_region")].set("unique_id", "totals_for_" + new_event.get("country_region").downcase + "_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
                       end
                       totals_by_country[new_event.get("country_region")].set("confirmed", new_event.get("confirmed").to_i + totals_by_country[new_event.get("country_region")].get("confirmed").to_i)
                       totals_by_country[new_event.get("country_region")].set("deaths", new_event.get("deaths").to_i + totals_by_country[new_event.get("country_region")].get("deaths").to_i)
                       totals_by_country[new_event.get("country_region")].set("recovered", new_event.get("recovered").to_i + totals_by_country[new_event.get("country_region")].get("recovered").to_i)
                       ratio_deaths_to_confirmed = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
                       ratio_recovered_to_confirmed = totals_by_country[new_event.get("country_region")].get("recovered").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
                       ratio_deaths_to_recovered = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("recovered").to_f
                       totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
                       totals_by_country[new_event.get("country_region")].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
                       totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?

                       totals_by_country["WORLD"].set("total_confirmed", totals_by_country["WORLD"].get("total_confirmed").to_i + new_event.get("confirmed").to_i)
                       totals_by_country["WORLD"].set("total_recovered", totals_by_country["WORLD"].get("total_recovered").to_i + new_event.get("recovered").to_i)
                       totals_by_country["WORLD"].set("total_deaths", totals_by_country["WORLD"].get("total_deaths").to_i + new_event.get("deaths").to_i)
                       all_country_region.append(new_event.get("country_region")) if !new_event.get("country_region").nil? and !all_country_region.include?(new_event.get("country_region"))
                       all_province_state.append(new_event.get("province_state")) if !new_event.get("province_state").nil? and !all_province_state.include?(new_event.get("province_state"))
                       all_county.append(new_event.get("admin2")) if !new_event.get("admin2").nil? and !all_county.include?(new_event.get("admin2"))
                   end

                   ratio_deaths_to_confirmed = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
                   ratio_recovered_to_confirmed = totals_by_country["WORLD"].get("total_recovered").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
                   ratio_deaths_to_recovered = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_recovered").to_f
                   totals_by_country["WORLD"].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
                   totals_by_country["WORLD"].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
                   totals_by_country["WORLD"].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
                   totals_by_country["WORLD"].set("country_region", all_country_region)
                   totals_by_country["WORLD"].set("province_state", all_province_state)
                   totals_by_country["WORLD"].set("admin2", all_county)
                   totals_by_country.each do |k, v|
                       events_array.append(v)
                   end

                   return events_array
               end
           '
           code => '
               #First day in the dataset
               github_daily_reports_first_day = Time.gm(2020, 01, 22, 12, 00)
               #Current day
               github_daily_reports_last_day = Time.now
               current_date = github_daily_reports_first_day

               #If the CSV hashes are stored in a file, parse them..
               if File.file?(@github_stored_hashes_path)
                   stored_hashes = JSON.parse(File.read(@github_stored_hashes_path))
                   #..else, create a new hash
               else
                   stored_hashes = Hash.new
               end

               #For each day since the beggining of the dataset, until the current day..
               while current_date <= github_daily_reports_last_day="" begin="" #..generate="" the="" url="" based="" on="" current_day="" current_url="@github_daily_reports_base_url" +="" current_date.strftime("%m-%d-%y")="" ".csv"="" #..parse="" csv="" while="" normalizing="" headers="" #(e.g.="" until="" 21.03.2020,="" country="" is="" stored="" under="" "country="" region".="" after="" this="" date,="" it="" "country_region"="" #we="" normalize="" such="" that="" both="" variations="" are="" ultimately="" "country_region".="" applies="" to="" other="" fields="" as="" well)="" current_csv="CSV.parse(open(current_url).read.gsub(" \ufeff","="" ""),="" headers:="" true,="" header_converters:="" lambda="" {="" |h|="" h.downcase.gsub(="" [^\w]="" ,="" "_")="" })="" hash="" current_md5="Digest::SHA256.hexdigest(current_csv.to_s)" rescue="" #the="" pipeline="" will="" usually="" fail="" fetch="" data="" for="" current="" day,="" if="" was="" not="" already="" published="" logger.warn?="" and="" logger.warn("failed="" date="" "="" current_date.strftime("%m-%d-%y"))="" else="" #..if="" different="" than="" one,="" process="" events="" stored_hashes[current_url]="" !="current_md5" events_array="create_events(current_csv," current_date)="" #..store="" new="" #..and="" send="" each="" event="" back="" through="" events_array.each="" do="" |event|="" new_event_block.call(event)="" end="" #check="" next="" day="" current_date="" #attempt="" write="" hashes="" disk="" file.write(@github_stored_hashes_path,="" stored_hashes.to_json)="" logger.info?="" logger.info("succesfuly="" updated="" cache="" at="" #{@github_stored_hashes_path}")="" standarderror=""> ex</=>
                   logger.error? and logger.error("(#{ex.class} - #{ex.message}) Failed to write hashes to file #{@github_stored_hashes_path}. Does Logstash have read/write access to this location?")
               ensure
                   #..and cancel this event. We are only interested in the time series data, which was already sent back through the pipeline.
                   event.cancel
               end
           '
       }#end ruby
   }#end if
   
   #Each time series data event will be sent back through the pipeline.
   #This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
   if "covid_time_series" in [tags] {
       #Parse date fields
       date {
           #When the github dataset was last updated by the maintainers
           match => [ "last_update", "yyyy-MM-dd HH:mm:ss", "M/d/yy HH:mm", "M/d/yyyy HH:mm", "ISO8601" ]
           target => "last_update"
       }#end date
       date {
           #When the event was processed by Logstash
           match => ["last_process", "ISO8601" ]
           target => "last_process"
       }#end date
       date {
           #The true date of the report
           match => [ "timestamp", "ISO8601" ]
           target => "@timestamp"
       }#end date


     if "total_by_country_region" not in [tags] and "total_world" not in [tags] {
       mutate {
          #Normalize the location field
           rename => {
               "lat" => "[location][lat]"
               "latitude" => "[location][lat]"
               "lon" => "[location][lon]"
               "long" => "[location][lon]"
               "long_" => "[location][lon]"
               "longitude" => "[location][lon]"
           }
       }
       ruby {
       code => '
           @state_coordinates = { "Wisconsin" => { "lat" => "44.5", "lon" => "-89.5" }, "West Virginia" => { "lat" => "39", "lon" => "-80.5" }, "Vermont" => { "lat" => "44", "lon" => "-72.699997" }, "Texas" => { "lat" => "31", "lon" => "-100" }, "South Dakota" => { "lat" => "44.5", "lon" => "-100" }, "Rhode Island" => { "lat" => "41.700001", "lon" => "-71.5" }, "Oregon" => { "lat" => "44", "lon" => "-120.5" }, "New York" => { "lat" => "43", "lon" => "-75" }, "New Hampshire" => { "lat" => "44", "lon" => "-71.5" }, "Nebraska" => { "lat" => "41.5", "lon" => "-100" }, "Kansas" => { "lat" => "38.5", "lon" => "-98" }, "Mississippi" => { "lat" => "33", "lon" => "-90" }, "Illinois" => { "lat" => "40", "lon" => "-89" }, "Delaware" => { "lat" => "39", "lon" => "-75.5" }, "Connecticut" => { "lat" => "41.599998", "lon" => "-72.699997" }, "Arkansas" => { "lat" => "34.799999", "lon" => "-92.199997" }, "Indiana" => { "lat" => "40.273502", "lon" => "-86.126976" }, "Missouri" => { "lat" => "38.573936", "lon" => "-92.60376" }, "Florida" => { "lat" => "27.994402", "lon" => "-81.760254" }, "Nevada" => { "lat" => "39.876019", "lon" => "-117.224121" }, "Maine" => { "lat" => "45.367584", "lon" => "-68.972168" }, "Michigan" => { "lat" => "44.182205", "lon" => "-84.506836" }, "Georgia" => { "lat" => "33.247875", "lon" => "-83.441162" }, "Hawaii" => { "lat" => "19.741755", "lon" => "-155.844437" }, "Alaska" => { "lat" => "66.160507", "lon" => "-153.369141" }, "Tennessee" => { "lat" => "35.860119", "lon" => "-86.660156" }, "Virginia" => { "lat" => "37.926868", "lon" => "-78.024902" }, "New Jersey" => { "lat" => "39.833851", "lon" => "-74.871826" }, "Kentucky" => { "lat" => "37.839333", "lon" => "-84.27002" }, "North Dakota" => { "lat" => "47.650589", "lon" => "-100.437012" }, "Minnesota" => { "lat" => "46.39241", "lon" => "-94.63623" }, "Oklahoma" => { "lat" => "36.084621", "lon" => "-96.921387" }, "Montana" => { "lat" => "46.96526", "lon" => "-109.533691" }, "Washington" => { "lat" => "47.751076", "lon" => "-120.740135" }, "Utah" => { "lat" => "39.41922", "lon" => "-111.950684" }, "Colorado" => { "lat" => "39.113014", "lon" => "-105.358887" }, "Ohio" => { "lat" => "40.367474", "lon" => "-82.996216" }, "Alabama" => { "lat" => "32.31823", "lon" => "-86.902298" }, "Iowa" => { "lat" => "42.032974", "lon" => "-93.581543" }, "New Mexico" => { "lat" => "34.307144", "lon" => "-106.018066" }, "South Carolina" => { "lat" => "33.836082", "lon" => "-81.163727" }, "Pennsylvania" => { "lat" => "41.203323", "lon" => "-77.194527" }, "Arizona" => { "lat" => "34.048927", "lon" => "-111.093735" }, "Maryland" => { "lat" => "39.045753", "lon" => "-76.641273" }, "Massachusetts" => { "lat" => "42.407211", "lon" => "-71.382439" }, "California" => { "lat" => "36.778259", "lon" => "-119.417931" }, "Idaho" => { "lat" => "44.068203", "lon" => "-114.742043" }, "Wyoming" => { "lat" => "43.07597", "lon" => "-107.290283" }, "North Carolina" => { "lat" => "35.782169", "lon" => "-80.793457" }, "Louisiana" => { "lat" => "30.39183", "lon" => "-92.329102" } }

           if (event.get("[location][lat]").nil? || event.get("[location][lon]").nil?) || (event.get("[location][lat]").to_f == 0 && event.get("[location][lon]").to_f == 0)
               if (event.get("[country_region]") == "US") and (!@state_coordinates[event.get("[province_state]")].nil?)
                   event.set("[location][lat]", @state_coordinates[event.get("province_state")]["lat"])
                   event.set("[location][lon]", @state_coordinates[event.get("province_state")]["lon"])
               else
                   event.set("[location][lat]", 0)
                   event.set("[location][lon]", 0)
               end
           end
           '
       }
     }

       #Rename the fields if you want to correlate with other datasets
       mutate {
           rename => {
               "province_state" => "province_state"
               "country_region" => "country_region"
               "admin2" => "county"
               "fips" => "fips"
               
               "confirmed" => "confirmed"
               "deaths" => "deaths"
               "recovered" => "recovered"
               "active" => "active"
               
               "[location][lat]" => "[location][lat]"
               "[location][lon]" => "[location][lon]"

               "last_update" => "last_update"
               "last_process" => "last_process"
               "combined_key" => "combined_key"

               "ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
               "ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
               "ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
               
               "unique_id" => "unique_id"
           }
       }#end mutate
  }#end if
}#end filter

output {
   #Send the data to Elasticsearch
   elasticsearch {
       #Add your Elasticsearch hosts
       hosts => [""]
       #Add the index name
       index => "covid-19-live-update"
       #Add Elasticsearch credentials
       user => ""
       password => ""
       #Add SSL details
       #ssl => true
       #cacert => "/path/to/cert/"

       #Update documents based on the unique id that we defined
       action => "update"
       document_id => "%{unique_id}"
       #..or create a new document if it doesn't already exist
       doc_as_upsert => true
       manage_template => false
   }
}

Elasticsearch index

In addition to the Logstash code you will also require a template for the index in elasticsearch. The field mapping can be found below (feel free to change the name of the index but remember to change it also in the Logstash code):

-- CODE markdown--
PUT _template/covid-19-live-update
{
   "order" : 0,
   "index_patterns" : [
     "covid-19-live-update"
   ],
   "settings" : { },
   "mappings" : {
     "properties" : {
       "recovered" : {
         "type" : "long"
       },
       "updated_at" : {
         "type" : "date"
       },
       "ratio_deaths_to_recovered" : {
         "type" : "float"
       },
       "ratio_recovered_to_confirmed" : {
         "type" : "float"
       },
       "ratio_deaths_to_confirmed" : {
         "type" : "float"
       },
       "location" : {
         "type" : "geo_point"
       },
       "confirmed" : {
         "type" : "long"
       },
       "deaths" : {
         "type" : "long"
       },
       "last_update" : {
         "type" : "date"
       },
       "last_process" : {
         "type" : "date"
       },
       "@timestamp" : {
         "type" : "date"
       }
     }
   },
   "aliases" : { }
 }

Kibana dashboard

In the GitHub repository you will also find the dashboard from this article which you can import into your own Kibana. The dashboard was built using the latest version of the Elastic Stack (7.6.1). In order to import the dashboard you will have to navigate in the Kibana interface to Management > Saved Objects and click import.

The GitHub repository can be found here.

Watcher alerting

Once these steps are done you will have the data into Elasticsearch and available through Kibana.

We are also attaching two Watcher codes (alerting) to this article which will send an email to the configured email address with details regarding the number of infected people, recovered and deceased depending on the region of interest. Because not all countries have information at the province/region level (this is how the data is collected by John Hopkins Hospital) there is one Watcher for regional data and one for country-wide data. In the code you will find the following tag “change this” which should be changed with the desired value.

A few considerations for the alerts:

  • Make sure to change just the text within the double quotes without deleting them
  • Province and country names should be written with capital letters ( e.g. New York, China etc.)
  • Feel free to change the interval value at the top of the code with the desired interval with which you want the alerting to occur (values are in the form of a number followed by m for minute, h for hour, d for days without deleting the double quotes)
  • Keep in mind that the data from John Hopkins Hospital is updated once or twice a day from our observations
  • The following countries have regional information:
  • US
  • China
  • Australia
  • Canada
  • France
  • United Kingdom
  • Denmark
  • Cruise Ship
  • In order to be able to send emails you need to configure your Elastic Stack accordingly. Details can be found here.

Province/Region Watcher alert

-- CODE markdown--
{
 "trigger": {
   "schedule": {
     "interval": "24h"
   }
 },
 "input": {
   "search": {
     "request": {
       "body": {
         "query" : {
           "bool" : {
             "must" : [
               {
                   "match" : {
                       "province_state.keyword" : "Change this"
                   }
               }
             ]
           }
         },
         "size" : 2,
         "sort" : {
           "@timestamp" : {
             "order" : "desc"
           }
         }
       },
       "indices": [
         "covid-19-live-update"
       ]
     }
   }
 },
 "condition": {
     "compare" : {
         "ctx.payload.hits.total" : {
             "gt" : 1
         }
     }
 },
 "transform" : {
     "script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
 },
 "actions": {
     "email" : {
       "email" : {
         "to" : "username@example.org",
         "subject" : "COVID-19 Status for {{ctx.payload.province}}, {{ctx.payload.country}}",
         "body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})"
       }
     }
 }
}

Country level Watcher alert

-- CODE markdown--
{
 "trigger": {
   "schedule": {
     "interval": "24h"
   }
 },
 "input": {
   "search": {
     "request": {
       "body": {
         "query" : {
           "bool" : {
             "must" : [
               {
                   "match" : {
                       "country_region.keyword" : "Change this"
                   }
               }
             ]
           }
         },
         "size" : 2,
         "sort" : {
           "@timestamp" : {
             "order" : "desc"
           }
         }
       },
       "indices": [
         "covid-19-live-update"
       ]
     }
   }
 },
 "condition": {
     "compare" : {
         "ctx.payload.hits.total" : {
             "gt" : 1
         }
     }
 },
 "transform" : {
     "script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
 },
 "actions": {
     "email" : {
       "email" : {
         "to" : "username@example.org",
         "subject" : "COVID-19 Status for {{ctx.payload.country}}",
         "body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})"
       }
     }
 }
}

In part 2 of this blog post we will also show how you can correlate the Coronavirus data with other data sets. This will help businesses to correlate their operational and business data with the COVID-19 outbreak in order to adjust their activities.

If you require help or assistance with this process please contact us at contact@siscale.com