diff options
Diffstat (limited to 'third_party/python/broadlink')
-rw-r--r-- | third_party/python/broadlink/.gitignore | 1 | ||||
-rw-r--r-- | third_party/python/broadlink/LICENSE | 22 | ||||
-rw-r--r-- | third_party/python/broadlink/README.md | 112 | ||||
-rw-r--r-- | third_party/python/broadlink/broadlink/__init__.py | 1118 | ||||
-rw-r--r-- | third_party/python/broadlink/cli/README.md | 85 | ||||
-rwxr-xr-x | third_party/python/broadlink/cli/broadlink_cli | 239 | ||||
-rwxr-xr-x | third_party/python/broadlink/cli/broadlink_discovery | 27 | ||||
-rw-r--r-- | third_party/python/broadlink/default.nix | 15 | ||||
-rw-r--r-- | third_party/python/broadlink/protocol.md | 202 | ||||
-rw-r--r-- | third_party/python/broadlink/requirements.txt | 1 | ||||
-rw-r--r-- | third_party/python/broadlink/setup.py | 29 |
11 files changed, 1851 insertions, 0 deletions
diff --git a/third_party/python/broadlink/.gitignore b/third_party/python/broadlink/.gitignore new file mode 100644 index 000000000000..0d20b6487c61 --- /dev/null +++ b/third_party/python/broadlink/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/third_party/python/broadlink/LICENSE b/third_party/python/broadlink/LICENSE new file mode 100644 index 000000000000..d8c801656bdf --- /dev/null +++ b/third_party/python/broadlink/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2014 Mike Ryan +Copyright (c) 2016 Matthew Garrett + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/third_party/python/broadlink/README.md b/third_party/python/broadlink/README.md new file mode 100644 index 000000000000..8faba2be75fc --- /dev/null +++ b/third_party/python/broadlink/README.md @@ -0,0 +1,112 @@ +Python control for Broadlink RM2, RM3 and RM4 series controllers +=============================================== + +A simple Python API for controlling IR/RF controllers from [Broadlink](http://www.ibroadlink.com/rm/). At present, the following devices are currently supported: + +* RM Pro (referred to as RM2 in the codebase) +* A1 sensor platform devices are supported +* RM3 mini IR blaster +* RM4 and RM4C mini blasters + +There is currently no support for the cloud API. + +Example use +----------- + +Setup a new device on your local wireless network: + +1. Put the device into AP Mode + 1. Long press the reset button until the blue LED is blinking quickly. + 2. Long press again until blue LED is blinking slowly. + 3. Manually connect to the WiFi SSID named BroadlinkProv. +2. Run setup() and provide your ssid, network password (if secured), and set the security mode + 1. Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) +``` +import broadlink + +broadlink.setup('myssid', 'mynetworkpass', 3) +``` + +Discover available devices on the local network: +``` +import broadlink + +devices = broadlink.discover(timeout=5) +``` + +Obtain the authentication key required for further communication: +``` +devices[0].auth() +``` + +Enter learning mode: +``` +devices[0].enter_learning() +``` + +Sweep RF frequencies: +``` +devices[0].sweep_frequency() +``` + +Cancel sweep RF frequencies: +``` +devices[0].cancel_sweep_frequency() +``` +Check whether a frequency has been found: +``` +found = devices[0].check_frequency() +``` +(This will return True if the RM has locked onto a frequency, False otherwise) + +Attempt to learn an RF packet: +``` +found = devices[0].find_rf_packet() +``` +(This will return True if a packet has been found, False otherwise) + +Obtain an IR or RF packet while in learning mode: +``` +ir_packet = devices[0].check_data() +``` +(This will return None if the device does not have a packet to return) + +Send an IR or RF packet: +``` +devices[0].send_data(ir_packet) +``` + +Obtain temperature data from an RM2: +``` +devices[0].check_temperature() +``` + +Obtain sensor data from an A1: +``` +data = devices[0].check_sensors() +``` + +Set power state on a SmartPlug SP2/SP3: +``` +devices[0].set_power(True) +``` + +Check power state on a SmartPlug: +``` +state = devices[0].check_power() +``` + +Check energy consumption on a SmartPlug: +``` +state = devices[0].get_energy() +``` + +Set power state for S1 on a SmartPowerStrip MP1: +``` +devices[0].set_power(1, True) +``` + +Check power state on a SmartPowerStrip: +``` +state = devices[0].check_power() +``` diff --git a/third_party/python/broadlink/broadlink/__init__.py b/third_party/python/broadlink/broadlink/__init__.py new file mode 100644 index 000000000000..0c70c4beaed3 --- /dev/null +++ b/third_party/python/broadlink/broadlink/__init__.py @@ -0,0 +1,1118 @@ +#!/usr/bin/python + +import codecs +import json +import random +import socket +import struct +import threading +import time +from datetime import datetime + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +def gendevice(devtype, host, mac, name=None, cloud=None): + devices = { + sp1: [0], + sp2: [0x2711, # SP2 + 0x2719, 0x7919, 0x271a, 0x791a, # Honeywell SP2 + 0x2720, # SPMini + 0x753e, # SP3 + 0x7D00, # OEM branded SP3 + 0x947a, 0x9479, # SP3S + 0x2728, # SPMini2 + 0x2733, 0x273e, # OEM branded SPMini + 0x7530, 0x7546, 0x7918, # OEM branded SPMini2 + 0x7D0D, # TMall OEM SPMini3 + 0x2736 # SPMiniPlus + ], + rm: [0x2712, # RM2 + 0x2737, # RM Mini + 0x273d, # RM Pro Phicomm + 0x2783, # RM2 Home Plus + 0x277c, # RM2 Home Plus GDT + 0x272a, # RM2 Pro Plus + 0x2787, # RM2 Pro Plus2 + 0x279d, # RM2 Pro Plus3 + 0x27a9, # RM2 Pro Plus_300 + 0x278b, # RM2 Pro Plus BL + 0x2797, # RM2 Pro Plus HYC + 0x27a1, # RM2 Pro Plus R1 + 0x27a6, # RM2 Pro PP + 0x278f, # RM Mini Shate + 0x27c2, # RM Mini 3 + 0x27d1, # new RM Mini3 + 0x27de # RM Mini 3 (C) + ], + rm4: [0x51da, # RM4 Mini + 0x5f36, # RM Mini 3 + 0x6026, # RM4 Pro + 0x610e, # RM4 Mini + 0x610f, # RM4c + 0x62bc, # RM4 Mini + 0x62be # RM4c + ], + a1: [0x2714], # A1 + mp1: [0x4EB5, # MP1 + 0x4EF7 # Honyar oem mp1 + ], + hysen: [0x4EAD], # Hysen controller + S1C: [0x2722], # S1 (SmartOne Alarm Kit) + dooya: [0x4E4D], # Dooya DT360E (DOOYA_CURTAIN_V2) + bg1: [0x51E3], # BG Electrical Smart Power Socket + lb1 : [0x60c8] # RGB Smart Bulb + } + + # Look for the class associated to devtype in devices + [device_class] = [dev for dev in devices if devtype in devices[dev]] or [None] + if device_class is None: + return device(host, mac, devtype, name=name, cloud=cloud) + return device_class(host, mac, devtype, name=name, cloud=cloud) + + +def discover(timeout=None, local_ip_address=None, discover_ip_address='255.255.255.255', max_devices=100): + if local_ip_address is None: + local_ip_address = socket.gethostbyname(socket.gethostname()) + if local_ip_address.startswith('127.'): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('8.8.8.8', 53)) # connecting to a UDP address doesn't send packets + local_ip_address = s.getsockname()[0] + address = local_ip_address.split('.') + cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + cs.bind((local_ip_address, 0)) + port = cs.getsockname()[1] + starttime = time.time() + + devices = [] + + timezone = int(time.timezone / -3600) + packet = bytearray(0x30) + + year = datetime.now().year + + if timezone < 0: + packet[0x08] = 0xff + timezone - 1 + packet[0x09] = 0xff + packet[0x0a] = 0xff + packet[0x0b] = 0xff + else: + packet[0x08] = timezone + packet[0x09] = 0 + packet[0x0a] = 0 + packet[0x0b] = 0 + packet[0x0c] = year & 0xff + packet[0x0d] = year >> 8 + packet[0x0e] = datetime.now().minute + packet[0x0f] = datetime.now().hour + subyear = str(year)[2:] + packet[0x10] = int(subyear) + packet[0x11] = datetime.now().isoweekday() + packet[0x12] = datetime.now().day + packet[0x13] = datetime.now().month + packet[0x18] = int(address[0]) + packet[0x19] = int(address[1]) + packet[0x1a] = int(address[2]) + packet[0x1b] = int(address[3]) + packet[0x1c] = port & 0xff + packet[0x1d] = port >> 8 + packet[0x26] = 6 + + checksum = 0xbeaf + for b in packet: + checksum = (checksum + b) & 0xffff + + packet[0x20] = checksum & 0xff + packet[0x21] = checksum >> 8 + + cs.sendto(packet, (discover_ip_address, 80)) + if timeout is None: + response = cs.recvfrom(1024) + responsepacket = bytearray(response[0]) + host = response[1] + devtype = responsepacket[0x34] | responsepacket[0x35] << 8 + mac = responsepacket[0x3a:0x40] + name = responsepacket[0x40:].split(b'\x00')[0].decode('utf-8') + cloud = bool(responsepacket[-1]) + device = gendevice(devtype, host, mac, name=name, cloud=cloud) + return device + + while ((time.time() - starttime) < timeout) and (len(devices) < max_devices): + cs.settimeout(timeout - (time.time() - starttime)) + try: + response = cs.recvfrom(1024) + except socket.timeout: + return devices + responsepacket = bytearray(response[0]) + host = response[1] + devtype = responsepacket[0x34] | responsepacket[0x35] << 8 + mac = responsepacket[0x3a:0x40] + name = responsepacket[0x40:].split(b'\x00')[0].decode('utf-8') + cloud = bool(responsepacket[-1]) + device = gendevice(devtype, host, mac, name=name, cloud=cloud) + devices.append(device) + return devices + + +class device: + def __init__(self, host, mac, devtype, timeout=10, name=None, cloud=None): + self.host = host + self.mac = mac.encode() if isinstance(mac, str) else mac + self.devtype = devtype if devtype is not None else 0x272a + self.name = name + self.cloud = cloud + self.timeout = timeout + self.count = random.randrange(0xffff) + self.iv = bytearray( + [0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) + self.id = bytearray([0, 0, 0, 0]) + self.cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.cs.bind(('', 0)) + self.type = "Unknown" + self.lock = threading.Lock() + + self.aes = None + key = bytearray( + [0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) + self.update_aes(key) + + def update_aes(self, key): + self.aes = Cipher(algorithms.AES(key), modes.CBC(self.iv), + backend=default_backend()) + + def encrypt(self, payload): + encryptor = self.aes.encryptor() + return encryptor.update(payload) + encryptor.finalize() + + def decrypt(self, payload): + decryptor = self.aes.decryptor() + return decryptor.update(payload) + decryptor.finalize() + + def auth(self): + payload = bytearray(0x50) + payload[0x04] = 0x31 + payload[0x05] = 0x31 + payload[0x06] = 0x31 + payload[0x07] = 0x31 + payload[0x08] = 0x31 + payload[0x09] = 0x31 + payload[0x0a] = 0x31 + payload[0x0b] = 0x31 + payload[0x0c] = 0x31 + payload[0x0d] = 0x31 + payload[0x0e] = 0x31 + payload[0x0f] = 0x31 + payload[0x10] = 0x31 + payload[0x11] = 0x31 + payload[0x12] = 0x31 + payload[0x1e] = 0x01 + payload[0x2d] = 0x01 + payload[0x30] = ord('T') + payload[0x31] = ord('e') + payload[0x32] = ord('s') + payload[0x33] = ord('t') + payload[0x34] = ord(' ') + payload[0x35] = ord(' ') + payload[0x36] = ord('1') + + response = self.send_packet(0x65, payload) + + if any(response[0x22:0x24]): + return False + + payload = self.decrypt(response[0x38:]) + + key = payload[0x04:0x14] + if len(key) % 16 != 0: + return False + + self.id = payload[0x00:0x04] + self.update_aes(key) + + return True + + def get_type(self): + return self.type + + def send_packet(self, command, payload): + self.count = (self.count + 1) & 0xffff + packet = bytearray(0x38) + packet[0x00] = 0x5a + packet[0x01] = 0xa5 + packet[0x02] = 0xaa + packet[0x03] = 0x55 + packet[0x04] = 0x5a + packet[0x05] = 0xa5 + packet[0x06] = 0xaa + packet[0x07] = 0x55 + packet[0x24] = self.devtype & 0xff + packet[0x25] = self.devtype >> 8 + packet[0x26] = command + packet[0x28] = self.count & 0xff + packet[0x29] = self.count >> 8 + packet[0x2a] = self.mac[0] + packet[0x2b] = self.mac[1] + packet[0x2c] = self.mac[2] + packet[0x2d] = self.mac[3] + packet[0x2e] = self.mac[4] + packet[0x2f] = self.mac[5] + packet[0x30] = self.id[0] + packet[0x31] = self.id[1] + packet[0x32] = self.id[2] + packet[0x33] = self.id[3] + + # pad the payload for AES encryption + if payload: + payload += bytearray((16 - len(payload)) % 16) + + checksum = 0xbeaf + for b in payload: + checksum = (checksum + b) & 0xffff + + packet[0x34] = checksum & 0xff + packet[0x35] = checksum >> 8 + + payload = self.encrypt(payload) + for i in range(len(payload)): + packet.append(payload[i]) + + checksum = 0xbeaf + for b in packet: + checksum = (checksum + b) & 0xffff + + packet[0x20] = checksum & 0xff + packet[0x21] = checksum >> 8 + + start_time = time.time() + with self.lock: + while True: + try: + self.cs.sendto(packet, self.host) + self.cs.settimeout(1) + response = self.cs.recvfrom(2048) + break + except socket.timeout: + if (time.time() - start_time) > self.timeout: + raise + return bytearray(response[0]) + + +class mp1(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "MP1" + + def set_power_mask(self, sid_mask, state): + """Sets the power state of the smart power strip.""" + + packet = bytearray(16) + packet[0x00] = 0x0d + packet[0x02] = 0xa5 + packet[0x03] = 0xa5 + packet[0x04] = 0x5a + packet[0x05] = 0x5a + packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) + packet[0x07] = 0xc0 + packet[0x08] = 0x02 + packet[0x0a] = 0x03 + packet[0x0d] = sid_mask + packet[0x0e] = sid_mask if state else 0 + + self.send_packet(0x6a, packet) + + def set_power(self, sid, state): + """Sets the power state of the smart power strip.""" + sid_mask = 0x01 << (sid - 1) + return self.set_power_mask(sid_mask, state) + + def check_power_raw(self): + """Returns the power state of the smart power strip in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0a + packet[0x02] = 0xa5 + packet[0x03] = 0xa5 + packet[0x04] = 0x5a + packet[0x05] = 0x5a + packet[0x06] = 0xae + packet[0x07] = 0xc0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + state = payload[0x0e] + else: + state = ord(payload[0x0e]) + return state + + def check_power(self): + """Returns the power state of the smart power strip.""" + state = self.check_power_raw() + if state is None: + return {'s1': None, 's2': None, 's3': None, 's4': None} + data = {} + data['s1'] = bool(state & 0x01) + data['s2'] = bool(state & 0x02) + data['s3'] = bool(state & 0x04) + data['s4'] = bool(state & 0x08) + return data + + +class bg1(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "BG1" + + def get_state(self): + """Get state of device. + + Returns: + dict: Dictionary of current state + eg. `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}`""" + packet = self._encode(1, b'{}') + response = self.send_packet(0x6a, packet) + return self._decode(response) + + def set_state(self, pwr=None, pwr1=None, pwr2=None, maxworktime=None, maxworktime1=None, maxworktime2=None, idcbrightness=None): + data = {} + if pwr is not None: + data['pwr'] = int(bool(pwr)) + if pwr1 is not None: + data['pwr1'] = int(bool(pwr1)) + if pwr2 is not None: + data['pwr2'] = int(bool(pwr2)) + if maxworktime is not None: + data['maxworktime'] = maxworktime + if maxworktime1 is not None: + data['maxworktime1'] = maxworktime1 + if maxworktime2 is not None: + data['maxworktime2'] = maxworktime2 + if idcbrightness is not None: + data['idcbrightness'] = idcbrightness + js = json.dumps(data).encode('utf8') + packet = self._encode(2, js) + response = self.send_packet(0x6a, packet) + return self._decode(response) + + def _encode(self, flag, js): + # packet format is: + # 0x00-0x01 length + # 0x02-0x05 header + # 0x06-0x07 00 + # 0x08 flag (1 for read or 2 write?) + # 0x09 unknown (0xb) + # 0x0a-0x0d length of json + # 0x0e- json data + packet = bytearray(14) + length = 4 + 2 + 2 + 4 + len(js) + struct.pack_into('<HHHHBBI', packet, 0, length, 0xa5a5, 0x5a5a, 0x0000, flag, 0x0b, len(js)) + for i in range(len(js)): + packet.append(js[i]) + + checksum = 0xc0ad + for b in packet[0x08:]: + checksum = (checksum + b) & 0xffff + + packet[0x06] = checksum & 0xff + packet[0x07] = checksum >> 8 + + return packet + + def _decode(self, response): + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + + payload = self.decrypt(bytes(response[0x38:])) + js_len = struct.unpack_from('<I', payload, 0x0a)[0] + state = json.loads(payload[0x0e:0x0e+js_len]) + return state + +class sp1(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "SP1" + + def set_power(self, state): + packet = bytearray(4) + packet[0] = state + self.send_packet(0x66, packet) + + +class sp2(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "SP2" + + def set_power(self, state): + """Sets the power state of the smart plug.""" + packet = bytearray(16) + packet[0] = 2 + if self.check_nightlight(): + packet[4] = 3 if state else 2 + else: + packet[4] = 1 if state else 0 + self.send_packet(0x6a, packet) + + def set_nightlight(self, state): + """Sets the night light state of the smart plug""" + packet = bytearray(16) + packet[0] = 2 + if self.check_power(): + packet[4] = 3 if state else 1 + else: + packet[4] = 2 if state else 0 + self.send_packet(0x6a, packet) + + def check_power(self): + """Returns the power state of the smart plug.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) + return bool(ord(payload[0x4]) == 1 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFD) + + def check_nightlight(self): + """Returns the power state of the smart plug.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) + return bool(ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF) + + def get_energy(self): + packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x7], int): + energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 + else: + energy = int(hex(ord(payload[0x07]) * 256 + ord(payload[0x06]))[2:]) + int( + hex(ord(payload[0x05]))[2:]) / 100.0 + return energy + + +class a1(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "A1" + + def check_sensors(self): + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + data = {} + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 + data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 + light = payload[0x8] + air_quality = payload[0x0a] + noise = payload[0xc] + else: + data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 + data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0 + light = ord(payload[0x8]) + air_quality = ord(payload[0x0a]) + noise = ord(payload[0xc]) + if light == 0: + data['light'] = 'dark' + elif light == 1: + data['light'] = 'dim' + elif light == 2: + data['light'] = 'normal' + elif light == 3: + data['light'] = 'bright' + else: + data['light'] = 'unknown' + if air_quality == 0: + data['air_quality'] = 'excellent' + elif air_quality == 1: + data['air_quality'] = 'good' + elif air_quality == 2: + data['air_quality'] = 'normal' + elif air_quality == 3: + data['air_quality'] = 'bad' + else: + data['air_quality'] = 'unknown' + if noise == 0: + data['noise'] = 'quiet' + elif noise == 1: + data['noise'] = 'normal' + elif noise == 2: + data['noise'] = 'noisy' + else: + data['noise'] = 'unknown' + return data + + def check_sensors_raw(self): + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + data = {} + payload = self.decrypt(bytes(response[0x38:])) + if isinstance(payload[0x4], int): + data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 + data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 + data['light'] = payload[0x8] + data['air_quality'] = payload[0x0a] + data['noise'] = payload[0xc] + else: + data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 + data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0 + data['light'] = ord(payload[0x8]) + data['air_quality'] = ord(payload[0x0a]) + data['noise'] = ord(payload[0xc]) + return data + + +class rm(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "RM2" + self._request_header = bytes() + self._code_sending_header = bytes() + + def check_data(self): + packet = bytearray(self._request_header) + packet.append(0x04) + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + return payload[len(self._request_header) + 4:] + + def send_data(self, data): + packet = bytearray(self._code_sending_header) + packet += bytes([0x02, 0x00, 0x00, 0x00]) + packet += data + self.send_packet(0x6a, packet) + + def enter_learning(self): + packet = bytearray(self._request_header) + packet.append(0x03) + self.send_packet(0x6a, packet) + + def sweep_frequency(self): + packet = bytearray(self._request_header) + packet.append(0x19) + self.send_packet(0x6a, packet) + + def cancel_sweep_frequency(self): + packet = bytearray(self._request_header) + packet.append(0x1e) + self.send_packet(0x6a, packet) + + def check_frequency(self): + packet = bytearray(self._request_header) + packet.append(0x1a) + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return False + payload = self.decrypt(bytes(response[0x38:])) + if payload[len(self._request_header) + 4] == 1: + return True + return False + + def find_rf_packet(self): + packet = bytearray(self._request_header) + packet.append(0x1b) + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return False + payload = self.decrypt(bytes(response[0x38:])) + if payload[len(self._request_header) + 4] == 1: + return True + return False + + def _read_sensor(self, type, offset, divider): + packet = bytearray(self._request_header) + packet.append(type) + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return False + payload = self.decrypt(bytes(response[0x38:])) + value_pos = len(self._request_header) + offset + if isinstance(payload[value_pos], int): + value = (payload[value_pos] + payload[value_pos+1] / divider) + else: + value = (ord(payload[value_pos]) + ord(payload[value_pos+1]) / divider) + return value + + def check_temperature(self): + return self._read_sensor( 0x01, 4, 10.0 ) + +class rm4(rm): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "RM4" + self._request_header = b'\x04\x00' + self._code_sending_header = b'\xd0\x00' + + def check_temperature(self): + return self._read_sensor( 0x24, 4, 100.0 ) + + def check_humidity(self): + return self._read_sensor( 0x24, 6, 100.0 ) + + def check_sensors(self): + return { + 'temperature': self.check_temperature(), + 'humidity': self.check_humidity() + } + +# For legacy compatibility - don't use this +class rm2(rm): + def __init__(self): + device.__init__(self, None, None, None) + + def discover(self): + dev = discover() + self.host = dev.host + self.mac = dev.mac + + +class hysen(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "Hysen heating controller" + + # Send a request + # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) + # Returns decrypted payload + # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails + # The function prepends length (2 bytes) and appends CRC + + def calculate_crc16(self, input_data): + from ctypes import c_ushort + crc16_tab = [] + crc16_constant = 0xA001 + + for i in range(0, 256): + crc = c_ushort(i).value + for j in range(0, 8): + if (crc & 0x0001): + crc = c_ushort(crc >> 1).value ^ crc16_constant + else: + crc = c_ushort(crc >> 1).value + crc16_tab.append(hex(crc)) + + try: + is_string = isinstance(input_data, str) + is_bytes = isinstance(input_data, bytes) + + if not is_string and not is_bytes: + raise Exception("Please provide a string or a byte sequence " + "as argument for calculation.") + + crcValue = 0xffff + + for c in input_data: + d = ord(c) if is_string else c + tmp = crcValue ^ d + rotated = c_ushort(crcValue >> 8).value + crcValue = rotated ^ int(crc16_tab[(tmp & 0x00ff)], 0) + + return crcValue + except Exception as e: + print("EXCEPTION(calculate): {}".format(e)) + + def send_request(self, input_payload): + + crc = self.calculate_crc16(bytes(input_payload)) + + # first byte is length, +2 for CRC16 + request_payload = bytearray([len(input_payload) + 2, 0x00]) + request_payload.extend(input_payload) + + # append CRC + request_payload.append(crc & 0xFF) + request_payload.append((crc >> 8) & 0xFF) + + # send to device + response = self.send_packet(0x6a, request_payload) + + # check for error + err = response[0x22] | (response[0x23] << 8) + if err: + raise ValueError('broadlink_response_error', err) + + response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) + + # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) + response_payload_len = response_payload[0] + if response_payload_len + 2 > len(response_payload): + raise ValueError('hysen_response_error', 'first byte of response is not length') + crc = self.calculate_crc16(bytes(response_payload[2:response_payload_len])) + if (response_payload[response_payload_len] == crc & 0xFF) and ( + response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): + return response_payload[2:response_payload_len] + raise ValueError('hysen_response_error', 'CRC check on response failed') + + # Get current room temperature in degrees celsius + def get_temp(self): + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) + return payload[0x05] / 2.0 + + # Get current external temperature in degrees celsius + def get_external_temp(self): + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) + return payload[18] / 2.0 + + # Get full status (including timer schedule) + def get_full_status(self): + payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) + data = {} + data['remote_lock'] = payload[3] & 1 + data['power'] = payload[4] & 1 + data['active'] = (payload[4] >> 4) & 1 + data['temp_manual'] = (payload[4] >> 6) & 1 + data['room_temp'] = (payload[5] & 255) / 2.0 + data['thermostat_temp'] = (payload[6] & 255) / 2.0 + data['auto_mode'] = payload[7] & 15 + data['loop_mode'] = (payload[7] >> 4) & 15 + data['sensor'] = payload[8] + data['osv'] = payload[9] + data['dif'] = payload[10] + data['svh'] = payload[11] + data['svl'] = payload[12] + data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 + if data['room_temp_adj'] > 32767: + data['room_temp_adj'] = 32767 - data['room_temp_adj'] + data['fre'] = payload[15] + data['poweron'] = payload[16] + data['unknown'] = payload[17] + data['external_temp'] = (payload[18] & 255) / 2.0 + data['hour'] = payload[19] + data['min'] = payload[20] + data['sec'] = payload[21] + data['dayofweek'] = payload[22] + + weekday = [] + for i in range(0, 6): + weekday.append( + {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) + + data['weekday'] = weekday + weekend = [] + for i in range(6, 8): + weekend.append( + {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) + + data['weekend'] = weekend + return data + + # Change controller mode + # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. + # Manual mode will activate last used temperature. + # In typical usage call set_temp to activate manual control and set temp. + # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule + # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule + # The sensor command is currently experimental + def set_mode(self, auto_mode, loop_mode, sensor=0): + mode_byte = ((loop_mode + 1) << 4) + auto_mode + self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) + + # Advanced settings + # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, + # 2 for internal control temperature, external limit temperature. Factory default: 0. + # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C + # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C + # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C + # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C + # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C + # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, + # 1 for anti-freezing function open. Factory default: 0 + # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 + def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron): + input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, + (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) + self.send_request(input_payload) + + # For backwards compatibility only. Prefer calling set_mode directly. + # Note this function invokes loop_mode=0 and sensor=0. + def switch_to_auto(self): + self.set_mode(auto_mode=1, loop_mode=0) + + def switch_to_manual(self): + self.set_mode(auto_mode=0, loop_mode=0) + + # Set temperature for manual mode (also activates manual mode if currently in automatic) + def set_temp(self, temp): + self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) + + # Set device on(1) or off(0), does not deactivate Wifi connectivity. + # Remote lock disables control by buttons on thermostat. + def set_power(self, power=1, remote_lock=0): + self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) + + # set time on device + # n.b. day=1 is Monday, ..., day=7 is Sunday + def set_time(self, hour, minute, second, day): + self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) + + # Set timer schedule + # Format is the same as you get from get_full_status. + # weekday is a list (ordered) of 6 dicts like: + # {'start_hour':17, 'start_minute':30, 'temp': 22 } + # Each one specifies the thermostat temp that will become effective at start_hour:start_minute + # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) + def set_schedule(self, weekday, weekend): + # Begin with some magic values ... + input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) + + # Now simply append times/temps + # weekday times + for i in range(0, 6): + input_payload.append(weekday[i]['start_hour']) + input_payload.append(weekday[i]['start_minute']) + + # weekend times + for i in range(0, 2): + input_payload.append(weekend[i]['start_hour']) + input_payload.append(weekend[i]['start_minute']) + + # weekday temperatures + for i in range(0, 6): + input_payload.append(int(weekday[i]['temp'] * 2)) + + # weekend temperatures + for i in range(0, 2): + input_payload.append(int(weekend[i]['temp'] * 2)) + + self.send_request(input_payload) + + +S1C_SENSORS_TYPES = { + 0x31: 'Door Sensor', # 49 as hex + 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse + 0x21: 'Motion Sensor' # 33 as hex +} + + +class S1C(device): + """ + Its VERY VERY VERY DIRTY IMPLEMENTATION of S1C + """ + + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = 'S1C' + + def get_sensors_status(self): + packet = bytearray(16) + packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + + payload = self.decrypt(bytes(response[0x38:])) + if not payload: + return None + count = payload[0x4] + sensors = payload[0x6:] + sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)] + + sens_res = [] + for sens in sensors_a: + status = ord(chr(sens[0])) + _name = str(bytes(sens[4:26]).decode()) + _order = ord(chr(sens[1])) + _type = ord(chr(sens[3])) + _serial = bytes(codecs.encode(sens[26:30], "hex")).decode() + + type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') + + r = { + 'status': status, + 'name': _name.strip('\x00'), + 'type': type_str, + 'order': _order, + 'serial': _serial, + } + if r['serial'] != '00000000': + sens_res.append(r) + result = { + 'count': count, + 'sensors': sens_res + } + return result + + +class dooya(device): + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "Dooya DT360E" + + def _send(self, magic1, magic2): + packet = bytearray(16) + packet[0] = 0x09 + packet[2] = 0xbb + packet[3] = magic1 + packet[4] = magic2 + packet[9] = 0xfa + packet[10] = 0x44 + response = self.send_packet(0x6a, packet) + err = response[0x22] | (response[0x23] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + return ord(payload[4]) + + def open(self): + return self._send(0x01, 0x00) + + def close(self): + return self._send(0x02, 0x00) + + def stop(self): + return self._send(0x03, 0x00) + + def get_percentage(self): + return self._send(0x06, 0x5d) + + def set_percentage_and_wait(self, new_percentage): + current = self.get_percentage() + if current > new_percentage: + self.close() + while current is not None and current > new_percentage: + time.sleep(0.2) + current = self.get_percentage() + + elif current < new_percentage: + self.open() + while current is not None and current < new_percentage: + time.sleep(0.2) + current = self.get_percentage() + self.stop() + +class lb1(device): + state_dict = [] + effect_map_dict = { 'lovely color' : 0, + 'flashlight' : 1, + 'lightning' : 2, + 'color fading' : 3, + 'color breathing' : 4, + 'multicolor breathing' : 5, + 'color jumping' : 6, + 'multicolor jumping' : 7 } + + def __init__(self, host, mac, devtype): + device.__init__(self, host, mac, devtype) + self.type = "SmartBulb" + + def send_command(self,command, type = 'set'): + packet = bytearray(16+(int(len(command)/16) + 1)*16) + packet[0x02] = 0xa5 + packet[0x03] = 0xa5 + packet[0x04] = 0x5a + packet[0x05] = 0x5a + packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set + packet[0x09] = 0x0b + packet[0x0a] = len(command) + packet[0x0e:] = map(ord, command) + + checksum = 0xbeaf + for b in packet: + checksum = (checksum + b) & 0xffff + + packet[0x00] = (0x0c + len(command)) & 0xff + packet[0x06] = checksum & 0xff # Checksum 1 position + packet[0x07] = checksum >> 8 # Checksum 2 position + + response = self.send_packet(0x6a, packet) + + err = response[0x36] | (response[0x37] << 8) + if err != 0: + return None + payload = self.decrypt(bytes(response[0x38:])) + + responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) + if responseLength > 0: + self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) + + def set_json(self, jsonstr): + reconvert = json.loads(jsonstr) + if 'bulb_sceneidx' in reconvert.keys(): + reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) + + self.send_command(json.dumps(reconvert)) + return json.dumps(self.state_dict) + + def set_state(self, state): + cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) + self.send_command(cmd) + + def get_state(self): + cmd = "{}" + self.send_command(cmd) + return self.state_dict + +# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. +# Only tested with Broadlink RM3 Mini (Blackbean) +def setup(ssid, password, security_mode): + # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) + payload = bytearray(0x88) + payload[0x26] = 0x14 # This seems to always be set to 14 + # Add the SSID to the payload + ssid_start = 68 + ssid_length = 0 + for letter in ssid: + payload[(ssid_start + ssid_length)] = ord(letter) + ssid_length += 1 + # Add the WiFi password to the payload + pass_start = 100 + pass_length = 0 + for letter in password: + payload[(pass_start + pass_length)] = ord(letter) + pass_length += 1 + + payload[0x84] = ssid_length # Character length of SSID + payload[0x85] = pass_length # Character length of password + payload[0x86] = security_mode # Type of encryption (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2) + + checksum = 0xbeaf + for b in payload: + checksum = (checksum + b) & 0xffff + + payload[0x20] = checksum & 0xff # Checksum 1 position + payload[0x21] = checksum >> 8 # Checksum 2 position + + sock = socket.socket(socket.AF_INET, # Internet + socket.SOCK_DGRAM) # UDP + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.sendto(payload, ('192.168.10.1', 80)) diff --git a/third_party/python/broadlink/cli/README.md b/third_party/python/broadlink/cli/README.md new file mode 100644 index 000000000000..7e229e3eb557 --- /dev/null +++ b/third_party/python/broadlink/cli/README.md @@ -0,0 +1,85 @@ +Command line interface for python-broadlink +=========================================== + +This is a command line interface for broadlink python library + +Tested with BroadLink RMPRO / RM2 + + +Requirements +------------ +You should have the broadlink python installed, this can be made in many linux distributions using : +``` +sudo pip install broadlink +``` + +Installation +----------- +Just copy this files + + +Programs +-------- + + +* broadlink_discovery +used to run the discovery in the network +this program withh show the command line parameters to be used with +broadlink_cli to select broadlink device + +* broadlink_cli +used to send commands and query the broadlink device + + +device specification formats +---------------------------- + +Using separate parameters for each information: +``` +broadlink_cli --type 0x2712 --host 1.1.1.1 --mac aaaaaaaaaa --temp +``` + +Using all parameters as a single argument: +``` +broadlink_cli --device "0x2712 1.1.1.1 aaaaaaaaaa" --temp +``` + +Using file with parameters: +``` +broadlink_cli --device @BEDROOM.device --temp +``` +This is prefered as the configuration is stored in file and you can change +just a file to point to a different hardware + +Sample usage +------------ + +Learn commands : +``` +# Learn and save to file +broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power +# LEard and show at console +broadlink_cli --device @BEDROOM.device --learn +``` + + +Send command : +``` +broadlink_cli --device @BEDROOM.device --send @LG-TV.power +broadlink_cli --device @BEDROOM.device --send ....datafromlearncommand... +``` + +Get Temperature : +``` +broadlink_cli --device @BEDROOM.device --temperature +``` + +Get Energy Consumption (For a SmartPlug) : +``` +broadlink_cli --device @BEDROOM.device --energy +``` + +Once joined to the Broadlink provisioning Wi-Fi, configure it with your Wi-Fi details: +``` +broadlink_cli --joinwifi MySSID MyWifiPassword +``` diff --git a/third_party/python/broadlink/cli/broadlink_cli b/third_party/python/broadlink/cli/broadlink_cli new file mode 100755 index 000000000000..5045c5c1082f --- /dev/null +++ b/third_party/python/broadlink/cli/broadlink_cli @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +import codecs +import time + +import broadlink + +TICK = 32.84 +IR_TOKEN = 0x26 + + +def auto_int(x): + return int(x, 0) + + +def to_microseconds(bytes): + result = [] + # print bytes[0] # 0x26 = 38for IR + index = 4 + while index < len(bytes): + chunk = bytes[index] + index += 1 + if chunk == 0: + chunk = bytes[index] + chunk = 256 * chunk + bytes[index + 1] + index += 2 + result.append(int(round(chunk * TICK))) + if chunk == 0x0d05: + break + return result + + +def durations_to_broadlink(durations): + result = bytearray() + result.append(IR_TOKEN) + result.append(0) + result.append(len(durations) % 256) + result.append(len(durations) / 256) + for dur in durations: + num = int(round(dur / TICK)) + if num > 255: + result.append(0) + result.append(num / 256) + result.append(num % 256) + return result + + +def format_durations(data): + result = '' + for i in range(0, len(data)): + if len(result) > 0: + result += ' ' + result += ('+' if i % 2 == 0 else '-') + str(data[i]) + return result + + +def parse_durations(str): + result = [] + for s in str.split(): + result.append(abs(int(s))) + return result + + +parser = argparse.ArgumentParser(fromfile_prefix_chars='@') +parser.add_argument("--device", help="device definition as 'type host mac'") +parser.add_argument("--type", type=auto_int, default=0x2712, help="type of device") +parser.add_argument("--host", help="host address") +parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library") +parser.add_argument("--temperature", action="store_true", help="request temperature from device") +parser.add_argument("--energy", action="store_true", help="request energy consumption from device") +parser.add_argument("--check", action="store_true", help="check current power state") +parser.add_argument("--checknl", action="store_true", help="check current nightlight state") +parser.add_argument("--turnon", action="store_true", help="turn on device") +parser.add_argument("--turnoff", action="store_true", help="turn off device") +parser.add_argument("--turnnlon", action="store_true", help="turn on nightlight on the device") +parser.add_argument("--turnnloff", action="store_true", help="turn off nightlight on the device") +parser.add_argument("--switch", action="store_true", help="switch state from on to off and off to on") +parser.add_argument("--send", action="store_true", help="send command") +parser.add_argument("--sensors", action="store_true", help="check all sensors") +parser.add_argument("--learn", action="store_true", help="learn command") +parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") +parser.add_argument("--learnfile", help="save learned command to a specified file") +parser.add_argument("--durations", action="store_true", + help="use durations in micro seconds instead of the Broadlink format") +parser.add_argument("--convert", action="store_true", help="convert input data to durations") +parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with"); +parser.add_argument("data", nargs='*', help="Data to send or convert") +args = parser.parse_args() + +if args.device: + values = args.device.split() + type = int(values[0], 0) + host = values[1] + mac = bytearray.fromhex(values[2]) +elif args.mac: + type = args.type + host = args.host + mac = bytearray.fromhex(args.mac) + +if args.host or args.device: + dev = broadlink.gendevice(type, (host, 80), mac) + dev.auth() + +if args.joinwifi: + broadlink.setup(args.joinwifi[0], args.joinwifi[1], 4) + +if args.convert: + data = bytearray.fromhex(''.join(args.data)) + durations = to_microseconds(data) + print(format_durations(durations)) +if args.temperature: + print(dev.check_temperature()) +if args.energy: + print(dev.get_energy()) +if args.sensors: + try: + data = dev.check_sensors() + except: + data = {} + data['temperature'] = dev.check_temperature() + for key in data: + print("{} {}".format(key, data[key])) +if args.send: + data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ + if args.durations else bytearray.fromhex(''.join(args.data)) + dev.send_data(data) +if args.learn or args.learnfile: + dev.enter_learning() + data = None + print("Learning...") + timeout = 30 + while (data is None) and (timeout > 0): + time.sleep(2) + timeout -= 2 + data = dev.check_data() + if data: + learned = format_durations(to_microseconds(bytearray(data))) \ + if args.durations \ + else ''.join(format(x, '02x') for x in bytearray(data)) + if args.learn: + print(learned) + decode_hex = codecs.getdecoder("hex_codec") + print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + if args.learnfile: + print("Saving to {}".format(args.learnfile)) + with open(args.learnfile, "w") as text_file: + text_file.write(learned) + else: + print("No data received...") +if args.check: + if dev.check_power(): + print('* ON *') + else: + print('* OFF *') +if args.checknl: + if dev.check_nightlight(): + print('* ON *') + else: + print('* OFF *') +if args.turnon: + dev.set_power(True) + if dev.check_power(): + print('== Turned * ON * ==') + else: + print('!! Still OFF !!') +if args.turnoff: + dev.set_power(False) + if dev.check_power(): + print('!! Still ON !!') + else: + print('== Turned * OFF * ==') +if args.turnnlon: + dev.set_nightlight(True) + if dev.check_nightlight(): + print('== Turned * ON * ==') + else: + print('!! Still OFF !!') +if args.turnnloff: + dev.set_nightlight(False) + if dev.check_nightlight(): + print('!! Still ON !!') + else: + print('== Turned * OFF * ==') +if args.switch: + if dev.check_power(): + dev.set_power(False) + print('* Switch to OFF *') + else: + dev.set_power(True) + print('* Switch to ON *') +if args.rfscanlearn: + dev.sweep_frequency() + print("Learning RF Frequency, press and hold the button to learn...") + + timeout = 20 + + while (not dev.check_frequency()) and (timeout > 0): + time.sleep(1) + timeout -= 1 + + if timeout <= 0: + print("RF Frequency not found") + dev.cancel_sweep_frequency() + exit(1) + + print("Found RF Frequency - 1 of 2!") + print("You can now let go of the button") + + input("Press enter to continue...") + + print("To complete learning, single press the button you want to learn") + + dev.find_rf_packet() + + data = None + timeout = 20 + + while (data is None) and (timeout > 0): + time.sleep(1) + timeout -= 1 + data = dev.check_data() + + if data: + print("Found RF Frequency - 2 of 2!") + learned = format_durations(to_microseconds(bytearray(data))) \ + if args.durations \ + else ''.join(format(x, '02x') for x in bytearray(data)) + if args.learnfile is None: + print(learned) + decode_hex = codecs.getdecoder("hex_codec") + print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) + if args.learnfile is not None: + print("Saving to {}".format(args.learnfile)) + with open(args.learnfile, "w") as text_file: + text_file.write(learned) + else: + print("No data received...") diff --git a/third_party/python/broadlink/cli/broadlink_discovery b/third_party/python/broadlink/cli/broadlink_discovery new file mode 100755 index 000000000000..1c6b80b1483e --- /dev/null +++ b/third_party/python/broadlink/cli/broadlink_discovery @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +import argparse + +import broadlink + +parser = argparse.ArgumentParser(fromfile_prefix_chars='@') +parser.add_argument("--timeout", type=int, default=5, help="timeout to wait for receiving discovery responses") +parser.add_argument("--ip", default=None, help="ip address to use in the discovery") +parser.add_argument("--dst-ip", default=None, help="destination ip address to use in the discovery") +args = parser.parse_args() + +print("Discovering...") +devices = broadlink.discover(timeout=args.timeout, local_ip_address=args.ip, discover_ip_address=args.dst_ip) +for device in devices: + if device.auth(): + print("###########################################") + print(device.type) + print("# broadlink_cli --type {} --host {} --mac {}".format(hex(device.devtype), device.host[0], + ''.join(format(x, '02x') for x in device.mac))) + print("Device file data (to be used with --device @filename in broadlink_cli) : ") + print("{} {} {}".format(hex(device.devtype), device.host[0], ''.join(format(x, '02x') for x in device.mac))) + if hasattr(device, 'check_temperature'): + print("temperature = {}".format(device.check_temperature())) + print("") + else: + print("Error authenticating with device : {}".format(device.host)) diff --git a/third_party/python/broadlink/default.nix b/third_party/python/broadlink/default.nix new file mode 100644 index 000000000000..e316d83d1d71 --- /dev/null +++ b/third_party/python/broadlink/default.nix @@ -0,0 +1,15 @@ +# Python package for controlling the Broadlink RM Pro Infrared +# controller. +# +# https://github.com/mjg59/python-broadlink +{ pkgs, lib, ... }: + +let + inherit (pkgs) fetchFromGitHub; + inherit (pkgs.python3Packages) buildPythonPackage cryptography; +in buildPythonPackage (lib.fix (self: { + pname = "python-broadlink"; + version = "0.13.2"; + src = ./.; + propagatedBuildInputs = [ cryptography ]; +})) diff --git a/third_party/python/broadlink/protocol.md b/third_party/python/broadlink/protocol.md new file mode 100644 index 000000000000..2e388d749963 --- /dev/null +++ b/third_party/python/broadlink/protocol.md @@ -0,0 +1,202 @@ +Broadlink RM2 network protocol +============================== + +Encryption +---------- + +Packets include AES-based encryption in CBC mode. The initial key is 0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02. The IV is 0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58. + +Checksum +-------- + +Construct the packet and set checksum bytes to zero. Add each byte to the starting value of 0xbeaf, wrapping after 0xffff. + +New device setup +---------------- + +To setup a new Broadlink device while in AP Mode a 136 byte packet needs to be sent to the device as follows: + +| Offset | Contents | +|---------|----------| +|0x00-0x19|00| +|0x20-0x21|Checksum as a little-endian 16 bit integer| +|0x26|14 (Always 14)| +|0x44-0x63|SSID Name (zero padding is appended)| +|0x64-0x83|Password (zero padding is appended)| +|0x84|Character length of SSID| +|0x85|Character length of password| +|0x86|Wireless security mode (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2)| +|0x87-88|00| + +Send this packet as a UDP broadcast to 255.255.255.255 on port 80. + +Network discovery +----------------- + +To discover Broadlink devices on the local network, send a 48 byte packet with the following contents: + +| Offset | Contents | +|---------|----------| +|0x00-0x07|00| +|0x08-0x0b|Current offset from GMT as a little-endian 32 bit integer| +|0x0c-0x0d|Current year as a little-endian 16 bit integer| +|0x0e|Current number of seconds past the minute| +|0x0f|Current number of minutes past the hour| +|0x10|Current number of hours past midnight| +|0x11|Current day of the week (Monday = 1, Tuesday = 2, etc)| +|0x12|Current day in month| +|0x13|Current month| +|0x14-0x17|00| +|0x18-0x1b|Local IP address| +|0x1c-0x1d|Source port as a little-endian 16 bit integer| +|0x1e-0x1f|00| +|0x20-0x21|Checksum as a little-endian 16 bit integer| +|0x22-0x25|00| +|0x26|06| +|0x27-0x2f|00| + +Send this packet as a UDP broadcast to 255.255.255.255 on port 80. + +Response (any unicast response): + +| Offset | Contents | +|---------|----------| +|0x34-0x35|Device type as a little-endian 16 bit integer (see device type mapping)| +|0x3a-0x3f|MAC address of the target device| + +Device type mapping: + +| Device type in response packet | Device type | Treat as | +|---------|----------|----------| +|0|SP1|SP1| +|0x2711|SP2|SP2| +|0x2719 or 0x7919 or 0x271a or 0x791a|Honeywell SP2|SP2| +|0x2720|SPMini|SP2| +|0x753e|SP3|SP2| +|0x2728|SPMini2|SP2 +|0x2733 or 0x273e|OEM branded SPMini|SP2| +|>= 0x7530 and <= 0x7918|OEM branded SPMini2|SP2| +|0x2736|SPMiniPlus|SP2| +|0x2712|RM2|RM| +|0x2737|RM Mini / RM3 Mini Blackbean|RM| +|0x273d|RM Pro Phicomm|RM| +|0x2783|RM2 Home Plus|RM| +|0x277c|RM2 Home Plus GDT|RM| +|0x272a|RM2 Pro Plus|RM| +|0x2787|RM2 Pro Plus2|RM| +|0x278b|RM2 Pro Plus BL|RM| +|0x278f|RM Mini Shate|RM| +|0x2714|A1|A1| +|0x4EB5|MP1|MP1| + + +Command packet format +--------------------- + +The command packet header is 56 bytes long with the following format: + +|Offset|Contents| +|------|--------| +|0x00|0x5a| +|0x01|0xa5| +|0x02|0xaa| +|0x03|0x55| +|0x04|0x5a| +|0x05|0xa5| +|0x06|0xaa| +|0x07|0x55| +|0x08-0x1f|00| +|0x20-0x21|Checksum of full packet as a little-endian 16 bit integer| +|0x22-0x23|00| +|0x24-0x25|Device type as a little-endian 16 bit integer| +|0x26-0x27|Command code as a little-endian 16 bit integer| +|0x28-0x29|Packet count as a little-endian 16 bit integer| +|0x2a-0x2f|Local MAC address| +|0x30-0x33|Local device ID (obtained during authentication, 00 before authentication)| +|0x34-0x35|Checksum of unencrypted payload as a little-endian 16 bit integer +|0x36-0x37|00| + +The payload is appended immediately after this. The checksum at 0x20 is calculated *after* the payload is appended, and covers the entire packet (including the checksum at 0x34). Therefore: + +1. Generate packet header with checksum values set to 0 +2. Set the checksum initialisation value to 0xbeaf and calculate the checksum of the unencrypted payload. Set 0x34-0x35 to this value. +3. Encrypt and append the payload +4. Set the checksum initialisation value to 0xbeaf and calculate the checksum of the entire packet. Set 0x20-0x21 to this value. + +Authorisation +------------- + +You must obtain an authorisation key from the device before you can communicate. To do so, generate an 80 byte packet with the following contents: + +|Offset|Contents| +|------|--------| +|0x00-0x03|00| +|0x04-0x12|A 15-digit value that represents this device. Broadlink's implementation uses the IMEI.| +|0x13|01| +|0x14-0x2c|00| +|0x2d|0x01| +|0x30-0x7f|NULL-terminated ASCII string containing the device name| + +Send this payload with a command value of 0x0065. The response packet will contain an encrypted payload from byte 0x38 onwards. Decrypt this using the default key and IV. The format of the decrypted payload is: + +|Offset|Contents| +|------|--------| +|0x00-0x03|Device ID| +|0x04-0x13|Device encryption key| + +All further command packets must use this encryption key and device ID. + +Entering learning mode +---------------------- + +Send the following 16 byte payload with a command value of 0x006a: + +|Offset|Contents| +|------|--------| +|0x00|0x03| +|0x01-0x0f|0x00| + +Reading back data from learning mode +------------------------------------ + +Send the following 16 byte payload with a command value of 0x006a: + +|Offset|Contents| +|------|--------| +|0x00|0x04| +|0x01-0x0f|0x00| + +Byte 0x22 of the response contains a little-endian 16 bit error code. If this is 0, a code has been obtained. Bytes 0x38 and onward of the response are encrypted. Decrypt them. Bytes 0x04 and onward of the decrypted payload contain the captured data. + +Sending data +------------ + +Send the following payload with a command byte of 0x006a + +|Offset|Contents| +|------|--------| +|0x00|0x02| +|0x01-0x03|0x00| +|0x04|0x26 = IR, 0xb2 for RF 433Mhz, 0xd7 for RF 315Mhz| +|0x05|repeat count, (0 = no repeat, 1 send twice, .....)| +|0x06-0x07|Length of the following data in little endian| +|0x08 ....|Pulse lengths in 2^-15 s units (µs * 269 / 8192 works very well)| +|....|0x0d 0x05 at the end for IR only| + +Each value is represented by one byte. If the length exceeds one byte +then it is stored big endian with a leading 0. + +Example: The header for my Optoma projector is 8920 4450 +8920 * 269 / 8192 = 0x124 +4450 * 269 / 8192 = 0x92 + +So the data starts with `0x00 0x1 0x24 0x92 ....` + + +Todo +---- + +* Support for other devices using the Broadlink protocol (various smart home devices) +* Figure out what the format of the data packets actually is. +* Deal with the response after AP Mode WiFi network setup. + diff --git a/third_party/python/broadlink/requirements.txt b/third_party/python/broadlink/requirements.txt new file mode 100644 index 000000000000..09f445bfd823 --- /dev/null +++ b/third_party/python/broadlink/requirements.txt @@ -0,0 +1 @@ +cryptography==2.6.1 diff --git a/third_party/python/broadlink/setup.py b/third_party/python/broadlink/setup.py new file mode 100644 index 000000000000..778f495fb58a --- /dev/null +++ b/third_party/python/broadlink/setup.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +from setuptools import setup, find_packages + + +version = '0.13.2' + +setup( + name='broadlink', + version=version, + author='Matthew Garrett', + author_email='mjg59@srcf.ucam.org', + url='http://github.com/mjg59/python-broadlink', + packages=find_packages(), + scripts=[], + install_requires=['cryptography>=2.1.1'], + description='Python API for controlling Broadlink IR controllers', + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + ], + include_package_data=True, + zip_safe=False, +) |