mirror of https://github.com/hwchase17/langchain
prompty: adding Microsoft langchain_prompty package (#21346)
Co-authored-by: Micky Liu <wayliu@microsoft.com> Co-authored-by: wayliums <wayliums@users.noreply.github.com> Co-authored-by: Erick Friis <erick@langchain.dev>pull/19865/head^2 langchain-prompty==0.0.1
parent
56c6b5868b
commit
af875cff57
@ -0,0 +1 @@
|
||||
__pycache__
|
@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2023 LangChain, Inc.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
@ -0,0 +1,59 @@
|
||||
.PHONY: all format lint test tests integration_tests docker_tests help extended_tests
|
||||
|
||||
# Default target executed when no arguments are given to make.
|
||||
all: help
|
||||
|
||||
# Define a variable for the test file path.
|
||||
TEST_FILE ?= tests/unit_tests/
|
||||
|
||||
test:
|
||||
poetry run pytest $(TEST_FILE)
|
||||
|
||||
tests:
|
||||
poetry run pytest $(TEST_FILE)
|
||||
|
||||
|
||||
######################
|
||||
# LINTING AND FORMATTING
|
||||
######################
|
||||
|
||||
# Define a variable for Python and notebook files.
|
||||
PYTHON_FILES=.
|
||||
MYPY_CACHE=.mypy_cache
|
||||
lint format: PYTHON_FILES=.
|
||||
lint_diff format_diff: PYTHON_FILES=$(shell git diff --relative=libs/partners/prompty --name-only --diff-filter=d master | grep -E '\.py$$|\.ipynb$$')
|
||||
lint_package: PYTHON_FILES=langchain_prompty
|
||||
lint_tests: PYTHON_FILES=tests
|
||||
lint_tests: MYPY_CACHE=.mypy_cache_test
|
||||
|
||||
lint lint_diff lint_package lint_tests:
|
||||
poetry run ruff .
|
||||
poetry run ruff format $(PYTHON_FILES) --diff
|
||||
poetry run ruff --select I $(PYTHON_FILES)
|
||||
mkdir $(MYPY_CACHE); poetry run mypy $(PYTHON_FILES) --cache-dir $(MYPY_CACHE)
|
||||
|
||||
format format_diff:
|
||||
poetry run ruff format $(PYTHON_FILES)
|
||||
poetry run ruff --select I --fix $(PYTHON_FILES)
|
||||
|
||||
spell_check:
|
||||
poetry run codespell --toml pyproject.toml
|
||||
|
||||
spell_fix:
|
||||
poetry run codespell --toml pyproject.toml -w
|
||||
|
||||
check_imports: $(shell find langchain_prompty -name '*.py')
|
||||
poetry run python ./scripts/check_imports.py $^
|
||||
|
||||
######################
|
||||
# HELP
|
||||
######################
|
||||
|
||||
help:
|
||||
@echo '----'
|
||||
@echo 'check_imports - check imports'
|
||||
@echo 'format - run code formatters'
|
||||
@echo 'lint - run linters'
|
||||
@echo 'test - run unit tests'
|
||||
@echo 'tests - run unit tests'
|
||||
@echo 'test TEST_FILE=<test_file> - run all tests in file'
|
@ -0,0 +1,54 @@
|
||||
# langchain-prompty
|
||||
|
||||
This package contains the LangChain integration with Microsoft Prompty.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
pip install -U langchain-prompty
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
Use the `create_chat_prompt` function to load `prompty` file as prompt.
|
||||
|
||||
```python
|
||||
from langchain_prompty import create_chat_prompt
|
||||
|
||||
prompt = create_chat_prompt('<your .prompty file path>')
|
||||
```
|
||||
Then you can use the prompt for next steps.
|
||||
|
||||
Here is an example .prompty file:
|
||||
```prompty
|
||||
---
|
||||
name: Basic Prompt
|
||||
description: A basic prompt that uses the GPT-3 chat API to answer questions
|
||||
authors:
|
||||
- author_1
|
||||
- author_2
|
||||
model:
|
||||
api: chat
|
||||
configuration:
|
||||
azure_deployment: gpt-35-turbo
|
||||
sample:
|
||||
firstName: Jane
|
||||
lastName: Doe
|
||||
question: What is the meaning of life?
|
||||
chat_history: []
|
||||
---
|
||||
system:
|
||||
You are an AI assistant who helps people find information.
|
||||
As the assistant, you answer questions briefly, succinctly,
|
||||
and in a personable manner using markdown and even add some personal flair with appropriate emojis.
|
||||
|
||||
{% for item in chat_history %}
|
||||
{{item.role}}:
|
||||
{{item.content}}
|
||||
{% endfor %}
|
||||
|
||||
|
||||
user:
|
||||
{{input}}
|
||||
|
||||
```
|
@ -0,0 +1,9 @@
|
||||
from langchain_prompty.core import InvokerFactory
|
||||
from langchain_prompty.langchain import create_chat_prompt
|
||||
from langchain_prompty.parsers import PromptyChatParser
|
||||
from langchain_prompty.renderers import MustacheRenderer
|
||||
|
||||
InvokerFactory().register_renderer("mustache", MustacheRenderer)
|
||||
InvokerFactory().register_parser("prompty.chat", PromptyChatParser)
|
||||
|
||||
__all__ = ["create_chat_prompt"]
|
@ -0,0 +1,297 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Generic, List, Literal, Optional, Type, TypeVar, Union
|
||||
|
||||
import yaml
|
||||
from pydantic import BaseModel, ConfigDict, Field, FilePath
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class SimpleModel(BaseModel, Generic[T]):
|
||||
item: T
|
||||
|
||||
|
||||
class PropertySettings(BaseModel):
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
type: Literal["string", "number", "array", "object", "boolean"]
|
||||
default: Union[str, int, float, List, Dict, bool] = Field(default=None)
|
||||
description: str = Field(default="")
|
||||
|
||||
|
||||
class ModelSettings(BaseModel):
|
||||
api: str = Field(default="")
|
||||
configuration: dict = Field(default={})
|
||||
parameters: dict = Field(default={})
|
||||
response: dict = Field(default={})
|
||||
|
||||
def model_dump_safe(self) -> dict:
|
||||
d = self.model_dump()
|
||||
d["configuration"] = {
|
||||
k: "*" * len(v) if "key" in k.lower() or "secret" in k.lower() else v
|
||||
for k, v in d["configuration"].items()
|
||||
}
|
||||
return d
|
||||
|
||||
|
||||
class TemplateSettings(BaseModel):
|
||||
type: str = Field(default="mustache")
|
||||
parser: str = Field(default="")
|
||||
|
||||
|
||||
class Prompty(BaseModel):
|
||||
# metadata
|
||||
name: str = Field(default="")
|
||||
description: str = Field(default="")
|
||||
authors: List[str] = Field(default=[])
|
||||
tags: List[str] = Field(default=[])
|
||||
version: str = Field(default="")
|
||||
base: str = Field(default="")
|
||||
basePrompty: Optional[Prompty] = Field(default=None)
|
||||
# model
|
||||
model: ModelSettings = Field(default_factory=ModelSettings)
|
||||
|
||||
# sample
|
||||
sample: dict = Field(default={})
|
||||
|
||||
# input / output
|
||||
inputs: Dict[str, PropertySettings] = Field(default={})
|
||||
outputs: Dict[str, PropertySettings] = Field(default={})
|
||||
|
||||
# template
|
||||
template: TemplateSettings
|
||||
|
||||
file: FilePath = Field(default="")
|
||||
content: str = Field(default="")
|
||||
|
||||
def to_safe_dict(self) -> Dict[str, Any]:
|
||||
d = {}
|
||||
for k, v in self:
|
||||
if v != "" and v != {} and v != [] and v is not None:
|
||||
if k == "model":
|
||||
d[k] = v.model_dump_safe()
|
||||
elif k == "template":
|
||||
d[k] = v.model_dump()
|
||||
elif k == "inputs" or k == "outputs":
|
||||
d[k] = {k: v.model_dump() for k, v in v.items()}
|
||||
elif k == "file":
|
||||
d[k] = (
|
||||
str(self.file.as_posix())
|
||||
if isinstance(self.file, Path)
|
||||
else self.file
|
||||
)
|
||||
elif k == "basePrompty":
|
||||
# no need to serialize basePrompty
|
||||
continue
|
||||
|
||||
else:
|
||||
d[k] = v
|
||||
return d
|
||||
|
||||
# generate json representation of the prompty
|
||||
def to_safe_json(self) -> str:
|
||||
d = self.to_safe_dict()
|
||||
return json.dumps(d)
|
||||
|
||||
@staticmethod
|
||||
def normalize(attribute: Any, parent: Path, env_error: bool = True) -> Any:
|
||||
if isinstance(attribute, str):
|
||||
attribute = attribute.strip()
|
||||
if attribute.startswith("${") and attribute.endswith("}"):
|
||||
variable = attribute[2:-1].split(":")
|
||||
if variable[0] in os.environ.keys():
|
||||
return os.environ[variable[0]]
|
||||
else:
|
||||
if len(variable) > 1:
|
||||
return variable[1]
|
||||
else:
|
||||
if env_error:
|
||||
raise ValueError(
|
||||
f"Variable {variable[0]} not found in environment"
|
||||
)
|
||||
else:
|
||||
return ""
|
||||
elif (
|
||||
attribute.startswith("file:")
|
||||
and Path(parent / attribute.split(":")[1]).exists()
|
||||
):
|
||||
with open(parent / attribute.split(":")[1], "r") as f:
|
||||
items = json.load(f)
|
||||
if isinstance(items, list):
|
||||
return [Prompty.normalize(value, parent) for value in items]
|
||||
elif isinstance(items, dict):
|
||||
return {
|
||||
key: Prompty.normalize(value, parent)
|
||||
for key, value in items.items()
|
||||
}
|
||||
else:
|
||||
return items
|
||||
else:
|
||||
return attribute
|
||||
elif isinstance(attribute, list):
|
||||
return [Prompty.normalize(value, parent) for value in attribute]
|
||||
elif isinstance(attribute, dict):
|
||||
return {
|
||||
key: Prompty.normalize(value, parent)
|
||||
for key, value in attribute.items()
|
||||
}
|
||||
else:
|
||||
return attribute
|
||||
|
||||
|
||||
def param_hoisting(
|
||||
top: Dict[str, Any], bottom: Dict[str, Any], top_key: Any = None
|
||||
) -> Dict[str, Any]:
|
||||
if top_key:
|
||||
new_dict = {**top[top_key]} if top_key in top else {}
|
||||
else:
|
||||
new_dict = {**top}
|
||||
for key, value in bottom.items():
|
||||
if key not in new_dict:
|
||||
new_dict[key] = value
|
||||
return new_dict
|
||||
|
||||
|
||||
class Invoker(abc.ABC):
|
||||
def __init__(self, prompty: Prompty) -> None:
|
||||
self.prompty = prompty
|
||||
|
||||
@abc.abstractmethod
|
||||
def invoke(self, data: BaseModel) -> BaseModel:
|
||||
pass
|
||||
|
||||
def __call__(self, data: BaseModel) -> BaseModel:
|
||||
return self.invoke(data)
|
||||
|
||||
|
||||
class NoOpParser(Invoker):
|
||||
def invoke(self, data: BaseModel) -> BaseModel:
|
||||
return data
|
||||
|
||||
|
||||
class InvokerFactory(object):
|
||||
_instance = None
|
||||
_renderers: Dict[str, Type[Invoker]] = {}
|
||||
_parsers: Dict[str, Type[Invoker]] = {}
|
||||
_executors: Dict[str, Type[Invoker]] = {}
|
||||
_processors: Dict[str, Type[Invoker]] = {}
|
||||
|
||||
def __new__(cls) -> InvokerFactory:
|
||||
if cls._instance is None:
|
||||
cls._instance = super(InvokerFactory, cls).__new__(cls)
|
||||
# Add NOOP invokers
|
||||
cls._renderers["NOOP"] = NoOpParser
|
||||
cls._parsers["NOOP"] = NoOpParser
|
||||
cls._executors["NOOP"] = NoOpParser
|
||||
cls._processors["NOOP"] = NoOpParser
|
||||
return cls._instance
|
||||
|
||||
def register(
|
||||
self,
|
||||
type: Literal["renderer", "parser", "executor", "processor"],
|
||||
name: str,
|
||||
invoker: Type[Invoker],
|
||||
) -> None:
|
||||
if type == "renderer":
|
||||
self._renderers[name] = invoker
|
||||
elif type == "parser":
|
||||
self._parsers[name] = invoker
|
||||
elif type == "executor":
|
||||
self._executors[name] = invoker
|
||||
elif type == "processor":
|
||||
self._processors[name] = invoker
|
||||
else:
|
||||
raise ValueError(f"Invalid type {type}")
|
||||
|
||||
def register_renderer(self, name: str, renderer_class: Any) -> None:
|
||||
self.register("renderer", name, renderer_class)
|
||||
|
||||
def register_parser(self, name: str, parser_class: Any) -> None:
|
||||
self.register("parser", name, parser_class)
|
||||
|
||||
def register_executor(self, name: str, executor_class: Any) -> None:
|
||||
self.register("executor", name, executor_class)
|
||||
|
||||
def register_processor(self, name: str, processor_class: Any) -> None:
|
||||
self.register("processor", name, processor_class)
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
type: Literal["renderer", "parser", "executor", "processor"],
|
||||
name: str,
|
||||
prompty: Prompty,
|
||||
data: BaseModel,
|
||||
) -> Any:
|
||||
if type == "renderer":
|
||||
return self._renderers[name](prompty)(data)
|
||||
elif type == "parser":
|
||||
return self._parsers[name](prompty)(data)
|
||||
elif type == "executor":
|
||||
return self._executors[name](prompty)(data)
|
||||
elif type == "processor":
|
||||
return self._processors[name](prompty)(data)
|
||||
else:
|
||||
raise ValueError(f"Invalid type {type}")
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"renderers": {
|
||||
k: f"{v.__module__}.{v.__name__}" for k, v in self._renderers.items()
|
||||
},
|
||||
"parsers": {
|
||||
k: f"{v.__module__}.{v.__name__}" for k, v in self._parsers.items()
|
||||
},
|
||||
"executors": {
|
||||
k: f"{v.__module__}.{v.__name__}" for k, v in self._executors.items()
|
||||
},
|
||||
"processors": {
|
||||
k: f"{v.__module__}.{v.__name__}" for k, v in self._processors.items()
|
||||
},
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self.to_dict())
|
||||
|
||||
|
||||
class Frontmatter:
|
||||
_yaml_delim = r"(?:---|\+\+\+)"
|
||||
_yaml = r"(.*?)"
|
||||
_content = r"\s*(.+)$"
|
||||
_re_pattern = r"^\s*" + _yaml_delim + _yaml + _yaml_delim + _content
|
||||
_regex = re.compile(_re_pattern, re.S | re.M)
|
||||
|
||||
@classmethod
|
||||
def read_file(cls, path: str) -> dict[str, Any]:
|
||||
"""Reads file at path and returns dict with separated frontmatter.
|
||||
See read() for more info on dict return value.
|
||||
"""
|
||||
with open(path, encoding="utf-8") as file:
|
||||
file_contents = file.read()
|
||||
return cls.read(file_contents)
|
||||
|
||||
@classmethod
|
||||
def read(cls, string: str) -> dict[str, Any]:
|
||||
"""Returns dict with separated frontmatter from string.
|
||||
|
||||
Returned dict keys:
|
||||
attributes -- extracted YAML attributes in dict form.
|
||||
body -- string contents below the YAML separators
|
||||
frontmatter -- string representation of YAML
|
||||
"""
|
||||
fmatter = ""
|
||||
body = ""
|
||||
result = cls._regex.search(string)
|
||||
|
||||
if result:
|
||||
fmatter = result.group(1)
|
||||
body = result.group(2)
|
||||
return {
|
||||
"attributes": yaml.load(fmatter, Loader=yaml.FullLoader),
|
||||
"body": body,
|
||||
"frontmatter": fmatter,
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
|
||||
from langchain_core.runnables import Runnable, RunnableLambda
|
||||
|
||||
from .utils import load, prepare
|
||||
|
||||
|
||||
def create_chat_prompt(
|
||||
path: str, input_name_agent_scratchpad: str = "agent_scratchpad"
|
||||
) -> Runnable[Dict[str, Any], ChatPromptTemplate]:
|
||||
def runnable_chat_lambda(inputs: Dict[str, Any]) -> ChatPromptTemplate:
|
||||
p = load(path)
|
||||
parsed = prepare(p, inputs)
|
||||
lc_messages = []
|
||||
for message in parsed:
|
||||
lc_messages.append((message["role"], message["content"]))
|
||||
|
||||
lc_messages.append(
|
||||
MessagesPlaceholder(
|
||||
variable_name=input_name_agent_scratchpad, optional=True
|
||||
) # type: ignore[arg-type]
|
||||
)
|
||||
lc_p = ChatPromptTemplate.from_messages(lc_messages)
|
||||
lc_p = lc_p.partial(**p.inputs)
|
||||
|
||||
return lc_p
|
||||
|
||||
return RunnableLambda(runnable_chat_lambda)
|
@ -0,0 +1,108 @@
|
||||
import base64
|
||||
import re
|
||||
from typing import List, Union
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .core import Invoker, Prompty, SimpleModel
|
||||
|
||||
|
||||
class PromptyChatParser(Invoker):
|
||||
def __init__(self, prompty: Prompty) -> None:
|
||||
self.prompty = prompty
|
||||
self.roles = ["assistant", "function", "system", "user", "human", "ai"]
|
||||
self.path = self.prompty.file.parent
|
||||
|
||||
def inline_image(self, image_item: str) -> str:
|
||||
# pass through if it's a url or base64 encoded
|
||||
if image_item.startswith("http") or image_item.startswith("data"):
|
||||
return image_item
|
||||
# otherwise, it's a local file - need to base64 encode it
|
||||
else:
|
||||
image_path = self.path / image_item
|
||||
with open(image_path, "rb") as f:
|
||||
base64_image = base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
if image_path.suffix == ".png":
|
||||
return f"data:image/png;base64,{base64_image}"
|
||||
elif image_path.suffix == ".jpg":
|
||||
return f"data:image/jpeg;base64,{base64_image}"
|
||||
elif image_path.suffix == ".jpeg":
|
||||
return f"data:image/jpeg;base64,{base64_image}"
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid image format {image_path.suffix} - currently only .png "
|
||||
"and .jpg / .jpeg are supported."
|
||||
)
|
||||
|
||||
def parse_content(self, content: str) -> Union[str, List]:
|
||||
"""for parsing inline images"""
|
||||
# regular expression to parse markdown images
|
||||
image = r"(?P<alt>!\[[^\]]*\])\((?P<filename>.*?)(?=\"|\))\)"
|
||||
matches = re.findall(image, content, flags=re.MULTILINE)
|
||||
if len(matches) > 0:
|
||||
content_items = []
|
||||
content_chunks = re.split(image, content, flags=re.MULTILINE)
|
||||
current_chunk = 0
|
||||
for i in range(len(content_chunks)):
|
||||
# image entry
|
||||
if (
|
||||
current_chunk < len(matches)
|
||||
and content_chunks[i] == matches[current_chunk][0]
|
||||
):
|
||||
content_items.append(
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": self.inline_image(
|
||||
matches[current_chunk][1].split(" ")[0].strip()
|
||||
)
|
||||
},
|
||||
}
|
||||
)
|
||||
# second part of image entry
|
||||
elif (
|
||||
current_chunk < len(matches)
|
||||
and content_chunks[i] == matches[current_chunk][1]
|
||||
):
|
||||
current_chunk += 1
|
||||
# text entry
|
||||
else:
|
||||
if len(content_chunks[i].strip()) > 0:
|
||||
content_items.append(
|
||||
{"type": "text", "text": content_chunks[i].strip()}
|
||||
)
|
||||
return content_items
|
||||
else:
|
||||
return content
|
||||
|
||||
def invoke(self, data: BaseModel) -> BaseModel:
|
||||
assert isinstance(data, SimpleModel)
|
||||
messages = []
|
||||
separator = r"(?i)^\s*#?\s*(" + "|".join(self.roles) + r")\s*:\s*\n"
|
||||
|
||||
# get valid chunks - remove empty items
|
||||
chunks = [
|
||||
item
|
||||
for item in re.split(separator, data.item, flags=re.MULTILINE)
|
||||
if len(item.strip()) > 0
|
||||
]
|
||||
|
||||
# if no starter role, then inject system role
|
||||
if chunks[0].strip().lower() not in self.roles:
|
||||
chunks.insert(0, "system")
|
||||
|
||||
# if last chunk is role entry, then remove (no content?)
|
||||
if chunks[-1].strip().lower() in self.roles:
|
||||
chunks.pop()
|
||||
|
||||
if len(chunks) % 2 != 0:
|
||||
raise ValueError("Invalid prompt format")
|
||||
|
||||
# create messages
|
||||
for i in range(0, len(chunks), 2):
|
||||
role = chunks[i].strip().lower()
|
||||
content = chunks[i + 1].strip()
|
||||
messages.append({"role": role, "content": self.parse_content(content)})
|
||||
|
||||
return SimpleModel[list](item=messages)
|
@ -0,0 +1,14 @@
|
||||
from langchain_core.utils import mustache
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .core import Invoker, Prompty, SimpleModel
|
||||
|
||||
|
||||
class MustacheRenderer(Invoker):
|
||||
def __init__(self, prompty: Prompty) -> None:
|
||||
self.prompty = prompty
|
||||
|
||||
def invoke(self, data: BaseModel) -> BaseModel:
|
||||
assert isinstance(data, SimpleModel)
|
||||
generated = mustache.render(self.prompty.content, data.item)
|
||||
return SimpleModel[str](item=generated)
|
@ -0,0 +1,207 @@
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from .core import (
|
||||
Frontmatter,
|
||||
InvokerFactory,
|
||||
ModelSettings,
|
||||
Prompty,
|
||||
PropertySettings,
|
||||
SimpleModel,
|
||||
TemplateSettings,
|
||||
param_hoisting,
|
||||
)
|
||||
|
||||
|
||||
def load(prompt_path: str, configuration: str = "default") -> Prompty:
|
||||
file_path = Path(prompt_path)
|
||||
if not file_path.is_absolute():
|
||||
# get caller's path (take into account trace frame)
|
||||
caller = Path(traceback.extract_stack()[-3].filename)
|
||||
file_path = Path(caller.parent / file_path).resolve().absolute()
|
||||
|
||||
# load dictionary from prompty file
|
||||
matter = Frontmatter.read_file(file_path.__fspath__())
|
||||
attributes = matter["attributes"]
|
||||
content = matter["body"]
|
||||
|
||||
# normalize attribute dictionary resolve keys and files
|
||||
attributes = Prompty.normalize(attributes, file_path.parent)
|
||||
|
||||
# load global configuration
|
||||
if "model" not in attributes:
|
||||
attributes["model"] = {}
|
||||
|
||||
# pull model settings out of attributes
|
||||
try:
|
||||
model = ModelSettings(**attributes.pop("model"))
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error in model settings: {e}")
|
||||
|
||||
# pull template settings
|
||||
try:
|
||||
if "template" in attributes:
|
||||
t = attributes.pop("template")
|
||||
if isinstance(t, dict):
|
||||
template = TemplateSettings(**t)
|
||||
# has to be a string denoting the type
|
||||
else:
|
||||
template = TemplateSettings(type=t, parser="prompty")
|
||||
else:
|
||||
template = TemplateSettings(type="mustache", parser="prompty")
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error in template loader: {e}")
|
||||
|
||||
# formalize inputs and outputs
|
||||
if "inputs" in attributes:
|
||||
try:
|
||||
inputs = {
|
||||
k: PropertySettings(**v) for (k, v) in attributes.pop("inputs").items()
|
||||
}
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error in inputs: {e}")
|
||||
else:
|
||||
inputs = {}
|
||||
if "outputs" in attributes:
|
||||
try:
|
||||
outputs = {
|
||||
k: PropertySettings(**v) for (k, v) in attributes.pop("outputs").items()
|
||||
}
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error in outputs: {e}")
|
||||
else:
|
||||
outputs = {}
|
||||
|
||||
# recursive loading of base prompty
|
||||
if "base" in attributes:
|
||||
# load the base prompty from the same directory as the current prompty
|
||||
base = load(file_path.parent / attributes["base"])
|
||||
# hoist the base prompty's attributes to the current prompty
|
||||
model.api = base.model.api if model.api == "" else model.api
|
||||
model.configuration = param_hoisting(
|
||||
model.configuration, base.model.configuration
|
||||
)
|
||||
model.parameters = param_hoisting(model.parameters, base.model.parameters)
|
||||
model.response = param_hoisting(model.response, base.model.response)
|
||||
attributes["sample"] = param_hoisting(attributes, base.sample, "sample")
|
||||
|
||||
p = Prompty(
|
||||
**attributes,
|
||||
model=model,
|
||||
inputs=inputs,
|
||||
outputs=outputs,
|
||||
template=template,
|
||||
content=content,
|
||||
file=file_path,
|
||||
basePrompty=base,
|
||||
)
|
||||
else:
|
||||
p = Prompty(
|
||||
**attributes,
|
||||
model=model,
|
||||
inputs=inputs,
|
||||
outputs=outputs,
|
||||
template=template,
|
||||
content=content,
|
||||
file=file_path,
|
||||
)
|
||||
return p
|
||||
|
||||
|
||||
def prepare(
|
||||
prompt: Prompty,
|
||||
inputs: Dict[str, Any] = {},
|
||||
) -> Any:
|
||||
invoker = InvokerFactory()
|
||||
|
||||
inputs = param_hoisting(inputs, prompt.sample)
|
||||
|
||||
if prompt.template.type == "NOOP":
|
||||
render = prompt.content
|
||||
else:
|
||||
# render
|
||||
result = invoker(
|
||||
"renderer",
|
||||
prompt.template.type,
|
||||
prompt,
|
||||
SimpleModel(item=inputs),
|
||||
)
|
||||
render = result.item
|
||||
|
||||
if prompt.template.parser == "NOOP":
|
||||
result = render
|
||||
else:
|
||||
# parse
|
||||
result = invoker(
|
||||
"parser",
|
||||
f"{prompt.template.parser}.{prompt.model.api}",
|
||||
prompt,
|
||||
SimpleModel(item=result.item),
|
||||
)
|
||||
|
||||
if isinstance(result, SimpleModel):
|
||||
return result.item
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
def run(
|
||||
prompt: Prompty,
|
||||
content: Union[Dict, List, str],
|
||||
configuration: Dict[str, Any] = {},
|
||||
parameters: Dict[str, Any] = {},
|
||||
raw: bool = False,
|
||||
) -> Any:
|
||||
invoker = InvokerFactory()
|
||||
|
||||
if configuration != {}:
|
||||
prompt.model.configuration = param_hoisting(
|
||||
configuration, prompt.model.configuration
|
||||
)
|
||||
|
||||
if parameters != {}:
|
||||
prompt.model.parameters = param_hoisting(parameters, prompt.model.parameters)
|
||||
|
||||
# execute
|
||||
result = invoker(
|
||||
"executor",
|
||||
prompt.model.configuration["type"],
|
||||
prompt,
|
||||
SimpleModel(item=content),
|
||||
)
|
||||
|
||||
# skip?
|
||||
if not raw:
|
||||
# process
|
||||
result = invoker(
|
||||
"processor",
|
||||
prompt.model.configuration["type"],
|
||||
prompt,
|
||||
result,
|
||||
)
|
||||
|
||||
if isinstance(result, SimpleModel):
|
||||
return result.item
|
||||
else:
|
||||
return result
|
||||
|
||||
|
||||
def execute(
|
||||
prompt: Union[str, Prompty],
|
||||
configuration: Dict[str, Any] = {},
|
||||
parameters: Dict[str, Any] = {},
|
||||
inputs: Dict[str, Any] = {},
|
||||
raw: bool = False,
|
||||
connection: str = "default",
|
||||
) -> Any:
|
||||
if isinstance(prompt, str):
|
||||
prompt = load(prompt, connection)
|
||||
|
||||
# prepare content
|
||||
content = prepare(prompt, inputs)
|
||||
|
||||
# run LLM model
|
||||
result = run(prompt, content, configuration, parameters, raw)
|
||||
|
||||
return result
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,95 @@
|
||||
[tool.poetry]
|
||||
name = "langchain-prompty"
|
||||
version = "0.0.1"
|
||||
description = "An integration package connecting Prompty and LangChain"
|
||||
authors = []
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/langchain-ai/langchain"
|
||||
license = "MIT"
|
||||
|
||||
[tool.poetry.urls]
|
||||
"Source Code" = "https://github.com/langchain-ai/langchain/tree/master/libs/partners/prompty"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.8.1,<4.0"
|
||||
langchain-core = "^0.1.52"
|
||||
pyyaml = "^6.0.1"
|
||||
types-pyyaml = "^6.0.12.20240311"
|
||||
|
||||
[tool.poetry.group.test]
|
||||
optional = true
|
||||
|
||||
[tool.poetry.group.test.dependencies]
|
||||
pytest = "^7.3.0"
|
||||
freezegun = "^1.2.2"
|
||||
pytest-mock = "^3.10.0"
|
||||
syrupy = "^4.0.2"
|
||||
pytest-watcher = "^0.3.4"
|
||||
pytest-asyncio = "^0.21.1"
|
||||
langchain-core = { path = "../../core", develop = true }
|
||||
langchain = {path = "../../langchain", develop = true}
|
||||
|
||||
[tool.poetry.group.codespell]
|
||||
optional = true
|
||||
|
||||
[tool.poetry.group.codespell.dependencies]
|
||||
codespell = "^2.2.0"
|
||||
|
||||
[tool.poetry.group.test_integration]
|
||||
optional = true
|
||||
|
||||
[tool.poetry.group.test_integration.dependencies]
|
||||
|
||||
[tool.poetry.group.lint]
|
||||
optional = true
|
||||
|
||||
[tool.poetry.group.lint.dependencies]
|
||||
ruff = "^0.1.5"
|
||||
|
||||
[tool.poetry.group.typing.dependencies]
|
||||
mypy = "^0.991"
|
||||
langchain-core = { path = "../../core", develop = true }
|
||||
|
||||
[tool.poetry.group.dev]
|
||||
optional = true
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
langchain-core = { path = "../../core", develop = true }
|
||||
types-pyyaml = "^6.0.12.20240311"
|
||||
|
||||
[tool.ruff]
|
||||
select = [
|
||||
"E", # pycodestyle
|
||||
"F", # pyflakes
|
||||
"I", # isort
|
||||
]
|
||||
|
||||
[tool.mypy]
|
||||
disallow_untyped_defs = "True"
|
||||
|
||||
[tool.coverage.run]
|
||||
omit = ["tests/*"]
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
# --strict-markers will raise errors on unknown marks.
|
||||
# https://docs.pytest.org/en/7.1.x/how-to/mark.html#raising-errors-on-unknown-marks
|
||||
#
|
||||
# https://docs.pytest.org/en/7.1.x/reference/reference.html
|
||||
# --strict-config any warnings encountered while parsing the `pytest`
|
||||
# section of the configuration file raise errors.
|
||||
#
|
||||
# https://github.com/tophat/syrupy
|
||||
# --snapshot-warn-unused Prints a warning on unused snapshots rather than fail the test suite.
|
||||
addopts = "--snapshot-warn-unused --strict-markers --strict-config --durations=5"
|
||||
# Registering custom markers.
|
||||
# https://docs.pytest.org/en/7.1.x/example/markers.html#registering-markers
|
||||
markers = [
|
||||
"requires: mark tests as requiring a specific library",
|
||||
"asyncio: mark tests as requiring asyncio",
|
||||
"compile: mark placeholder test used to compile integration tests without running them",
|
||||
]
|
||||
asyncio_mode = "auto"
|
@ -0,0 +1,17 @@
|
||||
import sys
|
||||
import traceback
|
||||
from importlib.machinery import SourceFileLoader
|
||||
|
||||
if __name__ == "__main__":
|
||||
files = sys.argv[1:]
|
||||
has_failure = False
|
||||
for file in files:
|
||||
try:
|
||||
SourceFileLoader("x", file).load_module()
|
||||
except Exception:
|
||||
has_faillure = True
|
||||
print(file)
|
||||
traceback.print_exc()
|
||||
print()
|
||||
|
||||
sys.exit(1 if has_failure else 0)
|
@ -0,0 +1,27 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# This script searches for lines starting with "import pydantic" or "from pydantic"
|
||||
# in tracked files within a Git repository.
|
||||
#
|
||||
# Usage: ./scripts/check_pydantic.sh /path/to/repository
|
||||
|
||||
# Check if a path argument is provided
|
||||
if [ $# -ne 1 ]; then
|
||||
echo "Usage: $0 /path/to/repository"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
repository_path="$1"
|
||||
|
||||
# Search for lines matching the pattern within the specified repository
|
||||
result=$(git -C "$repository_path" grep -E '^import pydantic|^from pydantic')
|
||||
|
||||
# Check if any matching lines were found
|
||||
if [ -n "$result" ]; then
|
||||
echo "ERROR: The following lines need to be updated:"
|
||||
echo "$result"
|
||||
echo "Please replace the code with an import from langchain_core.pydantic_v1."
|
||||
echo "For example, replace 'from pydantic import BaseModel'"
|
||||
echo "with 'from langchain_core.pydantic_v1 import BaseModel'"
|
||||
exit 1
|
||||
fi
|
@ -0,0 +1,17 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -eu
|
||||
|
||||
# Initialize a variable to keep track of errors
|
||||
errors=0
|
||||
|
||||
# make sure not importing from langchain or langchain_experimental
|
||||
git --no-pager grep '^from langchain\.' . && errors=$((errors+1))
|
||||
git --no-pager grep '^from langchain_experimental\.' . && errors=$((errors+1))
|
||||
|
||||
# Decide on an exit status based on the errors
|
||||
if [ "$errors" -gt 0 ]; then
|
||||
exit 1
|
||||
else
|
||||
exit 0
|
||||
fi
|
@ -0,0 +1,7 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.compile
|
||||
def test_placeholder() -> None:
|
||||
"""Used for compiling integration tests without running any real tests."""
|
||||
pass
|
@ -0,0 +1,395 @@
|
||||
"""A fake callback handler for testing purposes."""
|
||||
|
||||
from itertools import chain
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from uuid import UUID
|
||||
|
||||
from langchain_core.callbacks import AsyncCallbackHandler, BaseCallbackHandler
|
||||
from langchain_core.messages import BaseMessage
|
||||
from langchain_core.pydantic_v1 import BaseModel
|
||||
|
||||
|
||||
class BaseFakeCallbackHandler(BaseModel):
|
||||
"""Base fake callback handler for testing."""
|
||||
|
||||
starts: int = 0
|
||||
ends: int = 0
|
||||
errors: int = 0
|
||||
text: int = 0
|
||||
ignore_llm_: bool = False
|
||||
ignore_chain_: bool = False
|
||||
ignore_agent_: bool = False
|
||||
ignore_retriever_: bool = False
|
||||
ignore_chat_model_: bool = False
|
||||
|
||||
# to allow for similar callback handlers that are not technicall equal
|
||||
fake_id: Union[str, None] = None
|
||||
|
||||
# add finer-grained counters for easier debugging of failing tests
|
||||
chain_starts: int = 0
|
||||
chain_ends: int = 0
|
||||
llm_starts: int = 0
|
||||
llm_ends: int = 0
|
||||
llm_streams: int = 0
|
||||
tool_starts: int = 0
|
||||
tool_ends: int = 0
|
||||
agent_actions: int = 0
|
||||
agent_ends: int = 0
|
||||
chat_model_starts: int = 0
|
||||
retriever_starts: int = 0
|
||||
retriever_ends: int = 0
|
||||
retriever_errors: int = 0
|
||||
retries: int = 0
|
||||
|
||||
input_prompts: List[str] = []
|
||||
|
||||
|
||||
class BaseFakeCallbackHandlerMixin(BaseFakeCallbackHandler):
|
||||
"""Base fake callback handler mixin for testing."""
|
||||
|
||||
def on_llm_start_common(self) -> None:
|
||||
self.llm_starts += 1
|
||||
self.starts += 1
|
||||
|
||||
def on_llm_end_common(self) -> None:
|
||||
self.llm_ends += 1
|
||||
self.ends += 1
|
||||
|
||||
def on_llm_error_common(self) -> None:
|
||||
self.errors += 1
|
||||
|
||||
def on_llm_new_token_common(self) -> None:
|
||||
self.llm_streams += 1
|
||||
|
||||
def on_retry_common(self) -> None:
|
||||
self.retries += 1
|
||||
|
||||
def on_chain_start_common(self) -> None:
|
||||
self.chain_starts += 1
|
||||
self.starts += 1
|
||||
|
||||
def on_chain_end_common(self) -> None:
|
||||
self.chain_ends += 1
|
||||
self.ends += 1
|
||||
|
||||
def on_chain_error_common(self) -> None:
|
||||
self.errors += 1
|
||||
|
||||
def on_tool_start_common(self) -> None:
|
||||
self.tool_starts += 1
|
||||
self.starts += 1
|
||||
|
||||
def on_tool_end_common(self) -> None:
|
||||
self.tool_ends += 1
|
||||
self.ends += 1
|
||||
|
||||
def on_tool_error_common(self) -> None:
|
||||
self.errors += 1
|
||||
|
||||
def on_agent_action_common(self) -> None:
|
||||
self.agent_actions += 1
|
||||
self.starts += 1
|
||||
|
||||
def on_agent_finish_common(self) -> None:
|
||||
self.agent_ends += 1
|
||||
self.ends += 1
|
||||
|
||||
def on_chat_model_start_common(self) -> None:
|
||||
self.chat_model_starts += 1
|
||||
self.starts += 1
|
||||
|
||||
def on_text_common(self) -> None:
|
||||
self.text += 1
|
||||
|
||||
def on_retriever_start_common(self) -> None:
|
||||
self.starts += 1
|
||||
self.retriever_starts += 1
|
||||
|
||||
def on_retriever_end_common(self) -> None:
|
||||
self.ends += 1
|
||||
self.retriever_ends += 1
|
||||
|
||||
def on_retriever_error_common(self) -> None:
|
||||
self.errors += 1
|
||||
self.retriever_errors += 1
|
||||
|
||||
|
||||
class FakeCallbackHandler(BaseCallbackHandler, BaseFakeCallbackHandlerMixin):
|
||||
"""Fake callback handler for testing."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.input_prompts = []
|
||||
|
||||
@property
|
||||
def ignore_llm(self) -> bool:
|
||||
"""Whether to ignore LLM callbacks."""
|
||||
return self.ignore_llm_
|
||||
|
||||
@property
|
||||
def ignore_chain(self) -> bool:
|
||||
"""Whether to ignore chain callbacks."""
|
||||
return self.ignore_chain_
|
||||
|
||||
@property
|
||||
def ignore_agent(self) -> bool:
|
||||
"""Whether to ignore agent callbacks."""
|
||||
return self.ignore_agent_
|
||||
|
||||
@property
|
||||
def ignore_retriever(self) -> bool:
|
||||
"""Whether to ignore retriever callbacks."""
|
||||
return self.ignore_retriever_
|
||||
|
||||
def on_llm_start(
|
||||
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
|
||||
) -> Any:
|
||||
self.input_prompts = prompts
|
||||
self.on_llm_start_common()
|
||||
|
||||
def on_llm_new_token(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_llm_new_token_common()
|
||||
|
||||
def on_llm_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_llm_end_common()
|
||||
|
||||
def on_llm_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_llm_error_common()
|
||||
|
||||
def on_retry(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_retry_common()
|
||||
|
||||
def on_chain_start(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_chain_start_common()
|
||||
|
||||
def on_chain_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_chain_end_common()
|
||||
|
||||
def on_chain_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_chain_error_common()
|
||||
|
||||
def on_tool_start(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_tool_start_common()
|
||||
|
||||
def on_tool_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_tool_end_common()
|
||||
|
||||
def on_tool_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_tool_error_common()
|
||||
|
||||
def on_agent_action(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_agent_action_common()
|
||||
|
||||
def on_agent_finish(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_agent_finish_common()
|
||||
|
||||
def on_text(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_text_common()
|
||||
|
||||
def on_retriever_start(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_retriever_start_common()
|
||||
|
||||
def on_retriever_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_retriever_end_common()
|
||||
|
||||
def on_retriever_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_retriever_error_common()
|
||||
|
||||
def __deepcopy__(self, memo: dict) -> "FakeCallbackHandler":
|
||||
return self
|
||||
|
||||
|
||||
class FakeCallbackHandlerWithChatStart(FakeCallbackHandler):
|
||||
def on_chat_model_start(
|
||||
self,
|
||||
serialized: Dict[str, Any],
|
||||
messages: List[List[BaseMessage]],
|
||||
*,
|
||||
run_id: UUID,
|
||||
parent_run_id: Optional[UUID] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
assert all(isinstance(m, BaseMessage) for m in chain(*messages))
|
||||
self.on_chat_model_start_common()
|
||||
|
||||
|
||||
class FakeAsyncCallbackHandler(AsyncCallbackHandler, BaseFakeCallbackHandlerMixin):
|
||||
"""Fake async callback handler for testing."""
|
||||
|
||||
@property
|
||||
def ignore_llm(self) -> bool:
|
||||
"""Whether to ignore LLM callbacks."""
|
||||
return self.ignore_llm_
|
||||
|
||||
@property
|
||||
def ignore_chain(self) -> bool:
|
||||
"""Whether to ignore chain callbacks."""
|
||||
return self.ignore_chain_
|
||||
|
||||
@property
|
||||
def ignore_agent(self) -> bool:
|
||||
"""Whether to ignore agent callbacks."""
|
||||
return self.ignore_agent_
|
||||
|
||||
async def on_retry(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
self.on_retry_common()
|
||||
|
||||
async def on_llm_start(
|
||||
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
|
||||
) -> None:
|
||||
self.on_llm_start_common()
|
||||
|
||||
async def on_llm_new_token(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_llm_new_token_common()
|
||||
|
||||
async def on_llm_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_llm_end_common()
|
||||
|
||||
async def on_llm_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_llm_error_common()
|
||||
|
||||
async def on_chain_start(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_chain_start_common()
|
||||
|
||||
async def on_chain_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_chain_end_common()
|
||||
|
||||
async def on_chain_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_chain_error_common()
|
||||
|
||||
async def on_tool_start(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_tool_start_common()
|
||||
|
||||
async def on_tool_end(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_tool_end_common()
|
||||
|
||||
async def on_tool_error(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_tool_error_common()
|
||||
|
||||
async def on_agent_action(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_agent_action_common()
|
||||
|
||||
async def on_agent_finish(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_agent_finish_common()
|
||||
|
||||
async def on_text(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.on_text_common()
|
||||
|
||||
def __deepcopy__(self, memo: dict) -> "FakeAsyncCallbackHandler":
|
||||
return self
|
@ -0,0 +1,44 @@
|
||||
"""Fake Chat Model wrapper for testing purposes."""
|
||||
import json
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from langchain_core.callbacks import (
|
||||
AsyncCallbackManagerForLLMRun,
|
||||
CallbackManagerForLLMRun,
|
||||
)
|
||||
from langchain_core.language_models.chat_models import SimpleChatModel
|
||||
from langchain_core.messages import AIMessage, BaseMessage
|
||||
from langchain_core.outputs import ChatGeneration, ChatResult
|
||||
|
||||
|
||||
class FakeEchoPromptChatModel(SimpleChatModel):
|
||||
"""Fake Chat Model wrapper for testing purposes."""
|
||||
|
||||
def _call(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[CallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
return json.dumps([message.dict() for message in messages])
|
||||
|
||||
async def _agenerate(
|
||||
self,
|
||||
messages: List[BaseMessage],
|
||||
stop: Optional[List[str]] = None,
|
||||
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> ChatResult:
|
||||
output_str = "fake response 2"
|
||||
message = AIMessage(content=output_str)
|
||||
generation = ChatGeneration(message=message)
|
||||
return ChatResult(generations=[generation])
|
||||
|
||||
@property
|
||||
def _llm_type(self) -> str:
|
||||
return "fake-echo-prompt-chat-model"
|
||||
|
||||
@property
|
||||
def _identifying_params(self) -> Dict[str, Any]:
|
||||
return {"key": "fake"}
|
@ -0,0 +1,43 @@
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
from langchain.agents import AgentOutputParser
|
||||
from langchain_core.agents import AgentAction, AgentFinish
|
||||
|
||||
|
||||
def extract_action_details(text: str) -> Tuple[Optional[str], Optional[str]]:
|
||||
# Split the text into lines and strip whitespace
|
||||
lines = [line.strip() for line in text.strip().split("\n")]
|
||||
|
||||
# Initialize variables to hold the extracted values
|
||||
action = None
|
||||
action_input = None
|
||||
|
||||
# Iterate through the lines to find and extract the desired information
|
||||
for line in lines:
|
||||
if line.startswith("Action:"):
|
||||
action = line.split(":", 1)[1].strip()
|
||||
elif line.startswith("Action Input:"):
|
||||
action_input = line.split(":", 1)[1].strip()
|
||||
|
||||
return action, action_input
|
||||
|
||||
|
||||
class FakeOutputParser(AgentOutputParser):
|
||||
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
|
||||
print("FakeOutputParser", text)
|
||||
action, input = extract_action_details(text)
|
||||
|
||||
if action:
|
||||
log = f"\nInvoking: `{action}` with `{input}"
|
||||
|
||||
return AgentAction(tool=action, tool_input=(input or ""), log=log)
|
||||
elif "Final Answer" in text:
|
||||
return AgentFinish({"output": text}, text)
|
||||
|
||||
return AgentAction(
|
||||
"Intermediate Answer", "after_colon", "Final Answer: This should end"
|
||||
)
|
||||
|
||||
@property
|
||||
def _type(self) -> str:
|
||||
return "self_ask"
|
@ -0,0 +1,28 @@
|
||||
---
|
||||
name: Basic Prompt
|
||||
description: A basic prompt that uses the GPT-3 chat API to answer questions
|
||||
authors:
|
||||
- author_1
|
||||
- author_2
|
||||
model:
|
||||
api: chat
|
||||
configuration:
|
||||
azure_deployment: gpt-35-turbo
|
||||
sample:
|
||||
firstName: Jane
|
||||
lastName: Doe
|
||||
question: What is the meaning of life?
|
||||
chat_history: []
|
||||
---
|
||||
system:
|
||||
You are an AI assistant who helps people find information.
|
||||
As the assistant, you answer questions briefly, succinctly,
|
||||
and in a personable manner using markdown and even add some personal flair with appropriate emojis.
|
||||
|
||||
{{#chat_history}}
|
||||
{{role}}:
|
||||
{{content}}
|
||||
{{/chat_history}}
|
||||
|
||||
user:
|
||||
{{input}}
|
@ -0,0 +1,32 @@
|
||||
---
|
||||
name: Basic Prompt
|
||||
description: A basic prompt that uses the GPT-3 chat API to answer questions
|
||||
authors:
|
||||
- author_1
|
||||
- author_2
|
||||
model:
|
||||
api: chat
|
||||
configuration:
|
||||
azure_deployment: gpt-35-turbo
|
||||
sample:
|
||||
firstName: Jane
|
||||
lastName: Doe
|
||||
input: What is the meaning of life?
|
||||
chat_history: []
|
||||
---
|
||||
system:
|
||||
You are an AI assistant who helps people find information.
|
||||
As the assistant, you answer questions briefly, succinctly,
|
||||
and in a personable manner using markdown and even add some personal flair with appropriate emojis.
|
||||
|
||||
# Customer
|
||||
You are helping {{firstName}} {{lastName}} to find answers to their questions.
|
||||
Use their name to address them in your responses.
|
||||
|
||||
{{#chat_history}}
|
||||
{{type}}:
|
||||
{{content}}
|
||||
{{/chat_history}}
|
||||
|
||||
user:
|
||||
{{input}}
|
@ -0,0 +1,7 @@
|
||||
from langchain_prompty import __all__
|
||||
|
||||
EXPECTED_ALL = ["create_chat_prompt"]
|
||||
|
||||
|
||||
def test_all_imports() -> None:
|
||||
assert sorted(EXPECTED_ALL) == sorted(__all__)
|
@ -0,0 +1,165 @@
|
||||
import json
|
||||
import os
|
||||
from typing import List, Tuple
|
||||
|
||||
from langchain.agents.format_scratchpad import format_to_openai_function_messages
|
||||
from langchain.tools import tool
|
||||
from langchain_core.language_models import FakeListLLM
|
||||
from langchain_core.messages import AIMessage, HumanMessage
|
||||
from langchain_core.pydantic_v1 import BaseModel, Field
|
||||
from langchain_core.utils.function_calling import convert_to_openai_function
|
||||
|
||||
import langchain_prompty
|
||||
|
||||
from .fake_callback_handler import FakeCallbackHandler
|
||||
from .fake_chat_model import FakeEchoPromptChatModel
|
||||
from .fake_output_parser import FakeOutputParser
|
||||
|
||||
prompty_folder_relative = "./prompts/"
|
||||
# Get the directory of the current script
|
||||
current_script_dir = os.path.dirname(__file__)
|
||||
|
||||
# Combine the current script directory with the relative path
|
||||
prompty_folder = os.path.abspath(
|
||||
os.path.join(current_script_dir, prompty_folder_relative)
|
||||
)
|
||||
|
||||
|
||||
def test_prompty_basic_chain() -> None:
|
||||
prompt = langchain_prompty.create_chat_prompt(f"{prompty_folder}/chat.prompty")
|
||||
model = FakeEchoPromptChatModel()
|
||||
chain = prompt | model
|
||||
|
||||
parsed_prompts = chain.invoke(
|
||||
{
|
||||
"firstName": "fakeFirstName",
|
||||
"lastName": "fakeLastName",
|
||||
"input": "fakeQuestion",
|
||||
}
|
||||
)
|
||||
|
||||
if isinstance(parsed_prompts.content, str):
|
||||
msgs = json.loads(str(parsed_prompts.content))
|
||||
else:
|
||||
msgs = parsed_prompts.content
|
||||
|
||||
print(msgs)
|
||||
|
||||
assert len(msgs) == 2
|
||||
# Test for system and user entries
|
||||
system_message = msgs[0]
|
||||
user_message = msgs[1]
|
||||
|
||||
# Check the types of the messages
|
||||
assert (
|
||||
system_message["type"] == "system"
|
||||
), "The first message should be of type 'system'."
|
||||
assert (
|
||||
user_message["type"] == "human"
|
||||
), "The second message should be of type 'human'."
|
||||
|
||||
# Test for existence of fakeFirstName and fakeLastName in the system message
|
||||
assert (
|
||||
"fakeFirstName" in system_message["content"]
|
||||
), "The string 'fakeFirstName' should be in the system message content."
|
||||
assert (
|
||||
"fakeLastName" in system_message["content"]
|
||||
), "The string 'fakeLastName' should be in the system message content."
|
||||
|
||||
# Test for existence of fakeQuestion in the user message
|
||||
assert (
|
||||
"fakeQuestion" in user_message["content"]
|
||||
), "The string 'fakeQuestion' should be in the user message content."
|
||||
|
||||
|
||||
def test_prompty_used_in_agent() -> None:
|
||||
prompt = langchain_prompty.create_chat_prompt(f"{prompty_folder}/chat.prompty")
|
||||
tool_name = "search"
|
||||
responses = [
|
||||
f"FooBarBaz\nAction: {tool_name}\nAction Input: fakeSearch",
|
||||
"Oh well\nFinal Answer: fakefinalresponse",
|
||||
]
|
||||
callbackHandler = FakeCallbackHandler()
|
||||
llm = FakeListLLM(responses=responses, callbacks=[callbackHandler])
|
||||
|
||||
@tool
|
||||
def search(query: str) -> str:
|
||||
"""Look up things."""
|
||||
return "FakeSearchResponse"
|
||||
|
||||
tools = [search]
|
||||
llm_with_tools = llm.bind(functions=[convert_to_openai_function(t) for t in tools])
|
||||
|
||||
agent = (
|
||||
{ # type: ignore[var-annotated]
|
||||
"firstName": lambda x: x["firstName"],
|
||||
"lastName": lambda x: x["lastName"],
|
||||
"input": lambda x: x["input"],
|
||||
"chat_history": lambda x: x["chat_history"],
|
||||
"agent_scratchpad": lambda x: (
|
||||
format_to_openai_function_messages(x["intermediate_steps"])
|
||||
if "intermediate_steps" in x
|
||||
else []
|
||||
),
|
||||
}
|
||||
| prompt
|
||||
| llm_with_tools
|
||||
| FakeOutputParser()
|
||||
)
|
||||
|
||||
from langchain.agents import AgentExecutor
|
||||
|
||||
class AgentInput(BaseModel):
|
||||
input: str
|
||||
chat_history: List[Tuple[str, str]] = Field(
|
||||
...,
|
||||
extra={"widget": {"type": "chat", "input": "input", "output": "output"}},
|
||||
)
|
||||
|
||||
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True).with_types(
|
||||
input_type=AgentInput # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
agent_executor.invoke(
|
||||
{
|
||||
"firstName": "fakeFirstName",
|
||||
"lastName": "fakeLastName",
|
||||
"input": "fakeQuestion",
|
||||
"chat_history": [
|
||||
AIMessage(content="chat_history_1_ai"),
|
||||
HumanMessage(content="chat_history_1_human"),
|
||||
],
|
||||
}
|
||||
)
|
||||
print(callbackHandler)
|
||||
input_prompt = callbackHandler.input_prompts[0]
|
||||
|
||||
# Test for existence of fakeFirstName and fakeLastName in the system message
|
||||
assert "fakeFirstName" in input_prompt
|
||||
assert "fakeLastName" in input_prompt
|
||||
assert "chat_history_1_ai" in input_prompt
|
||||
assert "chat_history_1_human" in input_prompt
|
||||
assert "fakeQuestion" in input_prompt
|
||||
assert "fakeSearch" in input_prompt
|
||||
|
||||
|
||||
def test_all_prompty_can_run() -> None:
|
||||
exclusions = ["embedding.prompty", "groundedness.prompty"]
|
||||
|
||||
prompty_files = [
|
||||
f
|
||||
for f in os.listdir(prompty_folder)
|
||||
if os.path.isfile(os.path.join(prompty_folder, f))
|
||||
and f.endswith(".prompty")
|
||||
and f not in exclusions
|
||||
]
|
||||
|
||||
for file in prompty_files:
|
||||
file_path = os.path.join(prompty_folder, file)
|
||||
|
||||
print(f"==========\nTesting Prompty file: {file_path}")
|
||||
prompt = langchain_prompty.create_chat_prompt(file_path)
|
||||
model = FakeEchoPromptChatModel()
|
||||
chain = prompt | model
|
||||
output = chain.invoke({})
|
||||
print(f"{file_path}, {output}")
|
Loading…
Reference in New Issue