Importing COVID-19 data into Elasticsearch

 

UPDATE

Due to a change in the way that John Hopkins Hospital organizes the COVID-19 related data the pipeline together with the index template have been changed. The below configurations for both the pipeline and template have been updated to work properly when importing COVID-19 data into elasticsearch. These changes should not impact the dashboard.

We launched an interactive Kibana dashboard for the COVID-19 outbreak. You can use the dashboard for filtering and comparing COVID-19 data from different countries, regions and counties enabling you to run your own analysis of the outbreak.

SEE DASHBOARD

The outbreak of COVID-19 virus has had a huge impact on both personal lives as well as business activities and in light of this, access to data is essential in allowing everyone to make informed decisions regarding day-to-day activities. John Hopkins Hospital, which is deeply involved in the international response to the virus, is collecting and publishing the data regarding the outbreak through GitHub as well as daily reports and a live dashboard.

In this blog post we will provide you with the details needed to create a logstash pipeline that brings the data collected by John Hopkins Hospital into your own Elasticsearch cluster. We also added the details for this process in a GitHub repository that can be found here.

If you are interested to see how Siscale can help you during these uncertain times check out this page. For more information on ingesting COVID-19 data read on!

 

Logstash

Below is the Logstash code required to collect, transform the data into time series data and import it into Elasticsearch. The code contains commented lines which explain what each part of the code does.


input {
    
    #Use the generator input to update the dataset when the pipeline is first started
    generator {
        lines => [ "first-time-run" ]
        count => 1
        tags => "check_github"
    }

    #The http_poller input plugin is used to schedule checks for dataset updates
    #The "schedule" setting is defined to check for updates every new hour (at minute 0)
    http_poller {
        urls => {
            check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports"
        }
        tags => "check_github"
        request_timeout => 60
        schedule => { "cron" => "0 * * * * UTC" }
        codec => "plain"
        metadata_target => "http_poller_metadata"
    }
}

filter {

    #The pipeline will treat two types of events:
    #The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data 
    #The second type is the time series data itself
    #This 'if' discriminates between the two types. The time series data is treated later
    if "check_github" in [tags] {
        ruby {
            init => '
                require "csv"
                require "open-uri"
                require "digest"
                require "json"

                #Path to store the MD5 hashes of the downloaded CSV files
                #This is used to keep track of any changes to the Github repo and optimize the update process
                @github_stored_hashes_path = "/etc/logstash/covid-19-hashes.json"

                #Base URL for downloading the time series CSVs
                @github_daily_reports_base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/"

                @states = { "AL" => "Alabama", "AK" => "Alaska", "AS" => "American Samoa", "AZ" => "Arizona", "AR" => "Arkansas", "CA" => "California", "CO" => "Colorado", "CT" => "Connecticut", "DE" => "Delaware", "DC" => "District of Columbia", "FM" => "Federated States of Micronesia", "FL" => "Florida", "GA" => "Georgia", "GU" => "Guam", "HI" => "Hawaii", "ID" => "Idaho", "IL" => "Illinois", "IN" => "Indiana", "IA" => "Iowa", "KS" => "Kansas", "KY" => "Kentucky", "LA" => "Louisiana", "ME" => "Maine", "MH" => "Marshall Islands", "MD" => "Maryland", "MA" => "Massachusetts", "MI" => "Michigan", "MN" => "Minnesota", "MS" => "Mississippi", "MO" => "Missouri", "MT" => "Montana", "NE" => "Nebraska", "NV" => "Nevada", "NH" => "New Hampshire", "NJ" => "New Jersey", "NM" => "New Mexico", "NY" => "New York", "NC" => "North Carolina", "ND" => "North Dakota", "MP" => "Northern Mariana Islands", "OH" => "Ohio", "OK" => "Oklahoma", "OR" => "Oregon", "PW" => "Palau", "PA" => "Pennsylvania", "PR" => "Puerto Rico", "RI" => "Rhode Island", "SC" => "South Carolina", "SD" => "South Dakota", "TN" => "Tennessee", "TX" => "Texas", "UT" => "Utah", "VT" => "Vermont", "VI" => "Virgin Islands", "VA" => "Virginia", "WA" => "Washington", "WV" => "West Virginia", "WI" => "Wisconsin", "WY" => "Wyoming" }

                #Input: a CSV and the date of the CSV
                #Output: an array of Logstash events extracted from the CSV that will be sent to Elasticsearch
                def create_events(csv, current_date)
                    totals_by_country = Hash.new
                    totals_by_country["WORLD"] = LogStash::Event.new
                    totals_by_country["WORLD"].set("total_confirmed", 0)
                    totals_by_country["WORLD"].set("total_recovered", 0)
                    totals_by_country["WORLD"].set("total_deaths", 0)
                    totals_by_country["WORLD"].tag("total_world")
                    totals_by_country["WORLD"].tag("covid_time_series")
                    totals_by_country["WORLD"].set("timestamp", current_date.iso8601(3).to_s)
                    totals_by_country["WORLD"].set("unique_id", "totals_for_world_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
                    all_country_region = Array.new
                    all_province_state = Array.new
                    all_county = Array.new
                    all_country_region.append("WORLD")

                    events_array = Array.new
                    #For each event..
                    csv.each do |row|
                        #..create a new Logstash event that will store the data
                        new_event = LogStash::Event.new

                        #..add current row data to the current event
                        row.each do |k, v|
                            #new_event[k] = v
                            new_event.set(k, v&.strip)
                        end

                        #..store the date of the reporting
                        new_event.set("timestamp", current_date.iso8601(3).to_s)
                        #..store the date when Logstash processed the event
                        new_event.set("last_process", Time.now.iso8601(3).to_s)

                        #..some early events contain empty fields when the reported numbers are 0. Fix this here
                        new_event.set("confirmed", 0) if new_event.get("confirmed").nil?
                        new_event.set("recovered", 0) if new_event.get("recovered").nil?
                        new_event.set("deaths", 0) if new_event.get("deaths").nil?
                        #..calculate the number of active cases, if the field is not already populated
                        active = new_event.get("confirmed").to_i - new_event.get("recovered").to_i - new_event.get("deaths").to_i
                        new_event.set("active", active) if active >= 0

                        #..calculate ratios and store them
                        ratio_deaths_to_confirmed = new_event.get("deaths").to_f / new_event.get("confirmed").to_f
                        ratio_recovered_to_confirmed = new_event.get("recovered").to_f / new_event.get("confirmed").to_f
                        ratio_deaths_to_recovered = new_event.get("deaths").to_f / new_event.get("recovered").to_f
                        new_event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
                        new_event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
                        new_event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?

                        if new_event.get("province_state")&.match?(/(.*?),\s(\w{2})/)
                            matched = new_event.get("province_state").match(/(.*?),\s(\w{2})/)
                            @states[matched[2]].nil? ? new_event.set("province_state", matched[2]) : new_event.set("province_state", @states[matched[2]])
                            new_event.set("admin2", matched[1].gsub(/ [Cc]ounty/, ""))
                            new_event.tag("added_county")
                        end

                        #..generate a unique key for the current event
                        key = ((new_event.get("admin2") || "").downcase + "_" + (new_event.get("province_state") || "").downcase + "_" + (new_event.get("country_region") || "").downcase + "_" + (current_date.strftime("%m-%d-%Y") || "").to_s).gsub(/[^\w\d]/, "_")
                        new_event.set("unique_id", key)

                        #..add a tag to identify the event later
                        new_event.tag("covid_time_series")
                        events_array.append(new_event)

                        if ["Channel Islands", "Cayman Islands"].include?(new_event.get("country_region"))
                            new_event.set("province_state", new_event.get("country_region"))
                            new_event.set("country_region", "United Kingdom")
                        end
                        if ["Saint Martin", "St. Martin"].include?(new_event.get("country_region"))
                            new_event.set("province_state", "St Martin")
                            new_event.set("country_region", "France")
                        end

                        country_normalization = {
                            "Mainland China" => "China",
                            "South Korea" => "Korea, South",
                            "Republic of Korea" => "Korea, South",
                            "Iran \(Islamic Republic of\)" => "Iran",
                            "Bahamas, The" => "Bahamas",
                            "Gambia, The" => "Gambia",
                            "Hong Kong SAR" => "China",
                            "Hong Kong" => "China",
                            "Macao SAR" => "China",
                            "Macau" => "China",
                            "Republic of Ireland" => "Ireland",
                            "Republic of Moldova" => "Moldova",
                            "Republic of the Congo" => "Congo (Brazzaville)",
                            "Russian Federation" => "Russia",
                            "Taipei and environs" => "Taiwan",
                            "Taiwan\*" => "Taiwan",
                            "UK" => "United Kingdom",
                            "Vatican City" => "Holy See",
                            "Viet Nam" => "Vietnam",
                            "occupied Palestinian territory" => "West Bank and Gaza",
                            "Ivory Coast" => "Cote d\'Ivoire",
                            "East Timor" => "Timor-Leste",
                            "Czech Republic" => "Czechia",
                            "The Bahamas" => "Bahamas",
                            "The Gambia" => "Gambia",
                            "Cape Verde" => "Cabo Verde",
                            "North Ireland" => "Ireland",
                            "Palestine" => "West Bank and Gaza",
                        }
                        new_event.set("country_region", new_event.get("country_region").gsub(/.*/, country_normalization)) unless country_normalization[new_event.get("country_region")].nil?

                        if totals_by_country[new_event.get("country_region")].nil?
                            totals_by_country[new_event.get("country_region")] = LogStash::Event.new
                            totals_by_country[new_event.get("country_region")].set("confirmed", 0)
                            totals_by_country[new_event.get("country_region")].set("recovered", 0)
                            totals_by_country[new_event.get("country_region")].set("deaths", 0)
                            totals_by_country[new_event.get("country_region")].set("country_region", new_event.get("country_region"))
                            totals_by_country[new_event.get("country_region")].set("timestamp", new_event.get("timestamp"))
                            totals_by_country[new_event.get("country_region")].tag("total_by_country_region")
                            totals_by_country[new_event.get("country_region")].tag("covid_time_series")
                            totals_by_country[new_event.get("country_region")].set("unique_id", "totals_for_" + new_event.get("country_region").downcase + "_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
                        end
                        totals_by_country[new_event.get("country_region")].set("confirmed", new_event.get("confirmed").to_i + totals_by_country[new_event.get("country_region")].get("confirmed").to_i)
                        totals_by_country[new_event.get("country_region")].set("deaths", new_event.get("deaths").to_i + totals_by_country[new_event.get("country_region")].get("deaths").to_i)
                        totals_by_country[new_event.get("country_region")].set("recovered", new_event.get("recovered").to_i + totals_by_country[new_event.get("country_region")].get("recovered").to_i)
                        ratio_deaths_to_confirmed = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
                        ratio_recovered_to_confirmed = totals_by_country[new_event.get("country_region")].get("recovered").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
                        ratio_deaths_to_recovered = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("recovered").to_f
                        totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
                        totals_by_country[new_event.get("country_region")].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
                        totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?

                        totals_by_country["WORLD"].set("total_confirmed", totals_by_country["WORLD"].get("total_confirmed").to_i + new_event.get("confirmed").to_i)
                        totals_by_country["WORLD"].set("total_recovered", totals_by_country["WORLD"].get("total_recovered").to_i + new_event.get("recovered").to_i)
                        totals_by_country["WORLD"].set("total_deaths", totals_by_country["WORLD"].get("total_deaths").to_i + new_event.get("deaths").to_i)
                        all_country_region.append(new_event.get("country_region")) if !new_event.get("country_region").nil? and !all_country_region.include?(new_event.get("country_region"))
                        all_province_state.append(new_event.get("province_state")) if !new_event.get("province_state").nil? and !all_province_state.include?(new_event.get("province_state"))
                        all_county.append(new_event.get("admin2")) if !new_event.get("admin2").nil? and !all_county.include?(new_event.get("admin2"))
                    end

                    ratio_deaths_to_confirmed = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
                    ratio_recovered_to_confirmed = totals_by_country["WORLD"].get("total_recovered").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
                    ratio_deaths_to_recovered = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_recovered").to_f
                    totals_by_country["WORLD"].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
                    totals_by_country["WORLD"].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
                    totals_by_country["WORLD"].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
                    totals_by_country["WORLD"].set("country_region", all_country_region)
                    totals_by_country["WORLD"].set("province_state", all_province_state)
                    totals_by_country["WORLD"].set("admin2", all_county)
                    totals_by_country.each do |k, v|
                        events_array.append(v)
                    end

                    return events_array
                end
            '
            code => '
                #First day in the dataset
                github_daily_reports_first_day = Time.gm(2020, 01, 22, 12, 00)
                #Current day
                github_daily_reports_last_day = Time.now
                current_date = github_daily_reports_first_day

                #If the CSV hashes are stored in a file, parse them..
                if File.file?(@github_stored_hashes_path)
                    stored_hashes = JSON.parse(File.read(@github_stored_hashes_path))
                    #..else, create a new hash
                else
                    stored_hashes = Hash.new
                end

                #For each day since the beggining of the dataset, until the current day..
                while current_date <= github_daily_reports_last_day begin #..generate the URL based on the current_day current_url = @github_daily_reports_base_url + current_date.strftime("%m-%d-%Y") + ".csv" #..parse the CSV while normalizing the headers #(e.g. Until 21.03.2020, the country is stored under "Country/Region". After this date, it is "Country_Region" #We normalize this such that both variations are ultimately stored under "country_region". This applies to other fields as well) current_csv = CSV.parse(open(current_url).read.gsub("\uFEFF", ""), headers: true, header_converters: lambda { |h| h.downcase.gsub(/[^\w]/, "_") }) #..generate the CSV hash current_md5 = Digest::SHA256.hexdigest(current_csv.to_s) rescue #The pipeline will usually fail to fetch data for the current day, if it was not already published logger.warn? and logger.warn("Failed to fetch CSV for date " + current_date.strftime("%m-%d-%Y")) else #..if the hash for the current CSV is different than the stored one, process the events if stored_hashes[current_url] != current_md5 events_array = create_events(current_csv, current_date) #..store the new hash stored_hashes[current_url] = current_md5 #..and send each event back through the pipeline events_array.each do |event| new_event_block.call(event) end end end #Check the next day current_date += 86400 end #Attempt to write the new CSV hashes to the disk begin File.write(@github_stored_hashes_path, stored_hashes.to_json) logger.info? and logger.info("Succesfuly updated cache at #{@github_stored_hashes_path}") rescue StandardError => ex
                    logger.error? and logger.error("(#{ex.class} - #{ex.message}) Failed to write hashes to file #{@github_stored_hashes_path}. Does Logstash have read/write access to this location?")
                ensure
                    #..and cancel this event. We are only interested in the time series data, which was already sent back through the pipeline.
                    event.cancel
                end
            '
        }#end ruby 
    }#end if
    
    #Each time series data event will be sent back through the pipeline.
    #This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
    if "covid_time_series" in [tags] {
        #Parse date fields
        date {
            #When the github dataset was last updated by the maintainers
            match => [ "last_update", "yyyy-MM-dd HH:mm:ss", "M/d/yy HH:mm", "M/d/yyyy HH:mm", "ISO8601" ] 
            target => "last_update"
        }#end date
        date {
            #When the event was processed by Logstash
            match => ["last_process", "ISO8601" ]
            target => "last_process"
        }#end date
        date {
            #The true date of the report
            match => [ "timestamp", "ISO8601" ]
            target => "@timestamp"
        }#end date


      if "total_by_country_region" not in [tags] and "total_world" not in [tags] {
        mutate {
           #Normalize the location field
            rename => {
                "lat" => "[location][lat]"
                "latitude" => "[location][lat]"
                "lon" => "[location][lon]"
                "long" => "[location][lon]"
                "long_" => "[location][lon]"
                "longitude" => "[location][lon]"
            }
        } 
        ruby {
        code => '
            @state_coordinates = { "Wisconsin" => { "lat" => "44.5", "lon" => "-89.5" }, "West Virginia" => { "lat" => "39", "lon" => "-80.5" }, "Vermont" => { "lat" => "44", "lon" => "-72.699997" }, "Texas" => { "lat" => "31", "lon" => "-100" }, "South Dakota" => { "lat" => "44.5", "lon" => "-100" }, "Rhode Island" => { "lat" => "41.700001", "lon" => "-71.5" }, "Oregon" => { "lat" => "44", "lon" => "-120.5" }, "New York" => { "lat" => "43", "lon" => "-75" }, "New Hampshire" => { "lat" => "44", "lon" => "-71.5" }, "Nebraska" => { "lat" => "41.5", "lon" => "-100" }, "Kansas" => { "lat" => "38.5", "lon" => "-98" }, "Mississippi" => { "lat" => "33", "lon" => "-90" }, "Illinois" => { "lat" => "40", "lon" => "-89" }, "Delaware" => { "lat" => "39", "lon" => "-75.5" }, "Connecticut" => { "lat" => "41.599998", "lon" => "-72.699997" }, "Arkansas" => { "lat" => "34.799999", "lon" => "-92.199997" }, "Indiana" => { "lat" => "40.273502", "lon" => "-86.126976" }, "Missouri" => { "lat" => "38.573936", "lon" => "-92.60376" }, "Florida" => { "lat" => "27.994402", "lon" => "-81.760254" }, "Nevada" => { "lat" => "39.876019", "lon" => "-117.224121" }, "Maine" => { "lat" => "45.367584", "lon" => "-68.972168" }, "Michigan" => { "lat" => "44.182205", "lon" => "-84.506836" }, "Georgia" => { "lat" => "33.247875", "lon" => "-83.441162" }, "Hawaii" => { "lat" => "19.741755", "lon" => "-155.844437" }, "Alaska" => { "lat" => "66.160507", "lon" => "-153.369141" }, "Tennessee" => { "lat" => "35.860119", "lon" => "-86.660156" }, "Virginia" => { "lat" => "37.926868", "lon" => "-78.024902" }, "New Jersey" => { "lat" => "39.833851", "lon" => "-74.871826" }, "Kentucky" => { "lat" => "37.839333", "lon" => "-84.27002" }, "North Dakota" => { "lat" => "47.650589", "lon" => "-100.437012" }, "Minnesota" => { "lat" => "46.39241", "lon" => "-94.63623" }, "Oklahoma" => { "lat" => "36.084621", "lon" => "-96.921387" }, "Montana" => { "lat" => "46.96526", "lon" => "-109.533691" }, "Washington" => { "lat" => "47.751076", "lon" => "-120.740135" }, "Utah" => { "lat" => "39.41922", "lon" => "-111.950684" }, "Colorado" => { "lat" => "39.113014", "lon" => "-105.358887" }, "Ohio" => { "lat" => "40.367474", "lon" => "-82.996216" }, "Alabama" => { "lat" => "32.31823", "lon" => "-86.902298" }, "Iowa" => { "lat" => "42.032974", "lon" => "-93.581543" }, "New Mexico" => { "lat" => "34.307144", "lon" => "-106.018066" }, "South Carolina" => { "lat" => "33.836082", "lon" => "-81.163727" }, "Pennsylvania" => { "lat" => "41.203323", "lon" => "-77.194527" }, "Arizona" => { "lat" => "34.048927", "lon" => "-111.093735" }, "Maryland" => { "lat" => "39.045753", "lon" => "-76.641273" }, "Massachusetts" => { "lat" => "42.407211", "lon" => "-71.382439" }, "California" => { "lat" => "36.778259", "lon" => "-119.417931" }, "Idaho" => { "lat" => "44.068203", "lon" => "-114.742043" }, "Wyoming" => { "lat" => "43.07597", "lon" => "-107.290283" }, "North Carolina" => { "lat" => "35.782169", "lon" => "-80.793457" }, "Louisiana" => { "lat" => "30.39183", "lon" => "-92.329102" } }

            if (event.get("[location][lat]").nil? || event.get("[location][lon]").nil?) || (event.get("[location][lat]").to_f == 0 && event.get("[location][lon]").to_f == 0)
                if (event.get("[country_region]") == "US") and (!@state_coordinates[event.get("[province_state]")].nil?)
                    event.set("[location][lat]", @state_coordinates[event.get("province_state")]["lat"])
                    event.set("[location][lon]", @state_coordinates[event.get("province_state")]["lon"])
                else
                    event.set("[location][lat]", 0)
                    event.set("[location][lon]", 0)
                end
            end
            '
        }
      }

        #Rename the fields if you want to correlate with other datasets
        mutate { 
            rename => {
                "province_state" => "province_state"
                "country_region" => "country_region"
                "admin2" => "county"
                "fips" => "fips" 
                
                "confirmed" => "confirmed"
                "deaths" => "deaths"
                "recovered" => "recovered"
                "active" => "active"
                
                "[location][lat]" => "[location][lat]"
                "[location][lon]" => "[location][lon]"

                "last_update" => "last_update"
                "last_process" => "last_process"
                "combined_key" => "combined_key"

                "ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
                "ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
                "ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
                
                "unique_id" => "unique_id"
            }
        }#end mutate
   }#end if
}#end filter

output {
    #Send the data to Elasticsearch
    elasticsearch {
        #Add your Elasticsearch hosts
        hosts => [""]
        #Add the index name
        index => "covid-19-live-update"
        #Add Elasticsearch credentials
        user => ""
        password => ""
        #Add SSL details
        #ssl => true
        #cacert => "/path/to/cert/"

        #Update documents based on the unique id that we defined
        action => "update"
        document_id => "%{unique_id}"
        #..or create a new document if it doesn't already exist
        doc_as_upsert => true
        manage_template => false
    }
}

 

Elasticsearch index

In addition to the Logstash code you will also require a template for the index in elasticsearch. The field mapping can be found below (feel free to change the name of the index but remember to change it also in the Logstash code):


PUT _template/covid-19-live-update
{
    "order" : 0,
    "index_patterns" : [
      "covid-19-live-update"
    ],
    "settings" : { },
    "mappings" : {
      "properties" : {
        "recovered" : {
          "type" : "long"
        },
        "updated_at" : {
          "type" : "date"
        },
        "ratio_deaths_to_recovered" : {
          "type" : "float"
        },
        "ratio_recovered_to_confirmed" : {
          "type" : "float"
        },
        "ratio_deaths_to_confirmed" : {
          "type" : "float"
        },
        "location" : {
          "type" : "geo_point"
        },
        "confirmed" : {
          "type" : "long"
        },
        "deaths" : {
          "type" : "long"
        },
        "last_update" : {
          "type" : "date"
        },
        "last_process" : {
          "type" : "date"
        },
        "@timestamp" : {
          "type" : "date"
        }
      }
    },
    "aliases" : { }
  }

 

Kibana dashboard

In the GitHub repository you will also find the dashboard from this article which you can import into your own Kibana. The dashboard was built using the latest version of the Elastic Stack (7.6.1). In order to import the dashboard you will have to navigate in the Kibana interface to Management > Saved Objects and click import.

The GitHub repository can be found here.

 

Watcher alerting

Once these steps are done you will have the data into Elasticsearch and available through Kibana

We are also attaching two Watcher codes (alerting) to this article which will send an email to the configured email address with details regarding the number of infected people, recovered and deceased depending on the region of interest. Because not all countries have information at the province/region level (this is how the data is collected by John Hopkins Hospital) there is one Watcher for regional data and one for country-wide data. In the code you will find the following tag “change this” which should be changed with the desired value.

A few considerations for the alerts:

  • Make sure to change just the text within the double quotes without deleting them
  • Province and country names should be written with capital letters ( e.g. New York, China etc.)
  • Feel free to change the interval value at the top of the code with the desired interval with which you want the alerting to occur (values are in the form of a number followed by m for minute, h for hour, d for days without deleting the double quotes)
  • Keep in mind that the data from John Hopkins Hospital is updated once or twice a day from our observations
  • The following countries have regional information:
    • US
    • China
    • Australia
    • Canada
    • France
    • United Kingdom
    • Denmark
    • Cruise Ship
  • In order to be able to send emails you need to configure your Elastic Stack accordingly. Details can be found here.

 

Province/Region Watcher alert


{
  "trigger": {
    "schedule": {
      "interval": "24h"
    }
  },
  "input": {
    "search": {
      "request": {
        "body": {
          "query" : {
            "bool" : {
              "must" : [
                {
                    "match" : {
                        "province_state.keyword" : "Change this"
                    }
                }
              ]
            }
          },
          "size" : 2,
          "sort" : {
            "@timestamp" : {
              "order" : "desc"
            }
          }
        },
        "indices": [
          "covid-19-live-update"
        ]
      }
    }
  },
  "condition": {
      "compare" : {
          "ctx.payload.hits.total" : {
              "gt" : 1
          }
      }
  },
  "transform" : {
      "script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
  },
  "actions": {
      "email" : { 
        "email" : { 
          "to" : "username@example.org", 
          "subject" : "COVID-19 Status for {{ctx.payload.province}}, {{ctx.payload.country}}", 
          "body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})" 
        }
      }
  }
}

 

Country level Watcher alert


{
  "trigger": {
    "schedule": {
      "interval": "24h"
    }
  },
  "input": {
    "search": {
      "request": {
        "body": {
          "query" : {
            "bool" : {
              "must" : [
                {
                    "match" : {
                        "country_region.keyword" : "Change this"
                    }
                }
              ]
            }
          },
          "size" : 2,
          "sort" : {
            "@timestamp" : {
              "order" : "desc"
            }
          }
        },
        "indices": [
          "covid-19-live-update"
        ]
      }
    }
  },
  "condition": {
      "compare" : {
          "ctx.payload.hits.total" : {
              "gt" : 1
          }
      }
  },
  "transform" : {
      "script" : "return [ 'country' : ctx.payload.hits.hits[0]._source.country_region, 'province' : ctx.payload.hits.hits[0]._source.province_state, 'confirmed_today' : ctx.payload.hits.hits[0]._source.confirmed, 'deaths_today' : ctx.payload.hits.hits[0]._source.deaths, 'recovered_today' : ctx.payload.hits.hits[0]._source.recovered, 'confirmed_yesterday' : ctx.payload.hits.hits[1]._source.confirmed, 'deaths_yesterday' : ctx.payload.hits.hits[1]._source.deaths, 'recovered_yesterday' : ctx.payload.hits.hits[1]._source.recovered, 'difference_confirmed' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.confirmed) - Integer.parseInt(ctx.payload.hits.hits[1]._source.confirmed)), 'difference_deaths' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.deaths) - Integer.parseInt(ctx.payload.hits.hits[1]._source.deaths)), 'difference_recovered' : (Integer.parseInt(ctx.payload.hits.hits[0]._source.recovered) - Integer.parseInt(ctx.payload.hits.hits[1]._source.recovered))]"
  },
  "actions": {
      "email" : { 
        "email" : { 
          "to" : "username@example.org", 
          "subject" : "COVID-19 Status for {{ctx.payload.country}}", 
          "body" : "Confirmed/Recovered/Dead: {{ctx.payload.confirmed_today}} (+{{ctx.payload.difference_confirmed}}) / {{ctx.payload.recovered_today}} (+{{ctx.payload.difference_recovered}}) / {{ctx.payload.deaths_today}} (+{{ctx.payload.difference_deaths}})" 
        }
      }
  }
}

 

In part 2 of this blog post we will also show how you can correlate the Coronavirus data with other data sets. This will help businesses to correlate their operational and business data with the COVID-19 outbreak in order to adjust their activities.

If you require help or assistance with this process please contact us at contact@siscale.com