いっぺーちゃんの いろいろやってみよ~

REST to MQTTブリッジを作る(その1)

前回Twitterの改行を含む本文をIFTTTからbeebotte経由でESP32に送れなかったのが悔しかったので、どうにかして送れるようにしたいと考えました(←かなりの負けず嫌い)

まず、エラーになる原因を考察します。
といっても、ここにそのものズバリの答えがありました。
JSON文字列値に改行を埋め込むには、改行コードを文字列"\n" に置き換えないといけないのです。

 

IFTTTのレシピ内に文字列置換コマンドがあればイッパツで解決しますが、ないので他の手で。。。

Webhooksは送っているデータがJSONデータか否かを認識していないので、ここまでは問題なく動いているはずです。
データを受け取ったbeebotte側がJSONデータの規則に則っていないデータなので、エラーとしてはじいていると思われます。
ということは、JSONデータの規則に則っていない改行を含むJSONデータを受け取れるサーバがあれば良いということになります。

ということで、webhooksの送信先を自分で作ってしまおうと考えました。(←鳴かぬなら、作ってしまえホトトギス)
やはり、外部サーバからアクセスできるサーバである必要がありますが、PaaSを使えば作れそうです。
とはいえ、beebotteをそのまま置き換えるのはかなり重たいので(当然だ)、WebhooksからREST APIでデータを受け取って、既存のMQTTブローカに転送するREST to MQTT ブリッジの構成とします。

データの流れはこんな感じ。

    Twitter  → IFTTT(webhooks) → REST2MQTT(今回作るのはここ) → MQTTブローカ → ESP32

 

私はPaaSに無料で使えるHerokuを使いました。色々制限はありますが、ちょっと勉強のために試すくらいなら問題ないと思います。
Herokuの使い方はググってください(←他力本願)

ESP32とのインタフェースには今まで同様、MQTTを使用することにします。beebotteをそのまま使っても良いですし、test.mosquitto.orgなどの無料で使えるMQTTブローカーを使ったり、PaaSに付属しているMQTTブローカを使うこともできます。
私はHerokuのCloudMQTTというのを使いましたが、これを使うには無料プランでもクレジットカードの登録が必要なようで、ちょっと躊躇してしまいました。

  MQTTまで含めたHerokuの使い方の参考になるサイトはこちら(←ちょっと後ろめたいのでURL貼り付け。でも、やっぱり他力本願)

MQTTでドローンの自動操縦を考える(1) | Webエンジニア × ドローン

今回はHerokuにdeployする前のローカル環境での動作確認だけにしておきます。
(MQTTの設定を変更してdeployすればそのまま使えます)
環境は、virtualbox上のUbuntu 16.04でnode.js、npm、mosquitto(MQTT ブローカ)、gitなどがインストール済みとします。

私はnodebrewを使ってnode.jsのv7.10.1を使っています。
nodebrewを使ったnode.jsのインストールはここらあたりを参考に
NodeBrewインストール編 - Qiita

大体こんな感じ

wget https://raw.github.com/hokaccha/nodebrew/master/nodebrew
perl nodebrew setup
# ~/.bashrc か ~/.profile でPATH設定&再ログイン
# export PATH=$HOME/.nodebrew/current/bin:$PATH

# バイナリ版のインストール
nodebrew install-binary v7.10.1
# 通常使用するバージョンとして設定
nodebrew use v7.10.1

 

mosquittoのインストールはこんな感じで特に難しい設定とかはありません。

apt install mosquitto  mosquitto-clients

 

Herokuの場合、gitがあった方がdeployがやりやすいので必要ならインストールします。

apt-get install git

まず、作業用ディレクトリを作成し、移動します。

mkdir /proj/r2m
cd /proj/r2m

 

本来なら、npm init して、nmp install <モジュール>していくのですが、めんどっちいので
まず、以下内容を 「package.json」 というファイル名で保存します。

{
  "name": "rest2mqtt",
  "version": "0.0.1",
  "description": "REST to MQTT server",
  "main": "app.js",
  "scripts": {
    "start": "node app.js",
    "test": "node app.js"
  },
  "repository": {
    "type": "git",
    "url": "test"
  },
  "engines": {
    "node": "7.10.1",
    "npm": "5.4.2"
  },
  "author": "",
  "license": "MIT",
  "dependencies": {
    "body-parser": "^1.18.1",
    "express": "^4.15.4",
    "lodash": "^4.17.4",
    "moment-timezone": "^0.5.13",
    "mqtt": "^2.13.0"
  }

 

「npm start」と実行すると、"scripts" の下の "start" に設定されたコマンドが実行されます。
Herokuの場合、これが書いてあるとProcfileが要らなくなるらしい。
「npm test」と実行すると、"scripts" の下の "test" に設定されたコマンドが実行されます。
本来、テストスクリプトが動くようにするのですが、ここではstartと同じにしてあります。

"repository" の設定はこの内容は正しくないけど、ワーニングを黙らせるためだけに記述してあります。
"engines"の下は使ってるバージョンです。
"dependencies" の下はnpm install したときに追加されていきます。
また、「npm install」(モジュール名を指定しない) すると、インストールされていないパッケージがインストールされます。
ということで、以下のコマンドでモジュールをインストールします。

npm install

 

gitを使う場合は、以下の内容を 「.gitignore」 というファイル名で保存します。
今回重要なのは「node_modules」「package-lock.json」です。
他にもgitの管理対象外にしたいファイルがあったら追記してください。

# Node build artifacts
node_modules
npm-debug.log
package-lock.json

# Local development
*.env
*.dev
.DS_Store

# Docker
Dockerfile
docker-compose.yml

 

以下がアプリケーション本体です。「app.js」というファイル名で保存してください。

// ======== MQTT settings =====================================================
var mqtt_opt = {protocol: 'mqtt', host:'localhost'};    // 必要なら port, username, password などを追加
// 本番環境はこちら
// var mqtt_opt = {protocol: 'mqtt', host: '<<アドレス>>', port: <<ポーt番号>>, username: '<<ユーザ名>>', password: '<<パスワード>>'};

// ======== WebServer settings ================================================
var webservicePort = 8080;

// ======== utility ===========================================================
var _ = require('lodash');

// ==== moment-timezone =======================================================
var moment = require('moment-timezone');

// ==== express ===============================================================
var express = require('express');
var app = express();
var port = process.env.PORT || webservicePort;

// ==== body parser (POSTを使うときのお約束) ==================================
var bodyParser = require('body-parser');
// 個別にパーサを設定するので、app.useで登録しない
// app.use(bodyParser.urlencoded({extended: false}));
// app.use(bodyParser.json());
// app.use(bodyParser.text({type:"*/*" }));        // すべてのコンテンツタイプでテキストパーサを使う

// ==== MQTT ==================================================================
var mqtt = require('mqtt');
var mqtt_client  = mqtt.connect(mqtt_opt);

// MQTT 接続時の処理
mqtt_client.on('connect', function (connack) {
  console.log('MQTT client connected to brocker');
  console.log("    host: " + mqtt_opt.host);
  console.log("    port: " + mqtt_opt.port);
});

// ==== MISC ==================================================================
// QoS設定値の確認
function checkQoS(targetValue) {
    tmp = parseInt(targetValue);
    if (isNaN(tmp))             return 0;     // 数値でないなら0を返す
    if ((tmp < 0) || (tmp > 2)) return 0;     // 範囲外なら0を返す
    return tmp;
}

// 文字列"true"/"false" をbooleanに変換する関数
// Boolean(targetValue)だと文字列"false"や"0"がtrueになってしまうので
function parseBoolean(targetValue) {
    var targetType = typeof targetValue;  // 変数の型
    if      (targetType == 'boolean')                           return targetValue;             // booleanならそのまま返す
    else if (targetType == 'number')                            return Boolean(targetValue);    // 数値ならbooleanに変換して返す
    else if (targetType == 'string' || targetValue instanceof String ) {    // 文字列で
        var tmp = parseInt(targetValue);    // 数値に変換してみる
        if (isNaN(tmp)) {
            // 数値に変換できない
            if (targetValue.toLowerCase().trim() == 'true')     return true;                    // 'true' なら trueを返す
            else                                                return false;                   // それ以外はfalseを返す
        }
        else                                                    return Boolean(tmp);            // 数値をbooleanに変換して返す
    }
    else                                                        return false;                   // それ以外の型はfalseを返す
}
// ============================================================================

// Webサーバのスタート
app.listen(port);
console.log('Server started! listen Port: ' + port);

// GET rootの処理
app.get('/', function (req, res) {
  // console.log(req);
  var body = 'usage: <br />';
  var server_addr = req.protocol + '://' + req.headers.host;
  body += server_addr + '/mqtt_get/&lt;topic&gt;/&lt;payload&gt;/&lt;QoS: 0-2&gt;/&lt;retain: true or false&gt;<br />';
  body += server_addr + '/mqtt_query?topoc=&lt;topic&gt;&payload=&lt;payload&gt;qos=&lt;QoS: 0-2&gt;&retain=&lt;retain: true or false&gt;<br />';
  body += server_addr + '/mqtt_post<br />';
  body += '&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;';
  body += '{"topic":&lt;topic&gt;,"payload:"&lt;payload&gt;,"qos":&lt;QoS: 0-2&gt;,"retain":&lt;retain: true or false&gt;} <br />';
  body += '<h1><a href="test.html">test</a></h1><br />';
  res.send(body);
});

// GET mqtt_get endpoint の処理
app.get('/mqtt_get/:topic/:payload/:qos/:retain', function(req, res) {
    // endpointのPATHからパラメータ取り出し
    var mqttTopic   = decodeURI(req.params.topic);
    var mqttPayload = decodeURI(req.params.payload);
    var mqttQos     = req.params.qos;
    var mqttRetain  = req.params.retain;
    var userIP      = req.socket.remoteAddress;

    // console.log(req);    // デバッグ用

    // レスポンス送信 & MQTT publish処理
    responseCommon(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP);
});


// GET mqtt_query endpoint の処理
app.get('/mqtt_query', function(req, res) {
    // queryからパラメータ取り出し
    var mqttTopic   = decodeURI(req.query.topic);
    var mqttPayload = decodeURI(req.query.payload);
    var mqttQos     = req.query.qos;
    var mqttRetain  = req.query.retain;
    var userIP      = req.socket.remoteAddress;

    // console.log(req);    // デバッグ用

    // レスポンス送信 & MQTT publish処理
    responseCommon(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP);
});


// POST mqtt_post endpoint の処理
// note: データに改行が含まれていてもエラーにならないように前処理を入れたいので一旦textで受け取る。
app.post('/mqtt_post', bodyParser.text({type:"*/*" }), function(req, res) {
    // bodyからパラメータ取り出し
/* *************
    // JSONパーサ(bodyParser.json())で受け取っていたらこちら
    var mqttTopic   = req.body.topic;
    var mqttPayload = req.body.payload;
    var mqttQos     = req.body.qos;
    var mqttRetain  = req.body.retain;
************* */
    // TEXTパーサ(bodyParser.text())で受け取っていたらこちら
    var body = req.body;                    // TEXTパーサの時はreq.bodyはString
    // console.log("BEFOR " + body);            // デバッグ用
    body = body.replace(/\n/g, "\\n");      // 改行(\n)を文字列の "\\n"に変更
    // console.log("AFTER " + body);            // デバッグ用
    jsn = JSON.parse(body);                 // JSONパーサ
    var mqttTopic   = jsn.topic;
    var mqttPayload = jsn.payload;
    var mqttQos     = jsn.qos;
    var mqttRetain  = jsn.retain;

    var userIP      = req.socket.remoteAddress;

    // console.log(req);    // デバッグ用

    // レスポンス送信 & MQTT publish処理
    responseCommon(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP);
})

// レスポンス送信共通処理 & MQTT publish処理
var responseCommon = function(res, mqttTopic, mqttPayload , mqttQos, mqttRetain, userIP) {      
    // パラメータチェック
    param_checked = true;
    if (!_.isString(mqttTopic)   || _.isEmpty(mqttTopic))       param_checked = false;      // 必須パラメータ
    if (!_.isString(mqttPayload) || _.isEmpty(mqttPayload))     param_checked = false;      // 必須パラメータ
    mqttQos = checkQoS(mqttQos);                    // 数値化 & 範囲チェック
    mqttRetain = parseBoolean(mqttRetain);          // boolean化

    // レスポンス送信後、connectionを切断するための設定
    res.set("Connection", "close");

    if (param_checked) {
        // パラメータチェックOK
        var dt = moment().tz("Asia/Tokyo");                 // 現在時刻(リクエストパラメータの確認用)
        // レスポンスの作成
        var body = dt.format("YYYY/MM/DD(ddd) HH:mm:ss z\n");
        body += 'received request is ...\n';
        body += '  topic: '   + mqttTopic;
        body += '  payload: ' + mqttPayload;
        body += '  QOS: '     + mqttQos;
        body += '  retain: '  + mqttRetain;
        body += '  from: '    + userIP;
        body += '\n';
        // レスポンスの送信
        res.send(body);
        // console.log(body);      // デバッグ用
        // MQTTへpublsh
        mqtt_client.publish(mqttTopic, mqttPayload, mqttQos);
    } else {
        // パラメータチェックNG
        var body = 'Parameter error';
        res.status(400);        // 400:Bad request
        res.send(body);
        // console.log(body);      // デバッグ用
    }
}


// serve static site
// publicの下にHTMLをそのまま見せるファイルを置く
app.use(express.static('public'));

 

以下がテストページのHTMLファイルです。「public/test.html」というファイル名で保存してください。

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8">
  <title>テスト</title>
  <script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/2.2.4/jquery.min.js"></script>
                                <!--プロトコル(http: or https)はつけなければ、このページと同じプロトコルでリクエストが出る  -->
  <script type="text/javascript">
    $(function() {   // Ready function
        $("#req_url").html("");
        $("#req_data").html("");
        $("#result").html("");
        $("#response").html("");

        // POSTボタンクリック時の処理
        $("#post").click( function(){
            var param = getParam();
            var param_str = JSON.stringify(param);    // 文字列化しておかないとクエリ文字列(hoge=xx&fuga=yy)で送られてしまう
            var url = "/mqtt_post";
            $("#req_url").html(url);                  // URLの表示
            $("#req_data").html(param_str);           // データの表示
            // リクエストの送信
            // $.post(url, param_str).done(dispSuccess).fail(dispError).always(dispResponse);
            $.ajax({
                    url: url,
                    type: 'POST',
                    dataType: 'text',
                    contentType: 'application/json',    // postだとこれが指定できないのでajaxで送信
                    data: param_str                     // 送信データは文字列で!
                }).done(dispSuccess).fail(dispError).always(dispResponse);
        });

        // GETボタンクリック時の処理
        $("#get").click( function(){
            var param = getParam();
            var url = "/mqtt_get/" + param.topic +  "/" + param.payload +  "/" + param.qos + "/" + param.retain;
            url = encodeURI(url);                     // エンコードする
            $("#req_url").html(url);                  // URLの表示
            $("#req_data").html("");                  // GETなのでデータはない
            // リクエストの送信
            $.get(url).done(dispSuccess).fail(dispError).always(dispResponse);
        });

        // QUERYボタンクリック時の処理
        $("#query").click( function(){
            var param = getParam();
            var url = "/mqtt_query?topic=" + param.topic + "&payload=" + param.payload + "&qos=" + param.qos + "&retain=" + param.retain;
            url = encodeURI(url);                     // エンコードする
            $("#req_url").html(url);                  // URLの表示
            $("#req_data").html("");                  // GETなのでデータはない
            // リクエストの送信
            $.get(url).done(dispSuccess).fail(dispError).always(dispResponse);
        });
    });   // End of Ready function

    // 成功/失敗表示
    function dispSuccess(data) {
        $("#result").html("success");
    }
    function dispError(data) {
        $("#result").html("error");
    }
    // レスポンス表示関数
    function dispResponse(data) {
        if ($.type(data) === "string") {
            // 成功時はdataに文字列が返ってくる
            txt = data.replace(/\\n/g, "\n");    // 文字列の "\\n" を "\n"(改行)に変更
        }
        else if ($.type(data.responseText) === "string") {
            // 失敗時はdata.responseTextに文字列が返ってくる
            txt = data.responseText.replace(/\\n/g, "\n");    // 文字列の "\\n" を "\n"(改行)に変更
        }
        else {
            // それ以外は現状未サポート
            txt = "Unknown response";
        }
        $("#response").html(txt);
    }
    // パラメータ取得関数
    function getParam() {
        var param = {
            topic:   $("#topic").val().trim(),
            // payload: $("#payload").val().trim(),                       // 通常はこちら
            payload: $("#payload").val().trim().replace(/\\n/g, "\n"),    // テスト用にあえて'\\n'を改行に置き換える
            qos:     Number($("#qos").val()),
            //retain:  Boolean($("#retain").val())  これはNG "false" がTrueになる
            retain:  parseBoolean($("#retain").val())
        };
        return param
    }
    // 文字列"true"/"false" をbooleanに変換する関数
    function parseBoolean(targetValue) {
        targetType = typeof targetValue;  // 変数の型
        if      (targetType == 'boolean')    return targetValue;                // booleanならそのまま返す
        else if (targetType == 'number')     return Boolean(targetValue);       // 数値ならbooleanに変換して返す
        else if (targetType == 'string' || targetValue instanceof String ) {    // 文字列で
            tmp = parseInt(targetValue);
            if (isNaN(tmp)) {
                // 数値に変換できない
                if (targetValue.toLowerCase().trim() == "true")    return true;     // 'true' なら trueを返す
                else                                               return false;    // それ以外はfalseを返す
            }
            else {
                // 数値に変換できる
                return Boolean(tmp);
            }
        }
        else                                 return false;                      // それ以外の型はfalseを返す
    }
  </script>
</head>
<body>
    <h1>TEST</h1>
    <table>
      <tr><td>topic:   </td><td><input type="text" id="topic"   size="100"></td></tr>
      <tr><td>payload: </td><td><input type="text" id="payload" size="100"></td></tr>
      <tr><td>qos:     </td><td><select id="qos">
                                  <option value="0" selected>0</option>
                                  <option value="1"         >1</option>
                                  <option value="2"         >2</option>
                                </select>
      </td></tr>
      <tr><td>retain:  </td><td><select id="retain">
                                  <option value="false" selected>false</option>
                                  <option value="true"          >true</option>
                                </select>
      </td></tr>
    </table>
    <p>
      <button id="post" type="button">post</button>
      <button id="get" type="button">get</button>
      <button id="query" type="button">query</button>
    </p>
    <table>
      <tr><td valign="top">url      </td><td><textarea id="req_url"  cols=120 rows=1  wrap="off" style="overflow:auto;" disabled></textarea></td></tr>
      <tr><td valign="top">data     </td><td><textarea id="req_data" cols=120 rows=1  wrap="off" style="overflow:auto;" disabled></textarea></td></tr>
      <tr><td valign="top">result   </td><td><textarea id="result"   cols=120 rows=1  wrap="off" style="overflow:auto;" disabled></textarea></td></tr>
      <tr><td valign="top">response </td><td><textarea id="response" cols=120 rows=10 wrap="off" style="overflow:auto;" disabled></textarea></td></tr>
    </table>

</body>
</html>

 

 準備が整ったら、以下のコマンドで実行します。

npm start

 

正常に起動されたら以下のように表示され、接続待ちになります。

Server started! listen Port: 8080
MQTT client connected to brocker
    host: localhost
    port: 1883

 

また、MQTTのメッセージが正常に送られたことを確認するために別のターミナルで以下のコマンドを実行します。
これはlocalhostのMQTTブローカに送られたすべてのtopicを購読するものです。-vを指定することでtopic名も表示されます。

mosquitto_sub -v -h localhost -t "#"

 

この状態でブラウザで接続します。

http://localhost:8080

 

「TEST」のリンクをクリックするとテストページに移動します。
topicにメッセージを送信するトピックを、payloadにメッセージ本体を入力し(QoSとretainは省略。何者かはググってください)、
「post」「get」「query」のどれかのボタンをクリックします。
url、data(postのときだけ)、result、responseに結果が表示され(下図参照:正常実行時とエラー時)、mosquitto_subは受け取ったメッセージを表示します。

 

f:id:ippei8jp:20170922153326p:plain

f:id:ippei8jp:20170922153344p:plain

かなり長くなってしまった。。。githubかなにかで公開すれば良かったかな?

 解説は次回、ということで。