2025-12-05 07:11:25 +00:00
|
|
|
import re
|
|
|
|
|
import numpy as np
|
2025-12-06 17:24:28 +08:00
|
|
|
from typing import List, Tuple
|
2025-12-05 07:11:25 +00:00
|
|
|
from sklearn.cluster import AgglomerativeClustering
|
|
|
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
|
import config
|
|
|
|
|
|
|
|
|
|
from .ollama_client import call_qwen
|
|
|
|
|
from .xinference_client import embed_texts
|
2025-12-06 17:24:28 +08:00
|
|
|
from .prompt_utils import (
|
|
|
|
|
refine_instruction,
|
|
|
|
|
refine_instruction_with_history,
|
|
|
|
|
generate_initial_system_instruction_candidates,
|
2025-12-08 09:43:20 +08:00
|
|
|
generate_optimized_system_instruction,
|
|
|
|
|
refine_based_on_selection
|
2025-12-06 17:24:28 +08:00
|
|
|
)
|
2025-12-05 07:11:25 +00:00
|
|
|
|
|
|
|
|
def parse_candidates(raw: str) -> list:
|
|
|
|
|
lines = [l.strip() for l in re.split(r'\r?\n', raw) if l.strip()]
|
|
|
|
|
cleaned = []
|
|
|
|
|
for l in lines:
|
|
|
|
|
l = re.sub(r'^[\-\*\d\.\)\s]+', '', l).strip()
|
|
|
|
|
if len(l) >= 6:
|
|
|
|
|
cleaned.append(l)
|
|
|
|
|
return list(dict.fromkeys(cleaned))
|
|
|
|
|
|
|
|
|
|
def cluster_and_select(candidates: list, top_k=config.TOP_K, distance_threshold=config.CLUSTER_DISTANCE_THRESHOLD):
|
|
|
|
|
if not candidates:
|
|
|
|
|
return []
|
|
|
|
|
if len(candidates) <= top_k:
|
|
|
|
|
return candidates
|
|
|
|
|
vecs = embed_texts(candidates)
|
|
|
|
|
if not vecs or len(vecs) != len(candidates):
|
|
|
|
|
return candidates[:top_k]
|
|
|
|
|
X = np.array(vecs)
|
|
|
|
|
|
|
|
|
|
clustering = AgglomerativeClustering(n_clusters=None,
|
|
|
|
|
distance_threshold=distance_threshold,
|
|
|
|
|
metric="cosine",
|
|
|
|
|
linkage="average")
|
|
|
|
|
labels = clustering.fit_predict(X)
|
|
|
|
|
|
2025-12-06 17:24:28 +08:00
|
|
|
selected_idx = []
|
2025-12-05 07:11:25 +00:00
|
|
|
for label in sorted(set(labels)):
|
|
|
|
|
idxs = [i for i,l in enumerate(labels) if l == label]
|
|
|
|
|
sims = cosine_similarity(X[idxs]).mean(axis=1)
|
|
|
|
|
rep = idxs[int(np.argmax(sims))]
|
|
|
|
|
selected_idx.append(rep)
|
|
|
|
|
|
|
|
|
|
selected = [candidates[i] for i in sorted(selected_idx)]
|
|
|
|
|
return selected[:top_k]
|
|
|
|
|
|
|
|
|
|
def generate_candidates(query: str, rejected=None, top_k=config.TOP_K, model_name=None):
|
2025-12-06 17:24:28 +08:00
|
|
|
"""
|
|
|
|
|
LEGACY: Query rewriting function (NOT true OPRO).
|
|
|
|
|
Kept for backward compatibility with existing API endpoints.
|
|
|
|
|
"""
|
2025-12-05 07:11:25 +00:00
|
|
|
rejected = rejected or []
|
|
|
|
|
if rejected:
|
|
|
|
|
prompt = refine_instruction_with_history(query, rejected)
|
|
|
|
|
else:
|
|
|
|
|
prompt = refine_instruction(query)
|
|
|
|
|
|
|
|
|
|
raw = call_qwen(prompt, temperature=0.9, max_tokens=1024, model_name=model_name)
|
|
|
|
|
all_candidates = parse_candidates(raw)
|
|
|
|
|
return cluster_and_select(all_candidates, top_k=top_k)
|
2025-12-06 17:24:28 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ============================================================================
|
|
|
|
|
# TRUE OPRO FUNCTIONS (System Instruction Optimization)
|
|
|
|
|
# ============================================================================
|
|
|
|
|
|
|
|
|
|
def generate_system_instruction_candidates(
|
|
|
|
|
task_description: str,
|
|
|
|
|
trajectory: List[Tuple[str, float]] = None,
|
|
|
|
|
top_k: int = config.TOP_K,
|
|
|
|
|
pool_size: int = None,
|
|
|
|
|
model_name: str = None
|
|
|
|
|
) -> List[str]:
|
|
|
|
|
"""
|
|
|
|
|
TRUE OPRO: Generates optimized system instruction candidates.
|
|
|
|
|
|
|
|
|
|
This is the core OPRO function that generates system instructions based on
|
|
|
|
|
performance trajectory (if available) or initial candidates (if starting fresh).
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task_description: Description of the task the LLM should perform
|
|
|
|
|
trajectory: Optional list of (instruction, score) tuples from previous iterations
|
|
|
|
|
top_k: Number of diverse candidates to return (default: config.TOP_K = 5)
|
|
|
|
|
pool_size: Number of candidates to generate before clustering (default: config.GENERATION_POOL_SIZE = 10)
|
|
|
|
|
model_name: Optional model name to use for generation
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of top-k diverse system instruction candidates
|
|
|
|
|
"""
|
|
|
|
|
pool_size = pool_size or config.GENERATION_POOL_SIZE
|
|
|
|
|
|
|
|
|
|
# Generate the meta-prompt based on whether we have trajectory data
|
|
|
|
|
if trajectory and len(trajectory) > 0:
|
|
|
|
|
# Sort trajectory by score (highest first)
|
|
|
|
|
sorted_trajectory = sorted(trajectory, key=lambda x: x[1], reverse=True)
|
|
|
|
|
meta_prompt = generate_optimized_system_instruction(task_description, sorted_trajectory, pool_size)
|
|
|
|
|
else:
|
|
|
|
|
# No trajectory yet, generate initial candidates
|
|
|
|
|
meta_prompt = generate_initial_system_instruction_candidates(task_description, pool_size)
|
|
|
|
|
|
|
|
|
|
# Use the optimizer LLM to generate candidates
|
|
|
|
|
raw = call_qwen(meta_prompt, temperature=0.9, max_tokens=1024, model_name=model_name)
|
|
|
|
|
|
|
|
|
|
# Parse the generated candidates
|
|
|
|
|
all_candidates = parse_candidates(raw)
|
|
|
|
|
|
|
|
|
|
# Cluster and select diverse representatives
|
|
|
|
|
return cluster_and_select(all_candidates, top_k=top_k)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def evaluate_system_instruction(
|
|
|
|
|
system_instruction: str,
|
|
|
|
|
test_cases: List[Tuple[str, str]],
|
|
|
|
|
model_name: str = None
|
|
|
|
|
) -> float:
|
|
|
|
|
"""
|
|
|
|
|
TRUE OPRO: Evaluates a system instruction's performance on test cases.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
system_instruction: The system instruction to evaluate
|
|
|
|
|
test_cases: List of (input, expected_output) tuples
|
|
|
|
|
model_name: Optional model name to use for evaluation
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Performance score (0.0 to 1.0)
|
|
|
|
|
"""
|
|
|
|
|
if not test_cases:
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
correct = 0
|
|
|
|
|
total = len(test_cases)
|
|
|
|
|
|
|
|
|
|
for input_text, expected_output in test_cases:
|
|
|
|
|
# Construct the full prompt with system instruction
|
|
|
|
|
full_prompt = f"{system_instruction}\n\n{input_text}"
|
|
|
|
|
|
|
|
|
|
# Get LLM response
|
|
|
|
|
response = call_qwen(full_prompt, temperature=0.2, max_tokens=512, model_name=model_name)
|
|
|
|
|
|
|
|
|
|
# Simple exact match scoring (can be replaced with more sophisticated metrics)
|
|
|
|
|
if expected_output.strip().lower() in response.strip().lower():
|
|
|
|
|
correct += 1
|
|
|
|
|
|
|
|
|
|
return correct / total
|
2025-12-08 09:43:20 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def refine_instruction_candidates(
|
|
|
|
|
task_description: str,
|
|
|
|
|
selected_instruction: str,
|
|
|
|
|
rejected_instructions: List[str],
|
|
|
|
|
top_k: int = config.TOP_K,
|
|
|
|
|
pool_size: int = None,
|
|
|
|
|
model_name: str = None
|
|
|
|
|
) -> List[str]:
|
|
|
|
|
"""
|
|
|
|
|
Simple refinement: Generate new candidates based on user's selection.
|
|
|
|
|
|
|
|
|
|
This is NOT OPRO - just straightforward iterative refinement.
|
|
|
|
|
User picks a favorite, we generate variations of it while avoiding rejected ones.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task_description: Description of the task
|
|
|
|
|
selected_instruction: The instruction the user selected
|
|
|
|
|
rejected_instructions: The instructions the user didn't select
|
|
|
|
|
top_k: Number of diverse candidates to return
|
|
|
|
|
pool_size: Number of candidates to generate before clustering
|
|
|
|
|
model_name: Optional model name to use
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List of refined instruction candidates
|
|
|
|
|
"""
|
|
|
|
|
pool_size = pool_size or config.GENERATION_POOL_SIZE
|
|
|
|
|
|
|
|
|
|
# Generate the refinement prompt
|
|
|
|
|
meta_prompt = refine_based_on_selection(
|
|
|
|
|
task_description,
|
|
|
|
|
selected_instruction,
|
|
|
|
|
rejected_instructions,
|
|
|
|
|
pool_size
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Use LLM to generate refined candidates
|
|
|
|
|
raw = call_qwen(meta_prompt, temperature=0.9, max_tokens=1024, model_name=model_name)
|
|
|
|
|
|
|
|
|
|
# Parse and cluster
|
|
|
|
|
all_candidates = parse_candidates(raw)
|
|
|
|
|
return cluster_and_select(all_candidates, top_k=top_k)
|