いっぺーちゃんの いろいろやってみよ~

micropython on ESP32 でPWM

今更だけど備忘録として。

ESP32でパルスを出力するのにPWM(Pulse Width Modulation)を使用できます。

出力端子は出力可能な任意のpinを指定可能ですが、設定できる端子は最大8本です。
周波数はすべてのチャンネルで共通です。
周波数を変更すると、パルス幅も変わります(デューティ比で指定されているので)
初期化方法は以下の通りです。
freq で周波数(単位Hz)、dutyで出力パルス幅(単位(100/1024)%)を指定します。
デフォルトはfreq=5000(他の端子で設定済みの場合はその値を引き継ぐ)、duty=512

import machine
pin19=machine.Pin(19,machine.Pin.OUT)
pwm0=machine.PWM(pin19, freq=2000, duty=100)

 

出力パルス幅(High期間)を変更するには以下のように実行します。
単位は(100/1024)%
設定値に0を指定したとき全期間でLow出力、1023を指定したとき全期間の1023/1024のパルスを出力します。全期間でHigh出力することはできません。
1024以上を設定した場合は、設定値は0x3ffでマスクした値が使用されます。
つまり、1025を指定すると、1を指定したことになります。

pwm0.duty(100)

 

周波数の変更を変更するには以下のように実行します。
周波数はすべてのチャンネルで共通なので、1本だけ周波数を変更することはできません。
設定値の単位はHz。

pwm0.freq(1000)

 

設定値を確認するには以下のように実行します。

print(pwm0)
    ==> PWM(19, freq=2000, duty=100)

 

micropython on ESP32 でADC

今更だけど備忘録として。

ESP32でアナログ電圧を取得するのにADC(Analog-Digital Converter) を使用できます。
ESP32のADCのアナログ入力の電圧範囲は0~VDD(3.3V)です(ESP8266とは異なります)。
ADC入力端子として、Pin36、Pin37、Pin38、Pin39、Pin32、Pin33、Pin34、Pin35 を指定できます。
それ以外の端子を指定すると例外ValueErrorがraise されます。

 

初期化方法は以下の通りです。

import machine

pin36=machine.Pin(36, machine.Pin.IN)
adc0 = machine.ADC(pin36)

 

アナログ電圧値を取得するには以下のように実行します。
取得できる値は、デフォルト設定時、有効桁12bitなので、0~4095(0x0fff)の整数値です。
ただし、デフォルトのattenuation(減衰率?)が0dBに設定されていて、入力電圧が約1V程度で変換結果が最大値になります(ESP8266との互換性??)。

val = adc0.read()

 

attenuation(減衰率?)を変更するには以下のように実行します。
11dBを設定すると、大体フルレンジでの変換になるようです。

adc0.atten(machine.ADC.ATTN_11DB)

 

設定値 attenuation  
machine.ADC.ATTN_0DB 0dB (デフォルト)
machine.ADC.ATTN_2_5DB 2.5dB  
machine.ADC.ATTN_6DB 6dB  
machine.ADC.ATTN_11DB 11dB  

 

 

また、以下のように実行すると、ADCの変換結果の有効桁数を変更できます。

adc0.width(machine.ADC.WIDTH_9BIT)

 

設定値 変換結果  
machine.ADC.WIDTH_9BIT 0~511  
machine.ADC.WIDTH_10BIT 0~1023  
machine.ADC.WIDTH_11BIT 0~2047  
machine.ADC.WIDTH_12BIT 0~4095 (デフォルト)

 

 

 

micropython on ESP32 でLINE

Twitterに飽きたらLINEもね。ということで、Espruino on ESP32 で LINE(その1) のmicprpython版です。

準備として、上のリンクにある通り、アクセストークンを取得しておきます。

 

 以下がLINE Notifyにメッセージを送信するモジュールです。
tiny_line.py という名前で保存し、upipmなどでインストールしておきます。

import usocket as socket
import ussl as ssl

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# ---- percent encoding
def RFC3986_encode(s):
    ret = ''
    bts = s.encode('utf-8')
    for c in bts :
        if c in range(0x30, 0x39 + 1) or \
           c in range(0x41, 0x5a + 1) or \
           c in range(0x61, 0x7a + 1) or \
           c in (0x2d, 0x2e, 0x5f, 0x7e):
            ret += chr(c)
        else :
            ret += '%%%02X' % (c)
    return ret

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class tiny_line :
    def __init__(self, access_token, debug=False) :
        # パラメータチェック
        if type(access_token) is not str:
            raise ValueError("access key must be string")
        
        self.access_token      = access_token
        self.__DEBUG__         = debug

    def __debug_print(self, str) :
        if self.__DEBUG__ :
            print(str)
    
    def __makeRequestMessage(self, method, host, message) :
        method = method.upper()                                     # to upper
        
        # make request
        request = method + ' ' + host['endpoint'] + ' HTTP/1.0\n'   # HTTP/1.0 を指定するとChunked Transferにならないので、レスポンスが1行で済む
        
        # make message body
        body = 'message=' + RFC3986_encode(message)
        
        # make message header
        header  = 'Host: ' + host['host'] + '\n'                        \
                + 'User-Agent: micropython line Bot v0.1\n'             \
                + 'Accept: */*\n'                                       \
                + 'Authorization: Bearer ' + self.access_token +'\n'    \
                + 'Content-Type: application/x-www-form-urlencoded\n'   \
                + 'Content-Length: ' + str(len(body)) + '\n'
                # HTTP/1.0の場合は以下は要らない
                #+ 'Connection: close\n'                                 \

        # request message
        ret = request + header + '\n' + body + '\n'
        return(ret)
    
    def __sendmessage(self, host, msg) :
        sock = socket.socket()
        addr = socket.getaddrinfo(host['host'], host['port'])[0][-1]
        
        # connect socket
        sock.connect(addr)
        try :
            # SSL wrap
            ssl_sock = ssl.wrap_socket(sock)
            
            # send data
            ssl_sock.write(msg)
            
            # 受信データの最初の1行
            rcv_line = ssl_sock.readline()
            protover, status, msg = rcv_line.split(None, 2)
            # self.__debug_print('%s::::%s::::%s' % (protover, status, msg))
            # status 200以外はエラー
            if status != b"200":
                raise ValueError(status)
            # それ以外のレスポンスヘッダを読む
            while True:
                rcv_line = ssl_sock.readline()
                # self.__debug_print(rcv_line)
                if not rcv_line:
                    # なんらかの異常なレスポンス(ヘッダが終わる前にデータがなくなった)
                    raise ValueError("Unexpected EOF in HTTP headers")
                if rcv_line == b'\r\n':
                    # 空行でヘッダ終了
                    break
        except Exception as e:
            # エラーが発生したらクローズして上位へ例外通知
            ssl_sock.close()
            raise e
        # メッセージ本体を受信(とりあえず読み捨て)
        # 制限回数を超えた場合なんかはここでチェックが必要?
        rcv_line = b''
        while True :
            try :
                l = ssl_sock.readline()
            except Exception as e:              # エラーが発生したらクローズして上位へ例外通知
                ssl_sock.close()
                raise e
           
            if not l:                           # データがない → 終了
                break
            
            # 読み込んだデータをためる
            rcv_line += l
        
        # self.__debug_print("@@@@" + str(rcv_line))
        self.__debug_print("close!!")
        ssl_sock.close()
    
    # ######## notify API ################################
    def notify(self, msg) : 
        host = { 'host': 'notify-api.line.me', 'port': 443, 'endpoint': '/api/notify'}
        
        reqMessage = self.__makeRequestMessage('POST', host, msg)
        self.__debug_print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        self.__debug_print(reqMessage)
        self.__debug_print('%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%')
        
        self.__sendmessage(host, reqMessage)

 送信側だけなので、ブロッキングモードのみです。

 準備ができたら、以下のプログラムをコンソールでコピペ実行するか、pyboard.pyで実行します。
しつこいですが、日本語を使うときは文字コードUTF-8でファイル保存してください。
実行すると、LINEに「へろー」と送信されます。

access_token    = '取得したアクセストークン'

from tiny_line import tiny_line

# 初期化
tl = tiny_line(access_token, debug=True)
# メッセージ送信
tl.notify("へろー")

 

 以下、解説。。。と思ったのですが、何も解説することがありません(笑)。

LINEの場合、時刻情報は不要なので、時刻合わせをする必要はありません。と、無理やり解説。。。

 

micropython on ESP32 でTwitter(その4)

前回の受信プログラムはブロッキング動作のため、受信動作を行うとコールバック以外の処理が行えなくなってしまいます。
そこで、今回はノンブロッキング動作に変更して他の処理を行えるように変更してみます。

とはいえ、tiny_twitterモジュールには既にノンブロッキング動作を行える仕組みが組み込んであります。
以下のプログラムをコンソールでコピペ実行するか、pyboard.pyで実行します。

 

consumer_key    = '取得した Consumer key'
consumer_secret = '取得した Consumer secret'
access_token    = '取得した Access Token'
access_secret   = '取得した Access Token Secret'

import sys
platform = sys.platform
# platform :    ESP32 => 'esp32'
#               Linux => 'linux'

from tiny_twitter import tiny_twitter
import ujson as json

if platform == 'esp32' :
    # ESP32のときは時刻合わせする
    import ntptime
    import utime
    utime.set_time(ntptime.time())

# データ受信時のコールバック関数
def CallbackFunc(jsn) :
    # print("#### " + json.dumps(jsn))
    print("--------\n" + jsn.get("text", "NO_DATA") + "\n--------")

# 初期化
tw = tiny_twitter(consumer_key, consumer_secret, access_token, access_secret, CallbackFunc, blocking=False, debug=True)


# twitterに接続
param = {"with":"user", "track":"てすとつぃーと"};
tw.userstream(param);


# プロンプト
prmpt = ['-', '\\', '|', '/']
# 表示するプロンプトのインデックス
idx = 0
# 受信ループ
while True :
    if tw.recieve() < 0 :
        # クローズされたら終了
        break
    # プロンプトをくるくる回す
    print(prmpt[idx], end="\r")
    idx = (idx + 1) & 0x03

tiny_twitter.userstream()は、twitterに接続し、レスポンスヘッダを受信した後戻ってきます。

後は定期的にtiny_twitter.recieve()を実行します。
受信データがあれば、ブロッキング動作時と同様にcallbackが呼び出されます。
このプログラムでは、特にほかにやることがないので、プロンプトをくるくると回しています。
フォントが円マーク(¥)でなく、バックスラッシュ(\)なら棒がくるくる回っているように見えると思いますが。。。

 

以下、前回からの変更箇所の説明。

初期化のパラメータに「blocking=False」を追加しています。
パラメータ blockingは、省略時Trueになるように設定されているので、前回のプログラムでは「blocking=True」を指定したのと同等です。

 

tw = tiny_twitter(consumer_key, consumer_secret, access_token, access_secret, CallbackFunc, blocking=False, debug=True)

 

メインループです。
定期的にtw.recieve()をコールして受信処理を行っています。
接続がクローズされると、tw.recieve()は-1を返します。それ以外(受信データがあってもなくても)は0を返します。
接続がクローズされたらループから抜けてプログラム終了です。
ループ内では'-', '\'(半角), '|', '/' を順に表示しています。
実際はここで本来やりたい処理を行います。

# プロンプト
prmpt = ['-', '\\', '|', '/']
# 表示するプロンプトのインデックス
idx = 0
# 受信ループ
while True :
    if tw.recieve() < 0 :
        # クローズされたら終了
        break
    # プロンプトをくるくる回す
    print(prmpt[idx], end="\r")
    idx = (idx + 1) & 0x03

 

明示的にクローズしたいときは、以下のように実行します。

tw.close()

 

micropython on ESP32 でTwitter シリーズはこれにて一旦終了です。

micropython on ESP32 でTwitter(その3)

今回はtweet内容を受信するプログラムを試してみます。
Espruino on ESP32 で Twitter(その4) のmicropython版です。

あまりたくさんのtweetを一度に受信してしまうと、メモリ不足で落ちてしまいます。
それを防ぐため、1つのtweetのデータが大きすぎる場合は、そのデータを破棄します。現在は16kByte以上のデータは破棄するように設定してあります(もっと小さい方が良いかも)。
(引用ツイートのリツイートで色々ユーザ情報が付加されているとデータが大きくなることがあります。サーバ側でフィルタをかけられれば良いのですが。。。)
また、不完全なデータ(一部が抜け落ちている等)が送られてきた場合は、その後のtweetを含めて破棄されることがあります。
(データの終わりが見つからないので、↑の大データリミットに引っかかって破棄されます。このとき、後続のデータも含めて破棄されます。データの先頭や最後をうまく見つける方法があれば良いのですが。。。)

以下のプログラムをコンソールでコピペ実行するか、pyboard.pyで実行します。
twitterに接続したあと、データ受信待ちになります。
以下の設定の場合、自分がtweetするか、「てすとつぃーと」を含むtweetがあった場合、データを表示します。

consumer_key    = '取得した Consumer key'
consumer_secret = '取得した Consumer secret'
access_token    = '取得した Access Token'
access_secret   = '取得した Access Token Secret'

import sys
platform = sys.platform
# platform :    ESP32 => 'esp32'
#               Linux => 'linux'

from tiny_twitter import tiny_twitter
import ujson as json

if platform == 'esp32' :
    # ESP32のときは時刻合わせする
    import ntptime
    import utime
    utime.set_time(ntptime.time())

# データ受信時のコールバック関数
def CallbackFunc(jsn) :
    # print("#### " + json.dumps(jsn))
    print("--------\n" + jsn.get("text", "NO_DATA") + "\n--------")

# 初期化
tw = tiny_twitter(consumer_key, consumer_secret, access_token, access_secret, CallbackFunc, debug=True)


# twitterに接続
param = {"with":"user", "track":"てすとつぃーと"};
tw.userstream(param);

 

ほとんど前回と同じなので、違う部分だけ説明。

コールバック関数は、基本的に前回と同じですが、今回はtweet内容だけ表示するようにしてあります。

日本語を正常に表示するには、コンソールの日本語モードをUTF-8に設定しておく必要があります。
が、それ以外の言語(韓国語やアラビア語など)が送られてくるとコンソールによっては正常に表示されません。

def CallbackFunc(jsn) :
    # print("#### " + json.dumps(jsn))
    print("--------\n" + jsn.get("text", "NO_DATA") + "\n--------")

たとえば、こんな感じでlangプロパティが"ja"(日本語)か"en"(英語)のときだけ表示するという処理にすると
日本語と英語のtweetのみを表示できるようになります。

def CallbackFunc(jsn) :
    lang = jsn.get("lang", "UNKNOWN")
    text = jsn.get("text", "NO_DATA")
    if lang == "ja" or lang == "en" :
        print("-------- lang: " + lang)
        print(text)
        print("--------")

しかし、langプロパティはサーバ側でtweet内容から判断して付加している情報ですので、完璧ではありません。
たまに韓国語や中国語が混じっている場合があります。
また、日本語や英語であっても絵文字は表示できません。

 

twitterに接続します。
paramに設定できるパラメータは GET user — Twitter DevelopersStreaming API request parameters — Twitter Developers を参照してください。

"language":"en,ja"を追加すれば、↑のようにcallbackで振り分けなくても英語と日本語だけ取得になるような気がします(試してないけど)。

正常に接続されれば、接続されたまま、データを受信するたびにコールバック関数が呼び出されます。
ソケット通信がブロッキング動作のため、tiny_twitter.userstream()はエラーが発生して接続がcloseされるまで戻ってきません。

param = {"with":"user", "track":"てすとつぃーと"};
tw.userstream(param);

 

次回に続く。。。

micropython on ESP32 でTwitter(その2)

さて、早速tweetしてみましょう。

ESP32はmicropython on ESP32 でWi-Fiルータ(or AP)に接続するにしたがってWi-Fiに接続されていて、micropython on ESP32 でTwitter(その1)の手順で各モジュールが改造/インストール済みであるものとします。

 

以下のプログラムをコンソールでコピペ実行するか、pyboard.pyで実行します。
日本語を使用する場合、エディタの文字コードUTF-8を選択しておいてください(これ重要!!)。

consumer_key    = '取得した Consumer key'
consumer_secret = '取得した Consumer secret'
access_token    = '取得した Access Token'
access_secret   = '取得した Access Token Secret'

import sys
platform = sys.platform
# platform :    ESP32 => 'esp32'
#               Linux => 'linux'

from tiny_twitter import tiny_twitter
import ujson as json

if platform == 'esp32' :
    # ESP32のときは時刻合わせする
    import ntptime
    import utime
    utime.set_time(ntptime.time())

# データ受信時のコールバック関数
def CallbackFunc(jsn) :
    print("#### " + json.dumps(jsn))
    # print("--------\n" + jsn.get("text", "NO_DATA") + "\n--------")

# 初期化
tw = tiny_twitter(consumer_key, consumer_secret, access_token, access_secret, CallbackFunc, debug=True)

# tweetする
query_params = {"trim_user":"true", "include_entities":"false"}
msg = "てすとつぃーと"
tw.tweet(msg, query_params)

成功すれば、「てすとつぃーと《改行》(2017/08/21[Mon] 09:11:18)」のようにtweetされるはずです。

 

 

以下解説。

実行環境のチェックのための値取得です。
ESP32のときは platform に 'esp32'が、Linuxのときは 'linux' が格納されます。

import sys
platform = sys.platform
# platform :    ESP32 => 'esp32'
#               Linux => 'linux'

 

必要なモジュールをインポートします。
tiny_twittertwitterアクセスのためのモジュールです。これがなければ始まりません。
ujson はJSONデータを処理するためのモジュールです。
twitterのデータはJSONデータで送られてきますが、tiny_twitter内でDICT型に変換されてくるので、DICT型のまま使うなら不要です。
今回はujson.dumps()で文字列に戻すためにインポートしています。

from tiny_twitter import tiny_twitter
import ujson as json

 

ESP32のときは時刻合わせを行います。
micropython on ESP32 で時刻設定を可能にするにあるutimeモジュールの改造と、micropython on ESP32 でNTPサーバから時刻取得にあるntptimeモジュールのインストールが終わっている必要があります。

if platform == 'esp32' :
    # ESP32のときは時刻合わせする
    import ntptime
    import utime
    utime.set_time(ntptime.time())

 

Twitterからのデータを受信したときのコールバック関数です。
パラメータjsnにはDICT型に格納されたJSONデータが入っています。
今回は受け取ったJSONデータをujson.dumps()で文字列に変換して表示しています。
tweet内容だけ表示したい場合はコメントアウトされている側を有効にしてください。

# データ受信時のコールバック関数
def CallbackFunc(jsn) :
    print("#### " + json.dumps(jsn))
    # print("--------\n" + jsn.get("text", "NO_DATA") + "\n--------")

 

tiny_twitterクラスの初期化です。
最初の4つのパラメータがTwitterアクセスのための4つのキーで、5つ目がデータ受信時のコールバック関数(↑で定義)です。
debugパラメータをTrueにすると、モジュール内部のdebug_printでの表示が有効になります。表示したくない場合は省略するかFalseを指定してください。

tw = tiny_twitter(consumer_key, consumer_secret, access_token, access_secret, CallbackFunc, debug=True)

 

query_paramsは APIリファレンス を参照してください。指定しても動くかどうかわかりませんが。。。

msgにはtweetする文字列を設定します。
Espruino版とは異なり、ソケット通信がブロッキング動作のため、tiny_twitter.tweet()はレスポンスを受信し、接続がcloseされるまで戻ってきません。

query_params = {"trim_user":"true", "include_entities":"false"}
msg = "てすとつぃーと"
tw.tweet(msg, query_params)

 

次回に続く。。。

micropython on ESP32 でTwitter(その1)

Espruino on ESP32 で Twitter(その1) - いっぺーちゃんの いろいろやってみよ~のmicropython版です

 

これの元ネタがmicropython版なので、戻ってきた感じですが、Espruino版で入れた種々の変更も取り込んで一から作り直しました。

 

まずは、micropythonのBug fixです(笑)。
以下のパッチをあててmicropythonを再buildしてください。
これは、ハッシュ値の計算を2つ以上同時に行うと2つ目以降の計算値が不正になる、というBugを修正するものです。
mbedtlsでのハッシュ値の計算は、1つだけだとハードウェアを使用しますが、2つ目以降はソフトウェアで行います。これはハッシュ値計算用ハードウェアが1つしかないためです。
このとき、mbedtls_sha*_starts()で計算データの初期値を設定していないと、計算データが不正になります。
ハードウェアを使用する場合は、初期値の設定はハードウェアで自動的に行われるため、ハードウェアを使用する1つ目の計算は正常に行われます。

 

diff --git a/esp32/moduhashlib.c b/esp32/moduhashlib.c
index 6f67aa7..5a6d20b 100644
--- a/esp32/moduhashlib.c
+++ b/esp32/moduhashlib.c
@@ -46,6 +46,7 @@ STATIC mp_obj_t sha256_make_new(const mp_obj_type_t *type,
     mp_obj_hash_t *o = m_new_obj_var(mp_obj_hash_t, char, sizeof(union sha_ctxs));
     o->base.type = type;
     mbedtls_sha256_init(&o->state.sha256);
+    mbedtls_sha256_starts(&o->state.sha256, 0);                // [#/] add
     if (n_args == 1) {
         sha256_update(MP_OBJ_FROM_PTR(o), args[0]);
     }
@@ -58,6 +59,7 @@ STATIC mp_obj_t sha1_make_new(const mp_obj_type_t *type,
     mp_obj_hash_t *o = m_new_obj_var(mp_obj_hash_t, char, sizeof(union sha_ctxs));
     o->base.type = type;
     mbedtls_sha1_init(&o->state.sha1);
+    mbedtls_sha1_starts(&o->state.sha1);               // [#/] add
     if (n_args == 1) {
         sha1_update(MP_OBJ_FROM_PTR(o), args[0]);
     }

 

次はhmacモジュールのインストールです。
参照元にある通り、ここから拝借します。
hmac/hmac.pyの右上の「View」ボタンをクリックしてファイル全体を表示します(その他のファイルは使いません)。
右上の「Raw」ボタンを右クリックして「名前を付けて保存」を選択し、ローカルディスクに保存します。
Ubuntuから直接なら、以下のように実行しても取得できます。

wget https://raw.githubusercontent.com/bynds/micropython-lib/ff74b8cb508199a33e1978d68bd7076f2a536ab0/hmac/hmac.py

 

ダウンロードしたら、以下のパッチを当てます(ESP32ではuhashlibでなくhashlibなので)。
これをupipmなどでインストールします。

--- hmac.py.org	2017-08-19 07:06:27.771787400 +0900
+++ hmac.py	2017-08-19 07:17:43.191131335 +0900
@@ -8,7 +8,11 @@
 #import hashlib as _hashlib
 #PendingDeprecationWarning = None
 #RuntimeWarning = None
-import uhashlib as _hashlib
+#import uhashlib as _hashlib
+try:
+    import uhashlib as _hashlib
+except:
+    import hashlib as _hashlib
 
 trans_5C = bytes((x ^ 0x5C) for x in range(256))
 trans_36 = bytes((x ^ 0x36) for x in range(256))

 

さらにに以下のファイルをダウンロード&解凍し、upipmなどでインストールします。

tiny_twitter_py_20170819.zip

また、Espruino on ESP32 で Twitter(その2) - いっぺーちゃんの いろいろやってみよ~にある通り、4つのキー(Consumer key、Consumer secret、Access Token、Access Token Secret)を用意しておいてください。

 

次回に続く。。。