You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
langchain/libs/text-splitters/langchain_text_splitters/markdown.py

224 lines
9.0 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Tuple, TypedDict
from langchain_core.documents import Document
from langchain_text_splitters.base import Language
from langchain_text_splitters.character import RecursiveCharacterTextSplitter
class MarkdownTextSplitter(RecursiveCharacterTextSplitter):
"""Attempts to split the text along Markdown-formatted headings."""
def __init__(self, **kwargs: Any) -> None:
"""Initialize a MarkdownTextSplitter."""
separators = self.get_separators_for_language(Language.MARKDOWN)
super().__init__(separators=separators, **kwargs)
class MarkdownHeaderTextSplitter:
"""Splitting markdown files based on specified headers."""
def __init__(
self,
headers_to_split_on: List[Tuple[str, str]],
return_each_line: bool = False,
strip_headers: bool = True,
):
"""Create a new MarkdownHeaderTextSplitter.
Args:
headers_to_split_on: Headers we want to track
return_each_line: Return each line w/ associated headers
strip_headers: Strip split headers from the content of the chunk
"""
# Output line-by-line or aggregated into chunks w/ common headers
self.return_each_line = return_each_line
# Given the headers we want to split on,
# (e.g., "#, ##, etc") order by length
self.headers_to_split_on = sorted(
headers_to_split_on, key=lambda split: len(split[0]), reverse=True
)
# Strip headers split headers from the content of the chunk
self.strip_headers = strip_headers
def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Document]:
"""Combine lines with common metadata into chunks
Args:
lines: Line of text / associated header metadata
"""
aggregated_chunks: List[LineType] = []
for line in lines:
if (
aggregated_chunks
and aggregated_chunks[-1]["metadata"] == line["metadata"]
):
# If the last line in the aggregated list
# has the same metadata as the current line,
# append the current content to the last lines's content
aggregated_chunks[-1]["content"] += " \n" + line["content"]
elif (
aggregated_chunks
and aggregated_chunks[-1]["metadata"] != line["metadata"]
# may be issues if other metadata is present
and len(aggregated_chunks[-1]["metadata"]) < len(line["metadata"])
and aggregated_chunks[-1]["content"].split("\n")[-1][0] == "#"
and not self.strip_headers
):
# If the last line in the aggregated list
# has different metadata as the current line,
# and has shallower header level than the current line,
# and the last line is a header,
# and we are not stripping headers,
# append the current content to the last line's content
aggregated_chunks[-1]["content"] += " \n" + line["content"]
# and update the last line's metadata
aggregated_chunks[-1]["metadata"] = line["metadata"]
else:
# Otherwise, append the current line to the aggregated list
aggregated_chunks.append(line)
return [
Document(page_content=chunk["content"], metadata=chunk["metadata"])
for chunk in aggregated_chunks
]
def split_text(self, text: str) -> List[Document]:
"""Split markdown file
Args:
text: Markdown file"""
# Split the input text by newline character ("\n").
lines = text.split("\n")
# Final output
lines_with_metadata: List[LineType] = []
# Content and metadata of the chunk currently being processed
current_content: List[str] = []
current_metadata: Dict[str, str] = {}
# Keep track of the nested header structure
# header_stack: List[Dict[str, Union[int, str]]] = []
header_stack: List[HeaderType] = []
initial_metadata: Dict[str, str] = {}
in_code_block = False
opening_fence = ""
for line in lines:
stripped_line = line.strip()
# Remove all non-printable characters from the string, keeping only visible
# text.
stripped_line = "".join(filter(str.isprintable, stripped_line))
if not in_code_block:
# Exclude inline code spans
if stripped_line.startswith("```") and stripped_line.count("```") == 1:
in_code_block = True
opening_fence = "```"
elif stripped_line.startswith("~~~"):
in_code_block = True
opening_fence = "~~~"
else:
if stripped_line.startswith(opening_fence):
in_code_block = False
opening_fence = ""
if in_code_block:
current_content.append(stripped_line)
continue
# Check each line against each of the header types (e.g., #, ##)
for sep, name in self.headers_to_split_on:
# Check if line starts with a header that we intend to split on
if stripped_line.startswith(sep) and (
# Header with no text OR header is followed by space
# Both are valid conditions that sep is being used a header
len(stripped_line) == len(sep) or stripped_line[len(sep)] == " "
):
# Ensure we are tracking the header as metadata
if name is not None:
# Get the current header level
current_header_level = sep.count("#")
# Pop out headers of lower or same level from the stack
while (
header_stack
and header_stack[-1]["level"] >= current_header_level
):
# We have encountered a new header
# at the same or higher level
popped_header = header_stack.pop()
# Clear the metadata for the
# popped header in initial_metadata
if popped_header["name"] in initial_metadata:
initial_metadata.pop(popped_header["name"])
# Push the current header to the stack
header: HeaderType = {
"level": current_header_level,
"name": name,
"data": stripped_line[len(sep) :].strip(),
}
header_stack.append(header)
# Update initial_metadata with the current header
initial_metadata[name] = header["data"]
# Add the previous line to the lines_with_metadata
# only if current_content is not empty
if current_content:
lines_with_metadata.append(
{
"content": "\n".join(current_content),
"metadata": current_metadata.copy(),
}
)
current_content.clear()
if not self.strip_headers:
current_content.append(stripped_line)
break
else:
if stripped_line:
current_content.append(stripped_line)
elif current_content:
lines_with_metadata.append(
{
"content": "\n".join(current_content),
"metadata": current_metadata.copy(),
}
)
current_content.clear()
current_metadata = initial_metadata.copy()
if current_content:
lines_with_metadata.append(
{"content": "\n".join(current_content), "metadata": current_metadata}
)
# lines_with_metadata has each line with associated header metadata
# aggregate these into chunks based on common metadata
if not self.return_each_line:
return self.aggregate_lines_to_chunks(lines_with_metadata)
else:
return [
Document(page_content=chunk["content"], metadata=chunk["metadata"])
for chunk in lines_with_metadata
]
class LineType(TypedDict):
"""Line type as typed dict."""
metadata: Dict[str, str]
content: str
class HeaderType(TypedDict):
"""Header type as typed dict."""
level: int
name: str
data: str