Day After Day
tsurezure naru mamani...
ANOTHER DECADE

from 2022 when it's begining after/with CORONA Virus.

Pythonでちょっとrpi-GWの監視をしてみる

2月
25
2024
Back
Alt+HOME


D-STARリピータのrpi-GWで時々アプリが'NG'となって困っていました。rpi-xchange をリスタートすると回復するのですが、四六時中見ていられません。折角学んだPython、ちょっと工夫してみました。

発想は単純で、人と同じ処理を手順かして放っておけるようにするというものです。 つまりWEB(左図:ポート20201で出力される)で赤文字で示されている[NG]と言う文字列を、その出力ポートで監視して、 見つかればスクリプトからコマンドを実行して rpi-xchange を再起動すると言うものです。

最初は tcpdump を用いた方法で htmlを捕らえていたのですが、テスト運用している内に誰か(自分)がWEBを起ち上げていないと、 出力されないと言うことが分かりました。そこで改訂版として Request を発行してhtmlを取りに行くと言う方法に変更しました。


短いスクリプトなので全体を記述した上で説明を付け加えます。

Python スクリプト全体


$ nano /usr/local/bin/watch_xchange.py

  GNU nano 7.2                                 /usr/local/bin/watch_xchange.py
import subprocess
import requests
import time
import signal


# WEBサーバーにアクセスしてhtmlコードを取得する
def fetch_html_from_server(port):

    # 監視するURLを構成する
    url = 'http://127.0.0.1' + ':' + port

    try:
        # requestsを送信して200(リクエスト成功)が返ったらその内容を取得
        response = requests.get(url)
        if response.status_code == 200:
            html_content = response.text
            return html_content

        # もしエラーが返ったらその内容を表示(コンソールで起動している時のみ)
        else:
            print(f"サーバーからのレスポンスコードがエラーです: {response.status_code}")
            return None

    except:
        print("Get nothing..")
        return None


# 取得したhtmlコード内にターゲットの文字が有るかチェックする
def check_status(fetched_html, target_string):

    # もし文字列が存在したらrpi-xchangeをリスタート
    if target_string in fetched_html:
        print("rpi-xchange をリスタートします。")
        command = 'sudo systemctl restart rpi-xchange.service'
        subprocess.run(command, shell=True)
        print(command)


if __name__ == "__main__":

    port = '20201'                      # 監視するポート番号
    target_string = "NG"                # 検索したい文字列
 
    # 監視開始(シャットダウン・リブート又は[Ctrl] + [c]でのみ終了
    print("ポート:", port, "を監視しています。")

    while True:

        # 取得したhtmlを変数に代入
        fetched_html = fetch_html_from_server(port)

        # 変数が空で無ければ
        if fetched_html:

            # 内容を検査
            check_status(fetched_html, target_string)

        # リクエスト送信の間隔を設定する。(10sec)
        time.sleep(10)

  1. 上部2つの def は html の取得関数と、それを判定する関数です。
  2. 下部がメインプログラムで最初に監視するポート番号と見つける文字列を記述しました。
  3. while Trueで無限ループに入ります。つまりずっと監視し続けます。
  4. 上部2つの関数を順次呼び出します。
  5. 判定関数の中で、もし NG が見つかったら、sudo systemctl restart rpi-xchange.service を実行します。
  6. これらの処理の間隔は 10秒 に設定していますが、WEBを表示している時は 5秒 間隔なので、素早く修正したい場合はそれで良いと思います。

テストと自動運用


テストする時は直接ターミナルで、コマンドを打ってみてください。[Ctrl + c] を入力するまで動き続けます。

$ python /usr/local/bin/watch_xchange.py

この場合は、サービスとして起動した時と違って、上のコード中に print() で示されたコメントが表示され処理が分かります。 rpi-multi_forward 等をリスタートして、敢えてエラーを出させると、すぐに回復します。

テストが完了したら、自動起動を設定して本運用に移ります。

  1. 自動起動させるため、systemd用のユニットファイルを作成します。

  2. $ sudo nano /etc/systemd/system/watch_xchange.service

      GNU nano 7.2                        /etc/systemd/system/watch_xchange.service
    [Unit]
    Description=rpi-xchange restart daemon for Status NG
    After = rpi-xchange.service
    
    [Service]
    ExecStart=python /usr/local/bin/watch_xchange.py
    Restart=always
    Type=simple
    
    [Install]
    WantedBy=multi-user.target
    

  3. 保存したら、自動軌道ファイルに設定します。

  4. $ sudo systemctl enable watch_xchange.service
    $ sudo reboot

これでWEBを起ち上げること無く、'NG' が出るたびに自動で rpi-xchange をリスタートしてくれます。

Back
Alt+HOME