core[patch]: Check if event loop is closed in memory stream (#21841)

Check if event stream is closed in memory loop.

Using try/except here to avoid race condition, but this may incur a
small overhead in versions prios to 3.11
pull/21804/merge
Eugene Yurtsev 2 weeks ago committed by GitHub
parent d8f89a5e9b
commit 67b6f6c82a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -37,7 +37,11 @@ class _SendStream(Generic[T]):
def send_nowait(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop."""
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item)
try:
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item)
except RuntimeError:
if not self._reader_loop.is_closed():
raise # Raise the exception if the loop is not closed
async def aclose(self) -> None:
"""Schedule the done object write the queue using the original loop."""
@ -45,7 +49,11 @@ class _SendStream(Generic[T]):
def close(self) -> None:
"""Schedule the done object write the queue using the original loop."""
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done)
try:
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done)
except RuntimeError:
if not self._reader_loop.is_closed():
raise # Raise the exception if the loop is not closed
class _ReceiveStream(Generic[T]):

@ -112,6 +112,24 @@ async def test_queue_for_streaming_via_sync_call() -> None:
), f"delta_time: {delta_time}"
def test_send_to_closed_stream() -> None:
"""Test that sending to a closed stream doesn't raise an error.
We may want to handle this in a better way in the future.
"""
event_loop = asyncio.get_event_loop()
channel = _MemoryStream[str](event_loop)
writer = channel.get_send_stream()
# send with an open even loop
writer.send_nowait("hello")
event_loop.close()
writer.send_nowait("hello")
# now close the loop
event_loop.close()
writer.close()
writer.send_nowait("hello")
async def test_closed_stream() -> None:
reader_loop = asyncio.get_event_loop()
channel = _MemoryStream[str](reader_loop)

Loading…
Cancel
Save