いっぺーちゃんの いろいろやってみよ~

micropython on ESP32 でIFTTT(アクション編)

ジャッキー・チェンがアチョーって、、、またスベった。。。

今回はIFTTTのwebhooksチャネルをアクション(that)で利用し、ESP32側で処理を行うことを試してみます。
ESP32は家庭内LANなどに接続されており、外部から直接アクセスできないので、こちらを参考にESP32とのデータを直接やりとりするのにBeebotteを使用することにします。

データの流れは以下のような感じです。

    IFTTT --REST API--> beebotte --MQTT--> ESP32

まずbeebotteにユーザ登録します。
Beebotteにアクセスし、右上の「Sign up」から必要事項を入力し、「SIGN UP」をクリックすると、登録したe-mailアドレス宛に確認メールが届きます。
このメールに記載されているリンクをクリックすると登録完了です。

 

チャネルとリソースの作成、動作確認はいつものように先人の知恵を拝借。
IFTTTのトリガーおよびアクションをESP8266で実行する - Qiita

beebotteへのアクセス用URLには上記ページにある、
https://api.beebotte.com/v1/data/write/《チャネル》/《リソース》?token=《チャネルトークン》
のほか、
https://api.beebotte.com/v1/data/publish/《チャネル》/《リソース》?token=《チャネルトークン》
も使用できます。

これらの違いは、writeがストレージにデータを記録するのと同時にメッセージ配信を行うのに対し、publishはその場でメッセージを配信するだけだという点です。
その場でメッセージ配信するだけで良いならば、publishで良いでしょう。
ただし、Send on Subscribe(SoS)を使用するにはwriteでなければなりません。

IFTTT側の設定も上記ページを参考にしてください。
注意事項として、以下があります。

  • 送信するデータはダブルクォーテーションで囲む(例:「"送信データ"」)
    • 特に、Add ingredientで項目を選択したときに忘れがちです。
  • 送信するデータに改行が含まれないようにする。含まれているとIFTTTのレシピがエラーになり、データが送信されません。
    • たとえば、トリガをTwitterにした場合、送信データにtweet本文(Text)を指定すると、tweet本文に改行が含まれているとエラーになります。
    • 以下のような回避策が考えられます。
      • 本文に改行を入れないように運用する
      • 本文を送るのを諦め、特定のハッシュタグが含まれていたら固定文字列やなどを送るようにする
        • たとえば、ハッシュタグ#LIGHTが含まれていたらtweet本文を送信、ON/OFFをESP32側で判別する、という方法ではなく、ハッシュタグ#LIGHT_ON が含まれていたらONを送る、#LIGHT_OFFが含まれていたらOFFを送る、といった具合にIFTTTのレシピで分けておく。

beebotteとESP32間の通信はMQTTを使用します。
mqtt通信用モジュールはPypiにumqtt.simpleモジュールがあるので、それを利用します。
あらかじめupip(upipmではない)でumqtt.simpleモジュールをインストールします。

import upip
upip.install("micropython-umqtt.simple")

umqtt.simple についてはこちらを参照してください。
ソースはこちらで。

 

準備ができたら以下のプログラムを実行します。
チャネルトークン、チャネル名、リソース名に使用するデータを設定してください。
IFTTTからトリガがかかるたびにbeebotteにデータが送信され、最終的にcallback_func()が実行されます。

CHANNEL_TOKEN = '取得したチャネルトークン'

CHANNEL_NAME  = 'チャネル名'
RESOURCE_NAME = 'リソース名'

PING_PERIOD   = 120         # pingを打つ間隔(単位:秒)

from umqtt.simple import MQTTClient
import ujson
import utime

MQTT_SERVER = 'mqtt.beebotte.com'                     # サーバ名
MQTT_USER = 'token:' + CHANNEL_TOKEN                  # ユーザ名
MQTT_TOPIC = CHANNEL_NAME + '/' + RESOURCE_NAME       # トピック名

def callback_func(topic, msg):
    print("topic:%s  msg:%s" % (topic, msg))
    json_data= ujson.loads(msg)
    dt = json_data.get("data", "UNKNOWN")
    print("*** " + str(dt) + " ***")

# 初期化
c = MQTTClient("umqtt_client", MQTT_SERVER, user=MQTT_USER, password='', keepalive=PING_PERIOD*2)
# 接続
c.connect()
# コールバックの設定
c.set_callback(callback_func)
# サブスクライブ実行
c.subscribe(MQTT_TOPIC)

prompt = ['-', '\\', '|', '/']
cnt = 0
prevtime = utime.time()        # pingを打つための前回実行時刻を初期化
while  True:
    curtime = utime.time()
    if (curtime - prevtime) > PING_PERIOD :
        # 接続保持のためPING_PERIOD毎にpingを打つ
        c.ping()
        print('ping')
        prevtime = curtime       # 前回時刻の更新
    
    c.check_msg()                 # pingを打つ必要があるので、ノンブロッキング動作
    print(prompt[cnt], end="\r")  # 動作確認用表示
    cnt = (cnt + 1) & 0x03

c.disconnect()                    # 無限ループなのでここには来ない

以下解説です。

 

データを受信したときに実行されるコールバック関数です。
取得したデータのトピックとデータを表示します。
また、データはJSONデータ文字列なので、エンコードし、dataプロパティを取得して表示しています。
実際にはここで変数dtに応じて処理を行います(LEDを点灯/消灯する等)。
データの取得にはdataプロパティが存在しなかった場合に備え、getメソッドを使用しています。

def callback_func(topic, msg):
    print("topic:%s  msg:%s" % (topic, msg))
    json_data= ujson.loads(msg)
    dt = json_data.get("data", "UNKNOWN")
    print("*** " + str(dt) + " ***")

 

MQTTモジュールの初期化です。
ユーザ名には'token:'+チャネルトークン、パスワードには空文字列を指定します。
keepalive時間はPING_PERIOD(pingを打つ間隔)の2倍の値を設定しています。
MQTTブローカはkeepaliveに設定した時間の1.5倍の間接続を維持しますので、実際にはPING_PERIODの3倍の時間 接続が維持されます。
keepaliveを設定しない(or 0を設定)と、接続が切断されたことが通知されません(本来は切断されないのですが、beebotteでは5分くらいで接続断になってしまうようです。もしかしたらうちのネットワーク環境のせいかもしれませんが)。そこで、keepaliveを設定することで接続断時に例外がraiseされるようにしています。
(説明がわかりにくいですが、keepaliveの設定の有無の違いはwhileループ内のc.ping()を削除して10分ぐらい放置したあと、データを送信してみると違いが分かると思います)

また、この設定ではSSLを有効にしていません。SSLを有効にするにはパラメータに「ssl=True」を追加してください。
ただし、linux版micropythonはSSLのノンブロッキング動作に対応していないので、SSLを有効にするとエラー終了します。

c = MQTTClient("umqtt_client", MQTT_SERVER, user=MQTT_USER, password='', keepalive=PING_PERIOD*2)

 

接続~コールバックの設定~サブスクライブの実行の処理です。
トピック名は チャネル名/リソース名 で指定しますが、リソース名にワイルドカード(#)を指定することができるようです。
つまり、トピック名に チャネル名/# と指定すれば、チャネル内のすべてのトピックのデータが取得できるということです(使い道があるかはわかりませんが)。
トピック名に'#'と指定すると、すべてのチャネルのすべてのリソースのデータが取得できそうな気がしますが、beebotteではエラーになるようです。

c.connect()
c.set_callback(callback_func)
c.subscribe(MQTT_TOPIC)

 

pingを打つための時刻を取得するために現在の時刻を取得しておきます。
相対時間が必要なだけなので、時刻合わせを行う必要はありません。

prevtime = utime.time()        # pingを打つための前回実行時刻を初期化

 

データ取得のために無限ループにします。
接続断時やエラー発生時は例外がraiseされるので、try~except~ で囲んでおくのが良いかもしれません。

while  True:

 

keepaliveを設定してあるので、何もしないと接続が切断されてしまいます。それを防ぐために一定時間ごとにpingを打ちます。
他の処理の処理時間によってはPING_PERIODよりも長くなることがありますが、keepaliveがPING_PERIOD*2と余裕を持って設定されているので、大丈夫でしょう(たぶん、おそらく、もしかしたら。。。)

    curtime = utime.time()
    if (curtime - prevtime) > PING_PERIOD :
        # 接続保持のためPING_PERIOD毎にpingを打つ
        c.ping()
        print('ping')
        prevtime = curtime       # 前回時刻の更新

 

定期的に受信処理を呼び出します。
受信データがあったら、コールバック関数が実行されます。ここにデータが返ってくるわけではありません。

    c.check_msg()                 # pingを打つ必要があるので、ノンブロッキング動作

 

メインループで処理が必要な場合はここで処理します。

    print(prompt[cnt], end="\r")  # 動作確認用表示
    cnt = (cnt + 1) & 0x03

 

 

 

micropython on ESP32 でpush通知

TwitterやLINEで通知を試しましたが、twitterやLINEのアプリをインストールしたりアカウントを作成する手間も省きたいというモノグサ魂がムクムクと。。。
モノグサのためなら苦労も厭わないという、本末転倒な感じですが(笑)。
また、通知の痕跡が残らないようにしたいという希望も。「なお、このメッセージは自動的に消滅する」というアレみたいな感じですね。
どうせなら、PCだけでなくスマホにも、と欲求は尽きません。
で、いろいろと調べてみたところ、Web Pusht通知を使うとそんな感じの機能が実現できそうです。

ただし、すべての環境で動作するわけではなさそうです。
試した時点ではWindows10のfirefoxChromeでは動作しましたが、Edgeでは動作しませんでした。その他のブラウザは持ってないので試してません。
MacOSは持ってないのでわかりません(貧乏なので・・・)。
スマホAndroidのみ試しました。
iOSではPush7アプリのインストールが必要なようです。詳細はググってください。

push通知をローカルな環境だけで実現できそうにないので、push通知サービスのPush7を使うことにします。

freeプランだと5000送信/月ですが、趣味でちょっと試すくらいなら十分でしょう。
注意しないといけないのが、制限がリクエスト数ではなく、送信数だというところです。購読者が10人いると、1リクエストあたり10送信行われることになります。
例えば、10分に1回づつ通知して(そんなに通知することはないと思いますが)、1か月で6×24×30=4320回だと思うと大間違いです。さらに購読者数をかけなければいけません。
また、自分で購読要求を制御できない(そもそも他人に勝手に登録して利用してもらうサービスなので)ので、秘密の情報など流さないように注意しましょう。

 

まずはWebプッシュ通知サービス「Push7」の導入方法①アカウント登録編にしたがってアカウント登録~アプリケーション登録を行ってください。

 

 アプリケーションの登録でのいっぺーちゃんなりの補足です。

  • アプリケーション名(↑のヘルプページではサイト名になっている)を入力します
    • 自分の好きな名前で良い
  • URLを入力します。
    • 購読ページの戻るボタンのリンク先です
    • とりあえず自分のブログのURLとか、やほーのトップとか書いておく
  • アイコンが必要なら画像をアップロードします
  • カテゴリを選択します
    • 「娯楽・レジャー・趣味」かな。
  • アプリケーションURLを入力します
    • アプリケーション名と同じにしておくと分かりやすいかな?
    • 既に使用されているとダメ。

 

 ダッシュボードでアプリケーションを開くと、「初期導入」の説明がありますが、今回はESP32からAPIで通知要求を行うので、無視して構いません。
また、APIの実行に必要な「App Number」と「API Key」は左側のメニューの「API」をクリックすると確認できます。

 

 次は通知を受け取る側の設定(購読の設定)です。
何もしないとpush通知を受け取れません。モノグサでもこれだけは我慢してやってください。

  • ブラウザ(スマホでも可)でアプリケーションURLに指定したURLにアクセス
  • 「~~に通知の送信を許可して受信しますか?」と聞かれるので、「通知を許可する」をクリック
  • 再度アプリケーションURLにアクセスし、「購読解除」をクリックすると購読をやめられます。

 

次にテストしてみましょう。

  • Push7のダッシュボードに戻ります。
  • 左側のメニューの一般通知(送信の下にあります)で「新規プッシュ通知」をクリック
  • タイトル、URL、内容を入力し、必要なら「クリックされるまで表示」を選択し、送信をクリック
    • タイトルと内容が通知ウィンドウに表示される文字列です。
    • URLは通知ウィンドウをクリックしたときにジャンプするURLです。
    • すべて入力しないとエラーになりますので、ダミーでも入れてください。
  • 送信をクリック
    • → 通知が表示されます。

 

「クリックされるまで表示」を選択しても、PCのブラウザ(Firefox/chromeで確認)では20秒程度で表示が消えてしまいます。
これはブラウザが表示を消しているようです。
逆にスマホだと、「クリックされるまで表示」を選択しなくても通知領域にずっと表示されているようです。

firefoxの場合、通知をクリックするまで表示させるには、以下の設定を行う必要があります(55.0.3 64bitで確認)。
ただし、どのような副作用が起こるか確認していませんので、設定は自己責任でお願いします。

  • about:config で設定エディタを開く
  • 「dom.webnotifications.requireinteraction.enabled」を「true」に設定

chromeでは対応する設定は見当たりませんでした。

 

なお、PCで通知を受け取るにはブラウザ(複数インストールされている場合は、購読のページで購読を許可したブラウザ)を起動しておく必要があります。どこか特定のページを開いておかなければならない、というようなことはありません。

 

 いよいよESP32から通知を行います。

 

 以下のプログラムをtiny_push7.pyという名前で保存し、いつものようにupipmでインストールしてください。

import usocket as socket
import ussl as ssl

class tiny_push7 :
    def __init__(self, app_no, api_key, icon, url, debug=False) :
        # パラメータチェック
        if type(app_no) is not str:
            raise ValueError("app_no must be string")
        if type(api_key) is not str:
            raise ValueError("api_key must be string")
        if type(icon) is not str:
            raise ValueError("icon must be string")
        if type(url) is not str:
            raise ValueError("url must be string")
        
        self.app_no     = app_no
        self.api_key    = api_key
        self.icon       = icon
        self.url        = url
        self.__DEBUG__  = debug

    def __debug_print(self, str) :
        if self.__DEBUG__ :
            print(str)
    
    def __makeRequestMessage(self, host, title, body, disappear) :
        # disappear の設定を文字列化
        disappear_str = "true" if disappear else "false"
        
        # make request
        request = 'POST /api/v1/' + self.app_no + '/send HTTP/1.1\n'
        # bodyがbytesなので、requestもbytesに
        request = request.encode('utf-8')

        # make message body
        # disappear_instantlyは効かないけどとりあえず設定しておく
        body =     '{"title":"'   + title        + '",' \
                 +   '"body":"'    + body         + '",' \
                 +   '"icon":"'    + self.icon    + '",' \
                 +   '"url":"'     + self.url     + '",' \
                 +   '"disappear_instantly":'+ disappear_str +','      \
                 +   '"apikey":"'  + self.api_key + '"}'

        # bodyは RFC3986 エンコードではなく、バイナリエンコード
        body = body.encode('utf-8')
        
        # make message header
        header  = 'Host: ' + host['host'] + '\n'                    \
                + 'User-Agent: micropython push7 agent v0.1\n'      \
                + 'Content-Type:application/json\n'                   \
                + 'Connection: close\n'                             \
                + 'Accept: */*\n'                                   \
                + 'Content-Length: ' + str(len(body)) + '\n'
        # bodyがbytesなので、headerもbytesに
        header = header.encode('utf-8')

        # request message
        ret = request + header + b'\n' + body + b'\n'
        return(ret)
    
    def __sendmessage(self, host, msg) :
        sock = socket.socket()
        addr = socket.getaddrinfo(host['host'], host['port'])[0][-1]
        
        # connect socket
        sock.connect(addr)
        try :
            # SSL wrap
            ssl_sock = ssl.wrap_socket(sock)
            
            # send data
            ssl_sock.write(msg)
            
            # 受信データの最初の1行
            rcv_line = ssl_sock.readline()
            protover, status, rcv_msg = rcv_line.split(None, 2)
            status = status.decode('utf-8')
            rcv_msg = rcv_msg.decode('utf-8')
            self.__debug_print('%s  %s  %s' % (protover, status, rcv_msg))
            # それ以外のレスポンスヘッダを読む
            while True:
                rcv_line = ssl_sock.readline()
                self.__debug_print(rcv_line)
                if not rcv_line:
                    # なんらかの異常なレスポンス(ヘッダが終わる前にデータがなくなった)
                    ssl_sock.close()
                    raise ValueError("Unexpected EOF in HTTP headers")
                if rcv_line == b'\r\n':
                    # 空行でヘッダ終了
                    break
        except Exception as e:
            # エラーが発生したらクローズして上位へ例外通知
            ssl_sock.close()
            raise e
        # メッセージ本体を受信(とりあえず読み捨て)
        rcv_line = b''
        while True :
            try :
                l = ssl_sock.readline()
            except Exception as e:              # エラーが発生したらクローズして上位へ例外通知
                ssl_sock.close()
                raise e
           
            if not l:                           # データがない → 終了
                break
            
            # 読み込んだデータをためる
            rcv_line += l
        
        self.__debug_print("@@@@" + str(rcv_line))
        self.__debug_print("close!!")
        ssl_sock.close()
        # 受信終了後にエラー判定
        if not ((status == "200") or (status == "201")) :
            # status 200,201 以外はエラー
            raise ValueError(status +' '+rcv_msg)
    
    # ######## notify API ################################
    def push(self, title, body, disappear=True) : 
        host = { 'host': 'api.push7.jp', 'port': 443}
        
        reqMessage = self.__makeRequestMessage(host, title, body, disappear)
        self.__debug_print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        self.__debug_print(reqMessage)
        self.__debug_print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        
        self.__sendmessage(host, reqMessage)

以下解説

 

送信データを作成している部分です。
API仕様にしたがって送信データを作成しています。
TwitterやLINEのときは、body部分はRFC3986でエンコードしていましたが、Push7ではRFC3986ではなく、バイナリデータそのもので送信します。
よって、メッセージヘッダの Content-Length にはバイナリエンコードした後のデータ長を設定します。
(RFC3986でエンコードしたデータはASCII文字のみなので、Stringのままデータ長を取得しても大丈夫でした。ここでハマりました。。。)

    def __makeRequestMessage(self, host, title, body, disappear) :
        ・・・・

なお、API仕様についてはPush7 APIの利用 - Push7 Docsを参照してください

 

データを送信する部分ですが、ほとんどこれまでの使いまわしです。
レスポンスのエラー判定方法と例外をraiseする部分をちょっと変更してあります。

    def __sendmessage(self, host, msg) :
        ・・・・

 

API関数です。
パラメータtitleにタイトル、bodyに本文を設定します。
disappearにTrueを設定する(or設定を省略する)とダッシュボードから送信した時の「クリックされるまで表示」を選択していないのと同等です。
disappearにFalseを設定するとダッシュボードから送信した時の「クリックされるまで表示」を選択したのと同等です。

    def push(self, title, body, disappear=True) : 
        ・・・・

 

以下が通知プログラムです。

app_no  = '<<ダッシュボードで取得したApp Number>>'
api_key = '<<ダッシュボードで取得したAPI Key>>'
icon    = '<<通知に表示するアイコンのURL>>'                  # httpsでなければならない。512文字以下
url     = '<<クリックしたときにジャンプするページのURL>>'    # 512文字以下

from tiny_push7 import tiny_push7

# 初期化
tp = tiny_push7(app_no, api_key, icon, url, debug=True)

title = "緊急指令"
body = "このメッセージは自動的に消滅する"
tp.push(title, body, True)
または
tp.push(title, body, False)         # クリックするまで表示される

 

iconに設定するURLは通知に表示するアイコンのURLですが、HTTPS接続のサイトにある必要があります。
もし用意できない場合は、Push7のアプリケーションで設定したアイコンを使うと良いでしょう。

  • アプリケーションで設定したアイコンのURLは以下にブラウザでアクセスすると調べられます。

urlには通知ウィンドウをクリックしたときにジャンプするページのURLを指定します。
実在しないページでも大丈夫なようですが、URLの形になっていないとエラーになります。
(ちょっと試した感じでは、 "" ⇒ NG  "http://hoge" ⇒ NG   "http://hoge.jp" ⇒ OK)
念のため「やほー」とか「ごーごる」のURLを入れておくのが良いと思います。
ここはHTTPS接続のサイトである必要はありません。
urlはpush API側で設定するようにしてもいいかなとも思いましたが、とりあえずコンストラクタで指定するようにしました。

あとは説明するまでもありませんね。

 

 

 

 

 

 

 

micropython on ESP32 でIFTTT(トリガ編)

micropythonでTwitterやLINE notifyにデータを送信するプログラムを作ってみましたが、IFTTTを使えば通知先を気にせず同じプログラムで送信できることに今更気が付きました。
ということで、IFTTTにデータを送信するプログラムを作ってみました。

まず、IFTTTにアクセスするモジュールをインストールします。
以下のプログラムをtiny_ifttt.pyという名前で保存し、いつものようにupipmでインストールしてください。

import usocket as socket
import ussl as ssl

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ---- percent encoding
def RFC3986_encode(s):
    ret = ''
    bts = s.encode('utf-8')
    for c in bts :
        if c in range(0x30, 0x39 + 1) or \
           c in range(0x41, 0x5a + 1) or \
           c in range(0x61, 0x7a + 1) or \
           c in (0x2d, 0x2e, 0x5f, 0x7e):
            ret += chr(c)
        else :
            ret += '%%%02X' % (c)
    return ret

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class tiny_ifttt :
    def __init__(self, my_key, debug=False) :
        # パラメータチェック
        if type(my_key) is not str:
            raise ValueError("key must be string")
        
        self.my_key      = my_key
        self.__DEBUG__   = debug

    def __debug_print(self, str) :
        if self.__DEBUG__ :
            print(str)
    
    def __makeRequestMessage(self, host, event, value1, value2, value3) :
        # make request
        request = 'POST /trigger/' + event + '/with/key/' + self.my_key
        request += '?'
        if not value1 is None :
            request += 'value1=' + RFC3986_encode(value1) + '&'
        if not value2 is None :
            request += 'value2=' + RFC3986_encode(value2) + '&'
        if not value3 is None :
            request += 'value3=' + RFC3986_encode(value3) + '&'
        # 最後の「?」または「&」を削除して「 HTTP/1.1」をつける 
        request = request[0:-1] + ' HTTP/1.1\n'
        
        # make message header
        header  = 'Host: ' + host['host'] + '\n'                    \
                + 'User-Agent: micropython ifttt agent v0.1\n'      \
                + 'Connection: close\n'                             \
                + 'Accept: */*\n'

        # request message
        ret = request + header + '\n'
        return(ret)
    
    def __sendmessage(self, host, msg) :
        sock = socket.socket()
        addr = socket.getaddrinfo(host['host'], host['port'])[0][-1]
        
        # connect socket
        sock.connect(addr)
        try :
            # SSL wrap
            ssl_sock = ssl.wrap_socket(sock)
            
            # send data
            ssl_sock.write(msg)
            
            # 受信データの最初の1行
            rcv_line = ssl_sock.readline()
            protover, status, msg = rcv_line.split(None, 2)
            # self.__debug_print('%s::::%s::::%s' % (protover, status, msg))
            # status 200以外はエラー
            if status != b"200":
                ssl_sock.close()
                raise ValueError(status)
            # それ以外のレスポンスヘッダを読む
            while True:
                rcv_line = ssl_sock.readline()
                # self.__debug_print(rcv_line)
                if not rcv_line:
                    # なんらかの異常なレスポンス(ヘッダが終わる前にデータがなくなった)
                    ssl_sock.close()
                    raise ValueError("Unexpected EOF in HTTP headers")
                if rcv_line == b'\r\n':
                    # 空行でヘッダ終了
                    break
        except Exception as e:
            # エラーが発生したらクローズして上位へ例外通知
            ssl_sock.close()
            raise e
        # メッセージ本体を受信(とりあえず読み捨て)
        rcv_line = b''
        while True :
            try :
                l = ssl_sock.readline()
            except Exception as e:              # エラーが発生したらクローズして上位へ例外通知
                ssl_sock.close()
                raise e
           
            if not l:                           # データがない → 終了
                break
            
            # 読み込んだデータをためる
            rcv_line += l
        
        self.__debug_print("@@@@" + str(rcv_line))
        self.__debug_print("close!!")
        ssl_sock.close()
    
    # ######## notify API ################################
    def trigger(self, event, value1=None, value2=None, value3=None) : 
        host = { 'host': 'maker.ifttt.com', 'port': 443}
        
        reqMessage = self.__makeRequestMessage(host, event, value1, value2, value3)
        self.__debug_print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        self.__debug_print(reqMessage)
        self.__debug_print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        
        self.__sendmessage(host, reqMessage)

プログラム自体はLINEのときとそんなに変わりません。
大きく違うのは送信するデータの作成部分(__makeRequestMessage メソッド)だけです。

次に、IFTTTのレシピを作成します。
ESP32からのデータを受け取るにはwebhooksというチャネルを使用します。これはREST APIでデータを受け取るチャネルのようです。
旧称は MakerChannel のようで、ネット上にはこの名前で検索すると結構情報が出てきたりします。
トリガ(this)、アクション(that)それぞれに対応していますが、ここではトリガとして使用する方を試してみます。
具体的な設定手順は、例によって先人の知恵を拝借します。
Nefry BTとIFTTTでスイッチを押したらLINEを送る仕組みを作ってみよう | dotstudio
「最後にFinishを押し、IFTTTのレシピの作成は完了です。」までの部分です。

 

準備ができたら以下のプログラムを実行します。

my_keyにはアカウントごとに割り当てられるキーを設定します。
上記ページではsettingをクリックしてほにゃららと書いてありますが、settingの左のDocumentationをクリックすると一番上にデカデカと表示されます。
eventにはレシピに設定したイベント名を設定します。上記ページでは「Nefry」となっているものです。
triggerのパラメータ、value1, value2, value3 は必要なものだけ記述してください。不要ならすべて省略してもかまいません。(eventは必須)

my_key = '取得したキー'
event  = '作成したイベント'

from tiny_ifttt import tiny_ifttt

# 初期化
ti = tiny_ifttt(my_key, debug=True)
ti.trigger(event, value1='へろー', value2='えぶり', value3='わん')

成功すれば、コンソールに

@@@@b"Congratulations! You've fired the <<イベント名>> event"
close!!

と表示され、LINEに以下のようなメッセージが届きます。

[IFTTT] Value 1: へろー
Value 2: えぶり
Value 3: わん

届かない場合、IFTTTのトップページから上のほうにあるメニューの一番右の「Activity」で「Applet ran」という表示が出ているか確認してください。
(失敗している場合は「Applet failed」と表示されます)
まだ表示されていない場合は、メニューの右から2番目の「My Applets」で作成したレシピを選択、下の方にある「Check now」をクリックしてみてください。
(webhooksをトリガとするレシピは大体数秒で実行されるようですが、まれに数分以上かかることがあります)

 

LINEに届いた日本語文字列が%数字%数字~となっていた場合は、シリアルコンソールやファイルの文字コードUTF-8になっているか確認してください。
また、シリアルコンソールからプログラムを入力する場合は、ペーストモードでコピペしてください。通常モードだと日本語部分が消えます。

my_key の設定が間違っていた場合、例外が発生し、

ValueError: b'401'

と表示されます。

 

eventが間違っていた場合は、プログラムは正常に終了しますが、IFTTT側では何も起こりません。
(居ない人が呼ばれたんだから、誰も返事しないのは当然)

これで、ESP32側のプログラムは一つでLINEやTwitterFacebook、e-mailなど色々な通知先を選択(またはすべてに同時配信)することができます。

また、アクションにDropboxやGoogleDriveを使用すればセンサの測定データを定期的に保存するといった使用方法もできるでしょう。

 

micropython on ESP32 で名前解決

別に子供の名前を決めてくれるわけではない。。。(スベった)

インターネットアクセスでマシン名からIPアドレスを取得するアレです。

公開されているサーバであれば、DNSで解決できるのですが、家の中のようなローカルな環境のマシンも名前でアクセスできたらいいですよね。
家でDNSサーバを動かす!というわけにもいかないし、、と思っていたら、
mDNS(Multicast Domain Name Service)とかLLMNR(Link-Local Multicast Name Resolution)というのがあると知りました。

茶番をスキップ

で、micropython上で使えないかなぁ、と色々調べてみました。
どうやら、mDNSはマルチキャスト通信でデータのやり取りを行うらしい(名前にMulticastって書いてあるやん)。
問い合わせをマルチキャストで投げたら、応答が問い合わせ元にユニキャストで返ってくるのかと思っていたんですが、応答もマルチキャストで返ってくるようです。
つまり、mDNSを使うにはマルチキャストアドレスでの待ち受けができないとダメということです。
micropythonのソースを眺めてみたけど、マルチキャストには対応していないようなので、あきらめました。
LLMNRの方はあんまり調べてないけど、似たようなもんだと思います。
どうしてもESP32でmDNS使いたい人は、ESP32のドライバのサンプル
esp-idf/examples/protocols/mdns/
があるので、そっちを参照してください。
まぁ、別タスクでこれを動かしておいて、そっちに問い合わせるという使い方も考えられますが。

ちゃんちゃん、
と終わっては話にならないので、さらに考えました。

mDNSの勉強のためにWiresharkでパケットキャプチャしていたのですが、mDNSとLLMNRのパケットの近くにいつも出ているNBNSというパケット、
中身を見てみるとどうやらこれも名前解決のためのパケットのようです。
で、ググりまくってみたところ、NetBT(NetBIOS over TCP/IP)のName Service らしい。
これだと、問い合わせはブロードキャストで送信、応答はユニキャストで返ってくるので、なんとかなりそう。

NetBTにはあまり良い印象はないのですが(情報も少ないし)、Windowsマシンでは必ず動いていますし、ubuntuでもsamba(windowsマシンへ共有ディレクトリを見せるためのプログラム)が入っていれば動いているので家庭内での環境としては悪くないと思います。
(nmbd というデーモンがこの機能を実現しています)

確かめていませんが、同様にRaspberryPiでもsambaをインストールすれば使えそうな気がします。

さらにググりまくって、仕様をみつけました。

その他、参考になりそうなサイト。

基礎から学ぶWindowsネットワーク:第18回 NetBIOS over TCP/IPプロトコル(その1) (3/3) - @IT

NetBIOSネームサービスでネットワーク内の端末をリアルタイムに列挙する:CodeZine(コードジン)

 

よし、なんとかなりそうだ。

カタカタ・・・カッターン。・・・カタカタ・・・カッターン。←プログラムを入力している音

が、、、、まさかの事実発覚!
ESP32版 micropythonはブロードキャストをサポートしていない!!
ここまで来て。。。。
しかーし、以下の修正でブロードキャスト対応できることを発見。
(正確には設定用のシンボルが定義されていなかっただけ)

diff --git a/esp32/modsocket.c b/esp32/modsocket.c
index fad42e9..6337938 100644
--- a/esp32/modsocket.c
+++ b/esp32/modsocket.c
@@ -559,6 +559,7 @@ STATIC const mp_map_elem_t mp_module_socket_globals_table[] = {
     { MP_OBJ_NEW_QSTR(MP_QSTR_IPPROTO_UDP), MP_OBJ_NEW_SMALL_INT(IPPROTO_UDP) },
     { MP_OBJ_NEW_QSTR(MP_QSTR_SOL_SOCKET), MP_OBJ_NEW_SMALL_INT(SOL_SOCKET) },
     { MP_OBJ_NEW_QSTR(MP_QSTR_SO_REUSEADDR), MP_OBJ_NEW_SMALL_INT(SO_REUSEADDR) },
+    { MP_OBJ_NEW_QSTR(MP_QSTR_SO_BROADCAST), MP_OBJ_NEW_SMALL_INT(SO_BROADCAST) },
 };
 
 STATIC MP_DEFINE_CONST_DICT(mp_module_socket_globals, mp_module_socket_globals_table);

変更しなくても、SO_BROADCASTの代わりに0x0020と数値で指定すればOKな気もする。。。。

で、ソースはこちら。
nbns.pyという名前で保存し、いつものようにupipmでインストールしてください

try:
    import usocket as socket        # for micropython
except:
    import socket                   # for python

try:
    import utime as time            # for micropython
except:
    import time                     # for python

try:
    import ustruct as struct        # for micropython
except:
    import struct                   # for python

# デバッグ用バイナリデータのダンプ
def dumpData(data) :
    # print(data)
    for i in range(len(data)) :
        print("%02x" % (data[i]), end=' ')
        if (i % 0x10) == 0x0f :
            print("")
    print("")


# NBNS
# http://www.ietf.org/rfc/rfc1002.txt
# http://www.atmarkit.co.jp/ait/articles/0405/20/news085_3.html
# NBNSのコンピュータ名のエンコード
# http://codezine.jp/article/detail/192


# 文字列のニブルエンコードを行う
# name   : string
# return : bytes
def NibbleEncode(name) :
    if len(name) > 15 :                 # 文字列は15文字以下でなければならない
        raise ValueError("name too long")

    # 大文字に変換
    name = name.upper()
    
    # 15文字に足りない分はスペースで埋める
                 # 123456789012345
    name = name + "               "     # 後ろにスペースをくっつけて
    name = name[0:15]                   # 15文字にする
    # print('"%s"'% (name))
    
    name_byte = name.encode('ascii')         # バイト列に変換
    name_byte += b'\0'                       # 後ろにidとして00をつける
    
    name_nibble=""              # 文字列初期化
    for i in range(len(name_byte)) :
        name_nibble += chr((name_byte[i] >> 4  ) + 0x41)    # ord('A') = 0x41 
        name_nibble += chr((name_byte[i] & 0x0f) + 0x41)
    # print('"%s"' % (name_nibble))
    Q_name = b'\x20'                            # size
    Q_name += name_nibble.encode('ascii')       # ニブルエンコードしたものをバイト列に変換
    Q_name += b'\x00'                           # terminate
    return Q_name


# ニブルエンコードされたバイト列のデコードを行う
# name   : bytes
# return : string, int
def NibbleDecode(name) :
    if not len(name) == 34 :                            # サイズは34バイトでなければならない(1+32+1)
        raise ValueError("name length is wrong")
    if not (name[0] == 0x20 and name[-1] == 0x00) :     # 先頭は0x20、末尾は0x00でなければならない
        raise ValueError("data is wrong")

    # 名前本体部分を取り出し
    name_byte = name[1:-1]
    
    # 0~15文字目をデコード
    name_decpded = ""
    for i in range(15) :                        # 15 = int(len(name_byte) / 2) - 1
        data_u = name_byte[i * 2    ] - 0x41    # ord('A') = 0x41 
        data_l = name_byte[i * 2 + 1] - 0x41
        name_decpded += chr((data_u << 4) + data_l)
    
    # 16文字目は識別子
    data_u = name_byte[15 * 2    ] - 0x41
    data_l = name_byte[15 * 2 + 1] - 0x41
    id = (data_u << 4) + data_l
    
    # print('"%s" <%02x>' % (name_decpded, id))
    # デコードした名前の右側の空白を削除して返す
    return name_decpded.rstrip(), id

# IPアドレスを32bit数値化(Cのinet_addrと同等の処理)
def inet_addr(addr) :
    if not addr.count('.') == 3 :
        # '.' が3つでないとIPアドレスではない
        raise ValueError("IP address format error")
    tmp = addr.split('.')       # '.' を区切り文字として分割
    n0 = int(tmp[0])            # それぞれを数値化
    n1 = int(tmp[1])
    n2 = int(tmp[2])
    n3 = int(tmp[3])
    if n0 > 255 or n1 > 255 or n2 > 255 or n3 > 255 :
        # 255より大きい値はIPアドレスでない
        raise ValueError("IP address format error")
    return (n0 << 24) | (n1 << 16) | (n2 << 8) | n3 

# 問い合わせパケットの作成
def makePacket(transaction_id, name) :
    OP_flag = 0x0110      # flag           15: R   14-11: OPCODE   10-4: NMflag 3-0: RCODE
        # 15    : R        0:request
        # 14-11 : OPCODE   0000  query
        # 10-4  : NMflag   0010001 
        #                  ||||  +--- B broadcast
        #                  |||+------ RA
        #                  ||+------- RD
        #                  |+-------- TC
        #                  +--------- AA
        # 3-0   : RCODE    0000
    QD_count = 1        # questions
    AN_count = 0        # answers
    NS_count = 0        # authorities
    AR_count = 0        # additionals
    Q_name = NibbleEncode(name)         # ニブルエンコードしてバイト列に変換
    Q_type = 0x0020     # 0x0020:  NB
    Q_class = 0x0001    # 0x0001:  Internet class

    # 質問エントリ
    q_entry  = Q_name
    q_entry += struct.pack("!HH", Q_type, Q_class)

    pack = struct.pack("!HHHHHH", transaction_id, OP_flag, QD_count, AN_count, NS_count, AR_count)
    pack += q_entry
    # dumpData(pack)
    return pack



# 応答パケットの解析
def analysisPacket(pack) :
    # 応答パケットの先頭部分の解析
    (transaction_id, OP_flag, QD_count, AN_count, NS_count, AR_count) = struct.unpack_from("!HHHHHH", pack, 0)
    # print("id=%04x  flag=%04x   QD=%04x   AN=%04x   NS=%04x   AR=%04x" % (transaction_id, OP_flag, QD_count, AN_count, NS_count, AR_count))
    if not (QD_count == 0 and AN_count == 1 and NS_count == 0 and AR_count == 0) :
        # 応答レコード数1だけ対応
        raise ValueError("response value not expected")
    
    # 本来はIDのチェックを行うべきだが、ローカルポートなので、他のデータは入ってこないハズなので省略
    
    res_record = pack[12:]        # 応答レコード部分を取り出し
    # dumpData(res_record)
    RR_name = res_record[0:34]          # 名前部分を取り出して
    name, id = NibbleDecode(RR_name)    # デコード
    # print('"%s" <%02x>' % (name, id))

    (RR_type, RR_class, RR_TTL, RD_length) = struct.unpack_from("!HHLH", res_record, 34)
    # print("RR_type=%04x  RR_class=%04x  RR_TTL=%08x  RD_length=%04x" % (RR_type, RR_class, RR_TTL, RD_length))
    # RD_lengthにIPアドレス部分の応答データサイズが入っている
    tmp = res_record[34+10:]
    ipaddr_strs = []
    for i in range(RD_length // 6) :    # IPアドレス1個あたりのデータは6Byte(Flag2ByteとIPアドレス4Byte)
        # ipaddr = tmp[i*6+2:i*6+6]
        # ipaddr_strs.append(socket.inet_ntoa(ipaddr))
        ipaddr_str = '%d.%d.%d.%d' % (tmp[i*6+2], tmp[i*6+3], tmp[i*6+4], tmp[i*6+5])
        ipaddr_strs.append(ipaddr_str)
    
    return ipaddr_strs

def get_addr(name, address, mask, timeout=10) :
    # 乱数の代わりに時刻を使う
    id = int(time.time()) & 0xffff
    
    # 問い合わせパケットの作成
    pack = makePacket(id, name)
    try :
        # micropythonはこれでaddrを取得しないとエラーになる
        addr = socket.getaddrinfo(address, 137)[0][-1]
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.sendto(pack, addr)
        try :
            s.settimeout(timeout)
        except :
            # linux版 micropythonはsettimeoutできない
            print("can not set socket timeout")
        repl, addr = s.recvfrom(512)
        # print("addr:" + str(addr))
        # dumpData(repl)
        s.close()
    except Exception as e:
        s.close()
        # 失敗したらNoneを返す
        print(e)
        return None
    
    # 受信データの解析
    ipaddrs = analysisPacket(repl)
    # WindowsだとNICが複数あると複数のデータが返ってくる
    # for i in range(len(ipaddrs)) :
    #     print("%d %s" % (i, ipaddrs[i]))
    
    # サブネットマスクをintに変換した値
    mask_addr = inet_addr(mask)
    # サブネットアドレスをintに変換した値
    subnet_addr = inet_addr(address) & mask_addr;
    # print("%08x   %08x" %(subnet_addr, mask_addr))
    
    # サブネットアドレスに一致するアドレスを検索する
    for i in range(len(ipaddrs)) :
        tmp = inet_addr(ipaddrs[i]) & mask_addr
        if tmp == subnet_addr :
            # 見つかった
            return ipaddrs[i]
    
    # 見つからなかったらNoneを返す
    return None

 

で、使い方はこんな感じ。
addressにブロードキャストアドレス、maskにサブネットマスクを指定し
nameにマシン名(コンピュータ名)を指定してnbns.get_addr()を呼び出すと
対象マシンのIPアドレスの「文字列」が返ります。
nameで指定した名前のマシンが見つからない場合は、Noneが返ります。
マシン名は大文字/小文字の区別は行いません。これはNetBTの仕様で、すべて大文字として取り扱われます。
マシン名に全角文字は指定できません。これはNetBTでは全角文字をShiftJISで管理していますがmicropythonではShiftJISを扱えないためです。

import nbns

address = "192.168.78.255"
mask    = "255.255.255.0"
name = "myPC"
print("==== %s ====" % name)
ipaddr = nbns.get_addr(name, address, mask)
print("ipaddr[%s] = %s" % (name, ipaddr))

 

結果はこんな感じで表示されます。

==== myPC ====
ipaddr[myPC] = 192.168.78.66

 

マシンが見つからなかったときなどのタイムアウト時間はパラメータtmoutで指定(単位:秒)できます。
デフォルトは10秒です。
以下の例ではタイムアウトまでの時間を3秒にしています。

ipaddr = nbns.get_addr(name, address, mask, timeout=3)

戻り値は文字列なので、得られたIPアドレスは socket.getaddrinfo()にそのまま渡すことができます。

このプログラムはLinux版micropythonやpython3(3.5.2で確認)でも動作します。
しかし、Linux版micropythonではsocketのタイムアウト時間を指定できません。よって、応答がなければ永遠に待ち続けてしまいますので注意してください。

 

以下解説。

 

ニブルエンコード処理部分です。
文字列を入力すると、ニブル変換したバイト列を返します。

def NibbleEncode(name) :
    ・・・・

ニブルデコード処理部分です。
ニブル変換されたバイト列を入力すると、名前の文字列と識別子を返します。

def NibbleDecode(name) :
    ・・・・

 

ニブル変換については以下のページの真ん中あたりに説明があります。

NetBIOSネームサービスでネットワーク内の端末をリアルタイムに列挙する:CodeZine(コードジン)

 

IPアドレス("xx.xx.xx.xx")の数値部分を接続して32bit整数値を生成します。
C言語のinet_addrと同等の処理です。
サブネットアドレスの比較を行いやすいようにIPアドレスのstring型をint型に変換したいときに使用します。

def inet_addr(addr) :
    ・・・・

 

問い合わせパケットの生成処理です。
名前部分以外固定値なので、b'~~'の固定値で書いても良いのですが、
分かりやすさを重視して各パラメータを設定したあと、struct.pack()でパケットを組み立てています。

def makePacket(transaction_id, name) :
    ・・・・

 

応答パケットの解析処理です。

def analysisPacket(pack) :
    ・・・・

応答パケットの解析処理はちょっと処理が多いので、さらに詳しく。
簡単にするため、応答レコード数=1、その他レコード数=0 のパケットのみ対応してしています。

    (transaction_id, OP_flag, QD_count, AN_count, NS_count, AR_count) = struct.unpack_from("!HHHHHH", pack, 0)
    if not (QD_count == 0 and AN_count == 1 and NS_count == 0 and AR_count == 0) :
        # 応答レコード数1だけ対応
        raise ValueError("response value not expected")

 

名前部分を取り出してデコードしていますが、使ってないのでコメントアウトしてしまっても良いでしょう。

    RR_name = res_record[0:34]          # 名前部分を取り出して
    name, id = NibbleDecode(RR_name)    # デコード

 

名前以降の固定長部分を取り出しています。
以降で参照しているのはRD_lengthだけです。

    (RR_type, RR_class, RR_TTL, RD_length) = struct.unpack_from("!HHLH", res_record, 34)

 

RD_length 以降にフラグ(2Byte)とIPアドレス(4Byte)が入っているのですが、このデータが1つとは限りません。
複数のNICが接続されている場合、NICの数だけデータが入っています(Windowsの場合)。
したがって、(RD_length // 6) でデータ組数を求めて、すべてのIPアドレスをリストに格納しています。
ここではフラグは使用しないので、フラグ部分(tmp[i*6+0]、tmp[i*6+1])は読み込んでいません。
Pythonだと、socket.inet_ntoa()でスマートに変換できるのですが、microputhonにはないので、泥臭い手法で行っています。

    tmp = res_record[34+10:]
    ipaddr_strs = []
    for i in range(RD_length // 6) :    # IPアドレス1個あたりのデータは6Byte(Flag2ByteとIPアドレス4Byte)
        # ipaddr = tmp[i*6+2:i*6+6]
        # ipaddr_strs.append(socket.inet_ntoa(ipaddr))
        ipaddr_str = '%d.%d.%d.%d' % (tmp[i*6+2], tmp[i*6+3], tmp[i*6+4], tmp[i*6+5])
        ipaddr_strs.append(ipaddr_str)

取得したIPアドレスのリストを返します。

    return ipaddr_strs

応答パケットの解析処理はここまで。

 

このモジュールのAPIです。
タイムアウトのデフォルト値はここで10に設定しています。

def get_addr(name, address, mask, timeout=10) :
    ・・・・

APIも中身を詳しく見ていきましょう。
idは問い合わせパケットを識別するためのユニークな値が入っていれば良いので、時刻データで代用しています。
ただ、このモジュールでは複数の問い合わせパケットを同時に送受信することはないので、固定値でも良いかもしれません。

    id = int(time.time()) & 0xffff

 

問い合わせパケットを作成します。

    pack = makePacket(id, name)

 

UDPソケットを生成してブロードキャストに設定し、データを送信しています。

        addr = socket.getaddrinfo(address, 137)[0][-1]
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.sendto(pack, addr)

 

応答がなかったときのために、受信タイムアウトを設定しています。
linux版micropythonではsocket.settimeout()は用意されていないので例外が発生します。
例外が発生したときはタイムアウトを設定できない旨表示して処理を続けます。

        try :
            s.settimeout(timeout)
        except :
            print("can not set socket timeout")

 

応答を受信します。
replに受信データが、addrに送信元アドレスが入ります。

        repl, addr = s.recvfrom(512)

実質、ここのaddrが取得したいIPアドレスだという気もしますが。。。

 

送受信処理の例外処理部です。
実質、受信タイムアウト処理です。ソケットをcloseしてNoneを返しています。

    except Exception as e:
        s.close()
        print(e)
        return None

 

受信パケットを解析してIPアドレスのリストを取得します。

    ipaddrs = analysisPacket(repl)

 

サブネットマスクとサブネットアドレスをint型に変換します。
mask:"255.255.255.0" (0xff.0xff.0xff.0x0) → 0xffffff00
addr:"192.168.78.255" (0xc0.0xa8.0x4e.0xff) → 0xc0a84eff
subnet_addr : 0xc0a84eff & 0xffffff00 → 0xc0a84e00

    mask_addr = inet_addr(mask)
    subnet_addr = inet_addr(address) & mask_addr;

 

取得したIPアドレスのリストからサブネットアドレスに一致するIPアドレスを検索します。
処理としては、IPアドレスサブネットマスクでマスクし、サブネットアドレスと一致するものを探します。
見つかったらそのIPアドレスを返します。
もし、複数のサブネットアドレスに一致するIPアドレスがあっても、最初に出てきたものが有効になります。

    for i in range(len(ipaddrs)) :
        tmp = inet_addr(ipaddrs[i]) & mask_addr
        if tmp == subnet_addr :
            return ipaddrs[i]

 

見つからなかったらNoneを返します(ここには来ないハズ)。

    return None

 

micropython on ESP32 でDAC

今更だけど備忘録として。

ESP32でアナログ電圧を出力するのにDAC(Digital-Analog Converter)を使用できます。
DAC出力端子として、Pin25、Pin26を指定できます。
それ以外の端子を指定すると例外ValueErrorがraise されます。
初期化は以下のように実行します。
初期化直後の出力レベルは0Vです。

import machine

pin25 = machine.Pin(25,machine.Pin.OUT)
dac0 = machine.DAC(pin25)

 

DACの出力レベルを変更するには以下のように実行します。
値は0~255が指定可能で、指定値/256* VDD が出力されます(ま、精度はアレなので、大体ね)。
範囲外の値を指定すると例外ValueErrorがraise されます。

dac0.write(100)

 

設定端子を確認するには以下のように実行します。
現在の出力レベルは取得できないようです。

print(dac0)
    ==> DAC(Pin(25))

 

micropython on ESP32 でPWM

今更だけど備忘録として。

ESP32でパルスを出力するのにPWM(Pulse Width Modulation)を使用できます。

出力端子は出力可能な任意のpinを指定可能ですが、設定できる端子は最大8本です。
周波数はすべてのチャンネルで共通です。
周波数を変更すると、パルス幅も変わります(デューティ比で指定されているので)
初期化方法は以下の通りです。
freq で周波数(単位Hz)、dutyで出力パルス幅(単位(100/1024)%)を指定します。
デフォルトはfreq=5000(他の端子で設定済みの場合はその値を引き継ぐ)、duty=512

import machine
pin19=machine.Pin(19,machine.Pin.OUT)
pwm0=machine.PWM(pin19, freq=2000, duty=100)

 

出力パルス幅(High期間)を変更するには以下のように実行します。
単位は(100/1024)%
設定値に0を指定したとき全期間でLow出力、1023を指定したとき全期間の1023/1024のパルスを出力します。全期間でHigh出力することはできません。
1024以上を設定した場合は、設定値は0x3ffでマスクした値が使用されます。
つまり、1025を指定すると、1を指定したことになります。

pwm0.duty(100)

 

周波数の変更を変更するには以下のように実行します。
周波数はすべてのチャンネルで共通なので、1本だけ周波数を変更することはできません。
設定値の単位はHz。

pwm0.freq(1000)

 

設定値を確認するには以下のように実行します。

print(pwm0)
    ==> PWM(19, freq=2000, duty=100)

 

micropython on ESP32 でADC

今更だけど備忘録として。

ESP32でアナログ電圧を取得するのにADC(Analog-Digital Converter) を使用できます。
ESP32のADCのアナログ入力の電圧範囲は0~VDD(3.3V)です(ESP8266とは異なります)。
ADC入力端子として、Pin36、Pin37、Pin38、Pin39、Pin32、Pin33、Pin34、Pin35 を指定できます。
それ以外の端子を指定すると例外ValueErrorがraise されます。

 

初期化方法は以下の通りです。

import machine

pin36=machine.Pin(36, machine.Pin.IN)
adc0 = machine.ADC(pin36)

 

アナログ電圧値を取得するには以下のように実行します。
取得できる値は、デフォルト設定時、有効桁12bitなので、0~4095(0x0fff)の整数値です。
ただし、デフォルトのattenuation(減衰率?)が0dBに設定されていて、入力電圧が約1V程度で変換結果が最大値になります(ESP8266との互換性??)。

val = adc0.read()

 

attenuation(減衰率?)を変更するには以下のように実行します。
11dBを設定すると、大体フルレンジでの変換になるようです。

adc0.atten(machine.ADC.ATTN_11DB)

 

設定値 attenuation  
machine.ADC.ATTN_0DB 0dB (デフォルト)
machine.ADC.ATTN_2_5DB 2.5dB  
machine.ADC.ATTN_6DB 6dB  
machine.ADC.ATTN_11DB 11dB  

 

 

また、以下のように実行すると、ADCの変換結果の有効桁数を変更できます。

adc0.width(machine.ADC.WIDTH_9BIT)

 

設定値 変換結果  
machine.ADC.WIDTH_9BIT 0~511  
machine.ADC.WIDTH_10BIT 0~1023  
machine.ADC.WIDTH_11BIT 0~2047  
machine.ADC.WIDTH_12BIT 0~4095 (デフォルト)