KosmoS Python Client

KosmoS-Client via Pip installieren

python3 -m pip install kosmos-client

KosmoS Test Client

Dies ist ein einfacher Test Client, dieser verbindet sich zu KosmoS, erstellt einen virtuellen Bewegungssensor samt Schema und ändert dessen Zustand alle 60 Sekunden.

import time
import logging

_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
_LOGGER.addHandler(console)

from src.kosmos_client import KosmosClient, KosmosEvent, KosmosDevice, KosmosScope, KosmosLocation, \
    KosmosNotFoundError, KosmosError


def on_auth_ok(kosmos: KosmosClient):
    _LOGGER.info(f"auth okay  on {kosmos}...")


def on_device_created(kosmos: KosmosClient, device: KosmosDevice):
    _LOGGER.info(f"new device on {kosmos} {device}")


def on_device_attribute_changed(kosmos: KosmosClient, device: KosmosDevice, attribute: str, value):
    _LOGGER.info(f"change on {kosmos} {device} - set {attribute} to {value}")


if __name__ == '__main__':
    with KosmosClient("http://localhost:18080", "user", "pass",
                      subs={
                          KosmosEvent.auth_ok: on_auth_ok,
                          KosmosEvent.device_created: on_device_created,
                          KosmosEvent.device_attribute_updated: on_device_attribute_changed
                      }) as kosmos:
        _LOGGER.info(f"is connected:{kosmos.is_connected()}")
        try:
            kosmos.add_schema({
                "failures": [

                ],
                "$schema": "http://json-schema.org/draft-07/schema#",
                "examples": [

                ],
                "additionalProperties": False,
                "title": "TutorialOccupancy",
                "type": "object",
                "required": ["occupancy"],
                "properties": {
                    "occupancy": {
                        "description": "Defines whether the sensors detected motion",
                        "readOnly": False,
                        "title": "occupancy",
                        "type": "boolean"
                    }
                },
                "$id": "https://kosmos-lab.de/schema/TutorialOccupancy2.json"
            })

        except KosmosError as e:
            _LOGGER.info(e)
            pass

        try:
            kosmos.add_device("tutorial_occupancy_1", {"occupancy": False},
                              schema="https://kosmos-lab.de/schema/TutorialOccupancy2.json")
        except KosmosError as e:
            _LOGGER.info(e)
            pass

        while kosmos.is_connected():
            kosmos.set_attribute("tutorial_occupancy_1", "occupancy", True)
            time.sleep(60)
            kosmos.set_attribute("tutorial_occupancy_1", "occupancy", False)
            time.sleep(60)

        # kosmos.stop()