Все мы проходили через это: скармливаешь RAG‑системе сложный PDF на 50 страниц, а она в ответ либо галлюцинирует, либо вываливает на LLM простыню нерелевантного текста, съедая ваш бюджет на токены быстрее, чем вы успеваете сказать «GPT-4o». Проблема в том, что классический подход со статическим top_k — это костыль, который либо не додает контекста, либо вызывает у модели информационное «ожирение» (заполняет контекст нерелевантным мусором). Нашему RAG нужно помочь адаптироваться к безжалостной среде разрозненных документов!
Я потратил выходные на то, чтобы решить эту проблему фундаментально. В итоге на свет появился DRAG with KNEE (Dynamic RAG with Knee‑point pruning) — алгоритм, который не просто ищет «похожее», а выстраивает иерархию документов и безжалостно отсекает лишнее с помощью геометрического анализа «колена». В этой статье я покажу, как с помощью Qdrant, Python и капли математики сделать ваш RAG адаптивным.
Лекарственная форма: Иерархическое дерево (Hierarchical Vector Tree). В отличие от «таблеток‑пустышек» обычного RAG, действующее вещество распределено по узлам: от широких корней (summary документов) до концентрированных листьев (конкретных страниц). Состав (Действующие вещества): Vector‑Dense Complex: для глубокого понимания семантики. Sparse‑Keyword Extract: для точно поиска по ключевым словам. RRF‑Enzymes: ферменты слияния, которые переваривают результаты разных поисков в единый ранжированный список. Механизм действия (Фармакодинамика): После введения запроса алгоритм запускает «дышащий луч» (Adaptive Beam). Он прогрызает путь от корней к листьям, попутно подавляя избыточные родительские узлы, если их «дети» оказались эффективнее. Способ применения (Алгоритм отсечения): В критической точке поиска применяется метод Колена. Система строит график релевантности и производит хирургический надрез в месте максимального изгиба кривой. Всё, что ниже колена — считается шумом и выводится из организма (контекста) естественным путем.
Форма выпуска: Попробовать лекарство можно на моей странице github.Перед применением ознакомьтесь с инструкцией!
Инструкция по применению DRAG with KNEE
Архитектура
Индексация
Для того, чтобы жить в доме, сначала его нужно построить (или купить, если вы очень богаты). В моем случае домом выступает векторная база данных Qdrant, в которой строится фундамент в виде иерархических деревьев документов. Так как система предназначалась для PDF документов, которые можно легко распарсить постранично, был выбран следующий алгоритм построения иерархических деревьев:
-
Документ разбивается на страницы, для каждой страницы LLM формирует сжатое смысловое представление и извлекает ключевые слова. Все конечно же через структурированный вывод, чтобы избежать галлюцинаций. Получившиеся описания с ключевыми словами формируют листья дерева.
-
Листья объединяются в узлы (количество узлов для объединения можно варьировать, но чтобы избежать сильного сжатия смысла и при этом не получать деревья большой глубины я бы выбирал этот параметр равным 3–5). На основе объединенных описаний и ключевых слов строится описание родительского узла (что‑то подобное можно увидеть в алгоритме RAPTOR), также происходит наследование ключевых слов — ключевые слова детей в полном составе передаются родителю.
-
Узлы объединяются до тех пор, пока не останется один корневой узел, который получает уникальный идентификатор в своих метаданных —
parent_id=-1. Так все корни легко и просто могут участвовать в стартовом поиске, который определит, контекст каких документов будет в дальнейшем уточняться нашей чудо‑таблеткой.
В конечном итоге в Qdrant коллекции у нас получится прекрасный плоский список точек, которым можно легко и просто манипулировать с помощью фильтров по метаданным.
Алгоритм поиска
Самое интересное это то, как все же нам использовать так удачно построенные деревья? Мне в голову пришло 3 алгоритма. Во всех методах точки сравниваются друг с другом с помощью гибридного поиска с объединением результатов через RRF — Reciprocal Rank Fusion, значение которого рассчитываются по формуле:
здесь — наши документы,
— алгоритмы ранжирования (dense и sparse, например),
— коэффициент, который обычно равен 60.
И так, к описанию алгоритмов поиска:
-
Находим корни деревьев и идём по каждому дереву отдельно: на каждой ветке сравниваем родителя с его детьми. Если ребёнок оказывается лучше родителя — спускаемся к нему. Если нет — фиксируем родителя как конечную точку ветки. Я бы сказал, что этот алгоритм может быть более полным, чем остальные три, но его проблема — куча мусора. Если вдруг в самом начале или даже в середине было найдено что‑то нерелевантное (будь проклят этот
top_k!), то данный алгоритм будет тащить этот мусор до самого конца, и в большинстве своем это будет целый полностью нерелевантный документ. Но в общем и целом данный алгоритм имеет право на существование, если вопрос стоит только в полноте найденной информации. Вы можете протестировать данный алгоритм через функциюbranch_search.
Код алгоритма branch_search
def parent_vs_children(query: str, parent: ScoredPoint) -> list[ScoredPoint]:
file_name = parent.payload["file_name"]
parent_id = parent.payload["id"]
child_ids = parent.payload["child_ids"]
all_ids = [parent_id] + child_ids
if len(child_ids) == 0:
console.print(
f"Parent with id {parent_id} is leave!", style="bold underline violet"
)
return []
console.print()
console.print("#" * 20, style="red")
console.print(
f"Children (ids: {child_ids}) [underline]VS[/underline] Parent (id: {parent_id}) [ FILE: {file_name} ]nFor query: {query[:20]}...",
style="bold violet",
)
with RAGalicClient() as client: # артефакты из предыдущей жизни :)
dense_model: str = client.client.embedding_model_name # type: ignore
sparse_model: str = client.client.sparse_embedding_model_name # type: ignore
children_and_parent_filter = Filter(
must=FieldCondition(key="id", match=MatchAny(any=all_ids))
)
sorted_points = client.client.query_points(
collection_name="ragalic",
prefetch=[
Prefetch(
query=Document(text=query, model=dense_model),
using="dense",
limit=len(all_ids),
filter=children_and_parent_filter,
),
Prefetch(
query=Document(text=query, model=sparse_model),
using="sparse",
limit=len(all_ids),
filter=children_and_parent_filter,
),
],
query=FusionQuery(fusion=Fusion.RRF),
query_filter=children_and_parent_filter,
limit=len(all_ids),
).points
children_better_then_parent = []
for point in sorted_points:
if point.payload["id"] == parent_id:
break
children_better_then_parent.append(point)
ids = [child.payload["id"] for child in children_better_then_parent]
console.print(
f"Child ids which is better than parent: {ids if ids else 'N/A'}",
style="italic purple",
)
console.print("#" * 20, style="red")
console.print()
return children_better_then_parent
def branch_search(query: str, num_roots: int = 3) -> list[Chunk]:
roots = find_roots(query=query, num_to_find=num_roots)
final_points = []
points_to_process = roots
while len(points_to_process) > 0:
new_point_to_process = []
for point in points_to_process:
new_points = parent_vs_children(query=query, parent=point)
if len(new_points) == 0:
console.print(
f"--- NEW FINAL POINT (id: {point.payload['id']} | file: {point.payload['file_name']} | pages: {point.payload['page_start']} - {point.payload['page_end']}) ---",
style="bold green",
)
final_points.append(point)
else:
new_point_to_process.extend(new_points)
points_to_process = new_point_to_process
console.print(
f"--- WAS FOUND {len(final_points)} POINTS FOR QUERY: '{query[:20]}...' ---",
style="bold green on white",
)
return prepare_chunks(final_points)
Применение branch_search
from rag_lib.search import branch_search
results = branch_search(query="ваш_запрос", num_roots=3)
for chunk in results:
print(f"{chunk.file_name} (страницы {chunk.page_start}-{chunk.page_end}):n{chunk.text[:200]}...")
-
Отличие от первого подхода в том, что мы делаем поиск внутри не изолированным, а сравниваем всех со всеми (поколение родителей с предыдущей итерации сражается со всеми своими и чужими детьми, ведь друг друга то они уже победили на прошлой итерации). Получаем алгоритм поиска по лучу (
beam_searchв режиме работыfixed). При этом используется фиксированная ширина луча, что является недостатком данного подхода. Сужение луча происходит по следующей логике:-
Если ребенок победил отца → отец исключается из рассмотрения на уточнение контекста (то есть более узкие домены информации оказались более релевантными к запросу).
-
Если отец победил всех детей → все его дети исключаются из рассмотрения на уточнение контекста (то есть более узкие домены информации оказались менее релевантными для запроса).
-
Все выжившие поборются друг с другом за место в луче.
Таким образом мы получаем то, что алгоритм в течении поиска может отбросить наиболее нерелевантную информацию. Это сэкономит вам большое количество токенов контекста. Казалось бы — ПРОФИТ! Но возникает высокая вероятность того, что все же наш лучик света принесет не только тепло, но и лишний контекст (мы же зафиксировали
top_k!) или наоборот — вполне релевантная информация может выпасть за границы нашего луча (мы же зафиксировалиtop_k!). Оба варианта плохи, поэтому была предложена следующая оптимизация, которая сломает все наши проблемы через «колено». -
-
Идея: «А давайте избавимся от
top_k! Пусть алгоритм сам решает, какую он хочет ширину луча, не маленький все же». Для реализации столь очевидной идеи нам на помощь приходит не столь очевидный алгоритм поиска «колена» (или «локтя», если так удобнее). Суть довольно проста — найти максимально удаленную точку графика от хорды этого графика. Но у этого метода есть важное ограничение — функция, к которой он применяется, обязательно должна быть выпукла. Неужели это камень преткновения, который мы не сможем преодолеть? Конечно, нет! Ведь нам повезло и гибридный поиск с RRF как раз имеет выпуклую вниз форму, что идеально подходит для того, чтобы оценивать важность узлов. На Рисунке 1 изображены RRF Score точек релевантных случайному запросу, под документами здесь подразумеваются «корни иерархических деревьев». Можно видеть, что мы легким движением колена смогли отбросить 62% нерелевантых документов. Протестировать этот алгоритм вы можете с помощью функцииbeam_searchв режимеadaptive_with_knee. Таким образом мы получаем поиск по лучу с динамической шириной, который сам решает, что релевантно, а что твоей LLM лучше не видеть. При реализации этого алгоритма я столкнулся с логической ошибкой, от которой потом проснулся ночью с тахикардией — очень важно при генерации луча на новой итерации сначала применять алгоритм «колена», а потом реализовывать логику сжатия! Почему? Потому что в данном алгоритме реализован глупый метод остановки — если луч на текущей и предыдущей итерациях не изменился, тогда поиск можно остановить. При такой постановке задачи, если сначала реализовывать логику сужения, а потом обрезать «хвосты» — мы получим то, что алгоритм будет почти всегда выдавать 2 финальные точки в луче (к которым уже нельзя применить алгоритм «колена»), что даже хуже чемtop_k.В целом уже неплохо, но этот алгоритм можно сделать еще более гибким — мы можем сделать алгоритм «колена» менее чувствительным с помощью параметра, который будет определять то, как агрессивно будут отрубаться «хвосты». Этот параметр может задаваться в пределах от 0 до 1 (при 0 вы всегда будете получать целые документы в количестве равном максимальному количеству релевантных корней
max_num_roots, при 1 мы получим стандартный алгоритм «колена»). Этот параметр можно настраивать под ваши файлы — если файлы схожих тематик, то скорее всего можно отсекать документы менее агрессивно, если файлы очень разнородны, то наоборот. Протестировать эту реализацию алгоритма вы можете с помощью функцииbeam_searchв режимеadaptive_with_sensitive_knee.
Реализация алгоритма beam_search с тремя режимами работы
def check_ids(old_ids: list, new_ids: list):
return set(old_ids) == set(new_ids)
def cut_knee(points: list[ScoredPoint]) -> list[ScoredPoint]:
"""
Отсекает 'хвост' результатов, находя точку максимального изгиба (колено)
на графике RRF-скоров. Нормированная версия, которая учитывает разные масштабы по осям.
"""
n_points = len(points)
if n_points <= 2:
return points
scores = np.array([p.score for p in points])
x = np.arange(n_points)
y = scores
x_norm = (x - x.min()) / (x.max() - x.min())
y_range = y.max() - y.min()
if y_range == 0:
return points
y_norm = (y - y.min()) / y_range
p1 = np.array([x_norm[0], y_norm[0]])
p2 = np.array([x_norm[-1], y_norm[-1]])
line_vec = p2 - p1
line_vec_norm = line_vec / np.linalg.norm(line_vec)
points_vec = np.vstack([x_norm - p1[0], y_norm - p1[1]]).T
scalar_product = (
points_vec[:, 0] * line_vec_norm[1] - points_vec[:, 1] * line_vec_norm[0]
)
distances = np.abs(scalar_product)
knee_idx = np.argmax(distances)
return points[: knee_idx + 1]
def cut_knee_flexible(
points: list[ScoredPoint], sensitivity: float = 0.5
) -> list[ScoredPoint]:
"""
Отсекает 'хвост' результатов, находя точку максимального изгиба (колено)
на графике RRF-скоров. Нормированная версия, которая учитывает разные масштабы по осям и позволяет более гибко настраивать чувствительность среза.
"""
n_points = len(points)
if n_points <= 2:
return points
scores = np.array([p.score for p in points])
x = np.arange(n_points)
y = scores
x_norm = (x - x.min()) / (x.max() - x.min())
y_range = y.max() - y.min()
if y_range == 0:
return points
y_norm = (y - y.min()) / y_range
p1 = np.array([x_norm[0], y_norm[0]])
p2 = np.array([x_norm[-1], y_norm[-1]])
line_vec = p2 - p1
line_vec_norm = line_vec / np.linalg.norm(line_vec)
points_vec = np.vstack([x_norm - p1[0], y_norm - p1[1]]).T
scalar_product = (
points_vec[:, 0] * line_vec_norm[1] - points_vec[:, 1] * line_vec_norm[0]
)
distances = np.abs(scalar_product)
knee_idx = np.argmax(distances)
max_dist = np.max(distances)
threshold = max_dist * sensitivity
indices_above_threshold = np.where(distances >= threshold)[0]
knee_idx = indices_above_threshold[-1]
return points[: knee_idx + 1]
def parents_vs_children(
query: str,
parents: list[ScoredPoint],
width: int = 3,
search_method: BEAM_SEARCH_METHODS = "fixed",
sensitivity: float = 0.85,
) -> list[ScoredPoint]:
task_meta = {
"file_names": [],
"parent_ids": [],
"child_ids": [],
}
for point in parents:
task_meta["file_names"].append(point.payload["file_name"])
task_meta["parent_ids"].append(point.payload["id"])
task_meta["child_ids"].extend(point.payload["child_ids"])
task_meta["file_names"] = list(set(task_meta["file_names"]))
task_meta["parent_ids"] = list(set(task_meta["parent_ids"]))
task_meta["child_ids"] = list(set(task_meta["child_ids"]))
if len(task_meta["child_ids"]) == 0:
console.print(
f"Parents with ids ({task_meta['parent_ids']}) is all leaves!",
style="bold underline violet",
)
return parents
console.print("")
console.print("#" * 20, style="red")
console.print(
f"Running [underline]ALL VS ALL[/underline] for:nParent IDs: {task_meta['parent_ids']} | Files: {task_meta['file_names']}",
style="bold violet",
)
all_ids = task_meta["parent_ids"] + task_meta["child_ids"]
with RAGalicClient() as client:
dense_model: str = client.client.embedding_model_name # type: ignore
sparse_model: str = client.client.sparse_embedding_model_name # type: ignore
all_vs_all_filter = Filter(
must=FieldCondition(key="id", match=MatchAny(any=all_ids))
)
sorted_points = client.client.query_points(
collection_name="ragalic",
prefetch=[
Prefetch(
query=Document(text=query, model=dense_model),
using="dense",
limit=len(all_ids),
filter=all_vs_all_filter,
),
Prefetch(
query=Document(text=query, model=sparse_model),
using="sparse",
limit=len(all_ids),
filter=all_vs_all_filter,
),
],
query=FusionQuery(fusion=Fusion.RRF),
query_filter=all_vs_all_filter,
limit=len(all_ids),
).points
children_to_eliminate = []
parents_to_eliminate = []
new_top_k = []
if search_method == "adaptive_with_knee":
console.print(
f"Applying KNEE method to cut the tail of results. Initial candidates: {len(sorted_points)}",
style="italic bright_black",
)
sorted_points = cut_knee(sorted_points)
console.print(
f"Candidates after KNEE cut: {len(sorted_points)}",
style="italic bright_black",
)
elif search_method == "adaptive_with_sensitive_knee":
console.print(
f"Applying SENSITIVE KNEE method to cut the tail of results with sensitivity {sensitivity}. Initial candidates: {len(sorted_points)}",
style="italic bright_black",
)
sorted_points = cut_knee_flexible(sorted_points, sensitivity=sensitivity)
console.print(
f"Candidates after SENSITIVE KNEE cut: {len(sorted_points)}",
style="italic bright_black",
)
for point in sorted_points:
p_id = point.payload["id"]
# Логика исключения (Suppression)
if p_id in task_meta["child_ids"] and p_id not in children_to_eliminate:
# Если ребенок лучше родителя — родителю тут не место
parents_to_eliminate.append(point.payload["parent_id"])
new_top_k.append(point)
elif p_id in task_meta["parent_ids"] and p_id not in parents_to_eliminate:
# Если родитель лучше детей — убираем его детей из пула
children_to_eliminate.extend(point.payload["child_ids"])
new_top_k.append(point)
if search_method == "fixed" and len(new_top_k) >= width:
break
new_top_k_meta = [
f"(ID: {point.payload['id']} | FILE: {point.payload['file_name']} | PAGES: {point.payload['page_start']} - {point.payload['page_end']})"
for point in new_top_k
]
console.print(
f"IDs: {task_meta['parent_ids']} | Files: {task_meta['file_names']} [OLD TOP]",
style="italic purple",
)
console.print("|nY", style="bold red")
console.print(f"{', '.join(new_top_k_meta)} [NEW TOP]", style="italic purple")
console.print("#" * 20, style="red")
console.print("")
return new_top_k
def beam_search(
query: str,
beam_width: int = 3,
search_method: BEAM_SEARCH_METHODS = "fixed",
max_num_roots: int = 20,
sensitivity: float = 0.5,
) -> list[Chunk]:
if search_method == "fixed":
console.print(
f"Running BEAM SEARCH with [underline]FIXED[/underline] width: {beam_width}",
style="bold cyan",
)
old_points = find_roots(query=query, num_to_find=beam_width)
elif (
search_method == "adaptive_with_knee"
or search_method == "adaptive_with_sensitive_knee"
):
if search_method == "adaptive_with_sensitive_knee":
console.print(
f"Running BEAM SEARCH with [underline]ADAPTIVE[/underline] width using [underline]SENSITIVE KNEE[/underline] method with sensitivity {sensitivity}",
style="bold cyan",
)
else:
console.print(
f"Running BEAM SEARCH with [underline]ADAPTIVE[/underline] width using [underline]KNEE[/underline] method",
style="bold cyan",
)
with RAGalicClient() as client:
root_filter = Filter(
must=[
FieldCondition(key="parent_id", match=MatchValue(value=-1)),
]
)
all_root_points_num = client.client.count(
collection_name="ragalic", count_filter=root_filter
).count
all_root_points_num = min(all_root_points_num, max_num_roots)
console.print(
f"Total root points available: {all_root_points_num}",
style="italic bright_black",
)
old_points = find_roots(query=query, num_to_find=all_root_points_num)
console.print(
f"Initial root points retrieved: {len(old_points)}",
style="italic bright_black",
)
if search_method == "adaptive_with_knee":
old_points = cut_knee(old_points)
console.print(
f"Root points after KNEE cut: {len(old_points)}",
style="italic bright_black",
)
elif search_method == "adaptive_with_sensitive_knee":
old_points = cut_knee_flexible(old_points, sensitivity=sensitivity)
console.print(
f"Root points after KNEE cut: {len(old_points)}",
style="italic bright_black",
)
new_points = []
old_ids = [point.payload["id"] for point in old_points]
new_ids = []
while not check_ids(old_ids=old_ids, new_ids=new_ids):
old_ids = [point.payload["id"] for point in old_points]
new_points = parents_vs_children(
query=query,
parents=old_points,
width=beam_width,
search_method=search_method,
sensitivity=sensitivity,
)
old_points = new_points
new_ids = [point.payload["id"] for point in new_points]
console.print(
f"--- WAS FOUND {len(new_points)} POINTS FOR QUERY: '{query[:20]}...' ---",
style="bold green on white",
)
return prepare_chunks(new_points)
Применение beam_search
from rag_lib.search import beam_search
# Фиксированная ширина луча
results = beam_search(query="ваш_запрос", beam_width=3, search_method="fixed")
# Адаптивная ширина с автоматическим определением через knee-метод
results = beam_search(
query="ваш_запрос",
search_method="adaptive_with_knee",
max_num_roots=20 # максимальное количество корневых узлов для анализа
)
# Адаптивная ширина с настраиваемой чувствительностью knee-метода
results = beam_search(
query="ваш_запрос",
search_method="adaptive_with_sensitive_knee",
sensitivity=0.85, # чувствительность knee-отсечения (0.0-1.0)
max_num_roots=20
)
Заключение
Небольшая серия экспериментов показала, что поиск по лучу с адаптивной шириной лучше справляется с поиском релевантной информации, чем луч с фиксированной шириной поиска. Он расширяется, когда это необходимо для контекста, и сужается, когда необходимо очистить мусор (ну прям как живой!).
В следующей части статьи будут приведены тесты на различных бенчмарках (предлагайте в комментариях). Также предлагайте идеи и делитесь своими тестами в комментариях (буду рад почитать!).
Открыт вашим PR на github, будем развивать адаптивный RAG вместе!
Автор: gptctrlc


