podcast-mindmap/scripts/match_answers.py
Dotty Dotter c5489eabaa #16/#17 match_answers.py und match_claims.py: Cross-Episode-Matching via Embeddings + Qwen
scripts/match_answers.py (#17):
- Laedt offene Fragen (genuine, follow_up; answered='no').
- Embedded jede Frage und sucht den besten Kandidat-Absatz aus einer anderen
  Episode (optional cross-podcast) per Cosinus-Aehnlichkeit ueber die paragraph-
  embeddings.
- Bei score >= 0.55: Qwen-Verifikation 'Beantwortet B die Frage in A?'
  (yes/partial/no), bei yes/partial wird answered + answered_by_* in der
  questions-Tabelle gesetzt.
- Hard-Budget 1,50 USD, --rerun setzt bestehende Matches neu.

scripts/match_claims.py (#16 Stufe 2):
- Analoge Mechanik fuer claims: Embedding, Cosinus-Suche, Qwen-Verifikation
  in der vier-stufigen Skala 'belegt' / 'widerspricht' / 'erweitert' / 'kein_bezug'.
- Schreibt Treffer (ohne 'kein_bezug') in neue Tabelle claim_matches.
- Default nur verifizierbare Claims (--include-non-verifiable kippt das),
  --cross-podcast erlaubt Cross-Podcast-Treffer.

Beide Skripte nutzen json_utils.parse_llm_json fuer robustes Parsing und sind
gegen NaN-Vektoren in den Embeddings abgesichert.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 02:21:49 +02:00

272 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""#17 Frage-Antwort-Asymmetrie: matche unbeantwortete Fragen gegen Antworten in anderen Episoden.
Vorgehen:
1. Lade Fragen mit answered='no' und question_type in ('genuine', 'follow_up').
2. Embedde jede Frage (text-embedding-v3) und suche per Cosinus-Aehnlichkeit den
besten Kandidaten in den vorhandenen paragraph-embeddings (cross-episode, optional
cross-podcast).
3. Bei score >= MIN_SCORE: Qwen-Verifikation "Beantwortet Absatz B die Frage in A?"
4. Wenn yes/partial: questions.answered + answered_by_podcast/episode/idx setzen.
Nutzung:
DASHSCOPE_API_KEY=... python3 match_answers.py [db-pfad] [limit]
Optionen:
--cross-podcast erlaubt Antworten in anderen Podcasts
--rerun Fragen erneut bearbeiten, auch wenn schon ein answered_by-Wert steht
"""
import json
import os
import sqlite3
import sys
import time
from pathlib import Path
import numpy as np
from openai import OpenAI
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from json_utils import parse_llm_json
# Backend-Helfer wiederverwenden (Pfad hinzufuegen)
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "backend"))
from database import get_all_embeddings # noqa: E402
DB_PATH = sys.argv[1] if len(sys.argv) > 1 else "data/db.sqlite"
LIMIT = int(sys.argv[2]) if len(sys.argv) > 2 and not sys.argv[2].startswith("--") else 500
CROSS_PODCAST = "--cross-podcast" in sys.argv
RERUN = "--rerun" in sys.argv
API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")
BASE_URL = "https://dashscope-intl.aliyuncs.com/compatible-mode/v1"
EMBED_MODEL = "text-embedding-v3"
LLM_MODEL = "qwen-plus"
EMBED_BATCH = 6
MIN_SCORE = 0.55
HARD_BUDGET_USD = 1.50
COST_IN = 0.0008 / 1000
COST_OUT = 0.002 / 1000
SYSTEM_PROMPT = """Du erhaeltst eine Frage aus einem Podcast und einen Kandidat-Absatz aus einer anderen Episode.
Pruefe: Beantwortet der Kandidat-Absatz die Frage konkret?
Antworte NUR mit JSON:
{"answers": "yes" | "partial" | "no", "reason": "1 Satz Begruendung"}
- "yes": der Absatz beantwortet die Frage direkt und vollstaendig.
- "partial": der Absatz nennt einen Teilaspekt oder eine verwandte Position, aber nicht die volle Antwort.
- "no": der Absatz beantwortet die Frage nicht (auch wenn er thematisch verwandt ist)."""
def setup_db(db):
db.executescript("""
CREATE INDEX IF NOT EXISTS idx_questions_answered ON questions(podcast_id, answered);
""")
def fetch_open_questions(db):
where = "WHERE q.answered='no' AND q.question_type IN ('genuine','follow_up')"
if not RERUN:
where += " AND (q.answered_by_episode IS NULL OR q.answered_by_episode = '')"
rows = db.execute(f"""
SELECT q.id, q.podcast_id, q.episode_id, q.paragraph_idx, q.question_text, q.question_type,
p.text AS para_text, e.title AS ep_title, e.guest AS ep_guest
FROM questions q
JOIN paragraphs p ON q.podcast_id=p.podcast_id AND q.episode_id=p.episode_id AND q.paragraph_idx=p.idx
JOIN episodes e ON q.podcast_id=e.podcast_id AND q.episode_id=e.id
{where}
ORDER BY q.id
LIMIT ?
""", (LIMIT,)).fetchall()
return rows
def embed_batch(client, texts):
resp = client.embeddings.create(model=EMBED_MODEL, input=texts, dimensions=1024)
return [item.embedding for item in resp.data]
def best_candidate(q_vec, vectors, meta, exclude_podcast, exclude_episode, allow_cross_podcast):
scores = vectors @ q_vec
order = np.argsort(scores)[::-1]
for idx in order[:30]:
m = meta[idx]
if m["podcast_id"] == exclude_podcast and m["episode_id"] == exclude_episode:
continue
if not allow_cross_podcast and m["podcast_id"] != exclude_podcast:
continue
return m, float(scores[idx])
return None, 0.0
class Budget:
def __init__(self, hard_limit_usd):
self.hard_limit = hard_limit_usd
self.tokens_in = 0
self.tokens_out = 0
def add(self, usage):
if usage:
self.tokens_in += getattr(usage, "prompt_tokens", 0) or 0
self.tokens_out += getattr(usage, "completion_tokens", 0) or 0
def cost(self):
return self.tokens_in * COST_IN + self.tokens_out * COST_OUT
def over(self):
return self.cost() > self.hard_limit
def verify(client, question_text, candidate_text, budget):
user_msg = (
f"FRAGE:\n\"{question_text}\"\n\n"
f"KANDIDAT-ABSATZ:\n\"{candidate_text[:1000]}\""
)
last_err = None
for attempt in range(2):
try:
resp = client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
temperature=0.0,
max_tokens=200,
)
budget.add(getattr(resp, "usage", None))
content = resp.choices[0].message.content
try:
return parse_llm_json(content, expect="object"), None
except ValueError as pe:
last_err = f"parse: {pe}"
break
except Exception as e:
last_err = str(e)
if attempt < 1:
time.sleep(2)
continue
return None, last_err
def main():
if not API_KEY:
print("DASHSCOPE_API_KEY nicht gesetzt.")
sys.exit(1)
client = OpenAI(api_key=API_KEY, base_url=BASE_URL, timeout=60.0, max_retries=1)
db = sqlite3.connect(DB_PATH, timeout=30.0)
db.execute("PRAGMA busy_timeout=30000")
db.row_factory = sqlite3.Row
setup_db(db)
questions = fetch_open_questions(db)
print(f"Fragen zu matchen: {len(questions)}")
if not questions:
return
# Lade alle paragraph-embeddings
print("Lade paragraph-embeddings…")
vectors, meta = get_all_embeddings(None)
if vectors is None:
print("Keine Embeddings vorhanden — abbruch.")
return
nan_mask = np.isnan(vectors).any(axis=1)
if nan_mask.any():
print(f" {nan_mask.sum()} NaN-Vektoren maskiert")
vectors[nan_mask] = 0
print(f" {len(meta)} Vektoren geladen")
# Embedde Fragen in Batches
print(f"Embedde {len(questions)} Fragen…")
q_vecs = []
for i in range(0, len(questions), EMBED_BATCH):
batch = questions[i:i + EMBED_BATCH]
texts = [q["question_text"][:500] for q in batch]
try:
embs = embed_batch(client, texts)
except Exception as e:
print(f" Embedding-Batch {i // EMBED_BATCH + 1} fehler: {e}")
time.sleep(2)
embs = [None] * len(batch)
for q, emb in zip(batch, embs):
if emb is None:
q_vecs.append(None)
else:
v = np.array(emb, dtype=np.float32)
v /= np.linalg.norm(v) or 1
q_vecs.append(v)
print(f" {sum(1 for v in q_vecs if v is not None)} Frage-Embeddings ok")
budget = Budget(hard_limit_usd=HARD_BUDGET_USD)
matched = 0
no_answer = 0
skipped_low_score = 0
parse_failed = 0
for i, (q, q_vec) in enumerate(zip(questions, q_vecs)):
if budget.over():
print(f"!! Budget ({budget.cost():.4f} USD) erreicht — Abbruch nach {i} Fragen")
break
if q_vec is None:
continue
cand, score = best_candidate(q_vec, vectors, meta, q["podcast_id"], q["episode_id"], CROSS_PODCAST)
if cand is None or score < MIN_SCORE:
skipped_low_score += 1
continue
cand_text = db.execute(
"SELECT text FROM paragraphs WHERE id=?", (cand["id"],)
).fetchone()["text"]
result, err = verify(client, q["question_text"], cand_text, budget)
if result is None:
parse_failed += 1
continue
ans = (result.get("answers") or "").lower()
if ans in ("yes", "partial"):
db.execute(
"UPDATE questions SET answered=?, answered_by_podcast=?, "
"answered_by_episode=?, answered_by_idx=? WHERE id=?",
(ans, cand["podcast_id"], cand["episode_id"], cand["idx"], q["id"]),
)
matched += 1
else:
no_answer += 1
if (i + 1) % 20 == 0:
db.commit()
print(f" [{i+1}/{len(questions)}] matched={matched} no_answer={no_answer} "
f"skipped={skipped_low_score} parse_err={parse_failed} cost=${budget.cost():.4f}")
time.sleep(0.25)
db.commit()
print()
print("=== Zusammenfassung ===")
print(f" matched (yes/partial): {matched}")
print(f" no_answer: {no_answer}")
print(f" skipped (score<{MIN_SCORE}): {skipped_low_score}")
print(f" parse-failures: {parse_failed}")
print(f" Tokens in={budget.tokens_in} out={budget.tokens_out}")
print(f" Kosten ~${budget.cost():.4f}")
# Verteilung nach run
stats = db.execute("SELECT answered, COUNT(*) FROM questions GROUP BY answered").fetchall()
print(" questions.answered nach Lauf:")
for s in stats:
print(f" {s[0]}: {s[1]}")
db.close()
if __name__ == "__main__":
main()