Technical Blog

Streaming SQL via MQTT

So many of you found out about us because of our multi-protocol MQTT-focused broker. I think this is great – we’ve gotten our broker to scale up to high levels which has helped keep our customers’ costs low. But a publish/subscribe message broker alone will provide only so much value. Eventually you’ll need to sprinkle some event processing and web APIs onto your streams of raw sensor data.

In this post we introduce our Streaming SQL engine and review a couple typical use cases. This is just an overview, so if you’re looking for details just head over to the documentation.

Use Case: Extract-Transform-Load (ETL)

Let’s first talk about Extract. MQTT is often used as a funnel protocol where data from thousands or millions of sensors is funneled into a system more appropriate for intra-cloud traffic. We have another blog post describing in better detail how to funnel MQTT data into DynamoDB.

Our SQL dialect was built with MQTT in mind. While MQTT is the only binding at the moment, we’ll support extracting data from other stream sources. In fact, SELECT * FROM 'com.example/hello' is really just shorthand for SELECT * FROM mqtt('com.example/hello'). The string is just a fully-capable MQTT subscription topic pattern. You can certainly use the ‘#’ and ‘+’ wildcards. Keep watching our blog to find out when we support new sources.

We can do more than simply funneling data into another system. The SELECT clause is useful for Transforming JSON input. It supports most ANSI SQL expressions, including:

  • Using CAST to convert between data types. i.e. Convert a String into a Number: SELECT CAST(foo AS NUMBER) AS numberFoo FROM 'com.example/hello'
  • Using CASE statements as conditionals. i.e.
    SELECT 
      CASE qos 
        WHEN 0 THEN 'At Most Once' 
        WHEN 1 THEN 'At Least Once' 
        ELSE 'Exactly Once' 
      END AS qosDescription 
    FROM 'com.example/hello'
  • Basic arithmetic expressions using the basic 5 functions (+, -, *, /, %). i.e. SELECT (temp - 32) / 1.8 as celsius FROM 'com.example/hello'
  • Most scalar ANSI SQL functions. These include power, sqrt, sin, cos, lower, trim, replace, and regexp_replace. See a full list here.

But it also lets you select properties from nested JSON objects. The .. operator is used to select properties that are arbitrarily deep.

SELECT i.know.what.i.want, arbitrarily..deep 
FROM 'com.example/hello'

You can also append calculated fields onto the result JSON document.

With this SQL statement:

SELECT *, (temp - 32) / 1.8 AS celsius 
FROM 'com.example/environment'

Given input that contains temp in Fahrenheit:

{
  "pressure": 152.877,
  "temp":68.9
}

Would append the Celsius conversion:

{
  "pressure": 152.877,
  "temp": 68.9,
  "celsius": 20.5
}

The function of Loading data into other systems is performed by integrations. The output of the SQL statement gets piped directly into all integrations. These are specified in JSON when you create the rule. Aside from that, you can also add and remove integrations dynamically after they were created.

POST /3/sql/:rule_id/integrations Add (push) a new integration
DELETE /3/sql/:rule_id/integrations Remove (pop) the last integration that was created
DELETE /3/sql/:rule_id/integrations/:integration_id Remove a specific integration by ID

We have several standard integrations that we provide. Read about them here. The most interesting one, in my opinion, is the DynamoDB integration.

Use Case: Web Hooks

It’s become incredibly easy to throw down a powerful web API via Node.js or Python and deploy to Heroku within just a few minutes. On this note Kyle Roche, our CEO, wrote a blog post on how to inject custom logic into an MQTT stream. It covers the basics on setting up a SQL rule to interact with live streams of MQTT data via a quick-n-dirty Node.js app on Elastic Beanstalk.

The concept is that he has an app deployed on some PaaS service, like Heroku, that accepts messages via HTTP POST requests. The HTTP service can store the message or access other web APIs but in the end the HTTP response is published back over MQTT to the responseTopic.

While the SQL engine is useful for redirecting MQTT traffic into web APIs, it’s much more powerful than simply that. First, let’s digest Kyle’s SQL rule. Here is the JSON needed to create the rule via our API:

{
  "query":"SELECT * FROM 'com.example/hello'",
  "integrations": [
    {
      "type":"http",
      "url":"http://kyleroche.elasticbeanstalk.com",
      "method":"POST",
      "responseTopic":"com.example/hello3"
    }
  ]
}

The query is very basic: SELECT * FROM 'com.example/hello'. It just subscribes to the “com.example/hello” MQTT topic and forwards all traffic into the SQL integrations. We can further filter out messages that don’t match by adding a WHERE clause: SELECT * FROM 'com.example/hello' WHERE test = 'value'. In this case test is a JSON property from the MQTT message. Please take a look at the SQL syntax reference for an up-to-date guide on what can be done with SQL.

The lone SQL integration instructs traffic to be posted to his web service on elasticbeanstalk.com. Other HTTP headers such as Content-Type (or even custom headers) could be added via the headers property.

{
  "query":"SELECT * FROM 'com.example/hello'",
  "integrations": [
    {
      "type":"http",
      "url":"http://kyleroche.elasticbeanstalk.com",
      "method":"POST",
      "responseTopic":"com.example/hello3",
      "headers": {
        "Content-Type": "application/json"
      }
    }
  ]
}

We could also add more integrations to post to other web services, publish to another messaging system like Apache Kafka or Amazon Kinesis, or store the data. There’s a more complete guide to working with integrations in the SQL API reference.

Summary

Streaming SQL rules can provide a lot of value to your business with very little effort. They can ease mundane ETL tasks or be used in conjunction with other web APIs to be extended indefinitely in whatever language or technology that you prefer. Also check out a series of blogs we’ve been writing for integrating our ThingFabric service with Amazon’s Lambda.