diff --git a/roborock/code_mappings.py b/roborock/code_mappings.py index 9f0736e..8d1123b 100644 --- a/roborock/code_mappings.py +++ b/roborock/code_mappings.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +from collections import namedtuple from enum import Enum, IntEnum _LOGGER = logging.getLogger(__name__) @@ -50,6 +51,68 @@ def items(cls: type[RoborockEnum]): return cls.as_dict().items() +ProductInfo = namedtuple("ProductInfo", ["nickname", "short_models"]) + + +class RoborockProductNickname(Enum): + # Coral Series + CORAL = ProductInfo(nickname="Coral", short_models=("a20", "a21")) + CORALPRO = ProductInfo(nickname="CoralPro", short_models=("a143", "a144")) + + # Pearl Series + PEARL = ProductInfo(nickname="Pearl", short_models=("a74", "a75")) + PEARLC = ProductInfo(nickname="PearlC", short_models=("a103", "a104")) + PEARLE = ProductInfo(nickname="PearlE", short_models=("a167", "a168")) + PEARLELITE = ProductInfo(nickname="PearlELite", short_models=("a169", "a170")) + PEARLPLUS = ProductInfo(nickname="PearlPlus", short_models=("a86", "a87")) + PEARLPLUSS = ProductInfo(nickname="PearlPlusS", short_models=("a116", "a117", "a136")) + PEARLS = ProductInfo(nickname="PearlS", short_models=("a100", "a101")) + PEARLSLITE = ProductInfo(nickname="PearlSLite", short_models=("a122", "a123")) + + # Ruby Series + RUBYPLUS = ProductInfo(nickname="RubyPlus", short_models=("t4", "s4")) + RUBYSC = ProductInfo(nickname="RubySC", short_models=("p5", "a08")) + RUBYSE = ProductInfo(nickname="RubySE", short_models=("a19",)) + RUBYSLITE = ProductInfo(nickname="RubySLite", short_models=("p6", "s5e", "a05")) + + # Tanos Series + TANOS = ProductInfo(nickname="Tanos", short_models=("t6", "s6")) + TANOSE = ProductInfo(nickname="TanosE", short_models=("t7", "a11")) + TANOSS = ProductInfo(nickname="TanosS", short_models=("a14", "a15")) + TANOSSC = ProductInfo(nickname="TanosSC", short_models=("a39", "a40")) + TANOSSE = ProductInfo(nickname="TanosSE", short_models=("a33", "a34")) + TANOSSMAX = ProductInfo(nickname="TanosSMax", short_models=("a52",)) + TANOSSLITE = ProductInfo(nickname="TanosSLite", short_models=("a37", "a38")) + TANOSSPLUS = ProductInfo(nickname="TanosSPlus", short_models=("a23", "a24")) + TANOSV = ProductInfo(nickname="TanosV", short_models=("t7p", "a09", "a10")) + + # Topaz Series + TOPAZS = ProductInfo(nickname="TopazS", short_models=("a29", "a30", "a76")) + TOPAZSC = ProductInfo(nickname="TopazSC", short_models=("a64", "a65")) + TOPAZSPLUS = ProductInfo(nickname="TopazSPlus", short_models=("a46", "a47", "a66")) + TOPAZSPOWER = ProductInfo(nickname="TopazSPower", short_models=("a62",)) + TOPAZSV = ProductInfo(nickname="TopazSV", short_models=("a26", "a27")) + + # Ultron Series + ULTRON = ProductInfo(nickname="Ultron", short_models=("a50", "a51")) + ULTRONE = ProductInfo(nickname="UltronE", short_models=("a72", "a84")) + ULTRONLITE = ProductInfo(nickname="UltronLite", short_models=("a73", "a85")) + ULTRONSC = ProductInfo(nickname="UltronSC", short_models=("a94", "a95")) + ULTRONSE = ProductInfo(nickname="UltronSE", short_models=("a124", "a125", "a139", "a140")) + ULTRONSPLUS = ProductInfo(nickname="UltronSPlus", short_models=("a68", "a69", "a70")) + ULTRONSV = ProductInfo(nickname="UltronSV", short_models=("a96", "a97")) + + # Verdelite Series + VERDELITE = ProductInfo(nickname="Verdelite", short_models=("a146", "a147")) + + # Vivian Series + VIVIAN = ProductInfo(nickname="Vivian", short_models=("a134", "a135", "a155", "a156")) + VIVIANC = ProductInfo(nickname="VivianC", short_models=("a158", "a159")) + + +short_model_to_enum = {model: product for product in RoborockProduct for model in product.value.short_models} + + class RoborockStateCode(RoborockEnum): unknown = 0 starting = 1 diff --git a/roborock/containers.py b/roborock/containers.py index afbe14b..dfdc03f 100644 --- a/roborock/containers.py +++ b/roborock/containers.py @@ -4,9 +4,9 @@ import json import logging import re -from dataclasses import asdict, dataclass, field +from dataclasses import asdict, dataclass, field, fields from datetime import timezone -from enum import Enum +from enum import Enum, IntEnum from typing import Any, NamedTuple, get_args, get_origin from .code_mappings import ( @@ -43,8 +43,10 @@ RoborockMopModeS7, RoborockMopModeS8MaxVUltra, RoborockMopModeS8ProUltra, + RoborockProductNickname, RoborockStartType, RoborockStateCode, + short_model_to_enum, ) from .const import ( CLEANING_BRUSH_REPLACE_TIME, @@ -294,144 +296,238 @@ class HomeDataDevice(RoborockBase): silent_ota_switch: bool | None = None setting: Any | None = None f: bool | None = None - device_features: DeviceFeatures | None = None - # seemingly not just str like I thought - example: '0000000000002000' and '0000000000002F63' - # def __post_init__(self): - # if self.feature_set is not None and self.new_feature_set is not None and self.new_feature_set != "": - # self.device_features = build_device_features(self.feature_set, self.new_feature_set) +class NewFeatureStrBit(IntEnum): + TWO_KEY_REAL_TIME_VIDEO = 32 + TWO_KEY_RTV_IN_CHARGING = 33 + DIRTY_REPLENISH_CLEAN = 34 + AUTO_DELIVERY_FIELD_IN_GLOBAL_STATUS = 35 + AVOID_COLLISION_MODE = 36 + VOICE_CONTROL = 37 + NEW_ENDPOINT = 38 + PUMPING_WATER = 39 + CORNER_MOP_STRECH = 40 + HOT_WASH_TOWEL = 41 + FLOOR_DIR_CLEAN_ANY_TIME = 42 + PET_SUPPLIES_DEEP_CLEAN = 43 + MOP_SHAKE_WATER_MAX = 45 + EXACT_CUSTOM_MODE = 47 + CARPET_CUSTOM_CLEAN = 49 + PET_SNAPSHOT = 50 + CUSTOM_CLEAN_MODE_COUNT = 51 + NEW_AI_RECOGNITION = 52 + AUTO_COLLECTION_2 = 53 + RIGHT_BRUSH_STRETCH = 54 + SMART_CLEAN_MODE_SET = 55 + DIRTY_OBJECT_DETECT = 56 + NO_NEED_CARPET_PRESS_SET = 57 + VOICE_CONTROL_LED = 58 + WATER_LEAK_CHECK = 60 + MIN_BATTERY_15_TO_CLEAN_TASK = 62 + GAP_DEEP_CLEAN = 63 + OBJECT_DETECT_CHECK = 64 + IDENTIFY_ROOM = 66 + MATTER = 67 + WORKDAY_HOLIDAY = 69 + CLEAN_DIRECT_STATUS = 70 + MAP_ERASER = 71 + OPTIMIZE_BATTERY = 72 + ACTIVATE_VIDEO_CHARGING_AND_STANDBY = 73 + CARPET_LONG_HAIRED = 75 + CLEAN_HISTORY_TIME_LINE = 76 + MAX_ZONE_OPENED = 77 + EXHIBITION_FUNCTION = 78 + LDS_LIFTING = 79 + AUTO_TEAR_DOWN_MOP = 80 + SAMLL_SIDE_MOP = 81 + SUPPORT_SIDE_BRUSH_UP_DOWN = 82 + DRY_INTERVAL_TIMER = 83 + UVC_STERILIZE = 84 + MIDWAY_BACK_TO_DOCK = 85 + SUPPORT_MAIN_BRUSH_UP_DOWN = 86 + EGG_DANCE_MODE = 87 @dataclass class DeviceFeatures(RoborockBase): - map_carpet_add_supported: bool - show_clean_finish_reason_supported: bool - resegment_supported: bool - video_monitor_supported: bool - any_state_transit_goto_supported: bool - fw_filter_obstacle_supported: bool - video_setting_supported: bool - ignore_unknown_map_object_supported: bool - set_child_supported: bool - carpet_supported: bool - mop_path_supported: bool - multi_map_segment_timer_supported: bool - custom_water_box_distance_supported: bool - wash_then_charge_cmd_supported: bool - room_name_supported: bool - current_map_restore_enabled: bool - photo_upload_supported: bool - shake_mop_set_supported: bool - map_beautify_internal_debug_supported: bool - new_data_for_clean_history: bool - new_data_for_clean_history_detail: bool - flow_led_setting_supported: bool - dust_collection_setting_supported: bool - rpc_retry_supported: bool - avoid_collision_supported: bool - support_set_switch_map_mode: bool - support_smart_scene: bool - support_floor_edit: bool - support_furniture: bool - support_room_tag: bool - support_quick_map_builder: bool - support_smart_global_clean_with_custom_mode: bool - record_allowed: bool - careful_slow_map_supported: bool - egg_mode_supported: bool - unsave_map_reason_supported: bool - carpet_show_on_map: bool - supported_valley_electricity: bool - drying_supported: bool - download_test_voice_supported: bool - support_backup_map: bool - support_custom_mode_in_cleaning: bool - support_remote_control_in_call: bool - support_set_volume_in_call: bool - support_clean_estimate: bool - support_custom_dnd: bool - carpet_deep_clean_supported: bool - stuck_zone_supported: bool - custom_door_sill_supported: bool - clean_route_fast_mode_supported: bool - cliff_zone_supported: bool - smart_door_sill_supported: bool - support_floor_direction: bool - wifi_manage_supported: bool - back_charge_auto_wash_supported: bool - support_incremental_map: bool - offline_map_supported: bool - - -def build_device_features(feature_set: str, new_feature_set: str) -> DeviceFeatures: - new_feature_set_int = int(new_feature_set) - feature_set_int = int(feature_set) - new_feature_set_divided = int(new_feature_set_int / (2**32)) - # Convert last 8 digits of new feature set into hexadecimal number - converted_new_feature_set = int("0x" + new_feature_set[-8:], 16) - new_feature_set_mod_8: bool = len(new_feature_set) % 8 == 0 - return DeviceFeatures( - map_carpet_add_supported=bool(1073741824 & new_feature_set_int), - show_clean_finish_reason_supported=bool(1 & new_feature_set_int), - resegment_supported=bool(4 & new_feature_set_int), - video_monitor_supported=bool(8 & new_feature_set_int), - any_state_transit_goto_supported=bool(16 & new_feature_set_int), - fw_filter_obstacle_supported=bool(32 & new_feature_set_int), - video_setting_supported=bool(64 & new_feature_set_int), - ignore_unknown_map_object_supported=bool(128 & new_feature_set_int), - set_child_supported=bool(256 & new_feature_set_int), - carpet_supported=bool(512 & new_feature_set_int), - mop_path_supported=bool(2048 & new_feature_set_int), - multi_map_segment_timer_supported=bool(feature_set_int and 4096 & new_feature_set_int), - custom_water_box_distance_supported=bool(new_feature_set_int and 2147483648 & new_feature_set_int), - wash_then_charge_cmd_supported=bool((new_feature_set_divided >> 5) & 1), - room_name_supported=bool(16384 & new_feature_set_int), - current_map_restore_enabled=bool(8192 & new_feature_set_int), - photo_upload_supported=bool(65536 & new_feature_set_int), - shake_mop_set_supported=bool(262144 & new_feature_set_int), - map_beautify_internal_debug_supported=bool(2097152 & new_feature_set_int), - new_data_for_clean_history=bool(4194304 & new_feature_set_int), - new_data_for_clean_history_detail=bool(8388608 & new_feature_set_int), - flow_led_setting_supported=bool(16777216 & new_feature_set_int), - dust_collection_setting_supported=bool(33554432 & new_feature_set_int), - rpc_retry_supported=bool(67108864 & new_feature_set_int), - avoid_collision_supported=bool(134217728 & new_feature_set_int), - support_set_switch_map_mode=bool(268435456 & new_feature_set_int), - support_smart_scene=bool(new_feature_set_divided & 2), - support_floor_edit=bool(new_feature_set_divided & 8), - support_furniture=bool((new_feature_set_divided >> 4) & 1), - support_room_tag=bool((new_feature_set_divided >> 6) & 1), - support_quick_map_builder=bool((new_feature_set_divided >> 7) & 1), - support_smart_global_clean_with_custom_mode=bool((new_feature_set_divided >> 8) & 1), - record_allowed=bool(1024 & new_feature_set_int), - careful_slow_map_supported=bool((new_feature_set_divided >> 9) & 1), - egg_mode_supported=bool((new_feature_set_divided >> 10) & 1), - unsave_map_reason_supported=bool((new_feature_set_divided >> 14) & 1), - carpet_show_on_map=bool((new_feature_set_divided >> 12) & 1), - supported_valley_electricity=bool((new_feature_set_divided >> 13) & 1), - # This one could actually be incorrect - # ((t.robotNewFeatures / 2 ** 32) >> 15) & 1 && (module422.DMM.isTopazSV_CE || 'cn' == t.deviceLocation)); - drying_supported=bool((new_feature_set_divided >> 15) & 1), - download_test_voice_supported=bool((new_feature_set_divided >> 16) & 1), - support_backup_map=bool((new_feature_set_divided >> 17) & 1), - support_custom_mode_in_cleaning=bool((new_feature_set_divided >> 18) & 1), - support_remote_control_in_call=bool((new_feature_set_divided >> 19) & 1), - support_set_volume_in_call=new_feature_set_mod_8 and bool(1 & converted_new_feature_set), - support_clean_estimate=new_feature_set_mod_8 and bool(2 & converted_new_feature_set), - support_custom_dnd=new_feature_set_mod_8 and bool(4 & converted_new_feature_set), - carpet_deep_clean_supported=bool(8 & converted_new_feature_set), - stuck_zone_supported=new_feature_set_mod_8 and bool(16 & converted_new_feature_set), - custom_door_sill_supported=new_feature_set_mod_8 and bool(32 & converted_new_feature_set), - clean_route_fast_mode_supported=bool(256 & converted_new_feature_set), - cliff_zone_supported=new_feature_set_mod_8 and bool(512 & converted_new_feature_set), - smart_door_sill_supported=new_feature_set_mod_8 and bool(1024 & converted_new_feature_set), - support_floor_direction=new_feature_set_mod_8 and bool(2048 & converted_new_feature_set), - wifi_manage_supported=bool(128 & converted_new_feature_set), - back_charge_auto_wash_supported=bool(4096 & converted_new_feature_set), - support_incremental_map=bool(8192 & converted_new_feature_set), - offline_map_supported=bool(16384 & converted_new_feature_set), + """Represents the features supported by a Roborock device.""" + + # Features derived from robot_new_features + is_show_clean_finish_reason_supported: bool = field(metadata={"robot_new_features": 1}) + is_resegment_supported: bool = field(metadata={"robot_new_features": 4}) + is_video_monitor_supported: bool = field(metadata={"robot_new_features": 8}) + is_any_state_transit_goto_supported: bool = field(metadata={"robot_new_features": 16}) + is_fw_filter_obstacle_supported: bool = field(metadata={"robot_new_features": 32}) + is_video_settings_supported: bool = field(metadata={"robot_new_features": 64}) + is_ignore_unknown_map_object_supported: bool = field(metadata={"robot_new_features": 128}) + is_set_child_supported: bool = field(metadata={"robot_new_features": 256}) + is_carpet_supported: bool = field(metadata={"robot_new_features": 512}) + is_record_allowed: bool = field(metadata={"robot_new_features": 1024}) + is_mop_path_supported: bool = field(metadata={"robot_new_features": 2048}) + is_current_map_restore_enabled: bool = field(metadata={"robot_new_features": 8192}) + is_room_name_supported: bool = field(metadata={"robot_new_features": 16384}) + is_photo_upload_supported: bool = field(metadata={"robot_new_features": 65536}) + is_shake_mop_set_supported: bool = field(metadata={"robot_new_features": 262144}) + is_map_beautify_internal_debug_supported: bool = field(metadata={"robot_new_features": 2097152}) + is_new_data_for_clean_history_supported: bool = field(metadata={"robot_new_features": 4194304}) + is_new_data_for_clean_history_detail_supported: bool = field(metadata={"robot_new_features": 8388608}) + is_flow_led_setting_supported: bool = field(metadata={"robot_new_features": 16777216}) + is_dust_collection_setting_supported: bool = field(metadata={"robot_new_features": 33554432}) + is_rpc_retry_supported: bool = field(metadata={"robot_new_features": 67108864}) + is_avoid_collision_supported: bool = field(metadata={"robot_new_features": 134217728}) + is_support_set_switch_map_mode_supported: bool = field(metadata={"robot_new_features": 268435456}) + is_map_carpet_add_support: bool = field(metadata={"robot_new_features": 1073741824}) + is_custom_water_box_distance_supported: bool = field(metadata={"robot_new_features": 2147483648}) + + # Features derived from unhexed_feature_info + is_support_smart_scene_supported: bool = field(metadata={"upper_32_bits": 1}) + is_support_floor_edit_supported: bool = field(metadata={"upper_32_bits": 3}) + is_support_furniture_supported: bool = field(metadata={"upper_32_bits": 4}) + is_wash_then_charge_cmd_supported: bool = field(metadata={"upper_32_bits": 5}) + is_support_room_tag_supported: bool = field(metadata={"upper_32_bits": 6}) + is_support_quick_map_builder_supported: bool = field(metadata={"upper_32_bits": 7}) + is_support_smart_global_clean_with_custom_mode_supported: bool = field(metadata={"upper_32_bits": 8}) + is_careful_slow_mop_supported: bool = field(metadata={"upper_32_bits": 9}) + is_egg_mode_supported: bool = field(metadata={"upper_32_bits": 10}) + is_carpet_show_on_map_supported: bool = field(metadata={"upper_32_bits": 12}) + is_supported_valley_electricity_supported: bool = field(metadata={"upper_32_bits": 13}) + is_unsave_map_reason_supported: bool = field(metadata={"upper_32_bits": 14}) + is_supported_download_test_voice_supported: bool = field(metadata={"upper_32_bits": 16}) + is_support_backup_map_supported: bool = field(metadata={"upper_32_bits": 17}) + is_support_custom_mode_in_cleaning_supported: bool = field(metadata={"upper_32_bits": 18}) + is_support_remote_control_in_call_supported: bool = field(metadata={"upper_32_bits": 19}) + + is_support_set_volume_in_call: bool = field(metadata={"unhexed_feature_info": 1}) + is_support_clean_estimate: bool = field(metadata={"unhexed_feature_info": 2}) + is_support_custom_dnd: bool = field(metadata={"unhexed_feature_info": 4}) + is_carpet_deep_clean_supported: bool = field(metadata={"unhexed_feature_info": 8}) + is_support_stuck_zone: bool = field(metadata={"unhexed_feature_info": 16}) + is_support_custom_door_sill: bool = field(metadata={"unhexed_feature_info": 32}) + is_wifi_manage_supported: bool = field(metadata={"unhexed_feature_info": 128}) + is_clean_route_fast_mode_supported: bool = field(metadata={"unhexed_feature_info": 256}) + is_support_cliff_zone: bool = field(metadata={"unhexed_feature_info": 512}) + is_support_smart_door_sill: bool = field(metadata={"unhexed_feature_info": 1024}) + is_support_floor_direction: bool = field(metadata={"unhexed_feature_info": 2048}) + is_back_charge_auto_wash_supported: bool = field(metadata={"unhexed_feature_info": 4096}) + is_super_deep_wash_supported: bool = field(metadata={"unhexed_feature_info": 32768}) + is_ces2022_supported: bool = field(metadata={"unhexed_feature_info": 65536}) + is_dss_believable_supported: bool = field(metadata={"unhexed_feature_info": 131072}) + is_main_brush_up_down_supported: bool = field(metadata={"unhexed_feature_info": 262144}) + is_goto_pure_clean_path_supported: bool = field(metadata={"unhexed_feature_info": 524288}) + is_water_up_down_drain_supported: bool = field(metadata={"unhexed_feature_info": 1048576}) + is_setting_carpet_first_supported: bool = field(metadata={"unhexed_feature_info": 8388608}) + is_clean_route_deep_slow_plus_supported: bool = field(metadata={"unhexed_feature_info": 16777216}) + is_left_water_drain_supported: bool = field(metadata={"unhexed_feature_info": 134217728}) + is_clean_count_setting_supported: bool = field(metadata={"unhexed_feature_info": 1073741824}) + is_corner_clean_mode_supported: bool = field(metadata={"unhexed_feature_info": 2147483648}) + + # --- Features from new_feature_info_str --- + is_two_key_real_time_video_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.TWO_KEY_REAL_TIME_VIDEO} + ) + is_two_key_rtv_in_charging_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.TWO_KEY_RTV_IN_CHARGING} + ) + is_dirty_replenish_clean_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.DIRTY_REPLENISH_CLEAN} + ) + is_avoid_collision_mode_str_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.AVOID_COLLISION_MODE} + ) + is_voice_control_str_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.VOICE_CONTROL}) + is_new_endpoint_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.NEW_ENDPOINT}) + is_corner_mop_strech_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.CORNER_MOP_STRECH}) + is_hot_wash_towel_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.HOT_WASH_TOWEL}) + is_floor_dir_clean_any_time_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.FLOOR_DIR_CLEAN_ANY_TIME} + ) + is_pet_supplies_deep_clean_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.PET_SUPPLIES_DEEP_CLEAN} + ) + is_mop_shake_water_max_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.MOP_SHAKE_WATER_MAX} + ) + is_exact_custom_mode_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.EXACT_CUSTOM_MODE}) + is_carpet_custom_clean_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.CARPET_CUSTOM_CLEAN} + ) + is_pet_snapshot_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.PET_SNAPSHOT}) + is_custom_clean_mode_count_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.CUSTOM_CLEAN_MODE_COUNT} + ) + is_new_ai_recognition_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.NEW_AI_RECOGNITION}) + is_auto_collection_2_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.AUTO_COLLECTION_2}) + is_right_brush_stretch_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.RIGHT_BRUSH_STRETCH} + ) + is_smart_clean_mode_set_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.SMART_CLEAN_MODE_SET} + ) + is_dirty_object_detect_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.DIRTY_OBJECT_DETECT} + ) + is_no_need_carpet_press_set_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.NO_NEED_CARPET_PRESS_SET} + ) + is_voice_control_led_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.VOICE_CONTROL_LED}) + is_water_leak_check_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.WATER_LEAK_CHECK}) + is_min_battery_15_to_clean_task_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.MIN_BATTERY_15_TO_CLEAN_TASK} + ) + is_gap_deep_clean_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.GAP_DEEP_CLEAN}) + is_object_detect_check_supported: bool = field( + metadata={"new_feature_str_bit": NewFeatureStrBit.OBJECT_DETECT_CHECK} ) + is_identify_room_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.IDENTIFY_ROOM}) + is_matter_supported: bool = field(metadata={"new_feature_str_bit": NewFeatureStrBit.MATTER}) + + # is_multi_map_segment_timer_supported: bool = field(default=False) + # is_supported_drying_supported: bool = field(default=False) + + @classmethod + def _is_new_feature_str_support(cls, o: int, new_feature_info_str: str) -> bool: + """ + Checks feature 'o' in hex string 'new_feature_info_str'. + """ + try: + l = o % 4 + target_index = -((o // 4) + 1) + p = new_feature_info_str[target_index] + hex_char_value = int(p, 16) + is_set = (hex_char_value >> l) & 1 + return bool(is_set) + except (IndexError, ValueError): + return False + + @classmethod + def from_feature_flags( + cls, robot_new_features: int, new_feature_set: str, product_nickname: RoborockProductNickname + ) -> DeviceFeatures: + """Creates a DeviceFeatures instance from raw feature flags.""" + unhexed_feature_info = int(new_feature_set[-8:], 16) if new_feature_set and len(new_feature_set) >= 8 else 0 + upper_32_bits = robot_new_features // (2**32) + + kwargs: dict[str, Any] = {} + + for f in fields(cls): + if not f.metadata: + continue + + if "robot_new_features" in f.metadata: + mask = f.metadata["robot_new_features"] + kwargs[f.name] = bool(mask & robot_new_features) + elif "upper_32_bits" in f.metadata: + bit_index = f.metadata["upper_32_bits"] + kwargs[f.name] = bool(robot_new_features and ((upper_32_bits >> bit_index) & 1)) + elif "unhexed_feature_info" in f.metadata: + mask = f.metadata["unhexed_feature_info"] + kwargs[f.name] = bool(mask & unhexed_feature_info) + elif "new_feature_str_bit" in f.metadata: + bit = f.metadata["new_feature_str_bit"] + kwargs[f.name] = cls._is_new_feature_str_support(bit, new_feature_set) + + return cls(**kwargs) @dataclass @@ -840,6 +936,22 @@ class DeviceData(RoborockBase): device: HomeDataDevice model: str host: str | None = None + product_nickname: RoborockProductNickname | None = None + device_features: DeviceFeatures | None = None + + def __post_init__(self): + self.product_nickname = short_model_to_enum.get(self.model.split(".")[-1], RoborockProductNickname.PEARLPLUS) + robot_new_features = int(self.device.feature_set) if self.device.feature_set else 0 + self.device_features = DeviceFeatures.from_feature_flags( + robot_new_features, + self.device.new_feature_set if self.device.new_feature_set is not None else "00000000", + self.product_nickname, + ) + + @property + def duid(self) -> str: + """Get the duid of the device.""" + return self.device.duid @dataclass @@ -971,3 +1083,12 @@ class DyadSndState(RoborockBase): @dataclass class DyadOtaNfo(RoborockBase): mqttOtaData: dict + + +@dataclass +class DndActions(RoborockBase): + dry: int | None = None + dust: int | None = None + led: int | None = None + resume: int | None = None + vol: int | None = None diff --git a/roborock/device_trait.py b/roborock/device_trait.py new file mode 100644 index 0000000..a767072 --- /dev/null +++ b/roborock/device_trait.py @@ -0,0 +1,56 @@ +from abc import ABC, abstractmethod +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +from . import RoborockCommand +from .containers import DeviceFeatures + + +@dataclass +class DeviceTrait(ABC): + handle_command: RoborockCommand + + def __init__(self, send_command: Callable[..., Awaitable[None]]): + self.send_command = send_command + self.subscriptions = [] + + @classmethod + @abstractmethod + def supported(cls, features: DeviceFeatures) -> bool: + raise NotImplementedError + + @abstractmethod + def update(cls, data: dict) -> bool: + raise NotImplementedError + + def on_message(self, data: dict) -> None: + self.status = self.update(data) + for callback in self.subscriptions: + callback(self.status) + + def subscribe(self, callable: Callable): + # Maybe needs to handle async too? + self.subscriptions.append(callable) + + @abstractmethod + def get(self): + raise NotImplementedError + + +# class ConsumableTrait(DeviceTrait): +# handle_command = RoborockCommand.GET_CONSUMABLE +# _status_type: type[Consumable] = DnDTimer +# status: Consumable +# +# def __init__(self, send_command: Callable[..., Awaitable[None]]): +# super().__init__(send_command) +# +# @classmethod +# def supported(cls, features: DeviceFeatures) -> bool: +# return True +# +# async def reset_consumable(self, consumable: str) -> None: +# await self.send_command(RoborockCommand.RESET_CONSUMABLE, [consumable]) +# +# async def get(self) -> None: +# await self.send_command(RoborockCommand.GET_CONSUMABLE) diff --git a/roborock/device_traits/__init__.py b/roborock/device_traits/__init__.py new file mode 100644 index 0000000..7adf22e --- /dev/null +++ b/roborock/device_traits/__init__.py @@ -0,0 +1 @@ +from .dnd import Dnd diff --git a/roborock/device_traits/dnd.py b/roborock/device_traits/dnd.py new file mode 100644 index 0000000..0831778 --- /dev/null +++ b/roborock/device_traits/dnd.py @@ -0,0 +1,56 @@ +import datetime +from collections.abc import Awaitable, Callable + +from roborock import DeviceFeatures, DndActions, RoborockCommand +from roborock.device_trait import DeviceTrait + + +class Dnd(DeviceTrait): + handle_command: RoborockCommand = RoborockCommand.GET_DND_TIMER + + def __init__(self, send_command: Callable[..., Awaitable[None]]): + self.start_hour: int | None = None + self.start_minute: int | None = None + self.end_hour: int | None = None + self.end_minute: int | None = None + self.enabled: bool | None = None + self.start_time: datetime.time | None = None + self.end_time: datetime.time | None = None + self.actions: DndActions | None = None + super().__init__(send_command) + + def from_dict(self, dnd_dict: dict): + self.start_hour = dnd_dict.get("start_hour") + self.start_minute = dnd_dict.get("start_minute") + self.end_hour = dnd_dict.get("end_hour") + self.end_minute = dnd_dict.get("end_minute") + self.enabled = bool(dnd_dict.get("enabled")) + self.actions = DndActions.from_dict(dnd_dict.get("actions")) + self.start_time = ( + datetime.time(hour=self.start_hour, minute=self.start_minute) + if self.start_hour is not None and self.start_minute is not None + else None + ) + self.end_time = ( + datetime.time(hour=self.end_hour, minute=self.end_minute) + if self.end_hour is not None and self.end_minute is not None + else None + ) + + def to_dict(self) -> dict: + return {} + + @classmethod + def supported(cls, features: DeviceFeatures) -> bool: + return features.is_support_custom_dnd + + async def update_dnd(self, enabled: bool, start_time: datetime.time, end_time: datetime.time) -> None: + if self.enabled and not enabled: + await self.send_command(RoborockCommand.CLOSE_DND_TIMER) + else: + start = start_time if start_time is not None else self.start_time + end = end_time if end_time is not None else self.end_time + await self.send_command(RoborockCommand.SET_DND_TIMER, [start.hour, start.minute, end.hour, end.minute]) + + async def get(self) -> None: + await self.send_command(RoborockCommand.GET_DND_TIMER) diff --git a/roborock/roborock_device.py b/roborock/roborock_device.py new file mode 100644 index 0000000..7b1f530 --- /dev/null +++ b/roborock/roborock_device.py @@ -0,0 +1,155 @@ +import base64 +import json +import logging +import math +import secrets +import time +from urllib.parse import urlparse + +from . import RoborockCommand +from .containers import DeviceData, ModelStatus, S7MaxVStatus, Status, UserData +from .device_trait import DeviceTrait +from .device_traits import Dnd +from .mqtt.roborock_session import MqttParams, RoborockMqttSession +from .protocol import MessageParser, Utils, md5hex +from .roborock_message import RoborockMessage, RoborockMessageProtocol +from .util import RoborockLoggerAdapter, get_next_int + +_LOGGER = logging.getLogger(__name__) + + +class RoborockDevice: + _mqtt_sessions: dict[str, RoborockMqttSession] = {} + + def __init__(self, user_data: UserData, device_info: DeviceData): + self.user_data = user_data + self.device_info = device_info + self.data = None + self._logger = RoborockLoggerAdapter(device_info.device.name, _LOGGER) + self._mqtt_endpoint = base64.b64encode(Utils.md5(user_data.rriot.k.encode())[8:14]).decode() + rriot = user_data.rriot + hashed_user = md5hex(rriot.u + ":" + rriot.k)[2:10] + url = urlparse(rriot.r.m) + + self._local_endpoint = "abc" + self._nonce = secrets.token_bytes(16) + self._message_id_types: dict[int, DeviceTrait] = {} + self._command_to_trait = {} + self._all_supported_traits = [] + self._dnd_trait: Dnd | None = self.determine_supported_traits(Dnd) + # self._consumable_trait: ConsumableTrait | None = self.determine_supported_traits(ConsumableTrait) + self._status_type: type[Status] = ModelStatus.get(device_info.model, S7MaxVStatus) + # TODO: One per client EVER + self.session = RoborockMqttSession( + MqttParams( + host=str(url.hostname), + port=url.port, + tls=True, + username=hashed_user, + password=md5hex(rriot.s + ":" + rriot.k)[16:], + ) + ) + self.input_topic = f"rr/m/i/{rriot.u}/{hashed_user}/{device_info.duid}" + self.output_topic = f"rr/m/o/{rriot.u}/{hashed_user}/{device_info.duid}" + + def determine_supported_traits(self, trait: type[DeviceTrait]): + def _send_command( + method: RoborockCommand | str, params: list | dict | int | None = None, use_cloud: bool = True + ): + return self.send_message(method, params, use_cloud) + + if trait.supported(self.device_info.device_features): + trait_instance = trait(_send_command) + self._all_supported_traits.append(trait(_send_command)) + self._command_to_trait[trait.handle_command] = trait_instance + return trait_instance + return None + + async def connect(self): + """Connect via MQTT and Local if possible.""" + if not self.session.connected: + await self.session.start() + await self.session.subscribe(self.output_topic, callback=self.on_message) + + async def update(self): + for trait in self._all_supported_traits: + await trait.get() + + def _get_payload( + self, + method: RoborockCommand | str, + params: list | dict | int | None = None, + secured=False, + use_cloud: bool = False, + ): + timestamp = math.floor(time.time()) + request_id = get_next_int(10000, 32767) + inner = { + "id": request_id, + "method": method, + "params": params or [], + } + if secured: + inner["security"] = { + "endpoint": self._mqtt_endpoint if use_cloud else self._local_endpoint, + "nonce": self._nonce.hex().lower(), + } + payload = bytes( + json.dumps( + { + "dps": {"101": json.dumps(inner, separators=(",", ":"))}, + "t": timestamp, + }, + separators=(",", ":"), + ).encode() + ) + return request_id, timestamp, payload + + async def send_message( + self, method: RoborockCommand | str, params: list | dict | int | None = None, use_cloud: bool = True + ): + request_id, timestamp, payload = self._get_payload(method, params, True, use_cloud) + request_protocol = RoborockMessageProtocol.RPC_REQUEST + roborock_message = RoborockMessage(timestamp=timestamp, protocol=request_protocol, payload=payload) + if request_id in self._message_id_types: + raise Exception("Duplicate id!") + if method in self._command_to_trait: + self._message_id_types[request_id] = self._command_to_trait[method] + local_key = self.device_info.device.local_key + msg = MessageParser.build(roborock_message, local_key, False) + if use_cloud: + await self.session.publish(self.input_topic, msg) + else: + # Handle doing local commands + pass + + def on_message(self, message_bytes: bytes): + messages = MessageParser.parse(message_bytes, self.device_info.device.local_key)[0] + for message in messages: + message_payload = message.get_payload() + message_id = message.get_request_id() + for data_point_number, data_point in message_payload.get("dps").items(): + if data_point_number == "102": + data_point_response = json.loads(data_point) + result = data_point_response.get("result") + if isinstance(result, list) and len(result) == 1: + result = result[0] + if result and (trait := self._message_id_types.get(message_id)) is not None: + trait.on_message(result) + if (error := result.get("error")) is not None: + print(error) + print() + # If message is command not supported - remove from self.update_commands + + # If message is an error - log it? + + # If message is 'ok' - ignore it + + # If message is anything else - store ids, and map back to id to determine message type. + # Then update self.data + + # If we haven't received a message in X seconds, the device is likely offline. I think we can continue the connection, + # but we should have some way to mark ourselves as unavailable. + + # This should also probably be split with on_cloud_message and on_local_message. + print(message) diff --git a/roborock/roborock_message.py b/roborock/roborock_message.py index 1774c3b..5f46c49 100644 --- a/roborock/roborock_message.py +++ b/roborock/roborock_message.py @@ -4,6 +4,7 @@ import math import time from dataclasses import dataclass, field +from functools import cache from roborock import RoborockEnum from roborock.util import get_next_int @@ -161,10 +162,14 @@ class RoborockMessage: random: int = field(default_factory=lambda: get_next_int(10000, 99999)) timestamp: int = field(default_factory=lambda: math.floor(time.time())) message_retry: MessageRetry | None = None + _parsed_payload: dict | None = None + + @cache + def get_payload(self) -> dict | None: + return json.loads(self.payload.decode()) def get_request_id(self) -> int | None: - if self.payload: - payload = json.loads(self.payload.decode()) + if payload := self.get_payload(): for data_point_number, data_point in payload.get("dps").items(): if data_point_number in ["101", "102"]: data_point_response = json.loads(data_point) @@ -180,8 +185,7 @@ def get_method(self) -> str | None: if self.message_retry: return self.message_retry.method protocol = self.protocol - if self.payload and protocol in [4, 5, 101, 102]: - payload = json.loads(self.payload.decode()) + if payload := self.get_payload() and protocol in [4, 5, 101, 102]: for data_point_number, data_point in payload.get("dps").items(): if data_point_number in ["101", "102"]: data_point_response = json.loads(data_point) @@ -190,8 +194,7 @@ def get_method(self) -> str | None: def get_params(self) -> list | dict | None: protocol = self.protocol - if self.payload and protocol in [4, 101, 102]: - payload = json.loads(self.payload.decode()) + if payload := self.get_payload() and protocol in [4, 101, 102]: for data_point_number, data_point in payload.get("dps").items(): if data_point_number in ["101", "102"]: data_point_response = json.loads(data_point)