g3pylib.system

  1from datetime import datetime
  2from typing import List, Optional, cast
  3
  4from g3pylib._utils import APIComponent, EndpointKind
  5from g3pylib.g3typing import URI
  6from g3pylib.system.battery import Battery
  7from g3pylib.websocket import G3WebSocketClientProtocol
  8
  9
 10class System(APIComponent):
 11    def __init__(self, connection: G3WebSocketClientProtocol, api_uri: URI) -> None:
 12        self._connection = connection
 13        self._battery: Optional[Battery] = None
 14        super().__init__(api_uri)
 15
 16    @property
 17    def battery(self) -> Battery:
 18        if self._battery is None:
 19            self._battery = Battery(self._connection, URI(self._api_uri + "/battery"))
 20        return self._battery
 21
 22    async def get_head_unit_serial(self) -> str:
 23        return cast(
 24            str,
 25            await self._connection.require_get(
 26                self.generate_endpoint_uri(EndpointKind.PROPERTY, "head-unit-serial")
 27            ),
 28        )
 29
 30    async def get_name(self) -> str:
 31        return cast(
 32            str,
 33            await self._connection.require_get(
 34                self.generate_endpoint_uri(EndpointKind.PROPERTY, "name")
 35            ),
 36        )
 37
 38    async def get_ntp_is_enabled(self) -> bool:
 39        return cast(
 40            bool,
 41            await self._connection.require_get(
 42                self.generate_endpoint_uri(EndpointKind.PROPERTY, "ntp-is-enabled")
 43            ),
 44        )
 45
 46    async def get_ntp_is_synchronized(self) -> bool:
 47        return cast(
 48            bool,
 49            await self._connection.require_get(
 50                self.generate_endpoint_uri(EndpointKind.PROPERTY, "ntp-is-synchronized")
 51            ),
 52        )
 53
 54    async def get_recording_unit_serial(self) -> str:
 55        return cast(
 56            str,
 57            await self._connection.require_get(
 58                self.generate_endpoint_uri(
 59                    EndpointKind.PROPERTY, "recording-unit-serial"
 60                )
 61            ),
 62        )
 63
 64    async def get_time(self) -> datetime:
 65        date_time = cast(
 66            str,
 67            await self._connection.require_get(
 68                self.generate_endpoint_uri(EndpointKind.PROPERTY, "time")
 69            ),
 70        )
 71        return datetime.fromisoformat(date_time.strip("Z"))
 72
 73    async def get_timezone(self) -> str:
 74        return cast(
 75            str,
 76            await self._connection.require_get(
 77                self.generate_endpoint_uri(EndpointKind.PROPERTY, "timezone")
 78            ),
 79        )
 80
 81    async def get_version(self) -> str:
 82        return cast(
 83            str,
 84            await self._connection.require_get(
 85                self.generate_endpoint_uri(EndpointKind.PROPERTY, "version")
 86            ),
 87        )
 88
 89    async def available_gaze_frequencies(self) -> List[int]:
 90        return cast(
 91            List[int],
 92            await self._connection.require_post(
 93                self.generate_endpoint_uri(
 94                    EndpointKind.ACTION, "available-gaze-frequencies"
 95                )
 96            ),
 97        )
 98
 99    async def set_time(self, value: datetime) -> bool:
100        return cast(
101            bool,
102            await self._connection.require_post(
103                self.generate_endpoint_uri(EndpointKind.ACTION, "set-time"),
104                body=[f"{datetime.isoformat(value)}Z"],
105            ),
106        )
107
108    async def set_timezone(self, value: str) -> bool:
109        return cast(
110            bool,
111            await self._connection.require_post(
112                self.generate_endpoint_uri(EndpointKind.ACTION, "set-timezone"),
113                body=[value],
114            ),
115        )
116
117    async def use_ntp(self, value: bool) -> bool:
118        return cast(
119            bool,
120            await self._connection.require_post(
121                self.generate_endpoint_uri(EndpointKind.ACTION, "use-ntp"), body=[value]
122            ),
123        )
class System(g3pylib._utils.APIComponent):
 11class System(APIComponent):
 12    def __init__(self, connection: G3WebSocketClientProtocol, api_uri: URI) -> None:
 13        self._connection = connection
 14        self._battery: Optional[Battery] = None
 15        super().__init__(api_uri)
 16
 17    @property
 18    def battery(self) -> Battery:
 19        if self._battery is None:
 20            self._battery = Battery(self._connection, URI(self._api_uri + "/battery"))
 21        return self._battery
 22
 23    async def get_head_unit_serial(self) -> str:
 24        return cast(
 25            str,
 26            await self._connection.require_get(
 27                self.generate_endpoint_uri(EndpointKind.PROPERTY, "head-unit-serial")
 28            ),
 29        )
 30
 31    async def get_name(self) -> str:
 32        return cast(
 33            str,
 34            await self._connection.require_get(
 35                self.generate_endpoint_uri(EndpointKind.PROPERTY, "name")
 36            ),
 37        )
 38
 39    async def get_ntp_is_enabled(self) -> bool:
 40        return cast(
 41            bool,
 42            await self._connection.require_get(
 43                self.generate_endpoint_uri(EndpointKind.PROPERTY, "ntp-is-enabled")
 44            ),
 45        )
 46
 47    async def get_ntp_is_synchronized(self) -> bool:
 48        return cast(
 49            bool,
 50            await self._connection.require_get(
 51                self.generate_endpoint_uri(EndpointKind.PROPERTY, "ntp-is-synchronized")
 52            ),
 53        )
 54
 55    async def get_recording_unit_serial(self) -> str:
 56        return cast(
 57            str,
 58            await self._connection.require_get(
 59                self.generate_endpoint_uri(
 60                    EndpointKind.PROPERTY, "recording-unit-serial"
 61                )
 62            ),
 63        )
 64
 65    async def get_time(self) -> datetime:
 66        date_time = cast(
 67            str,
 68            await self._connection.require_get(
 69                self.generate_endpoint_uri(EndpointKind.PROPERTY, "time")
 70            ),
 71        )
 72        return datetime.fromisoformat(date_time.strip("Z"))
 73
 74    async def get_timezone(self) -> str:
 75        return cast(
 76            str,
 77            await self._connection.require_get(
 78                self.generate_endpoint_uri(EndpointKind.PROPERTY, "timezone")
 79            ),
 80        )
 81
 82    async def get_version(self) -> str:
 83        return cast(
 84            str,
 85            await self._connection.require_get(
 86                self.generate_endpoint_uri(EndpointKind.PROPERTY, "version")
 87            ),
 88        )
 89
 90    async def available_gaze_frequencies(self) -> List[int]:
 91        return cast(
 92            List[int],
 93            await self._connection.require_post(
 94                self.generate_endpoint_uri(
 95                    EndpointKind.ACTION, "available-gaze-frequencies"
 96                )
 97            ),
 98        )
 99
100    async def set_time(self, value: datetime) -> bool:
101        return cast(
102            bool,
103            await self._connection.require_post(
104                self.generate_endpoint_uri(EndpointKind.ACTION, "set-time"),
105                body=[f"{datetime.isoformat(value)}Z"],
106            ),
107        )
108
109    async def set_timezone(self, value: str) -> bool:
110        return cast(
111            bool,
112            await self._connection.require_post(
113                self.generate_endpoint_uri(EndpointKind.ACTION, "set-timezone"),
114                body=[value],
115            ),
116        )
117
118    async def use_ntp(self, value: bool) -> bool:
119        return cast(
120            bool,
121            await self._connection.require_post(
122                self.generate_endpoint_uri(EndpointKind.ACTION, "use-ntp"), body=[value]
123            ),
124        )
System( connection: g3pylib.websocket.G3WebSocketClientProtocol, api_uri: g3pylib.g3typing.URI)
12    def __init__(self, connection: G3WebSocketClientProtocol, api_uri: URI) -> None:
13        self._connection = connection
14        self._battery: Optional[Battery] = None
15        super().__init__(api_uri)
async def get_head_unit_serial(self) -> str:
23    async def get_head_unit_serial(self) -> str:
24        return cast(
25            str,
26            await self._connection.require_get(
27                self.generate_endpoint_uri(EndpointKind.PROPERTY, "head-unit-serial")
28            ),
29        )
async def get_name(self) -> str:
31    async def get_name(self) -> str:
32        return cast(
33            str,
34            await self._connection.require_get(
35                self.generate_endpoint_uri(EndpointKind.PROPERTY, "name")
36            ),
37        )
async def get_ntp_is_enabled(self) -> bool:
39    async def get_ntp_is_enabled(self) -> bool:
40        return cast(
41            bool,
42            await self._connection.require_get(
43                self.generate_endpoint_uri(EndpointKind.PROPERTY, "ntp-is-enabled")
44            ),
45        )
async def get_ntp_is_synchronized(self) -> bool:
47    async def get_ntp_is_synchronized(self) -> bool:
48        return cast(
49            bool,
50            await self._connection.require_get(
51                self.generate_endpoint_uri(EndpointKind.PROPERTY, "ntp-is-synchronized")
52            ),
53        )
async def get_recording_unit_serial(self) -> str:
55    async def get_recording_unit_serial(self) -> str:
56        return cast(
57            str,
58            await self._connection.require_get(
59                self.generate_endpoint_uri(
60                    EndpointKind.PROPERTY, "recording-unit-serial"
61                )
62            ),
63        )
async def get_time(self) -> datetime.datetime:
65    async def get_time(self) -> datetime:
66        date_time = cast(
67            str,
68            await self._connection.require_get(
69                self.generate_endpoint_uri(EndpointKind.PROPERTY, "time")
70            ),
71        )
72        return datetime.fromisoformat(date_time.strip("Z"))
async def get_timezone(self) -> str:
74    async def get_timezone(self) -> str:
75        return cast(
76            str,
77            await self._connection.require_get(
78                self.generate_endpoint_uri(EndpointKind.PROPERTY, "timezone")
79            ),
80        )
async def get_version(self) -> str:
82    async def get_version(self) -> str:
83        return cast(
84            str,
85            await self._connection.require_get(
86                self.generate_endpoint_uri(EndpointKind.PROPERTY, "version")
87            ),
88        )
async def available_gaze_frequencies(self) -> List[int]:
90    async def available_gaze_frequencies(self) -> List[int]:
91        return cast(
92            List[int],
93            await self._connection.require_post(
94                self.generate_endpoint_uri(
95                    EndpointKind.ACTION, "available-gaze-frequencies"
96                )
97            ),
98        )
async def set_time(self, value: datetime.datetime) -> bool:
100    async def set_time(self, value: datetime) -> bool:
101        return cast(
102            bool,
103            await self._connection.require_post(
104                self.generate_endpoint_uri(EndpointKind.ACTION, "set-time"),
105                body=[f"{datetime.isoformat(value)}Z"],
106            ),
107        )
async def set_timezone(self, value: str) -> bool:
109    async def set_timezone(self, value: str) -> bool:
110        return cast(
111            bool,
112            await self._connection.require_post(
113                self.generate_endpoint_uri(EndpointKind.ACTION, "set-timezone"),
114                body=[value],
115            ),
116        )
async def use_ntp(self, value: bool) -> bool:
118    async def use_ntp(self, value: bool) -> bool:
119        return cast(
120            bool,
121            await self._connection.require_post(
122                self.generate_endpoint_uri(EndpointKind.ACTION, "use-ntp"), body=[value]
123            ),
124        )