Raspberry Pi から IRKitを操作する。二歩め
まだまだ未完成だけど、何とか使える状態になりました
体調を崩したりしたこともあって、なかなか進まなかったIRKit操作プログラムですが、何とか基本的な事はできるようになりました。
先ずはIRKitをBonjourで見つけるプログラム "resolve_irkit.py" をモジュールとして使えるように少し変えます。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import select
import sys
import pybonjour
from socket import gethostbyname
regtype = '_irkit._tcp'
timeout = 5
resolved = []
irkits = []
def resolve_callback(sdRef, flags, interfaceIndex, errorCode, fullname,
hosttarget, port, txtRecord):
if errorCode == pybonjour.kDNSServiceErr_NoError:
irkits.append((hosttarget, port))
resolved.append(True)
def browse_callback(sdRef, flags, interfaceIndex, errorCode, serviceName,
regtype, replyDomain):
if errorCode != pybonjour.kDNSServiceErr_NoError:
return
if not (flags & pybonjour.kDNSServiceFlagsAdd):
print('Service removed', file=sys.stderr)
return
with pybonjour.DNSServiceResolve(0,
interfaceIndex,
serviceName,
regtype,
replyDomain,
resolve_callback) as resolve_sdRef:
while not resolved:
ready = select.select([resolve_sdRef], [], [], timeout)
if resolve_sdRef not in ready[0]:
print('Resolve timed out', file=sys.stderr)
break
pybonjour.DNSServiceProcessResult(resolve_sdRef)
else:
resolved.pop()
def resolve_irkits():
with pybonjour.DNSServiceBrowse(regtype = regtype,
callBack = browse_callback) as browse_sdRef:
while True:
ready = select.select([browse_sdRef], [], [], timeout)
if browse_sdRef in ready[0]:
pybonjour.DNSServiceProcessResult(browse_sdRef)
else:
return irkits
if __name__ == '__main__':
for irkit in resolve_irkits():
print('hosttarget={0}
IPaddress={1}
port={2}'.format(irkit[0], gethostbyname(irkit[0]), irkit[1]))
|
変えた所は赤字の部分だけ。resolve_irkits()関数で見つけたIRKitデバイスのリストを返却するようにしました。(__main__の方はこの変更に対応しただけ。)このプログラムをIRKitの操作プログラムからモジュールとしてimportし、resolve_irkits()関数を呼び出してIRKitデバイスを取得します。
次がIRKit操作プログラム "irkit.py" です。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import urllib.request
from resolve_irkit import resolve_irkits
from socket import gethostbyname
import json
import argparse
save_dir = os.environ['HOME'] + '/' + '.irkit.d' # Getした赤外線信号の保存先ディレクトリ名
parser = argparse.ArgumentParser(description='Control IRKit')
parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1.0')
parser.add_argument('-V', '--verbose', action='store_true', default=False, help='verbose mode')
subparsers = parser.add_subparsers(dest='command')
# create the parser for the "get" command
parser_get = subparsers.add_parser('get', aliases=['read'], help='GET command')
parser_get.add_argument('ir_signal_name', action='store', help='IR signal name')
parser_get.add_argument('-a', '--address', nargs=1, action='store', default=None, dest='host',
metavar='address', help='IP address of IRKit device')
# create the parser for the "post" command
parser_post = subparsers.add_parser('post', aliases=['send'], help='POST command')
parser_post.add_argument('ir_signal_name', action='store', help='IR signal name')
parser_post.add_argument('-a', '--address', nargs=1, action='store', default=None, dest='host',
metavar='address', help='IP address of IRKit device')
# create the parser for the "show" command
parser_show = subparsers.add_parser('show', help='Show IR signal data')
parser_show.add_argument('ir_signal_name', action='store', help='IR signal name')
# create the parser for the "list" command
parser_list = subparsers.add_parser('list', help='Print list saved IR signal')
#
args = parser.parse_args()
verbose = args.verbose
class IRKit:
'''Irkit class'''
def __init__(self, host, port):
self.url = 'http://' + host + ':' + port + '/messages'
def get(self):
'''最も新しい受信した赤外線信号を返す。'''
with urllib.request.urlopen(url=self.url) as self.f:
return self.f.read().decode('utf-8')
def post(self, json_data):
'''赤外線信号を送信する。'''
with urllib.request.urlopen(url=self.url, data=json_data.encode('utf-8')):
pass
def get header(self, name):
'''HTTPヘッダを返す。'''
with urllib.request.urlopen(url=self.url) as self.f:
return self.f.getheader(name)
def check_dir(path):
'''pathディレクトリの有無を調べ、無い場合は作成する'''
if os.path.exists(path):
assert os.path.isdir(path), path + ' is not directory.'
else:
os.mkdir(path)
def getheader(address, port, name):
'''return http header specified by name'''
irkit = IRKit(address, port)
return irkit.getheader(name)
def search_irkit():
'''BonjourでIRKitを探し、見つかったIRKitの情報を返却する'''
irkits = resolve_irkits()
assert irkits, 'No IRKit device'
irkit = irkits[0] # 最初に見つかったIRKit
address = gethostbyname(irkit[0])
port = str(irkit[1])
server = getheader(address, port, 'Server')
return {'IPaddress':address, 'Port':port, 'Server':server}
def get_irkit_info():
'''IRKit の情報辞書(IPアドレス,ポート番号,サーバ名)を返す'''
save_file = save_dir + '/irkit.save'
try:
with open(save_file, mode='rt', encoding='utf-8') as f:
irkit_info = json.load(f)
except:
irkit_info = search_irkit()
with open(save_file, mode='wt', encoding='utf-8') as f:
json.dump(irkit_info, f)
if verbose:
print('IRKit: {0}'.format(irkit_info))
return irkit_info
def get(address, port, file):
'''GET sub-command'''
irkit = IRKit(address, port)
json_data = irkit.get()
if json_data:
if verbose:
print('{0} <= {1}'.format(file, json_data))
with open(file, mode='wt', encoding='utf-8') as f:
f.write(json_data)
else:
print('No data', file=sys.stderr)
def post(address, port, file):
'''POST sub-command'''
with open(file, mode='rt', encoding='utf-8') as f:
irkit = IRKit(address, port)
json_data = f.readlines()
if verbose:
print('{0} => {1}'.format(file, json_data[0]))
irkit.post(json_data[0])
def show(file):
'''Show sub-command'''
with open(file, mode='rt', encoding='utf-8') as f:
print(f.readlines()[0])
def list(dir):
'''List sub-command'''
for file in os.listdir(dir):
print(file.rstrip('.json'))
def main():
check_dir(save_dir)
check_dir(save_dir + '/signals')
if args.command == 'get' or args.command == 'read':
if args.host:
address, port = args.host[0], '80'
else:
info = get_irkit_info()
address, port = info['IPaddress'], info['Port']
get(address, port, save_dir + '/signals/' + args.ir_signal_name + '.json')
elif args.command == 'post' or args.command == 'send':
if args.host:
address, port = args.host[0], '80'
else:
info = get_irkit_info()
address, port = info['IPaddress'], info['Port']
post(address, port, save_dir + '/signals/' + args.ir_signal_name + '.json')
elif args.command == 'show':
show(save_dir + '/signals/' + args.ir_signal_name + '.json')
elif args.command == 'list':
list(save_dir + '/signals')
else:
pass
if __name__ == '__main__':
main()
|
簡単な使用方法が --helpオプションを指定することで表示されます。
$ irkit.py --help
usage: irkit [-h] [-v] [-V] {get,read,post,send,show,list} ...
Control IRKit
positional arguments:
{get,read,post,send,show,list}
get (read) GET command
post (send) POST command
show Show IR signal data
list Print list saved IR signal
optional arguments:
-h, --help show this help message and exit
-v, --version show program's version number and exit
-V, --verbose verbose mode
|
サブコマンドに get, post, show, list があり、それぞれ
- get
- IRKitから赤外線信号を取得して保存する
- post
- 保存してある赤外線信号を送信する
- show
- 保存してある赤外線信号データの内容を表示する
- list
- 保存してある赤外線信号の一覧を表示する
です。各サブコマンドの使用方法を表示するには、例えば
$ irkit.py get --help
sage: irkit get [-h] [-a address] ir_signal_name
positional arguments:
ir_signal_name IR signal name
optional arguments:
-h, --help show this help message and exit
-a address, --address address
IP address of IRKit device
|
とします。getサブコマンドには取得した赤外線信号を保存する名前を指定する必要があります。取得した赤外線信号の jsonファイルは "~/.irkit.d/signals/" ディレクトリ配下に保存します。
また --address (-a)オプションでIRKitのIPアドレスを指定することができます。このオプションを指定しない場合は、先の resolve_irkits()関数でIRKitを探します。ただIRKitを探しだすのには若干時間がかかるため、見つけた IRKitの情報を "~/.irkit.d/irkit.save" ファイルに保存するようにしています。このファイルがある場合は、保存されている情報の方を使うようにして時間がかからないようにしています。
ただ IRKitのIPアドレスが変わってしまった場合には、このファイルを作り直す必要があるのですが、その処理は未作成です。(この処理を作り込んだところ、何故か赤外線信号を getできなくなるという原因不明のバグが発生したためです。(*1))
他にも赤外線信号を日本語でget, postできない不具合とか、IPアドレスの有効性などを全くチェックしていない等、色々と問題があります。まだまだ未完成でイイカゲンな代物ですが、それでも何とか使えてます。今後もボチボチと改良していこうと思っています。
(*1):バグの原因が判明しまして、類似バグもあり現在修正方法を検討中です。しかし、これは結構面倒臭いぞ・・・
|