|
|
|
@ -1220,6 +1220,108 @@ async def test_event_stream_on_chain_with_tool() -> None:
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.xfail(reason="Fix order of callback invocations in RunnableSequence")
|
|
|
|
|
async def test_chain_ordering() -> None:
|
|
|
|
|
"""Test the event stream with a tool."""
|
|
|
|
|
|
|
|
|
|
def foo(a: str) -> str:
|
|
|
|
|
return a
|
|
|
|
|
|
|
|
|
|
def bar(a: str) -> str:
|
|
|
|
|
return a
|
|
|
|
|
|
|
|
|
|
chain = RunnableLambda(foo) | RunnableLambda(bar)
|
|
|
|
|
iterable = chain.astream_events("q", version="v1")
|
|
|
|
|
|
|
|
|
|
events = []
|
|
|
|
|
|
|
|
|
|
for _ in range(10):
|
|
|
|
|
try:
|
|
|
|
|
next_chunk = await iterable.__anext__()
|
|
|
|
|
events.append(next_chunk)
|
|
|
|
|
except Exception:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
events = _with_nulled_run_id(events)
|
|
|
|
|
for event in events:
|
|
|
|
|
event["tags"] = sorted(event["tags"])
|
|
|
|
|
|
|
|
|
|
assert events == [
|
|
|
|
|
{
|
|
|
|
|
"data": {"input": "q"},
|
|
|
|
|
"event": "on_chain_start",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "RunnableSequence",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": [],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {},
|
|
|
|
|
"event": "on_chain_start",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "foo",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": ["seq:step:1"],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {"chunk": "q"},
|
|
|
|
|
"event": "on_chain_stream",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "foo",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": ["seq:step:1"],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {"input": "q", "output": "q"},
|
|
|
|
|
"event": "on_chain_end",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "foo",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": ["seq:step:1"],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {},
|
|
|
|
|
"event": "on_chain_start",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "bar",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": ["seq:step:2"],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {"chunk": "q"},
|
|
|
|
|
"event": "on_chain_stream",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "bar",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": ["seq:step:2"],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {"chunk": "q"},
|
|
|
|
|
"event": "on_chain_stream",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "RunnableSequence",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": [],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {"input": "q", "output": "q"},
|
|
|
|
|
"event": "on_chain_end",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "bar",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": ["seq:step:2"],
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"data": {"output": "q"},
|
|
|
|
|
"event": "on_chain_end",
|
|
|
|
|
"metadata": {},
|
|
|
|
|
"name": "RunnableSequence",
|
|
|
|
|
"run_id": "",
|
|
|
|
|
"tags": [],
|
|
|
|
|
},
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def test_event_stream_with_retry() -> None:
|
|
|
|
|
"""Test the event stream with a tool."""
|
|
|
|
|
|
|
|
|
|