• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python common.async_fire_mqtt_message函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.common.async_fire_mqtt_message函数的典型用法代码示例。如果您正苦于以下问题:Python async_fire_mqtt_message函数的具体用法?Python async_fire_mqtt_message怎么用?Python async_fire_mqtt_message使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了async_fire_mqtt_message函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_no_color_brightness_color_temp_white_val_if_no_topics

async def test_no_color_brightness_color_temp_white_val_if_no_topics(
        hass, mqtt_mock):
    """Test for no RGB, brightness, color temp, effect, white val or XY."""
    assert await async_setup_component(hass, light.DOMAIN, {
        light.DOMAIN: {
            'platform': 'mqtt_json',
            'name': 'test',
            'state_topic': 'test_light_rgb',
            'command_topic': 'test_light_rgb/set',
        }
    })

    state = hass.states.get('light.test')
    assert STATE_OFF == state.state
    assert 40 == state.attributes.get(ATTR_SUPPORTED_FEATURES)
    assert state.attributes.get('rgb_color') is None
    assert state.attributes.get('brightness') is None
    assert state.attributes.get('color_temp') is None
    assert state.attributes.get('effect') is None
    assert state.attributes.get('white_value') is None
    assert state.attributes.get('xy_color') is None
    assert state.attributes.get('hs_color') is None

    async_fire_mqtt_message(hass, 'test_light_rgb', '{"state":"ON"}')
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_ON == state.state
    assert state.attributes.get('rgb_color') is None
    assert state.attributes.get('brightness') is None
    assert state.attributes.get('color_temp') is None
    assert state.attributes.get('effect') is None
    assert state.attributes.get('white_value') is None
    assert state.attributes.get('xy_color') is None
    assert state.attributes.get('hs_color') is None
开发者ID:ManHammer,项目名称:home-assistant,代码行数:35,代码来源:test_mqtt_json.py


示例2: test_default_availability_payload

async def test_default_availability_payload(hass, mqtt_mock):
    """Test availability by default payload with defined topic."""
    assert await async_setup_component(hass, lock.DOMAIN, {
        lock.DOMAIN: {
            'platform': 'mqtt',
            'name': 'test',
            'state_topic': 'state-topic',
            'command_topic': 'command-topic',
            'payload_lock': 'LOCK',
            'payload_unlock': 'UNLOCK',
            'availability_topic': 'availability-topic'
        }
    })

    state = hass.states.get('lock.test')
    assert state.state is STATE_UNAVAILABLE

    async_fire_mqtt_message(hass, 'availability-topic', 'online')
    await hass.async_block_till_done()

    state = hass.states.get('lock.test')
    assert state.state is not STATE_UNAVAILABLE

    async_fire_mqtt_message(hass, 'availability-topic', 'offline')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('lock.test')
    assert state.state is STATE_UNAVAILABLE
开发者ID:Martwall,项目名称:home-assistant,代码行数:29,代码来源:test_lock.py


示例3: test_unique_id

async def test_unique_id(hass):
    """Test unique id option only creates one light per unique_id."""
    await async_mock_mqtt_component(hass)
    assert await async_setup_component(hass, light.DOMAIN, {
        light.DOMAIN: [{
            'platform': 'mqtt',
            'name': 'Test 1',
            'schema': 'template',
            'state_topic': 'test-topic',
            'command_topic': 'test_topic',
            'command_on_template': 'on,{{ transition }}',
            'command_off_template': 'off,{{ transition|d }}',
            'unique_id': 'TOTALLY_UNIQUE'
        }, {
            'platform': 'mqtt',
            'name': 'Test 2',
            'schema': 'template',
            'state_topic': 'test-topic',
            'command_topic': 'test_topic',
            'unique_id': 'TOTALLY_UNIQUE'
        }]
    })
    async_fire_mqtt_message(hass, 'test-topic', 'payload')
    await hass.async_block_till_done()
    assert len(hass.states.async_entity_ids(light.DOMAIN)) == 1
开发者ID:Martwall,项目名称:home-assistant,代码行数:25,代码来源:test_light_template.py


示例4: test_discovery_removal_alarm

async def test_discovery_removal_alarm(hass, mqtt_mock, caplog):
    """Test removal of discovered alarm_control_panel."""
    entry = MockConfigEntry(domain=mqtt.DOMAIN)
    await async_start(hass, 'homeassistant', {}, entry)

    data = (
        '{ "name": "Beer",'
        '  "state_topic": "test_topic",'
        '  "command_topic": "test_topic" }'
    )

    async_fire_mqtt_message(hass,
                            'homeassistant/alarm_control_panel/bla/config',
                            data)
    await hass.async_block_till_done()

    state = hass.states.get('alarm_control_panel.beer')
    assert state is not None
    assert state.name == 'Beer'

    async_fire_mqtt_message(hass,
                            'homeassistant/alarm_control_panel/bla/config',
                            '')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('alarm_control_panel.beer')
    assert state is None
开发者ID:arsaboo,项目名称:home-assistant,代码行数:28,代码来源:test_alarm_control_panel.py


示例5: test_discovery_update_lock

async def test_discovery_update_lock(hass, mqtt_mock, caplog):
    """Test update of discovered lock."""
    entry = MockConfigEntry(domain=mqtt.DOMAIN)
    await async_start(hass, 'homeassistant', {}, entry)
    data1 = (
        '{ "name": "Beer",'
        '  "state_topic": "test_topic",'
        '  "command_topic": "command_topic",'
        '  "availability_topic": "availability_topic1" }'
    )
    data2 = (
        '{ "name": "Milk",'
        '  "state_topic": "test_topic2",'
        '  "command_topic": "command_topic",'
        '  "availability_topic": "availability_topic2" }'
    )
    async_fire_mqtt_message(hass, 'homeassistant/lock/bla/config',
                            data1)
    await hass.async_block_till_done()
    state = hass.states.get('lock.beer')
    assert state is not None
    assert state.name == 'Beer'
    async_fire_mqtt_message(hass, 'homeassistant/lock/bla/config',
                            data2)
    await hass.async_block_till_done()
    await hass.async_block_till_done()
    state = hass.states.get('lock.beer')
    assert state is not None
    assert state.name == 'Milk'

    state = hass.states.get('lock.milk')
    assert state is None
开发者ID:Martwall,项目名称:home-assistant,代码行数:32,代码来源:test_lock.py


示例6: test_mqtt_ws_subscription

async def test_mqtt_ws_subscription(hass, hass_ws_client):
    """Test MQTT websocket subscription."""
    await async_mock_mqtt_component(hass)

    client = await hass_ws_client(hass)
    await client.send_json({
        'id': 5,
        'type': 'mqtt/subscribe',
        'topic': 'test-topic',
    })
    response = await client.receive_json()
    assert response['success']

    async_fire_mqtt_message(hass, 'test-topic', 'test1')
    async_fire_mqtt_message(hass, 'test-topic', 'test2')

    response = await client.receive_json()
    assert response['event']['topic'] == 'test-topic'
    assert response['event']['payload'] == 'test1'

    response = await client.receive_json()
    assert response['event']['topic'] == 'test-topic'
    assert response['event']['payload'] == 'test2'

    # Unsubscribe
    await client.send_json({
        'id': 8,
        'type': 'unsubscribe_events',
        'subscription': 5,
    })
    response = await client.receive_json()
    assert response['success']
开发者ID:boced66,项目名称:home-assistant,代码行数:32,代码来源:test_init.py


示例7: test_default_availability_payload

async def test_default_availability_payload(hass, mqtt_mock):
    """Test availability by default payload with defined topic."""
    assert await async_setup_component(hass, light.DOMAIN, {
        light.DOMAIN: {
            'platform': 'mqtt',
            'schema': 'json',
            'name': 'test',
            'state_topic': 'test_light_rgb',
            'command_topic': 'test_light_rgb/set',
            'availability_topic': 'availability-topic'
        }
    })

    state = hass.states.get('light.test')
    assert STATE_UNAVAILABLE == state.state

    async_fire_mqtt_message(hass, 'availability-topic', 'online')
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_UNAVAILABLE != state.state

    async_fire_mqtt_message(hass, 'availability-topic', 'offline')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_UNAVAILABLE == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:28,代码来源:test_light_json.py


示例8: test_discovery_update_vacuum

async def test_discovery_update_vacuum(hass, mock_publish):
    """Test update of discovered vacuum."""
    entry = MockConfigEntry(domain=mqtt.DOMAIN)
    await async_start(hass, 'homeassistant', {}, entry)

    data1 = (
        '{ "name": "Beer",'
        '  "command_topic": "test_topic" }'
    )
    data2 = (
        '{ "name": "Milk",'
        '  "command_topic": "test_topic" }'
    )

    async_fire_mqtt_message(hass, 'homeassistant/vacuum/bla/config',
                            data1)
    await hass.async_block_till_done()

    state = hass.states.get('vacuum.beer')
    assert state is not None
    assert state.name == 'Beer'

    async_fire_mqtt_message(hass, 'homeassistant/vacuum/bla/config',
                            data2)
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('vacuum.beer')
    assert state is not None
    assert state.name == 'Milk'
    state = hass.states.get('vacuum.milk')
    assert state is None
开发者ID:Martwall,项目名称:home-assistant,代码行数:32,代码来源:test_vacuum.py


示例9: test_custom_availability_payload

async def test_custom_availability_payload(hass, mqtt_mock):
    """Test availability by custom payload with defined topic."""
    assert await async_setup_component(hass, light.DOMAIN, {
        light.DOMAIN: {
            'platform': 'mqtt',
            'name': 'test',
            'command_topic': 'test_light/set',
            'brightness_command_topic': 'test_light/bright',
            'rgb_command_topic': "test_light/rgb",
            'availability_topic': 'availability-topic',
            'payload_available': 'good',
            'payload_not_available': 'nogood'
        }
    })

    state = hass.states.get('light.test')
    assert STATE_UNAVAILABLE == state.state

    async_fire_mqtt_message(hass, 'availability-topic', 'good')
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_UNAVAILABLE != state.state

    async_fire_mqtt_message(hass, 'availability-topic', 'nogood')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_UNAVAILABLE == state.state
开发者ID:ManHammer,项目名称:home-assistant,代码行数:30,代码来源:test_mqtt.py


示例10: test_controlling_state_via_topic_and_json_message

async def test_controlling_state_via_topic_and_json_message(
        hass, mock_publish):
    """Test the controlling state via topic and JSON message."""
    assert await async_setup_component(hass, switch.DOMAIN, {
        switch.DOMAIN: {
            'platform': 'mqtt',
            'name': 'test',
            'state_topic': 'state-topic',
            'command_topic': 'command-topic',
            'payload_on': 'beer on',
            'payload_off': 'beer off',
            'value_template': '{{ value_json.val }}'
        }
    })

    state = hass.states.get('switch.test')
    assert STATE_OFF == state.state

    async_fire_mqtt_message(hass, 'state-topic', '{"val":"beer on"}')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('switch.test')
    assert STATE_ON == state.state

    async_fire_mqtt_message(hass, 'state-topic', '{"val":"beer off"}')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('switch.test')
    assert STATE_OFF == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:31,代码来源:test_switch.py


示例11: test_custom_availability_payload

async def test_custom_availability_payload(hass, mock_publish):
    """Test availability by custom payload with defined topic."""
    default_config.update({
        'availability_topic': 'availability-topic',
        'payload_available': 'good',
        'payload_not_available': 'nogood'
    })

    assert await async_setup_component(hass, vacuum.DOMAIN, {
        vacuum.DOMAIN: default_config,
    })

    state = hass.states.get('vacuum.mqtttest')
    assert STATE_UNAVAILABLE == state.state

    async_fire_mqtt_message(hass, 'availability-topic', 'good')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('vacuum.mqtttest')
    assert STATE_UNAVAILABLE != state.state

    async_fire_mqtt_message(hass, 'availability-topic', 'nogood')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('vacuum.mqtttest')
    assert STATE_UNAVAILABLE == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:28,代码来源:test_vacuum.py


示例12: test_controlling_state_via_topic

async def test_controlling_state_via_topic(hass, mock_publish):
    """Test the controlling state via topic."""
    assert await async_setup_component(hass, switch.DOMAIN, {
        switch.DOMAIN: {
            'platform': 'mqtt',
            'name': 'test',
            'state_topic': 'state-topic',
            'command_topic': 'command-topic',
            'payload_on': 1,
            'payload_off': 0
        }
    })

    state = hass.states.get('switch.test')
    assert STATE_OFF == state.state
    assert not state.attributes.get(ATTR_ASSUMED_STATE)

    async_fire_mqtt_message(hass, 'state-topic', '1')
    await hass.async_block_till_done()

    state = hass.states.get('switch.test')
    assert STATE_ON == state.state

    async_fire_mqtt_message(hass, 'state-topic', '0')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('switch.test')
    assert STATE_OFF == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:29,代码来源:test_switch.py


示例13: test_custom_state_payload

async def test_custom_state_payload(hass, mock_publish):
    """Test the state payload."""
    assert await async_setup_component(hass, switch.DOMAIN, {
        switch.DOMAIN: {
            'platform': 'mqtt',
            'name': 'test',
            'state_topic': 'state-topic',
            'command_topic': 'command-topic',
            'payload_on': 1,
            'payload_off': 0,
            'state_on': "HIGH",
            'state_off': "LOW",
        }
    })

    state = hass.states.get('switch.test')
    assert STATE_OFF == state.state
    assert not state.attributes.get(ATTR_ASSUMED_STATE)

    async_fire_mqtt_message(hass, 'state-topic', 'HIGH')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('switch.test')
    assert STATE_ON == state.state

    async_fire_mqtt_message(hass, 'state-topic', 'LOW')
    await hass.async_block_till_done()
    await hass.async_block_till_done()

    state = hass.states.get('switch.test')
    assert STATE_OFF == state.state
开发者ID:Martwall,项目名称:home-assistant,代码行数:32,代码来源:test_switch.py


示例14: test_snips_call_action

def test_snips_call_action(hass, mqtt_mock):
    """Test calling action via Snips."""
    calls = async_mock_service(hass, 'test', 'service')

    result = yield from async_setup_component(hass, "snips", {
        "snips": {
            "intents": {
                "Lights": {
                    "action": {
                        "service": "test.service",
                        "data_template": {
                            "color": "{{ light_color }}"
                        }
                    }
                }
            }
        }
    })
    assert result

    async_fire_mqtt_message(hass, 'hermes/nlu/intentParsed',
                            EXAMPLE_MSG)
    yield from hass.async_block_till_done()
    assert len(calls) == 1
    call = calls[0]
    assert call.data.get('color') == 'blue'
开发者ID:Khabi,项目名称:home-assistant,代码行数:26,代码来源:test_snips.py


示例15: test_snips_intent_username

async def test_snips_intent_username(hass, mqtt_mock):
    """Test intentName format username:intentName."""
    result = await async_setup_component(hass, "snips", {
        "snips": {},
    })
    assert result
    payload = """
    {
        "input": "what to do",
        "intent": {
            "intentName": "username:Lights",
            "probability": 1
        },
        "slots": []
    }
    """
    intents = async_mock_intent(hass, 'Lights')
    async_fire_mqtt_message(hass, 'hermes/intent/username:Lights',
                            payload)
    await hass.async_block_till_done()

    assert len(intents) == 1
    intent = intents[0]
    assert intent.platform == 'snips'
    assert intent.intent_type == 'Lights'
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:25,代码来源:test_snips.py


示例16: test_no_color_brightness_color_temp_hs_white_xy_if_no_topics

async def test_no_color_brightness_color_temp_hs_white_xy_if_no_topics(
        hass, mqtt_mock):
    """Test if there is no color and brightness if no topic."""
    assert await async_setup_component(hass, light.DOMAIN, {
        light.DOMAIN: {
            'platform': 'mqtt',
            'name': 'test',
            'state_topic': 'test_light_rgb/status',
            'command_topic': 'test_light_rgb/set',
        }
    })

    state = hass.states.get('light.test')
    assert STATE_OFF == state.state
    assert state.attributes.get('rgb_color') is None
    assert state.attributes.get('brightness') is None
    assert state.attributes.get('color_temp') is None
    assert state.attributes.get('hs_color') is None
    assert state.attributes.get('white_value') is None
    assert state.attributes.get('xy_color') is None

    async_fire_mqtt_message(hass, 'test_light_rgb/status', 'ON')
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_ON == state.state
    assert state.attributes.get('rgb_color') is None
    assert state.attributes.get('brightness') is None
    assert state.attributes.get('color_temp') is None
    assert state.attributes.get('hs_color') is None
    assert state.attributes.get('white_value') is None
    assert state.attributes.get('xy_color') is None
开发者ID:ManHammer,项目名称:home-assistant,代码行数:32,代码来源:test_mqtt.py


示例17: test_snips_low_probability

async def test_snips_low_probability(hass, mqtt_mock, caplog):
    """Test intent via Snips."""
    caplog.set_level(logging.WARNING)
    result = await async_setup_component(hass, "snips", {
        "snips": {
            "probability_threshold": 0.5
        },
    })
    assert result
    payload = """
    {
        "input": "I am not sure what to say",
        "intent": {
            "intentName": "LightsMaybe",
            "probability": 0.49
        },
        "slots": []
    }
    """

    async_mock_intent(hass, 'LightsMaybe')
    async_fire_mqtt_message(hass, 'hermes/intent/LightsMaybe',
                            payload)
    await hass.async_block_till_done()
    assert 'Intent below probaility threshold 0.49 < 0.5' in caplog.text
开发者ID:EarthlingRich,项目名称:home-assistant,代码行数:25,代码来源:test_snips.py


示例18: test_snips_intent_user

async def test_snips_intent_user(hass):
    """Test intentName format user_XXX__intentName."""
    await async_mock_mqtt_component(hass)

    result = await async_setup_component(hass, "snips", {
        "snips": {},
    })
    assert result
    payload = """
    {
        "input": "what to do",
        "intent": {
            "intentName": "user_ABCDEF123__Lights",
            "confidenceScore": 1
        },
        "slots": []
    }
    """
    intents = async_mock_intent(hass, 'Lights')
    async_fire_mqtt_message(hass, 'hermes/intent/user_ABCDEF123__Lights',
                            payload)
    await hass.async_block_till_done()

    assert len(intents) == 1
    intent = intents[0]
    assert intent.platform == 'snips'
    assert intent.intent_type == 'Lights'
开发者ID:boced66,项目名称:home-assistant,代码行数:27,代码来源:test_init.py


示例19: test_brightness_scale

async def test_brightness_scale(hass, mqtt_mock):
    """Test for brightness scaling."""
    assert await async_setup_component(hass, light.DOMAIN, {
        light.DOMAIN: {
            'platform': 'mqtt',
            'schema': 'json',
            'name': 'test',
            'state_topic': 'test_light_bright_scale',
            'command_topic': 'test_light_bright_scale/set',
            'brightness': True,
            'brightness_scale': 99
        }
    })

    state = hass.states.get('light.test')
    assert STATE_OFF == state.state
    assert state.attributes.get('brightness') is None
    assert not state.attributes.get(ATTR_ASSUMED_STATE)

    # Turn on the light
    async_fire_mqtt_message(hass, 'test_light_bright_scale', '{"state":"ON"}')
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_ON == state.state
    assert 255 == state.attributes.get('brightness')

    # Turn on the light with brightness
    async_fire_mqtt_message(hass, 'test_light_bright_scale',
                            '{"state":"ON", "brightness": 99}')
    await hass.async_block_till_done()

    state = hass.states.get('light.test')
    assert STATE_ON == state.state
    assert 255 == state.attributes.get('brightness')
开发者ID:Martwall,项目名称:home-assistant,代码行数:35,代码来源:test_light_json.py


示例20: test_discovery_expansion

def test_discovery_expansion(hass, mqtt_mock, caplog):
    """Test expansion of abbreviated discovery payload."""
    entry = MockConfigEntry(domain=mqtt.DOMAIN)

    yield from async_start(hass, 'homeassistant', {}, entry)

    data = (
        '{ "~": "some/base/topic",'
        '  "name": "DiscoveryExpansionTest1",'
        '  "stat_t": "test_topic/~",'
        '  "cmd_t": "~/test_topic" }'
    )

    async_fire_mqtt_message(
        hass, 'homeassistant/switch/bla/config', data)
    yield from hass.async_block_till_done()

    state = hass.states.get('switch.DiscoveryExpansionTest1')
    assert state is not None
    assert state.name == 'DiscoveryExpansionTest1'
    assert ('switch', 'bla') in hass.data[ALREADY_DISCOVERED]
    assert state.state == STATE_OFF

    async_fire_mqtt_message(hass, 'test_topic/some/base/topic',
                            'ON')
    yield from hass.async_block_till_done()
    yield from hass.async_block_till_done()

    state = hass.states.get('switch.DiscoveryExpansionTest1')
    assert state.state == STATE_ON
开发者ID:ManHammer,项目名称:home-assistant,代码行数:30,代码来源:test_discovery.py



注:本文中的tests.common.async_fire_mqtt_message函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python common.async_fire_time_changed函数代码示例发布时间:2022-05-27
下一篇:
Python common.assert_setup_component函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap