From 7be68228da97725eb3697003ecf0cc320d3b9e87 Mon Sep 17 00:00:00 2001 From: Philippe PRADOS Date: Wed, 8 May 2024 23:31:11 +0200 Subject: [PATCH] community[patch]: Make sql record manager fully compatible with async (#20735) The `_amake_session()` method does not allow modifying the `self.session_factory` with anything other than `async_sessionmaker`. This prohibits advanced uses of `index()`. In a RAG architecture, it is necessary to import document chunks. To keep track of the links between chunks and documents, we can use the `index()` API. This API proposes to use an SQL-type record manager. In a classic use case, using `SQLRecordManager` and a vector database, it is impossible to guarantee the consistency of the import. Indeed, if a crash occurs during the import (problem with the network, ...) there is an inconsistency between the SQL database and the vector database. With the [PR](https://github.com/langchain-ai/langchain-postgres/pull/32) we are proposing for `langchain-postgres`, it is now possible to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection. ```python def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, ) record_manager.create_schema() with engine.connect() as connection: session_maker = scoped_session(sessionmaker(bind=connection)) # NOTE: Update session_factories record_manager.session_factory = session_maker pgvector.session_maker = session_maker with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = index( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) ``` The same thing is possible asynchronously, but a bug in `sql_record_manager.py` in `_amake_session()` must first be fixed. ```python async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~ if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") async with self.session_factory() as session: yield session ``` Then, it is possible to do the same thing asynchronously: ```python async def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_async_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, async_mode=True, ) await record_manager.acreate_schema() async with engine.connect() as connection: session_maker = async_scoped_session( async_sessionmaker(bind=connection), scopefunc=current_task) record_manager.session_factory = session_maker pgvector.session_maker = session_maker async with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = await aindex( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) asyncio.run(main()) ``` --------- Signed-off-by: Rahul Tripathi Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com> Co-authored-by: Sean Co-authored-by: JuHyung-Son Co-authored-by: Erick Friis Co-authored-by: YISH Co-authored-by: Bagatur Co-authored-by: Jason_Chen <820542443@qq.com> Co-authored-by: Joan Fontanals Co-authored-by: Pavlo Paliychuk Co-authored-by: fzowl <160063452+fzowl@users.noreply.github.com> Co-authored-by: samanhappy Co-authored-by: Lei Zhang Co-authored-by: Tomaz Bratanic Co-authored-by: merdan <48309329+merdan-9@users.noreply.github.com> Co-authored-by: ccurme Co-authored-by: Andres Algaba Co-authored-by: davidefantiniIntel <115252273+davidefantiniIntel@users.noreply.github.com> Co-authored-by: Jingpan Xiong <71321890+klaus-xiong@users.noreply.github.com> Co-authored-by: kaka Co-authored-by: jingsi Co-authored-by: Eugene Yurtsev Co-authored-by: Rahul Triptahi Co-authored-by: Rahul Tripathi Co-authored-by: Shengsheng Huang Co-authored-by: Michael Schock Co-authored-by: Anish Chakraborty Co-authored-by: am-kinetica <85610855+am-kinetica@users.noreply.github.com> Co-authored-by: Dristy Srivastava <58721149+dristysrivastava@users.noreply.github.com> Co-authored-by: Matt Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com> --- .../indexes/_sql_record_manager.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/libs/community/langchain_community/indexes/_sql_record_manager.py b/libs/community/langchain_community/indexes/_sql_record_manager.py index 544e828df2..f70a1dc4f4 100644 --- a/libs/community/langchain_community/indexes/_sql_record_manager.py +++ b/libs/community/langchain_community/indexes/_sql_record_manager.py @@ -16,7 +16,17 @@ allow it to work with a variety of SQL as a backend. import contextlib import decimal import uuid -from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Sequence, Union +from typing import ( + Any, + AsyncGenerator, + Dict, + Generator, + List, + Optional, + Sequence, + Union, + cast, +) from sqlalchemy import ( URL, @@ -175,10 +185,10 @@ class SQLRecordManager(RecordManager): async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" - if not isinstance(self.session_factory, async_sessionmaker): + if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") - async with self.session_factory() as session: + async with cast(AsyncSession, self.session_factory()) as session: yield session def get_time(self) -> float: