g3pylib.recordings
1import asyncio 2import logging 3from collections.abc import Sequence 4from contextlib import asynccontextmanager 5from enum import Enum, auto 6from typing import Awaitable, Dict, List, Optional, Tuple, Union, cast, overload 7 8from g3pylib import _utils 9from g3pylib._utils import APIComponent, EndpointKind 10from g3pylib.g3typing import URI, SignalBody 11from g3pylib.recordings.recording import Recording 12from g3pylib.websocket import G3WebSocketClientProtocol 13 14 15class RecordingsEventKind(Enum): 16 """Defines event kinds for the `Recordings` class. These events are emitted to the `Recordings.events` queue in the context `Recordings.keep_updated_in_context`.""" 17 18 ADDED = auto() 19 """A recording was added.""" 20 REMOVED = auto() 21 """A recording was removed.""" 22 23 24class Recordings(APIComponent, Sequence[Recording]): 25 def __init__( 26 self, 27 connection: G3WebSocketClientProtocol, 28 api_uri: URI, 29 http_url: Optional[str], 30 ) -> None: 31 self._connection = connection 32 self._http_url = http_url 33 self._children = {} 34 self._handle_child_added_task = None 35 self._handle_child_removed_task = None 36 self._events: asyncio.Queue[ 37 Tuple[RecordingsEventKind, SignalBody] 38 ] = asyncio.Queue() 39 self.logger: logging.Logger = logging.getLogger(__name__) 40 super().__init__(api_uri) 41 42 async def get_name(self) -> str: 43 return cast( 44 str, 45 await self._connection.require_get( 46 self.generate_endpoint_uri(EndpointKind.PROPERTY, "name") 47 ), 48 ) 49 50 async def delete(self, uuid: str) -> bool: 51 return cast( 52 bool, 53 await self._connection.require_post( 54 self.generate_endpoint_uri(EndpointKind.ACTION, "delete"), body=[uuid] 55 ), 56 ) 57 58 async def subscribe_to_child_added( 59 self, 60 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 61 return await self._connection.subscribe_to_signal( 62 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added") 63 ) 64 65 async def subscribe_to_child_removed( 66 self, 67 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 68 return await self._connection.subscribe_to_signal( 69 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed") 70 ) 71 72 async def subscribe_to_deleted( 73 self, 74 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 75 return await self._connection.subscribe_to_signal( 76 self.generate_endpoint_uri(EndpointKind.SIGNAL, "deleted") 77 ) 78 79 async def subscribe_to_scan_done( 80 self, 81 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 82 return await self._connection.subscribe_to_signal( 83 self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-done") 84 ) 85 86 async def subscribe_to_scan_start( 87 self, 88 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 89 return await self._connection.subscribe_to_signal( 90 self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-start") 91 ) 92 93 async def _get_children(self) -> Dict[str, Recording]: 94 children = cast( 95 Dict[str, List[str]], await self._connection.require_get(self._api_uri) 96 )["children"] 97 return dict( 98 map( 99 lambda uuid: ( 100 uuid, 101 Recording(self._connection, self._api_uri, uuid, self._http_url), 102 ), 103 reversed(children), 104 ) 105 ) 106 107 async def start_children_handler_tasks(self) -> None: 108 async def handle_child_added_task( 109 added_children_queue: asyncio.Queue[SignalBody], 110 ) -> None: 111 while True: 112 body = await added_children_queue.get() 113 child_uuid = cast(List[str], body)[0] 114 self._children[child_uuid] = Recording( 115 self._connection, self._api_uri, child_uuid, self._http_url 116 ) 117 await self._events.put((RecordingsEventKind.ADDED, body)) 118 119 async def handle_child_removed_task( 120 removed_children_queue: asyncio.Queue[SignalBody], 121 ) -> None: 122 while True: 123 body = await removed_children_queue.get() 124 child_uuid = cast(List[str], body)[0] 125 del self._children[child_uuid] 126 await self._events.put((RecordingsEventKind.REMOVED, body)) 127 128 if ( 129 self._handle_child_added_task is None 130 and self._handle_child_removed_task is None 131 ): 132 self._children = await self._get_children() 133 ( 134 added_children_queue, 135 self._unsubscribe_to_child_added, 136 ) = await self._connection.subscribe_to_signal( 137 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added") 138 ) 139 ( 140 removed_children_queue, 141 self._unsubscribe_to_child_removed, 142 ) = await self._connection.subscribe_to_signal( 143 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed") 144 ) 145 self._handle_child_added_task = _utils.create_task( 146 handle_child_added_task(added_children_queue), 147 name="child_added_handler", 148 ) 149 self._handle_child_removed_task = _utils.create_task( 150 handle_child_removed_task(removed_children_queue), 151 name="child_removed_handler", 152 ) 153 else: 154 self.logger.warning( 155 "Attempted starting children handlers when already started." 156 ) # TODO: other type of warning? 157 158 async def stop_children_handler_tasks(self) -> None: 159 if ( 160 self._handle_child_added_task is not None 161 and self._handle_child_removed_task is not None 162 ): 163 await self._unsubscribe_to_child_added 164 await self._unsubscribe_to_child_removed 165 self._handle_child_added_task.cancel() 166 self._handle_child_removed_task.cancel() 167 try: 168 await self._handle_child_added_task 169 except asyncio.CancelledError: 170 self.logger.debug("handle_child_added_task cancelled") 171 try: 172 await self._handle_child_removed_task 173 except asyncio.CancelledError: 174 self.logger.debug("handle_child_removed_task cancelled") 175 self._handle_child_added_task = None 176 self._handle_child_removed_task = None 177 else: 178 self.logger.warning( 179 "Attempted stopping children handlers before starting them." 180 ) # TODO: other type of warning? 181 182 @property 183 def events(self) -> asyncio.Queue[Tuple[RecordingsEventKind, SignalBody]]: 184 """An event queue containing added and removed recording events. 185 186 Is kept updated in the context `keep_updated_in_context`.""" 187 return self._events 188 189 @property 190 def children(self) -> List[Recording]: 191 """A list of all current recordings. 192 193 This property is not recommended for use since the object itself has functionality of a 194 [`collections.abc.Sequence`](https://docs.python.org/3/library/collections.abc.html). 195 196 Is updated in the context `keep_updated_in_context`.""" 197 return list(reversed(self._children.values())) 198 199 def get_recording(self, uuid: str) -> Recording: 200 """Returns the recording specified by `uuid`.""" 201 return self._children[uuid] 202 203 def __len__(self) -> int: 204 return len(self._children) 205 206 @overload 207 def __getitem__(self, key: int) -> Recording: 208 ... 209 210 @overload 211 def __getitem__(self, key: slice) -> List[Recording]: 212 ... 213 214 def __getitem__(self, key: Union[int, slice]) -> Union[Recording, List[Recording]]: 215 return list(reversed(self._children.values()))[key] 216 217 @asynccontextmanager 218 async def keep_updated_in_context(self): 219 """Keep the `Recordings` state continuously updated in the context by listening for added and removed recordings. 220 221 Example usage: 222 ```python 223 async with g3.recordings.keep_updated_in_context(): 224 await g3.recorder.start() 225 await asyncio.sleep(3) 226 await g3.recorder.stop() 227 228 print(len(g3.recordings)) # current number of recordings on device 229 print(await g3.recordings.events.get()) # next event from the event queue 230 ``` 231 """ 232 await self.start_children_handler_tasks() 233 try: 234 yield 235 finally: 236 await self.stop_children_handler_tasks()
class
RecordingsEventKind(enum.Enum):
16class RecordingsEventKind(Enum): 17 """Defines event kinds for the `Recordings` class. These events are emitted to the `Recordings.events` queue in the context `Recordings.keep_updated_in_context`.""" 18 19 ADDED = auto() 20 """A recording was added.""" 21 REMOVED = auto() 22 """A recording was removed."""
Defines event kinds for the Recordings
class. These events are emitted to the Recordings.events
queue in the context Recordings.keep_updated_in_context
.
Inherited Members
- enum.Enum
- name
- value
class
Recordings(g3pylib._utils.APIComponent, collections.abc.Sequence[g3pylib.recordings.recording.Recording]):
25class Recordings(APIComponent, Sequence[Recording]): 26 def __init__( 27 self, 28 connection: G3WebSocketClientProtocol, 29 api_uri: URI, 30 http_url: Optional[str], 31 ) -> None: 32 self._connection = connection 33 self._http_url = http_url 34 self._children = {} 35 self._handle_child_added_task = None 36 self._handle_child_removed_task = None 37 self._events: asyncio.Queue[ 38 Tuple[RecordingsEventKind, SignalBody] 39 ] = asyncio.Queue() 40 self.logger: logging.Logger = logging.getLogger(__name__) 41 super().__init__(api_uri) 42 43 async def get_name(self) -> str: 44 return cast( 45 str, 46 await self._connection.require_get( 47 self.generate_endpoint_uri(EndpointKind.PROPERTY, "name") 48 ), 49 ) 50 51 async def delete(self, uuid: str) -> bool: 52 return cast( 53 bool, 54 await self._connection.require_post( 55 self.generate_endpoint_uri(EndpointKind.ACTION, "delete"), body=[uuid] 56 ), 57 ) 58 59 async def subscribe_to_child_added( 60 self, 61 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 62 return await self._connection.subscribe_to_signal( 63 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added") 64 ) 65 66 async def subscribe_to_child_removed( 67 self, 68 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 69 return await self._connection.subscribe_to_signal( 70 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed") 71 ) 72 73 async def subscribe_to_deleted( 74 self, 75 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 76 return await self._connection.subscribe_to_signal( 77 self.generate_endpoint_uri(EndpointKind.SIGNAL, "deleted") 78 ) 79 80 async def subscribe_to_scan_done( 81 self, 82 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 83 return await self._connection.subscribe_to_signal( 84 self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-done") 85 ) 86 87 async def subscribe_to_scan_start( 88 self, 89 ) -> Tuple[asyncio.Queue[SignalBody], Awaitable[None]]: 90 return await self._connection.subscribe_to_signal( 91 self.generate_endpoint_uri(EndpointKind.SIGNAL, "scan-start") 92 ) 93 94 async def _get_children(self) -> Dict[str, Recording]: 95 children = cast( 96 Dict[str, List[str]], await self._connection.require_get(self._api_uri) 97 )["children"] 98 return dict( 99 map( 100 lambda uuid: ( 101 uuid, 102 Recording(self._connection, self._api_uri, uuid, self._http_url), 103 ), 104 reversed(children), 105 ) 106 ) 107 108 async def start_children_handler_tasks(self) -> None: 109 async def handle_child_added_task( 110 added_children_queue: asyncio.Queue[SignalBody], 111 ) -> None: 112 while True: 113 body = await added_children_queue.get() 114 child_uuid = cast(List[str], body)[0] 115 self._children[child_uuid] = Recording( 116 self._connection, self._api_uri, child_uuid, self._http_url 117 ) 118 await self._events.put((RecordingsEventKind.ADDED, body)) 119 120 async def handle_child_removed_task( 121 removed_children_queue: asyncio.Queue[SignalBody], 122 ) -> None: 123 while True: 124 body = await removed_children_queue.get() 125 child_uuid = cast(List[str], body)[0] 126 del self._children[child_uuid] 127 await self._events.put((RecordingsEventKind.REMOVED, body)) 128 129 if ( 130 self._handle_child_added_task is None 131 and self._handle_child_removed_task is None 132 ): 133 self._children = await self._get_children() 134 ( 135 added_children_queue, 136 self._unsubscribe_to_child_added, 137 ) = await self._connection.subscribe_to_signal( 138 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added") 139 ) 140 ( 141 removed_children_queue, 142 self._unsubscribe_to_child_removed, 143 ) = await self._connection.subscribe_to_signal( 144 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed") 145 ) 146 self._handle_child_added_task = _utils.create_task( 147 handle_child_added_task(added_children_queue), 148 name="child_added_handler", 149 ) 150 self._handle_child_removed_task = _utils.create_task( 151 handle_child_removed_task(removed_children_queue), 152 name="child_removed_handler", 153 ) 154 else: 155 self.logger.warning( 156 "Attempted starting children handlers when already started." 157 ) # TODO: other type of warning? 158 159 async def stop_children_handler_tasks(self) -> None: 160 if ( 161 self._handle_child_added_task is not None 162 and self._handle_child_removed_task is not None 163 ): 164 await self._unsubscribe_to_child_added 165 await self._unsubscribe_to_child_removed 166 self._handle_child_added_task.cancel() 167 self._handle_child_removed_task.cancel() 168 try: 169 await self._handle_child_added_task 170 except asyncio.CancelledError: 171 self.logger.debug("handle_child_added_task cancelled") 172 try: 173 await self._handle_child_removed_task 174 except asyncio.CancelledError: 175 self.logger.debug("handle_child_removed_task cancelled") 176 self._handle_child_added_task = None 177 self._handle_child_removed_task = None 178 else: 179 self.logger.warning( 180 "Attempted stopping children handlers before starting them." 181 ) # TODO: other type of warning? 182 183 @property 184 def events(self) -> asyncio.Queue[Tuple[RecordingsEventKind, SignalBody]]: 185 """An event queue containing added and removed recording events. 186 187 Is kept updated in the context `keep_updated_in_context`.""" 188 return self._events 189 190 @property 191 def children(self) -> List[Recording]: 192 """A list of all current recordings. 193 194 This property is not recommended for use since the object itself has functionality of a 195 [`collections.abc.Sequence`](https://docs.python.org/3/library/collections.abc.html). 196 197 Is updated in the context `keep_updated_in_context`.""" 198 return list(reversed(self._children.values())) 199 200 def get_recording(self, uuid: str) -> Recording: 201 """Returns the recording specified by `uuid`.""" 202 return self._children[uuid] 203 204 def __len__(self) -> int: 205 return len(self._children) 206 207 @overload 208 def __getitem__(self, key: int) -> Recording: 209 ... 210 211 @overload 212 def __getitem__(self, key: slice) -> List[Recording]: 213 ... 214 215 def __getitem__(self, key: Union[int, slice]) -> Union[Recording, List[Recording]]: 216 return list(reversed(self._children.values()))[key] 217 218 @asynccontextmanager 219 async def keep_updated_in_context(self): 220 """Keep the `Recordings` state continuously updated in the context by listening for added and removed recordings. 221 222 Example usage: 223 ```python 224 async with g3.recordings.keep_updated_in_context(): 225 await g3.recorder.start() 226 await asyncio.sleep(3) 227 await g3.recorder.stop() 228 229 print(len(g3.recordings)) # current number of recordings on device 230 print(await g3.recordings.events.get()) # next event from the event queue 231 ``` 232 """ 233 await self.start_children_handler_tasks() 234 try: 235 yield 236 finally: 237 await self.stop_children_handler_tasks()
All the operations on a read-only sequence.
Concrete subclasses must override __new__ or __init__, __getitem__, and __len__.
Recordings( connection: g3pylib.websocket.G3WebSocketClientProtocol, api_uri: g3pylib.g3typing.URI, http_url: Optional[str])
26 def __init__( 27 self, 28 connection: G3WebSocketClientProtocol, 29 api_uri: URI, 30 http_url: Optional[str], 31 ) -> None: 32 self._connection = connection 33 self._http_url = http_url 34 self._children = {} 35 self._handle_child_added_task = None 36 self._handle_child_removed_task = None 37 self._events: asyncio.Queue[ 38 Tuple[RecordingsEventKind, SignalBody] 39 ] = asyncio.Queue() 40 self.logger: logging.Logger = logging.getLogger(__name__) 41 super().__init__(api_uri)
async def
subscribe_to_child_added( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
async def
subscribe_to_child_removed( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
async def
subscribe_to_deleted( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
async def
subscribe_to_scan_done( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
async def
subscribe_to_scan_start( self) -> Tuple[asyncio.queues.Queue[g3pylib.g3typing.SignalBody], Awaitable[NoneType]]:
async def
start_children_handler_tasks(self) -> None:
108 async def start_children_handler_tasks(self) -> None: 109 async def handle_child_added_task( 110 added_children_queue: asyncio.Queue[SignalBody], 111 ) -> None: 112 while True: 113 body = await added_children_queue.get() 114 child_uuid = cast(List[str], body)[0] 115 self._children[child_uuid] = Recording( 116 self._connection, self._api_uri, child_uuid, self._http_url 117 ) 118 await self._events.put((RecordingsEventKind.ADDED, body)) 119 120 async def handle_child_removed_task( 121 removed_children_queue: asyncio.Queue[SignalBody], 122 ) -> None: 123 while True: 124 body = await removed_children_queue.get() 125 child_uuid = cast(List[str], body)[0] 126 del self._children[child_uuid] 127 await self._events.put((RecordingsEventKind.REMOVED, body)) 128 129 if ( 130 self._handle_child_added_task is None 131 and self._handle_child_removed_task is None 132 ): 133 self._children = await self._get_children() 134 ( 135 added_children_queue, 136 self._unsubscribe_to_child_added, 137 ) = await self._connection.subscribe_to_signal( 138 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-added") 139 ) 140 ( 141 removed_children_queue, 142 self._unsubscribe_to_child_removed, 143 ) = await self._connection.subscribe_to_signal( 144 self.generate_endpoint_uri(EndpointKind.SIGNAL, "child-removed") 145 ) 146 self._handle_child_added_task = _utils.create_task( 147 handle_child_added_task(added_children_queue), 148 name="child_added_handler", 149 ) 150 self._handle_child_removed_task = _utils.create_task( 151 handle_child_removed_task(removed_children_queue), 152 name="child_removed_handler", 153 ) 154 else: 155 self.logger.warning( 156 "Attempted starting children handlers when already started." 157 ) # TODO: other type of warning?
async def
stop_children_handler_tasks(self) -> None:
159 async def stop_children_handler_tasks(self) -> None: 160 if ( 161 self._handle_child_added_task is not None 162 and self._handle_child_removed_task is not None 163 ): 164 await self._unsubscribe_to_child_added 165 await self._unsubscribe_to_child_removed 166 self._handle_child_added_task.cancel() 167 self._handle_child_removed_task.cancel() 168 try: 169 await self._handle_child_added_task 170 except asyncio.CancelledError: 171 self.logger.debug("handle_child_added_task cancelled") 172 try: 173 await self._handle_child_removed_task 174 except asyncio.CancelledError: 175 self.logger.debug("handle_child_removed_task cancelled") 176 self._handle_child_added_task = None 177 self._handle_child_removed_task = None 178 else: 179 self.logger.warning( 180 "Attempted stopping children handlers before starting them." 181 ) # TODO: other type of warning?
events: asyncio.queues.Queue[typing.Tuple[g3pylib.recordings.RecordingsEventKind, g3pylib.g3typing.SignalBody]]
An event queue containing added and removed recording events.
Is kept updated in the context keep_updated_in_context
.
children: List[g3pylib.recordings.recording.Recording]
A list of all current recordings.
This property is not recommended for use since the object itself has functionality of a
collections.abc.Sequence
.
Is updated in the context keep_updated_in_context
.
200 def get_recording(self, uuid: str) -> Recording: 201 """Returns the recording specified by `uuid`.""" 202 return self._children[uuid]
Returns the recording specified by uuid
.
@asynccontextmanager
def
keep_updated_in_context(self):
218 @asynccontextmanager 219 async def keep_updated_in_context(self): 220 """Keep the `Recordings` state continuously updated in the context by listening for added and removed recordings. 221 222 Example usage: 223 ```python 224 async with g3.recordings.keep_updated_in_context(): 225 await g3.recorder.start() 226 await asyncio.sleep(3) 227 await g3.recorder.stop() 228 229 print(len(g3.recordings)) # current number of recordings on device 230 print(await g3.recordings.events.get()) # next event from the event queue 231 ``` 232 """ 233 await self.start_children_handler_tasks() 234 try: 235 yield 236 finally: 237 await self.stop_children_handler_tasks()
Keep the Recordings
state continuously updated in the context by listening for added and removed recordings.
Example usage:
async with g3.recordings.keep_updated_in_context():
await g3.recorder.start()
await asyncio.sleep(3)
await g3.recorder.stop()
print(len(g3.recordings)) # current number of recordings on device
print(await g3.recordings.events.get()) # next event from the event queue
Inherited Members
- collections.abc.Sequence
- index
- count