Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose IR/RF conversion functions #788

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions broadlink/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,47 @@
from .device import Device


def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None:
"""Convert a microsecond duration sequence into a Broadlink IR packet."""
result = bytearray(4)
result[0x00] = 0x26

for pulse in pulses:
div, mod = divmod(int(pulse // tick), 256)
if div:
result.append(0)
result.append(div)
result.append(mod)

data_len = len(result) - 4
result[0x02] = data_len & 0xFF
result[0x03] = data_len >> 8

return result


def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]:
"""Parse a Broadlink packet into a microsecond duration sequence."""
result = []
index = 4
end = min(256 * data[0x03] + data[0x02] + 4, len(data))

while index < end:
chunk = data[index]
index += 1

if chunk == 0:
try:
chunk = 256 * data[index] + data[index + 1]
except IndexError:
raise ValueError("Malformed data.")
index += 2

result.append(int(chunk * tick))

return result


class rmmini(Device):
"""Controls a Broadlink RM mini 3."""

Expand Down
106 changes: 38 additions & 68 deletions cli/broadlink_cli
Original file line number Diff line number Diff line change
@@ -1,68 +1,32 @@
#!/usr/bin/env python3
import argparse
import base64
import codecs
import time
import typing as t

import broadlink
from broadlink.const import DEFAULT_PORT
from broadlink.exceptions import ReadError, StorageError
from broadlink.remote import data_to_pulses, pulses_to_data

TICK = 32.84
TIMEOUT = 30
IR_TOKEN = 0x26


def auto_int(x):
return int(x, 0)


def to_microseconds(bytes):
result = []
# print bytes[0] # 0x26 = 38for IR
index = 4
while index < len(bytes):
chunk = bytes[index]
index += 1
if chunk == 0:
chunk = bytes[index]
chunk = 256 * chunk + bytes[index + 1]
index += 2
result.append(int(round(chunk * TICK)))
if chunk == 0x0d05:
break
return result


def durations_to_broadlink(durations):
result = bytearray()
result.append(IR_TOKEN)
result.append(0)
result.append(len(durations) % 256)
result.append(len(durations) / 256)
for dur in durations:
num = int(round(dur / TICK))
if num > 255:
result.append(0)
result.append(num / 256)
result.append(num % 256)
return result
def format_pulses(pulses: t.List[int]) -> str:
"""Format pulses."""
return " ".join(
f"+{pulse}" if i % 2 == 0 else f"-{pulse}"
for i, pulse in enumerate(pulses)
)


def format_durations(data):
result = ''
for i in range(0, len(data)):
if len(result) > 0:
result += ' '
result += ('+' if i % 2 == 0 else '-') + str(data[i])
return result


def parse_durations(str):
result = []
for s in str.split():
result.append(abs(int(s)))
return result
def parse_pulses(data: t.List[str]) -> t.List[int]:
"""Parse pulses."""
return [abs(int(s)) for s in data]


parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
Expand Down Expand Up @@ -112,8 +76,8 @@ if args.joinwifi:

if args.convert:
data = bytearray.fromhex(''.join(args.data))
durations = to_microseconds(data)
print(format_durations(durations))
pulses = data_to_pulses(data)
print(format_pulses(pulses))
if args.temperature:
print(dev.check_temperature())
if args.humidity:
Expand All @@ -125,8 +89,11 @@ if args.sensors:
for key in data:
print("{} {}".format(key, data[key]))
if args.send:
data = durations_to_broadlink(parse_durations(' '.join(args.data))) \
if args.durations else bytearray.fromhex(''.join(args.data))
data = (
pulses_to_data(parse_pulses(args.data))
if args.durations
else bytes.fromhex(''.join(args.data))
)
dev.send_data(data)
if args.learn or (args.learnfile and not args.rflearn):
dev.enter_learning()
Expand All @@ -144,17 +111,19 @@ if args.learn or (args.learnfile and not args.rflearn):
print("No data received...")
exit(1)

learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learn:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: " + str(base64.b64encode(decode_hex(learned)[0])))
print("Packet found!")
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))

print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)

if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)
if args.check:
if dev.check_power():
print('* ON *')
Expand Down Expand Up @@ -238,14 +207,15 @@ if args.rflearn:
exit(1)

print("Packet found!")
learned = format_durations(to_microseconds(bytearray(data))) \
if args.durations \
else ''.join(format(x, '02x') for x in bytearray(data))
if args.learnfile is None:
print(learned)
decode_hex = codecs.getdecoder("hex_codec")
print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0]))))
if args.learnfile is not None:
raw_fmt = data.hex()
base64_fmt = base64.b64encode(data).decode('ascii')
pulse_fmt = format_pulses(data_to_pulses(data))

print("Raw:", raw_fmt)
print("Base64:", base64_fmt)
print("Pulses:", pulse_fmt)

if args.learnfile:
print("Saving to {}".format(args.learnfile))
with open(args.learnfile, "w") as text_file:
text_file.write(learned)
text_file.write(pulse_fmt if args.durations else raw_fmt)
Loading