いっぺーちゃんの いろいろやってみよ~

REST to MQTTブリッジを作る(その2)

前回の続きです

アプリケーション本体の説明です。

 

MQTTブローカに関するパラメータです。設定できるパラメータについてはここを参照してください。
ここではlocalhostのデフォルトポート(1883)でanonymous接続可能なMQTTブローカを設定しています。
MQTT通信にはMQTT.js(https://www.npmjs.com/package/mqtt)を使用します。

// ======== MQTT settings =====================================================
var mqtt_opt = {protocol: 'mqtt', host:'localhost'};    // 必要なら port, username, password などを追加
// 本番環境はこちら
// var mqtt_opt = {protocol: 'mqtt', host: '<<アドレス>>', port: <<ポーt番号>>, username: '<<ユーザ名>>', password: '<<パスワード>>'};
・・・
// ==== MQTT ==================================================================
var mqtt = require('mqtt');
var mqtt_client  = mqtt.connect(mqtt_opt);

 

ちょっと順番前後しますが、
MQTTの接続時のイベントハンドラです。特に処理することはありませんが、接続時に接続情報を表示しています。
本来なら、エラー時の処理とかリトライ処理とかいろいろ入れないといけない気がしますが、今回は端折りました

// MQTT 接続時の処理
mqtt_client.on('connect', function (connack) {
  console.log('MQTT client connected to brocker');
  console.log("    host: " + mqtt_opt.host);
  console.log("    port: " + mqtt_opt.port);
});

 

Webサーバのポート番号を指定します。
PaaSの場合、システムで指定されたポート番号を使用する必要があるため、そちらが指定されていたらそちらを優先して使用します。
Herokuの場合、システムで指定されたポート番号は環境変数PORTに設定されています。

// ======== WebServer settings ================================================
var webservicePort = 8080;
・・・
var port = process.env.PORT || webservicePort;

 

Webサーバはnode.jsのWebアプリケーションフレームワークであるexpress(http://expressjs.com/ja/)を使用します。
これを使うとかなりお手軽にWebアプリを記述できます。

// ==== express ===============================================================
var express = require('express');
var app = express();

 

POST メソッドで送られてくるbody部のパーサです。
通常の使用ではapp.use()で、すべてのendpointを共通で処理するパーサとして登録するのですが、今回はイリーガルなJSONデータを処理したいので共通処理は登録しません。

// ==== body parser (POSTを使うときのお約束) ==================================
var bodyParser = require('body-parser');
// 個別にパーサを設定するので、app.useで登録しない
// app.use(bodyParser.urlencoded({extended: false}));
// app.use(bodyParser.json());
// app.use(bodyParser.text({type:"*/*" }));        // すべてのコンテンツタイプでテキストパーサを使う

 

ちょっと順番前後しますが、
Webサーバの開始処理です。これ一行だけ。

// Webサーバのスタート
app.listen(port);
console.log('Server started! listen Port: ' + port);

 

QoSの設定値として送られてきたデータのチェック関数です。
0、1、2のみ設定可能なので、それ以外の値や文字列等はすべて0にします。

// QoS設定値の確認
function checkQoS(targetValue) {
    tmp = parseInt(targetValue);
    if (isNaN(tmp))             return 0;     // 数値でないなら0を返す
    if ((tmp < 0) || (tmp > 2)) return 0;     // 範囲外なら0を返す
    return tmp;
}

 

ブール値(false/true)、数値(0/0以外;数値に変換できる文字列を含む)、文字列("true"以外/"true")をブール値(false/true)に変換する関数です。
最初、Boolean(targetValue)とやってたら文字列"false""0"trueになってしまいハマりました。。。

// 文字列"true"/"false" をbooleanに変換する関数
// Boolean(targetValue)だと文字列"false"や"0"がtrueになってしまうので
function parseBoolean(targetValue) {
    var targetType = typeof targetValue;  // 変数の型
    if      (targetType == 'boolean')                           return targetValue;             // booleanならそのまま返す
    else if (targetType == 'number')                            return Boolean(targetValue);    // 数値ならbooleanに変換して返す
    else if (targetType == 'string' || targetValue instanceof String ) {    // 文字列で
        var tmp = parseInt(targetValue);    // 数値に変換してみる
        if (isNaN(tmp)) {
            // 数値に変換できない
            if (targetValue.toLowerCase().trim() == 'true')     return true;                    // 'true' なら trueを返す
            else                                                return false;                   // それ以外はfalseを返す
        }
        else                                                    return Boolean(tmp);            // 数値をbooleanに変換して返す
    }
    else                                                        return false;                   // それ以外の型はfalseを返す
}

 

トップページの表示処理です。
使い方とテストページへのリンクを表示しています。
ejsを使えばよかった気もしますが、この程度の表示なので、直接sendしてしまいました。

// GET rootの処理
app.get('/', function (req, res) {
  // console.log(req);
  var body = 'usage: <br />';
  var server_addr = req.protocol + '://' + req.headers.host;
  body += server_addr + '/mqtt_get/&lt;topic&gt;/&lt;payload&gt;/&lt;QoS: 0-2&gt;/&lt;retain: true or false&gt;<br />';
  body += server_addr + '/mqtt_query?topoc=&lt;topic&gt;&payload=&lt;payload&gt;qos=&lt;QoS: 0-2&gt;&retain=&lt;retain: true or false&gt;<br />';
  body += server_addr + '/mqtt_post<br />';
  body += '&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;';
  body += '{"topic":&lt;topic&gt;,"payload:"&lt;payload&gt;,"qos":&lt;QoS: 0-2&gt;,"retain":&lt;retain: true or false&gt;} <br />';
  body += '<h1><a href="test.html">test</a></h1><br />';
  res.send(body);
});

 

今回は勉強のため、3種類のendpointを用意しました。

まず一つ目。MQTTへ転送するパラメータをGETメソッドのpathとして受け取るendpointです。
/mqtt_get/<<topic>>/<<payload(message)>>/<<qos>>/<<retain>>
として指定します。
例えば、topic="topic1"payload="hello"qos=0retain=false の場合、
/mqtt_get/topic1/hello/0/false
となります。
パラメータはpathとして送られてくるので、URLエンコードされています。
topicとpayloadはURLデコードして使用します。
qosとretainはURLデコードしないといけないような値だと不正値でデフォルト値になるので、ここではデコードしていません。
受け取ったパラメータをレスポンス送信とMQTT publishの共通処理部に渡しています。
この方法はお手軽なのですが、パラメータを省略することができません。
また、topicに階層を指定できません。例えば、階層にtopic1/topicAと指定しようとすると
/mqtt_get/topic1/topicA/hello/0/false
となってしまい、topicAがpayload、helloqos、・・・と認識されてしまいこのendpointにヒットしません。結果、Not Foundエラー(404)が返ります。

// GET mqtt_get endpoint の処理
app.get('/mqtt_get/:topic/:payload/:qos/:retain', function(req, res) {
    // endpointのPATHからパラメータ取り出し
    var mqttTopic   = decodeURI(req.params.topic);
    var mqttPayload = decodeURI(req.params.payload);
    var mqttQos     = req.params.qos;
    var mqttRetain  = req.params.retain;
    var userIP      = req.socket.remoteAddress;

    // console.log(req);    // デバッグ用

    // レスポンス送信 & MQTT publish処理
    responseCommon(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP);
});

テストページではgetボタンをクリックするとこのリクエストが送信されます。

 

二つ目。MQTTへ転送するパラメータをGETメソッドのクエリ文字列(query string)として受け取るendpointです。
/mqtt_query?topic=<<topic>>&payload=<<payload(message)>>&qos=<<qos>>&retain=<<retain>>
として指定します。qosとretainは省略可能です。
例えば、topic="topic1"payload="hello"qos=0retain=false の場合、
/mqtt_query?topic=topic1&payload=hello&qos=0&retain=false
となります。
パラメータはquery stringとして送られてくるので、URLエンコードされています。
topicとpayloadはURLデコードして使用します。
qosとretainはURLデコードしないといけないような値だと不正値でデフォルト値になるので、ここではデコードしていません。
qosとretainは省略されていると、ここではundefinedになっているので、以降の処理でデフォルト値に設定されます。
受け取ったパラメータをレスポンス送信とMQTT publishの共通処理部に渡しています。

// GET mqtt_query endpoint の処理
app.get('/mqtt_query', function(req, res) {
    // queryからパラメータ取り出し
    var mqttTopic   = decodeURI(req.query.topic);
    var mqttPayload = decodeURI(req.query.payload);
    var mqttQos     = req.query.qos;
    var mqttRetain  = req.query.retain;
    var userIP      = req.socket.remoteAddress;

    // console.log(req);    // デバッグ用

    // レスポンス送信 & MQTT publish処理
    responseCommon(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP);
});

テストページではqueryボタンをクリックするとこのリクエストが送信されます。

 

三つ目。MQTTへ転送するパラメータをPOSTメソッドで受け取るendpointです。今回のメインです。
/mqtt_post
でパラメータをメッセージボディで
{ "topic":"<<topic>>", "payload": "<<payload(message)>>", "qos": 0, "retain": <<retain>> }
として指定します。
例えば、topic="topic1"payload="hello"qos=0retain=false の場合、
{ "topic":"topic1", "payload": "hello", "qos": <<qos>>, "retain": false }
となります。
app.postメソッドは第一パラメータのパスに対するPOSTリクエストを受け取ったときに、第二パラメータ以降のコールバック関数を順番に実行していきます。
そこで、第二パラメータにボディパーサを指定し、第三パラメータにユーザ処理を指定しています。
メッセージボディはJSONデータで送られてくるので、ボディパーサには通常JSONパーサ(bodyParser.json())を指定するのですが、それだと改行コードが含まれていたときにエラーになってしまい、今回の目的を達成できません。
そこで今回はテキストパーサ(bodyParser.text())を使用して一旦文字列に変換し、ユーザ処理でJSON文字列をデコードすることにしました。
また、bodyParser.text()はパラメータを省略すると、Content-typeが"text/plain"のリクエストのみ処理することになってしまうので、すべてのContent-typeを処理するように{type:"*/*" }を指定しています。
処理としてはtextとして取り出されたbodyの改行コード(\n)を文字列"\\n"に置換してからJSON.parse()でデコードしてパラメータを取り出しています。
ここはメッセージボディなのでURLエンコードはされていません。
(ちなみに、JSONパーサを使用した場合は、各パラメータはreq.body.パラメータ名に格納されます。)
受け取ったパラメータをレスポンス送信とMQTT publishの共通処理部に渡しています。

// POST mqtt_post endpoint の処理
// note: データに改行が含まれていてもエラーにならないように前処理を入れたいので一旦textで受け取る。
app.post('/mqtt_post', bodyParser.text({type:"*/*" }), function(req, res) {
    // bodyからパラメータ取り出し
/* *************
    // JSONパーサ(bodyParser.json())で受け取っていたらこちら
    var mqttTopic   = req.body.topic;
    var mqttPayload = req.body.payload;
    var mqttQos     = req.body.qos;
    var mqttRetain  = req.body.retain;
************* */
    // TEXTパーサ(bodyParser.text())で受け取っていたらこちら
    var body = req.body;                    // TEXTパーサの時はreq.bodyはString
    // console.log("BEFOR " + body);            // デバッグ用
    body = body.replace(/\n/g, "\\n");      // 改行(\n)を文字列の "\\n"に変更
    // console.log("AFTER " + body);            // デバッグ用
    jsn = JSON.parse(body);                 // JSONパーサ
    var mqttTopic   = jsn.topic;
    var mqttPayload = jsn.payload;
    var mqttQos     = jsn.qos;
    var mqttRetain  = jsn.retain;

    var userIP      = req.socket.remoteAddress;

    // console.log(req);    // デバッグ用

    // レスポンス送信 & MQTT publish処理
    responseCommon(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP);
});

テストページではpostボタンをクリックするとこのリクエストが送信されます。

 

それぞれのendpoint処理からコールされる、レスポンス送信とMQTT publishの共通処理部です。

// レスポンス送信共通処理 & MQTT publish処理
var responseCommon = function(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP) {      

最初にパラメータチェックです。
topicとpayload(message)は文字列でないか、空文字の場合はエラーとしてparam_checkedをfalseにします。
qosとretainは有効な値でなければデフォルト値を設定しています。

    // パラメータチェック
    param_checked = true;
    if (!_.isString(mqttTopic)   || _.isEmpty(mqttTopic))       param_checked = false;      // 必須パラメータ
    if (!_.isString(mqttPayload) || _.isEmpty(mqttPayload))     param_checked = false;      // 必須パラメータ
    mqttQos = checkQoS(mqttQos);                    // 数値化 & 範囲チェック
    mqttRetain = parseBoolean(mqttRetain);          // boolean化

レスポンスは送信後接続が切断されるようにHTTPヘッダに"Connection: close"を設定します。

    // レスポンス送信後、connectionを切断するための設定
    res.set("Connection", "close");

パラメータエラーがなければ(topicとpayloadが正常に設定されていれば)、レスポンスを送信し、
このレスポンスデータ自体には意味はありません。確認用に受け取った値を整形して送り返しています。

    if (param_checked) {
        // パラメータチェックOK
        var dt = moment().tz("Asia/Tokyo");                 // 現在時刻(リクエストパラメータの確認用)
        // レスポンスの作成
        var body = dt.format("YYYY/MM/DD(ddd) HH:mm:ss z\n");
        body += 'received request is ...\n';
        body += '  topic: '   + mqttTopic;
        body += '  payload: ' + mqttPayload;
        body += '  QOS: '     + mqttQos;
        body += '  retain: '  + mqttRetain;
        body += '  from: '    + userIP;
        body += '\n';
        // レスポンスの送信
        res.send(body);
        // console.log(body);      // デバッグ用

MQTTブローカへデータをpublishします。

        // MQTTへpublsh
        mqtt_client.publish(mqttTopic, mqttPayload, mqttQos);

なお、レスポンスに入れている時刻が不要なら、最初の方でモジュールを読み込んでいるvar moment = require('moment-timezone');の部分は不要です。

パラメータエラーがあれば(topicとpayloadが正常に設定されていなければ)、エラーステータス400(Bad request)でレスポンスを送信します。

    } else {
        // パラメータチェックNG
        var body = 'Parameter error';
        res.status(400);        // 400:Bad request
        res.send(body);
        // console.log(body);      // デバッグ用
    }
}

 

 htmlファイルをそのまま見せるファイルを送ディレクトリを指定します。
この設定ではpublicを指定していますが、public/test.htmlファイルはhttp://locahost:8080/public/test.html ではなく、http://locahost:8080/test.html でアクセスします。間違えないよう注意しましょう。

// serve static site
// publicの下にHTMLをそのまま見せるファイルを置く
app.use(express.static('public'));

 

外部のサーバ(Herokuなど)にdeployする方法はググってください。

最後にIFTTTでWebhooksに設定するパラメータはこんな感じです。

URL << http or https >> :// <<サーバアドレス>>/mqtt_post  
Method POST  
Content Type (optional) application/json どれでも同じだけど一応これで
Body (optional) {"topic":"topic1","payload":" {{Text}}"}