Collecting 911 Data with OpenWhisk Cron Triggers

Today I'm sharing probably the most complex thing I've built with OpenWhisk. While I'm proud of it, I will remind people I'm still the newbie to this world, so keep that in mind as I explain what I did.

Many years ago, like seven (holy crap), I built a ColdFusion demo that parsed local 911 data and persisted it locally to a database: Proof of Concept 911 Viewer.

I used a Yahoo Pipe to suck in the HTML data provided by a local police web site and convert into something I could store. It wasn't necessarily rocket science, but it was fun to build. It was even more fun when I forgot I had automated it and came back months later to look at all the data I collected: Update to my 911 Viewer

That demo was on my mind recently and I thought it would be an excellent thing to try building with OpenWhisk. With that in mind, I built the following:

  • First, an action that parses the data.
  • Second, an action that takes input data, sees if the data exists in a Cloudant data store, and then if not, adds it.
  • A sequence to connect the two.
  • A Cron-based schedule to periodically check the data.

That sounds like a lot, and pretty complex, but breaking it down into component parts/features made it simpler to work with and let me try some parts of OpenWhisk that I had not played with yet, specifically the Cron-trigger aspect. Let's take it step by step.

Parsing Raw HTML Data

The data I'm parsing lives at http://lafayette911.org. As you can see, it is a table of incident reports:

Web site

I began by doing a quick view source to see how the HTML was created. Turned out the table was driven by an iframe pointing to http://apps.lafayettela.gov/L911/default.aspx. Looking at the source code there I saw that the data was driven by an Ajax call to http://apps.lafayettela.gov/L911/Service2.svc/getTrafficIncidents. I got excited because I thought - for a moment - that I wouldn't have to parse anything. Turns out, the JSON was actually formatted HTML (I slimmed it down a bit):


{"d":" <center><a href=\"#KEY\">KEY<\/a><table border=0 bgcolor=\"white\"><tr bgcolor=\"#99FF99\"><td><b>&nbsp;<a href='http:\/\/maps.google.com\/maps?q=2909+NW+EVANGELINE+THROUGHWAY+,LAFAYETTE+LA' target='_new'>2909 NW EVANGELINE TW<\/a>&nbsp;<BR>&nbsp;LAFAYETTE,LA&nbsp;<\/b><\/td><td><b>Vehicle Accident w\/ Injuries<\/b><\/td><td><b>02\/14\/2017 - 11:59 AM<\/b><\/td><td><b>P F M <\/b><\/td><\/tr><tr bgcolor=\"#FFFF99\"><td><b>&nbsp;<a href='http:\/\/maps.google.com\/maps?q=1100+SE+EVANGELINE+THROUGHWAY+,LAFAYETTE+LA' target='_new'>1100 SE EVANGELINE TW<\/a>&nbsp;<BR>&nbsp;LAFAYETTE,LA&nbsp;<\/b><\/td><td><b>Vehicle Accident w\/ Injuries<\/b><\/td><td><b>02\/14\/2017 - 11:40 AM<\/b><\/td><td><b>P F M <\/b><\/td><\/tr><\/table><small>Data Updated at 02\/14\/2017 - 1:12:38 PM <\/small><\/center><script>$('dateline').innerHTML = '02\/14\/2017 - 1:12:38 PM'; <\/script>"}

Sigh

Ok, so luckily, I knew how to work with this. I did a demo last year involving web scraping with Cheerio (Scraping a web page in Node with Cheerio) and I knew that worked well, so my action focused around working with that. Remember, to include random npm packaged with OpenWhisk, you have to use a zipped action that includes the package.json and node_modules directory. It's a bit more work, but marginally so.

The other slightly complex aspect was that I wanted to geocode the addresses. For that I used Google's excellent Geocode API that is part of the Maps SDK. Here is the entire action.


let cheerio = require('cheerio');
let request = require('request');

function main(args) {

    return new Promise((resolve, reject) => {

        request('http://apps.lafayettela.gov/L911/Service2.svc/getTrafficIncidents', {method:'post'}, function(err, response, body) {

            if(err) reject(err);

            let results = [];
            // body is a json packet, html is in d
            let $ = cheerio.load(JSON.parse(body).d);
            let channels = $('tr');
            //channel 0 is the header
            for(let i=1;i<channels.length;i++) {
                let channelRow = channels.get(i);
                let cells = $(channelRow).children();
                //console.log(channelRow);
                let loc = $(cells.get(0)).text().trim();
                let reason = $(cells.get(1)).text().trim();
                let timestamp = $(cells.get(2)).text().trim();
                let [daypart,timepart] = timestamp.split(' - ');
                let incidentDate = new Date(daypart + ' '+timepart);
                let assisting = $(cells.get(3)).text().trim().split(' ');
                //package it up
                results.push({location:loc, reason:reason, timestamp:incidentDate, assisting: assisting});
            }

            /*
            New logic - for each, geocode
            */
            let promises = [];
            results.forEach(function(res) {

                console.log('need to work on '+res.location);
                promises.push(new Promise( (resolve, reject) => {
                    let geourl = 'https://maps.googleapis.com/maps/api/geocode/json?address='+encodeURIComponent(res.location);
                    console.log(geourl);
                    request(geourl, function(err, response, body) {
                        if(err) reject(err);
                        let geoResult = {};
                        let geodata = JSON.parse(body);
                        if(geodata.status === 'OK') {
                            geoResult.geostatus = true;
                            geoResult.geo = geodata.results[0].geometry.location;
                        } else {
                            geoResult.geostatus = false;
                        }
                        resolve(geoResult);
                        //console.log(body);
                    });

                }));

            });
            Promise.all(promises).then(function(geodata) {
                console.log('done with all promises');
                //brittle code here, geodata len != results
                for(var i=0;i<geodata.length;i++) {
                    results[i].geo = geodata[i];
                }
                resolve({ traffic:results });
            });


        });

    });

}

exports.main = main;

So from the top - we begin with a generic request for the data. Once we've got that, we can ask Cheerio to turn into a DOM, just like HTML in the browser. I then grab all the table rows, and then fetch the cells inside each row. I do a bit of manipulation of the time to turn it into a JavaScript data and convert the "assisting" cell into an array.

The next part is a bit complex. I need to geocode all the addresses and this involves N async processes. So I use an array of promises to get all the results and then update the original data. Unfortunately, it looks like the service has an issue with intersections. So for example, an accident at "Johnston and Camelia" isn't properly geocoded even though the map links from the site seem to work well. This could be my fault. Sometimes it worked, sometimes it didn't.

In the end, I get a nice set of data:


{
        "traffic": [
                {
                        "location": "LEE AV & E CYPRESS ST  LAFAYETTE,LA",
                        "reason": "Vehicle Accident",
                        "timestamp": "2017-02-14T19:08:00.000Z",
                        "assisting": [
                                "P",
                                "M"
                        ],
                        "geo": {
                                "geostatus": true,
                                "geo": {
                                        "lat": 30.2256757,
                                        "lng": -92.0149277
                                }
                        }
                },
                {
                        "location": "E UNIVERSITY AV & W PINHOOK RD  LAFAYETTE,LA",
                        "reason": "Vehicle Accident",
                        "timestamp": "2017-02-14T19:02:00.000Z",
                        "assisting": [
                                "S",
                                "P"
                        ],
                        "geo": {
                                "geostatus": true,
                                "geo": {
                                        "lat": 30.21055,
                                        "lng": -92.0097742
                                }
                        }
                },
                {
                        "location": "6801  JOHNSTON ST  LAFAYETTE,LA",
                        "reason": "Vehicle Accident",
                        "timestamp": "2017-02-14T19:00:00.000Z",
                        "assisting": [
                                "P"
                        ],
                        "geo": {
                                "geostatus": true,
                                "geo": {
                                        "lat": 30.150066,
                                        "lng": -92.0934762
                                }
                        }
                },
                {
                        "location": "W PINHOOK RD &  BENDEL RD  LAFAYETTE,LA",
                        "reason": "Vehicle Accident",
                        "timestamp": "2017-02-14T18:44:00.000Z",
                        "assisting": [
                                "P"
                        ],
                        "geo": {
                                "geostatus": true,
                                "geo": {
                                        "lat": 30.1990935,
                                        "lng": -92.0163944
                                }
                        }
                }
        ]
}

Not bad! Ok, on to step two - storing the data.

Persisting the Data with Cloudant

To store the data, I provisioned a new Cloudant service with Bluemix. OpenWhisk can automatically pick up new Cloudant services and add a package to your account with actions/triggers to interact with that service. To work with those actions, I built my own action tasked with handling an input of data, checking to see if it's new, and then adding it. Here is that action.


var openWhisk = require('openwhisk');
var ow = openWhisk({
    apihost:'openwhisk.ng.bluemix.net',
    api_key:'my secret is so secret it doesnt know it is a secret'
});

var actionBase = '/rcamden@us.ibm.com_My Space/Bluemix_Cloudant Traffic_Credentials-1';

function main(args) {

    /*
    hard coded for now
    args.traffic = [
        {
            location:"W CONGRESS ST &  CAJUNDOME BL  LAFAYETTE,LA",
            reason:"Flood",
            timestamp:"2017-02-08T20:59:00.000Z"
        },
        {
            location:"ssss W CONGRESS ST &  CAJUNDOME BL  LAFAYETTE,LA",
            reason:"Vehicle Accident",
            timestamp:"2017-02-08T20:59:00.000Z"
        },
        {
            location:"W CONGRESS ST &  CAJUNDOME BL  LAFAYETTE,LA",
            reason:"Monster",
            timestamp:"2017-02-08T20:59:00.000Z"
        },

    ];
    */

    if(!args.traffic) args.traffic = [];
    
    return new Promise((resolve, reject) => {

        let promises = [];
        args.traffic.forEach(function(d) {
            promises.push(addIfNew(d));
        });
        Promise.all(promises).then((results) => {
            console.log('all done like a boss');
            resolve({results:results});
        });

    });

}

function addIfNew(d) {

    return new Promise((resolve, reject) => {
        
        ow.actions.invoke({
            actionName:actionBase+'/exec-query-find',
            blocking:true,
            params:{
                "dbname":"traffic",
                "query":
                    {
                    "selector": {
                        "location": {
                        "$eq": d.location
                        },
                        "reason":{
                        "$eq":d.reason
                        },
                        "timestamp":{
                        "$eq":d.timestamp
                        }
                    },
                    "fields": [
                        "_id"
                    ]
                    }
            }
        }).then(function(res) {
            let numMatches = res.response.result.docs.length;
            if(numMatches === 0) {
                console.log('data is new, so add it');
                ow.actions.invoke({
                    actionName:actionBase+'/write',
                    blocking:true,
                    params:{
                        "dbname":"traffic",
                        "doc":d
                    }
                }).then(function(res) {
                    resolve({result:1});
                }); 
            } else {
                resolve({result:0});
            }
        });
        
    });
    
}

exports.main = main;

From the top - notice I'm using the OpenWhisk package. It basically lets me use OpenWhisk from my action much like I use it from the CLI. This still feels... wrong to me a bit, but I honestly don't know another way to do it. In theory, I could just make REST calls directly to my Cloudant service, but for now, I'm going to use the package. I definitely think I'll probably be doing things differently here in the future.

In the main section, note I've got some hard coded data there commented out. During testing, this is how I handled getting sample data into the action. In the end, it all comes down to the addIfNew block. My Cloudant skills are somewhat weak, but my logic seemed to work well. I query on location, reason, and timestamp, but not the assisting data as I wasn't sure if I could query on array values like that. On the off chance that two accidents happen at the same time at the same location but with different responders I'll just assume the entire multiverse is breaking down and life, as we know it, is pretty much over. (Hey, I won't have to write unit tests!)

If no matches are returned, I then simply pass the data to the write action and - that's it!

Connecting the Dots

Let's recap. I've got an action that can suck down the HTML string and turn it into data. I've got a second action that can take that input and store it, if it is new. Now we need to get this together, scheduled, and run with it.

First, connecting them is trivial - just use a sequence! I called mine handleTraffic and I simply passed it the name of my two actions - getTraffic and addTraffic. The command looks like so:

wsk action create handleTraffic --sequence getTraffic,addTrafic

I then fired off a call to the sequence to make sure that was working. Remember, baby steps.

Alright - here is where things get a bit tricky. I began by creating an Alarm-based trigger. This is a trigger available on the Bluemix OpenWhisk platform that lets you define a Cron-based time to fire. I created mine like so:

wsk trigger create checkTraffic --feed /whisk.system/alarms/alarm --param cron "5 * * * *" 

I always find Cron to be an incomprehensible syntax, so I used http://crontab-generator.org to generate the string for me.

All this does is make an alarm clock - even 5 minutes the trigger will fire. But by itself, that does nothing. In order to make it do something, I then made a rule. The rule simply said, when checkTraffic fires, run my sequence. I called my rule newTrafficRule because I have no imagination.

The OpenWhisk UI does a nice job of representing this - although for the life of me I can't figure out how to get the original Cron setting out. I guess it's something you want to make sure you don't forget.

Visual Representation

I plan on covering the OpenWhisk UI stuff in more detail later, but I want to point out that the monitor was extremely helpful while I was working on this demo. It let me see my actions fire in real time and watch their results.

UI

Wrap Up

All in all, it is working pretty good. I've discovered there is a limit to the amount of queries I can do per second with the free tier of Cloudant, but a paid account wouldn't have this issue of course. I've had this running for a few days now (although initially I didn't have geocoding) and I'm nearing 400 data points. I plan to let this be for a while and come back once I've got a good thousand or so entries and do some fun charting/analyzing of the data.

If anyone has any questions, or suggestions for improvement, let me know below!

Like This?

If you like this article, please consider visiting my Amazon Wishlist or donating via PayPal to show your support. You can also subscribe to the email feed to get notified of new posts.

See Also