絖綛 N@i.jp  昨日:00036563
 今日:00028050
 総計:00078927
keywords
管理者専用
  Post   Add link   Control Panel 































新しいトピック
最新:04/16 19:55


新しいコメント
最新:07/28 16:47






管理人へMAIL

プライバシーポリシー

Raspberry Pi から IRKitを操作する。二歩め

まだまだ未完成だけど、何とか使える状態になりました


 体調を崩したりしたこともあって、なかなか進まなかったIRKit操作プログラムですが、何とか基本的な事はできるようになりました。

 先ずはIRKitをBonjourで見つけるプログラム "resolve_irkit.py" をモジュールとして使えるように少し変えます。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import select
import sys
import pybonjour
from socket import gethostbyname

regtype  = '_irkit._tcp'
timeout  = 5
resolved = []
irkits = []

def resolve_callback(sdRef, flags, interfaceIndex, errorCode, fullname,
                     hosttarget, port, txtRecord):
    if errorCode == pybonjour.kDNSServiceErr_NoError:
        irkits.append((hosttarget, port))
        resolved.append(True)

def browse_callback(sdRef, flags, interfaceIndex, errorCode, serviceName,
                    regtype, replyDomain):
    if errorCode != pybonjour.kDNSServiceErr_NoError:
        return
    if not (flags & pybonjour.kDNSServiceFlagsAdd):
        print('Service removed', file=sys.stderr)
        return

    with pybonjour.DNSServiceResolve(0,
                                     interfaceIndex,
                                     serviceName,
                                     regtype,
                                     replyDomain,
                                     resolve_callback) as resolve_sdRef:
        while not resolved:
            ready = select.select([resolve_sdRef], [], [], timeout)
            if resolve_sdRef not in ready[0]:
                print('Resolve timed out', file=sys.stderr)
                break
            pybonjour.DNSServiceProcessResult(resolve_sdRef)
        else:
            resolved.pop()

def resolve_irkits():
    with pybonjour.DNSServiceBrowse(regtype = regtype,
                                    callBack = browse_callback) as browse_sdRef:
        while True:
            ready = select.select([browse_sdRef], [], [], timeout)
            if browse_sdRef in ready[0]:
                pybonjour.DNSServiceProcessResult(browse_sdRef)
            else:
                return irkits

if __name__ == '__main__':
    for irkit in resolve_irkits():
        print('hosttarget={0}
IPaddress={1}
port={2}'.format(irkit[0], gethostbyname(irkit[0]), irkit[1]))

 変えた所は赤字の部分だけ。resolve_irkits()関数で見つけたIRKitデバイスのリストを返却するようにしました。(__main__の方はこの変更に対応しただけ。)このプログラムをIRKitの操作プログラムからモジュールとしてimportし、resolve_irkits()関数を呼び出してIRKitデバイスを取得します。

 次がIRKit操作プログラム "irkit.py" です。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import urllib.request
from resolve_irkit import resolve_irkits
from socket import gethostbyname
import json
import argparse

save_dir = os.environ['HOME'] + '/' + '.irkit.d'    # Getした赤外線信号の保存先ディレクトリ名

parser = argparse.ArgumentParser(description='Control IRKit')
parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1.0')
parser.add_argument('-V', '--verbose', action='store_true', default=False, help='verbose mode')
subparsers = parser.add_subparsers(dest='command')
# create the parser for the "get" command
parser_get = subparsers.add_parser('get', aliases=['read'], help='GET command')
parser_get.add_argument('ir_signal_name', action='store', help='IR signal name')
parser_get.add_argument('-a', '--address', nargs=1, action='store', default=None, dest='host',
                        metavar='address', help='IP address of IRKit device')
# create the parser for the "post" command
parser_post = subparsers.add_parser('post', aliases=['send'], help='POST command')
parser_post.add_argument('ir_signal_name', action='store', help='IR signal name')
parser_post.add_argument('-a', '--address', nargs=1, action='store', default=None, dest='host',
                         metavar='address', help='IP address of IRKit device')
# create the parser for the "show" command
parser_show = subparsers.add_parser('show', help='Show IR signal data')
parser_show.add_argument('ir_signal_name', action='store', help='IR signal name')
# create the parser for the "list" command
parser_list = subparsers.add_parser('list', help='Print list saved IR signal')
#
args = parser.parse_args()
verbose = args.verbose


class IRKit:
    '''Irkit class'''

    def __init__(self, host, port):
        self.url = 'http://' + host + ':' + port + '/messages'

    def get(self):
        '''最も新しい受信した赤外線信号を返す。'''
        with urllib.request.urlopen(url=self.url) as self.f:
            return self.f.read().decode('utf-8')

    def post(self, json_data):
        '''赤外線信号を送信する。'''
        with urllib.request.urlopen(url=self.url, data=json_data.encode('utf-8')):
            pass

    def get header(self, name):
        '''HTTPヘッダを返す。'''
        with urllib.request.urlopen(url=self.url) as self.f:
            return self.f.getheader(name)

def check_dir(path):
    '''pathディレクトリの有無を調べ、無い場合は作成する'''
    if os.path.exists(path):
        assert os.path.isdir(path), path + ' is not directory.'
    else:
        os.mkdir(path)

def getheader(address, port, name):
    '''return http header specified by name'''
    irkit = IRKit(address, port)
    return irkit.getheader(name)

def search_irkit():
    '''BonjourでIRKitを探し、見つかったIRKitの情報を返却する'''
    irkits = resolve_irkits()
    assert irkits, 'No IRKit device'
    irkit = irkits[0]    # 最初に見つかったIRKit
    address = gethostbyname(irkit[0])
    port    = str(irkit[1])
    server  = getheader(address, port, 'Server')
    return {'IPaddress':address, 'Port':port, 'Server':server}

def get_irkit_info():
    '''IRKit の情報辞書(IPアドレス,ポート番号,サーバ名)を返す'''
    save_file = save_dir + '/irkit.save'
    try:
        with open(save_file, mode='rt', encoding='utf-8') as f:
            irkit_info = json.load(f)
    except:
        irkit_info = search_irkit()
        with open(save_file, mode='wt', encoding='utf-8') as f:
            json.dump(irkit_info, f)
    if verbose:
        print('IRKit: {0}'.format(irkit_info))
    return irkit_info

def get(address, port, file):
    '''GET sub-command'''
    irkit = IRKit(address, port)
    json_data = irkit.get()
    if json_data:
        if verbose:
            print('{0} <= {1}'.format(file, json_data))
        with open(file, mode='wt', encoding='utf-8') as f:
            f.write(json_data)
    else:
        print('No data', file=sys.stderr)

def post(address, port, file):
    '''POST sub-command'''
    with open(file, mode='rt', encoding='utf-8') as f:
        irkit = IRKit(address, port)
        json_data = f.readlines()
        if verbose:
            print('{0} => {1}'.format(file, json_data[0]))
        irkit.post(json_data[0])

def show(file):
    '''Show sub-command'''
    with open(file, mode='rt', encoding='utf-8') as f:
        print(f.readlines()[0])

def list(dir):
    '''List sub-command'''
    for file in os.listdir(dir):
        print(file.rstrip('.json'))


def main():
    check_dir(save_dir)
    check_dir(save_dir + '/signals')

    if args.command == 'get' or args.command == 'read':
        if args.host:
            address, port = args.host[0], '80'
        else:
            info = get_irkit_info()
            address, port = info['IPaddress'], info['Port']
        get(address, port, save_dir + '/signals/' + args.ir_signal_name + '.json')
    elif args.command == 'post' or args.command == 'send':
        if args.host:
            address, port = args.host[0], '80'
        else:
            info = get_irkit_info()
            address, port = info['IPaddress'], info['Port']
        post(address, port, save_dir + '/signals/' + args.ir_signal_name + '.json')
    elif args.command == 'show':
        show(save_dir + '/signals/' + args.ir_signal_name + '.json')
    elif args.command == 'list':
        list(save_dir + '/signals')
    else:
        pass

if __name__ == '__main__':
    main()

 簡単な使用方法が --helpオプションを指定することで表示されます。

$ irkit.py --help
usage: irkit [-h] [-v] [-V] {get,read,post,send,show,list} ...

Control IRKit

positional arguments:
  {get,read,post,send,show,list}
    get (read)          GET command
    post (send)         POST command
    show                Show IR signal data
    list                Print list saved IR signal

optional arguments:
  -h, --help            show this help message and exit
  -v, --version         show program's version number and exit
  -V, --verbose         verbose mode

 サブコマンドに get, post, show, list があり、それぞれ

get
IRKitから赤外線信号を取得して保存する
post
保存してある赤外線信号を送信する
show
保存してある赤外線信号データの内容を表示する
list
保存してある赤外線信号の一覧を表示する

です。各サブコマンドの使用方法を表示するには、例えば

$ irkit.py get --help
sage: irkit get [-h] [-a address] ir_signal_name

positional arguments:
  ir_signal_name        IR signal name

optional arguments:
  -h, --help            show this help message and exit
  -a address, --address address
                        IP address of IRKit device

とします。getサブコマンドには取得した赤外線信号を保存する名前を指定する必要があります。取得した赤外線信号の jsonファイルは "~/.irkit.d/signals/" ディレクトリ配下に保存します。
 また --address (-a)オプションでIRKitのIPアドレスを指定することができます。このオプションを指定しない場合は、先の resolve_irkits()関数でIRKitを探します。ただIRKitを探しだすのには若干時間がかかるため、見つけた IRKitの情報を "~/.irkit.d/irkit.save" ファイルに保存するようにしています。このファイルがある場合は、保存されている情報の方を使うようにして時間がかからないようにしています。
 ただ IRKitのIPアドレスが変わってしまった場合には、このファイルを作り直す必要があるのですが、その処理は未作成です。(この処理を作り込んだところ、何故か赤外線信号を getできなくなるという原因不明のバグが発生したためです。(*1))

 他にも赤外線信号を日本語でget, postできない不具合とか、IPアドレスの有効性などを全くチェックしていない等、色々と問題があります。まだまだ未完成でイイカゲンな代物ですが、それでも何とか使えてます。今後もボチボチと改良していこうと思っています。


(*1):バグの原因が判明しまして、類似バグもあり現在修正方法を検討中です。しかし、これは結構面倒臭いぞ・・・


< 過去の記事 [ 8月の 全てのカテゴリ リスト ] 新しい記事 >

2015 calendar
8月
1
2345678
9101112131415
16171819202122
23242526272829
3031


掲示板
最新:08/15 17:19


GsBlog was developed by GUSTAV, Copyright(C) 2003, Web Application Factory All Rights Reserved.