232 lines
8.1 KiB
Python
Executable File
232 lines
8.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
NEEWER FS230B LED Light Control SDK
|
|
Based on reverse-engineered protocol from similar NEEWER models
|
|
"""
|
|
|
|
import asyncio
|
|
import struct
|
|
from bleak import BleakClient, BleakScanner
|
|
from typing import Optional, List, Tuple
|
|
import logging
|
|
|
|
# NEEWER BLE Service/Characteristic UUIDs (based on RGB660 PRO reverse engineering)
|
|
SERVICE_UUID = "69400001-b5a3-f393-e0a9-e50e-24dc-ca99"
|
|
CHARACTERISTIC_UUID = "69400002-b5a3-f393-e0a9-e50e-24dc-ca99"
|
|
|
|
class NeewerFS230B:
|
|
"""Control class for NEEWER FS230B LED Light"""
|
|
|
|
def __init__(self, mac_address: str = None):
|
|
"""
|
|
Initialize the NEEWER light controller
|
|
|
|
Args:
|
|
mac_address: Bluetooth MAC address of the light. If None, will scan for devices.
|
|
"""
|
|
self.mac_address = mac_address
|
|
self.client = None
|
|
self.is_connected = False
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def scan_for_devices(self, timeout: int = 10) -> List[Tuple[str, str]]:
|
|
"""
|
|
Scan for NEEWER devices
|
|
|
|
Returns:
|
|
List of tuples containing (mac_address, device_name)
|
|
"""
|
|
devices = []
|
|
scanner = BleakScanner()
|
|
|
|
self.logger.info(f"Scanning for NEEWER devices for {timeout} seconds...")
|
|
discovered_devices = await scanner.discover(timeout=timeout)
|
|
|
|
for device in discovered_devices:
|
|
# Look for NEEWER devices by name patterns
|
|
if device.name and any(keyword in device.name.upper() for keyword in
|
|
["NEEWER", "FS230", "NW"]):
|
|
devices.append((device.address, device.name))
|
|
self.logger.info(f"Found NEEWER device: {device.name} ({device.address})")
|
|
|
|
return devices
|
|
|
|
async def connect(self) -> bool:
|
|
"""
|
|
Connect to the NEEWER light
|
|
|
|
Returns:
|
|
True if connected successfully, False otherwise
|
|
"""
|
|
if not self.mac_address:
|
|
devices = await self.scan_for_devices()
|
|
if not devices:
|
|
self.logger.error("No NEEWER devices found")
|
|
return False
|
|
self.mac_address = devices[0][0]
|
|
self.logger.info(f"Using first found device: {self.mac_address}")
|
|
|
|
try:
|
|
self.client = BleakClient(self.mac_address)
|
|
await self.client.connect()
|
|
self.is_connected = True
|
|
self.logger.info(f"Connected to NEEWER light at {self.mac_address}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect: {e}")
|
|
return False
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from the light"""
|
|
if self.client and self.is_connected:
|
|
await self.client.disconnect()
|
|
self.is_connected = False
|
|
self.logger.info("Disconnected from NEEWER light")
|
|
|
|
def _calculate_checksum(self, data: List[int]) -> int:
|
|
"""Calculate checksum for command (sum truncated to last byte)"""
|
|
return sum(data) & 0xFF
|
|
|
|
async def _send_command(self, command: List[int]) -> bool:
|
|
"""
|
|
Send a command to the light
|
|
|
|
Args:
|
|
command: List of integers representing the command bytes
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
if not self.is_connected or not self.client:
|
|
self.logger.error("Not connected to device")
|
|
return False
|
|
|
|
try:
|
|
# Add checksum to command
|
|
checksum = self._calculate_checksum(command)
|
|
full_command = command + [checksum]
|
|
|
|
# Convert to bytes
|
|
command_bytes = bytes(full_command)
|
|
|
|
self.logger.debug(f"Sending command: {command_bytes.hex()}")
|
|
await self.client.write_gatt_char(CHARACTERISTIC_UUID, command_bytes)
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to send command: {e}")
|
|
return False
|
|
|
|
async def set_power(self, on: bool) -> bool:
|
|
"""
|
|
Turn the light on or off
|
|
|
|
Args:
|
|
on: True to turn on, False to turn off
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
# Power command format: [0x78, 0x81, 0x01, on_off_byte]
|
|
# On = 0x01, Off = 0x02
|
|
command = [0x78, 0x81, 0x01, 0x01 if on else 0x02]
|
|
return await self._send_command(command)
|
|
|
|
async def set_brightness_and_temperature(self, brightness: int, temperature: int) -> bool:
|
|
"""
|
|
Set brightness and color temperature
|
|
|
|
Args:
|
|
brightness: 0-100 (percentage)
|
|
temperature: 3200-5600 (Kelvin) for bi-color models
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
# Validate inputs
|
|
brightness = max(0, min(100, brightness))
|
|
temperature = max(3200, min(5600, temperature))
|
|
|
|
# Convert temperature to protocol value (3200K = 0x20, 5600K = 0x38)
|
|
temp_value = int(((temperature - 3200) / (5600 - 3200)) * (0x38 - 0x20) + 0x20)
|
|
|
|
# CCT command format: [0x78, 0x87, 0x02, brightness, temp_value]
|
|
command = [0x78, 0x87, 0x02, brightness, temp_value]
|
|
return await self._send_command(command)
|
|
|
|
async def set_brightness(self, brightness: int) -> bool:
|
|
"""
|
|
Set brightness only (maintains current color temperature)
|
|
|
|
Args:
|
|
brightness: 0-100 (percentage)
|
|
|
|
Returns:
|
|
True if command sent successfully
|
|
"""
|
|
# Default to 5600K (daylight) if no temperature specified
|
|
return await self.set_brightness_and_temperature(brightness, 5600)
|
|
|
|
|
|
# CLI Interface
|
|
async def main():
|
|
"""Simple CLI for testing the SDK"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Control NEEWER FS230B LED Light")
|
|
parser.add_argument("--mac", help="Bluetooth MAC address of the light")
|
|
parser.add_argument("--scan", action="store_true", help="Scan for NEEWER devices")
|
|
parser.add_argument("--on", action="store_true", help="Turn light on")
|
|
parser.add_argument("--off", action="store_true", help="Turn light off")
|
|
parser.add_argument("--brightness", type=int, help="Set brightness (0-100)")
|
|
parser.add_argument("--temperature", type=int, help="Set color temperature (3200-5600K)")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
else:
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
light = NeewerFS230B(args.mac)
|
|
|
|
if args.scan:
|
|
devices = await light.scan_for_devices()
|
|
if devices:
|
|
print("Found NEEWER devices:")
|
|
for mac, name in devices:
|
|
print(f" {name}: {mac}")
|
|
else:
|
|
print("No NEEWER devices found")
|
|
return
|
|
|
|
# Connect to the light
|
|
if not await light.connect():
|
|
print("Failed to connect to light")
|
|
return
|
|
|
|
try:
|
|
# Execute commands
|
|
if args.on:
|
|
success = await light.set_power(True)
|
|
print(f"Turn on: {'Success' if success else 'Failed'}")
|
|
|
|
if args.off:
|
|
success = await light.set_power(False)
|
|
print(f"Turn off: {'Success' if success else 'Failed'}")
|
|
|
|
if args.brightness is not None:
|
|
if args.temperature:
|
|
success = await light.set_brightness_and_temperature(args.brightness, args.temperature)
|
|
print(f"Set brightness {args.brightness}% and temperature {args.temperature}K: {'Success' if success else 'Failed'}")
|
|
else:
|
|
success = await light.set_brightness(args.brightness)
|
|
print(f"Set brightness {args.brightness}%: {'Success' if success else 'Failed'}")
|
|
|
|
finally:
|
|
await light.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|