g3pylib.recordings

  1import asyncio
  2import logging
  3from collections.abc import Sequence
  4from contextlib import asynccontextmanager
  5from enum import Enum, auto
  6from typing import Awaitable, Dict, List, Optional, Tuple, Union, cast, overload
  7
  8from g3pylib import _utils
  9from g3pylib._utils import APIComponent, EndpointKind
 10from g3pylib.g3typing import URI, SignalBody
 11from g3pylib.recordings.recording import Recording
 12from g3pylib.websocket import G3WebSocketClientProtocol
 13
 14
 15class RecordingsEventKind(Enum):
 16    """Defines event kinds for the `Recordings` class. These events are emitted to the `Recordings.events` queue in the context `Recordings.keep_updated_in_context`."""
 17
 18    ADDED = auto()
 19    """A recording was added."""
 20    REMOVED = auto()
 21    """A recording was removed."""
 22
 23
 24class Recordings(APIComponent, Sequence[Recording]):
 25    def __init__(
 26        self,
 27        connection: G3WebSocketClientProtocol,
 28        api_uri: URI,
 29        http_url: Optional[str],
 30    ) -> None:
 31        self._connection = connection
 32        self._http_url = http_url
 33        self._children = {}
 34        self._handle_child_added_task = None
 35        self._handle_child_removed_task = None
 36        self._events: asyncio.Queue[
 37            Tuple[RecordingsEventKind, SignalBody]
 38        ] = asyncio.Queue()
 39        self.logger: logging.Logger = logging.getLogger(__name__)
 40        super().__init__(api_uri)
 41
 42    async def get_name(self) -> str:
 43        return cast(
 44            str,
 45            await self._connection.require_get(
 46                self.generate_endpoint_uri(EndpointKind.PROPERTY, "name")
 47            ),
 48        )
 49
 50    async def delete(self, uuid: str) -> bool:
 51        return cast(
 52            bool,
 53            await self._connection.require_post(
 54                self.generate_endpoint_uri(EndpointKind.ACTION, "delete"), body=[uuid]
 55            ),
 56        )
 57
 58    async def subscribe_to_child_added(
 59        self,
 60    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 61        return await self._connection.subscribe_to_signal(
 62            self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added")
 63        )
 64
 65    async def subscribe_to_child_removed(
 66        self,
 67    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 68        return await self._connection.subscribe_to_signal(
 69            self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed")
 70        )
 71
 72    async def subscribe_to_deleted(
 73        self,
 74    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 75        return await self._connection.subscribe_to_signal(
 76            self.generate_endpoint_uri(EndpointKind.SIGNAL, "deleted")
 77        )
 78
 79    async def subscribe_to_scan_done(
 80        self,
 81    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 82        return await self._connection.subscribe_to_signal(
 83            self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-done")
 84        )
 85
 86    async def subscribe_to_scan_start(
 87        self,
 88    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 89        return await self._connection.subscribe_to_signal(
 90            self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-start")
 91        )
 92
 93    async def _get_children(self) -> Dict[str, Recording]:
 94        children = cast(
 95            Dict[str, List[str]], await self._connection.require_get(self._api_uri)
 96        )["children"]
 97        return dict(
 98            map(
 99                lambda uuid: (
100                    uuid,
101                    Recording(self._connection, self._api_uri, uuid, self._http_url),
102                ),
103                reversed(children),
104            )
105        )
106
107    async def start_children_handler_tasks(self) -> None:
108        async def handle_child_added_task(
109            added_children_queue: asyncio.Queue[SignalBody],
110        ) -> None:
111            while True:
112                body = await added_children_queue.get()
113                child_uuid = cast(List[str], body)[0]
114                self._children[child_uuid] = Recording(
115                    self._connection, self._api_uri, child_uuid, self._http_url
116                )
117                await self._events.put((RecordingsEventKind.ADDED, body))
118
119        async def handle_child_removed_task(
120            removed_children_queue: asyncio.Queue[SignalBody],
121        ) -> None:
122            while True:
123                body = await removed_children_queue.get()
124                child_uuid = cast(List[str], body)[0]
125                del self._children[child_uuid]
126                await self._events.put((RecordingsEventKind.REMOVED, body))
127
128        if (
129            self._handle_child_added_task is None
130            and self._handle_child_removed_task is None
131        ):
132            self._children = await self._get_children()
133            (
134                added_children_queue,
135                self._unsubscribe_to_child_added,
136            ) = await self._connection.subscribe_to_signal(
137                self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added")
138            )
139            (
140                removed_children_queue,
141                self._unsubscribe_to_child_removed,
142            ) = await self._connection.subscribe_to_signal(
143                self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed")
144            )
145            self._handle_child_added_task = _utils.create_task(
146                handle_child_added_task(added_children_queue),
147                name="child_added_handler",
148            )
149            self._handle_child_removed_task = _utils.create_task(
150                handle_child_removed_task(removed_children_queue),
151                name="child_removed_handler",
152            )
153        else:
154            self.logger.warning(
155                "Attempted starting children handlers when already started."
156            )  # TODO: other type of warning?
157
158    async def stop_children_handler_tasks(self) -> None:
159        if (
160            self._handle_child_added_task is not None
161            and self._handle_child_removed_task is not None
162        ):
163            await self._unsubscribe_to_child_added
164            await self._unsubscribe_to_child_removed
165            self._handle_child_added_task.cancel()
166            self._handle_child_removed_task.cancel()
167            try:
168                await self._handle_child_added_task
169            except asyncio.CancelledError:
170                self.logger.debug("handle_child_added_task cancelled")
171            try:
172                await self._handle_child_removed_task
173            except asyncio.CancelledError:
174                self.logger.debug("handle_child_removed_task cancelled")
175            self._handle_child_added_task = None
176            self._handle_child_removed_task = None
177        else:
178            self.logger.warning(
179                "Attempted stopping children handlers before starting them."
180            )  # TODO: other type of warning?
181
182    @property
183    def events(self) -> asyncio.Queue[Tuple[RecordingsEventKind, SignalBody]]:
184        """An event queue containing added and removed recording events.
185
186        Is kept updated in the context `keep_updated_in_context`."""
187        return self._events
188
189    @property
190    def children(self) -> List[Recording]:
191        """A list of all current recordings.
192
193        This property is not recommended for use since the object itself has functionality of a
194        [`collections.abc.Sequence`](https://docs.python.org/3/library/collections.abc.html).
195
196        Is updated in the context `keep_updated_in_context`."""
197        return list(reversed(self._children.values()))
198
199    def get_recording(self, uuid: str) -> Recording:
200        """Returns the recording specified by `uuid`."""
201        return self._children[uuid]
202
203    def __len__(self) -> int:
204        return len(self._children)
205
206    @overload
207    def __getitem__(self, key: int) -> Recording:
208        ...
209
210    @overload
211    def __getitem__(self, key: slice) -> List[Recording]:
212        ...
213
214    def __getitem__(self, key: Union[int, slice]) -> Union[Recording, List[Recording]]:
215        return list(reversed(self._children.values()))[key]
216
217    @asynccontextmanager
218    async def keep_updated_in_context(self):
219        """Keep the `Recordings` state continuously updated in the context by listening for added and removed recordings.
220
221        Example usage:
222        ```python
223        async with g3.recordings.keep_updated_in_context():
224            await g3.recorder.start()
225            await asyncio.sleep(3)
226            await g3.recorder.stop()
227
228            print(len(g3.recordings)) # current number of recordings on device
229            print(await g3.recordings.events.get()) # next event from the event queue
230        ```
231        """
232        await self.start_children_handler_tasks()
233        try:
234            yield
235        finally:
236            await self.stop_children_handler_tasks()
class RecordingsEventKind(enum.Enum):
16class RecordingsEventKind(Enum):
17    """Defines event kinds for the `Recordings` class. These events are emitted to the `Recordings.events` queue in the context `Recordings.keep_updated_in_context`."""
18
19    ADDED = auto()
20    """A recording was added."""
21    REMOVED = auto()
22    """A recording was removed."""

Defines event kinds for the Recordings class. These events are emitted to the Recordings.events queue in the context Recordings.keep_updated_in_context.

A recording was added.

A recording was removed.

Inherited Members
enum.Enum
name
value
class Recordings(g3pylib._utils.APIComponent, collections.abc.Sequence[g3pylib.recordings.recording.Recording]):
 25class Recordings(APIComponent, Sequence[Recording]):
 26    def __init__(
 27        self,
 28        connection: G3WebSocketClientProtocol,
 29        api_uri: URI,
 30        http_url: Optional[str],
 31    ) -> None:
 32        self._connection = connection
 33        self._http_url = http_url
 34        self._children = {}
 35        self._handle_child_added_task = None
 36        self._handle_child_removed_task = None
 37        self._events: asyncio.Queue[
 38            Tuple[RecordingsEventKind, SignalBody]
 39        ] = asyncio.Queue()
 40        self.logger: logging.Logger = logging.getLogger(__name__)
 41        super().__init__(api_uri)
 42
 43    async def get_name(self) -> str:
 44        return cast(
 45            str,
 46            await self._connection.require_get(
 47                self.generate_endpoint_uri(EndpointKind.PROPERTY, "name")
 48            ),
 49        )
 50
 51    async def delete(self, uuid: str) -> bool:
 52        return cast(
 53            bool,
 54            await self._connection.require_post(
 55                self.generate_endpoint_uri(EndpointKind.ACTION, "delete"), body=[uuid]
 56            ),
 57        )
 58
 59    async def subscribe_to_child_added(
 60        self,
 61    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 62        return await self._connection.subscribe_to_signal(
 63            self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added")
 64        )
 65
 66    async def subscribe_to_child_removed(
 67        self,
 68    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 69        return await self._connection.subscribe_to_signal(
 70            self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed")
 71        )
 72
 73    async def subscribe_to_deleted(
 74        self,
 75    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 76        return await self._connection.subscribe_to_signal(
 77            self.generate_endpoint_uri(EndpointKind.SIGNAL, "deleted")
 78        )
 79
 80    async def subscribe_to_scan_done(
 81        self,
 82    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 83        return await self._connection.subscribe_to_signal(
 84            self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-done")
 85        )
 86
 87    async def subscribe_to_scan_start(
 88        self,
 89    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
 90        return await self._connection.subscribe_to_signal(
 91            self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-start")
 92        )
 93
 94    async def _get_children(self) -> Dict[str, Recording]:
 95        children = cast(
 96            Dict[str, List[str]], await self._connection.require_get(self._api_uri)
 97        )["children"]
 98        return dict(
 99            map(
100                lambda uuid: (
101                    uuid,
102                    Recording(self._connection, self._api_uri, uuid, self._http_url),
103                ),
104                reversed(children),
105            )
106        )
107
108    async def start_children_handler_tasks(self) -> None:
109        async def handle_child_added_task(
110            added_children_queue: asyncio.Queue[SignalBody],
111        ) -> None:
112            while True:
113                body = await added_children_queue.get()
114                child_uuid = cast(List[str], body)[0]
115                self._children[child_uuid] = Recording(
116                    self._connection, self._api_uri, child_uuid, self._http_url
117                )
118                await self._events.put((RecordingsEventKind.ADDED, body))
119
120        async def handle_child_removed_task(
121            removed_children_queue: asyncio.Queue[SignalBody],
122        ) -> None:
123            while True:
124                body = await removed_children_queue.get()
125                child_uuid = cast(List[str], body)[0]
126                del self._children[child_uuid]
127                await self._events.put((RecordingsEventKind.REMOVED, body))
128
129        if (
130            self._handle_child_added_task is None
131            and self._handle_child_removed_task is None
132        ):
133            self._children = await self._get_children()
134            (
135                added_children_queue,
136                self._unsubscribe_to_child_added,
137            ) = await self._connection.subscribe_to_signal(
138                self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added")
139            )
140            (
141                removed_children_queue,
142                self._unsubscribe_to_child_removed,
143            ) = await self._connection.subscribe_to_signal(
144                self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed")
145            )
146            self._handle_child_added_task = _utils.create_task(
147                handle_child_added_task(added_children_queue),
148                name="child_added_handler",
149            )
150            self._handle_child_removed_task = _utils.create_task(
151                handle_child_removed_task(removed_children_queue),
152                name="child_removed_handler",
153            )
154        else:
155            self.logger.warning(
156                "Attempted starting children handlers when already started."
157            )  # TODO: other type of warning?
158
159    async def stop_children_handler_tasks(self) -> None:
160        if (
161            self._handle_child_added_task is not None
162            and self._handle_child_removed_task is not None
163        ):
164            await self._unsubscribe_to_child_added
165            await self._unsubscribe_to_child_removed
166            self._handle_child_added_task.cancel()
167            self._handle_child_removed_task.cancel()
168            try:
169                await self._handle_child_added_task
170            except asyncio.CancelledError:
171                self.logger.debug("handle_child_added_task cancelled")
172            try:
173                await self._handle_child_removed_task
174            except asyncio.CancelledError:
175                self.logger.debug("handle_child_removed_task cancelled")
176            self._handle_child_added_task = None
177            self._handle_child_removed_task = None
178        else:
179            self.logger.warning(
180                "Attempted stopping children handlers before starting them."
181            )  # TODO: other type of warning?
182
183    @property
184    def events(self) -> asyncio.Queue[Tuple[RecordingsEventKind, SignalBody]]:
185        """An event queue containing added and removed recording events.
186
187        Is kept updated in the context `keep_updated_in_context`."""
188        return self._events
189
190    @property
191    def children(self) -> List[Recording]:
192        """A list of all current recordings.
193
194        This property is not recommended for use since the object itself has functionality of a
195        [`collections.abc.Sequence`](https://docs.python.org/3/library/collections.abc.html).
196
197        Is updated in the context `keep_updated_in_context`."""
198        return list(reversed(self._children.values()))
199
200    def get_recording(self, uuid: str) -> Recording:
201        """Returns the recording specified by `uuid`."""
202        return self._children[uuid]
203
204    def __len__(self) -> int:
205        return len(self._children)
206
207    @overload
208    def __getitem__(self, key: int) -> Recording:
209        ...
210
211    @overload
212    def __getitem__(self, key: slice) -> List[Recording]:
213        ...
214
215    def __getitem__(self, key: Union[int, slice]) -> Union[Recording, List[Recording]]:
216        return list(reversed(self._children.values()))[key]
217
218    @asynccontextmanager
219    async def keep_updated_in_context(self):
220        """Keep the `Recordings` state continuously updated in the context by listening for added and removed recordings.
221
222        Example usage:
223        ```python
224        async with g3.recordings.keep_updated_in_context():
225            await g3.recorder.start()
226            await asyncio.sleep(3)
227            await g3.recorder.stop()
228
229            print(len(g3.recordings)) # current number of recordings on device
230            print(await g3.recordings.events.get()) # next event from the event queue
231        ```
232        """
233        await self.start_children_handler_tasks()
234        try:
235            yield
236        finally:
237            await self.stop_children_handler_tasks()

All the operations on a read-only sequence.

Concrete subclasses must override __new__ or __init__, __getitem__, and __len__.

Recordings( connection: g3pylib.websocket.G3WebSocketClientProtocol, api_uri: g3pylib.g3typing.URI, http_url: Optional[str])
26    def __init__(
27        self,
28        connection: G3WebSocketClientProtocol,
29        api_uri: URI,
30        http_url: Optional[str],
31    ) -> None:
32        self._connection = connection
33        self._http_url = http_url
34        self._children = {}
35        self._handle_child_added_task = None
36        self._handle_child_removed_task = None
37        self._events: asyncio.Queue[
38            Tuple[RecordingsEventKind, SignalBody]
39        ] = asyncio.Queue()
40        self.logger: logging.Logger = logging.getLogger(__name__)
41        super().__init__(api_uri)
async def get_name(self) -> str:
43    async def get_name(self) -> str:
44        return cast(
45            str,
46            await self._connection.require_get(
47                self.generate_endpoint_uri(EndpointKind.PROPERTY, "name")
48            ),
49        )
async def delete(self, uuid: str) -> bool:
51    async def delete(self, uuid: str) -> bool:
52        return cast(
53            bool,
54            await self._connection.require_post(
55                self.generate_endpoint_uri(EndpointKind.ACTION, "delete"), body=[uuid]
56            ),
57        )
async def subscribe_to_child_added( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
59    async def subscribe_to_child_added(
60        self,
61    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
62        return await self._connection.subscribe_to_signal(
63            self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added")
64        )
async def subscribe_to_child_removed( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
66    async def subscribe_to_child_removed(
67        self,
68    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
69        return await self._connection.subscribe_to_signal(
70            self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed")
71        )
async def subscribe_to_deleted( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
73    async def subscribe_to_deleted(
74        self,
75    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
76        return await self._connection.subscribe_to_signal(
77            self.generate_endpoint_uri(EndpointKind.SIGNAL, "deleted")
78        )
async def subscribe_to_scan_done( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
80    async def subscribe_to_scan_done(
81        self,
82    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
83        return await self._connection.subscribe_to_signal(
84            self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-done")
85        )
async def subscribe_to_scan_start( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
87    async def subscribe_to_scan_start(
88        self,
89    ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]:
90        return await self._connection.subscribe_to_signal(
91            self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-start")
92        )
async def start_children_handler_tasks(self) -> None:
108    async def start_children_handler_tasks(self) -> None:
109        async def handle_child_added_task(
110            added_children_queue: asyncio.Queue[SignalBody],
111        ) -> None:
112            while True:
113                body = await added_children_queue.get()
114                child_uuid = cast(List[str], body)[0]
115                self._children[child_uuid] = Recording(
116                    self._connection, self._api_uri, child_uuid, self._http_url
117                )
118                await self._events.put((RecordingsEventKind.ADDED, body))
119
120        async def handle_child_removed_task(
121            removed_children_queue: asyncio.Queue[SignalBody],
122        ) -> None:
123            while True:
124                body = await removed_children_queue.get()
125                child_uuid = cast(List[str], body)[0]
126                del self._children[child_uuid]
127                await self._events.put((RecordingsEventKind.REMOVED, body))
128
129        if (
130            self._handle_child_added_task is None
131            and self._handle_child_removed_task is None
132        ):
133            self._children = await self._get_children()
134            (
135                added_children_queue,
136                self._unsubscribe_to_child_added,
137            ) = await self._connection.subscribe_to_signal(
138                self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added")
139            )
140            (
141                removed_children_queue,
142                self._unsubscribe_to_child_removed,
143            ) = await self._connection.subscribe_to_signal(
144                self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed")
145            )
146            self._handle_child_added_task = _utils.create_task(
147                handle_child_added_task(added_children_queue),
148                name="child_added_handler",
149            )
150            self._handle_child_removed_task = _utils.create_task(
151                handle_child_removed_task(removed_children_queue),
152                name="child_removed_handler",
153            )
154        else:
155            self.logger.warning(
156                "Attempted starting children handlers when already started."
157            )  # TODO: other type of warning?
async def stop_children_handler_tasks(self) -> None:
159    async def stop_children_handler_tasks(self) -> None:
160        if (
161            self._handle_child_added_task is not None
162            and self._handle_child_removed_task is not None
163        ):
164            await self._unsubscribe_to_child_added
165            await self._unsubscribe_to_child_removed
166            self._handle_child_added_task.cancel()
167            self._handle_child_removed_task.cancel()
168            try:
169                await self._handle_child_added_task
170            except asyncio.CancelledError:
171                self.logger.debug("handle_child_added_task cancelled")
172            try:
173                await self._handle_child_removed_task
174            except asyncio.CancelledError:
175                self.logger.debug("handle_child_removed_task cancelled")
176            self._handle_child_added_task = None
177            self._handle_child_removed_task = None
178        else:
179            self.logger.warning(
180                "Attempted stopping children handlers before starting them."
181            )  # TODO: other type of warning?
events: asyncio.queues.Queue[typing.Tuple[g3pylib.recordings.RecordingsEventKind, g3pylib.g3typing.SignalBody]]

An event queue containing added and removed recording events.

Is kept updated in the context keep_updated_in_context.

A list of all current recordings.

This property is not recommended for use since the object itself has functionality of a collections.abc.Sequence.

Is updated in the context keep_updated_in_context.

def get_recording(self, uuid: str) -> g3pylib.recordings.recording.Recording:
200    def get_recording(self, uuid: str) -> Recording:
201        """Returns the recording specified by `uuid`."""
202        return self._children[uuid]

Returns the recording specified by uuid.

@asynccontextmanager
def keep_updated_in_context(self):
218    @asynccontextmanager
219    async def keep_updated_in_context(self):
220        """Keep the `Recordings` state continuously updated in the context by listening for added and removed recordings.
221
222        Example usage:
223        ```python
224        async with g3.recordings.keep_updated_in_context():
225            await g3.recorder.start()
226            await asyncio.sleep(3)
227            await g3.recorder.stop()
228
229            print(len(g3.recordings)) # current number of recordings on device
230            print(await g3.recordings.events.get()) # next event from the event queue
231        ```
232        """
233        await self.start_children_handler_tasks()
234        try:
235            yield
236        finally:
237            await self.stop_children_handler_tasks()

Keep the Recordings state continuously updated in the context by listening for added and removed recordings.

Example usage:

async with g3.recordings.keep_updated_in_context():
    await g3.recorder.start()
    await asyncio.sleep(3)
    await g3.recorder.stop()

    print(len(g3.recordings)) # current number of recordings on device
    print(await g3.recordings.events.get()) # next event from the event queue
Inherited Members
g3pylib._utils.APIComponent
generate_endpoint_uri
collections.abc.Sequence
index
count