Files
opro_demo/_qwen_xinference_demo/opro/session_state.py

294 lines
8.1 KiB
Python
Raw Normal View History

2025-12-05 07:11:25 +00:00
import uuid
from typing import List, Tuple, Dict, Any
2025-12-05 07:11:25 +00:00
# Legacy session storage (for query rewriting)
2025-12-05 07:11:25 +00:00
SESSIONS = {}
USER_FEEDBACK_LOG = []
# OPRO session storage (for system instruction optimization)
OPRO_RUNS = {}
OPRO_RUN_LOG = []
2025-12-05 07:11:25 +00:00
def create_session(query: str) -> str:
sid = uuid.uuid4().hex
SESSIONS[sid] = {
"original_query": query,
"round": 0,
"history_candidates": [],
"user_feedback": [],
"rejected": [],
"selected_prompt": None,
"chat_history": [],
"model_name": None
}
return sid
def get_session(sid: str):
return SESSIONS.get(sid)
def update_session_add_candidates(sid: str, candidates: list):
s = SESSIONS[sid]
s["round"] += 1
s["history_candidates"].extend(candidates)
def log_user_choice(sid: str, choice: str):
SESSIONS[sid]["user_feedback"].append(
{"round": SESSIONS[sid]["round"], "choice": choice}
)
USER_FEEDBACK_LOG.append({
"session_id": sid,
"round": SESSIONS[sid]["round"],
"choice": choice
})
def log_user_reject(sid: str, candidate: str, reason: str | None = None):
SESSIONS[sid]["rejected"].append(candidate)
USER_FEEDBACK_LOG.append({
"session_id": sid,
"round": SESSIONS[sid]["round"],
"reject": candidate,
"reason": reason or ""
})
def set_selected_prompt(sid: str, prompt: str):
SESSIONS[sid]["selected_prompt"] = prompt
def log_chat_message(sid: str, role: str, content: str):
SESSIONS[sid]["chat_history"].append({"role": role, "content": content})
def set_session_model(sid: str, model_name: str | None):
s = SESSIONS.get(sid)
if s is not None:
s["model_name"] = model_name
# ============================================================================
# TRUE OPRO SESSION MANAGEMENT
# ============================================================================
# Session storage (contains multiple runs)
OPRO_SESSIONS = {}
def create_opro_session(session_name: str = None) -> str:
"""
Create a new OPRO session that can contain multiple runs.
Args:
session_name: Optional name for the session
Returns:
session_id: Unique identifier for this session
"""
session_id = uuid.uuid4().hex
OPRO_SESSIONS[session_id] = {
"session_name": session_name or "新会话", # Will be updated with first task description
"created_at": uuid.uuid1().time,
"run_ids": [], # List of run IDs in this session
"chat_history": [] # Cross-run chat history
}
return session_id
def get_opro_session(session_id: str) -> Dict[str, Any]:
"""Get OPRO session by ID."""
return OPRO_SESSIONS.get(session_id)
def list_opro_sessions() -> List[Dict[str, Any]]:
"""
List all OPRO sessions with summary information.
Returns:
List of session summaries
"""
return [
{
"session_id": session_id,
"session_name": session["session_name"],
"num_runs": len(session["run_ids"]),
"created_at": session["created_at"]
}
for session_id, session in OPRO_SESSIONS.items()
]
def create_opro_run(
task_description: str,
test_cases: List[Tuple[str, str]] = None,
model_name: str = None,
session_id: str = None
) -> str:
"""
Create a new OPRO optimization run.
Args:
task_description: Description of the task to optimize for
test_cases: List of (input, expected_output) tuples for evaluation
model_name: Optional model name to use
session_id: Optional session ID to associate this run with
Returns:
run_id: Unique identifier for this OPRO run
"""
run_id = uuid.uuid4().hex
OPRO_RUNS[run_id] = {
"task_description": task_description,
"test_cases": test_cases or [],
"model_name": model_name,
"session_id": session_id, # Link to parent session
"iteration": 0,
"trajectory": [], # List of (instruction, score) tuples
"best_instruction": None,
"best_score": 0.0,
"current_candidates": [],
"created_at": uuid.uuid1().time,
"status": "active" # active, completed, failed
}
# Add run to session if session_id provided
if session_id and session_id in OPRO_SESSIONS:
OPRO_SESSIONS[session_id]["run_ids"].append(run_id)
# Update session name with first task description if it's still default
if OPRO_SESSIONS[session_id]["session_name"] == "新会话" and len(OPRO_SESSIONS[session_id]["run_ids"]) == 1:
OPRO_SESSIONS[session_id]["session_name"] = task_description
return run_id
def get_opro_run(run_id: str) -> Dict[str, Any]:
"""Get OPRO run by ID."""
return OPRO_RUNS.get(run_id)
def update_opro_iteration(
run_id: str,
candidates: List[str],
scores: List[float] = None
):
"""
Update OPRO run with new iteration results.
Args:
run_id: OPRO run identifier
candidates: List of system instruction candidates
scores: Optional list of scores (if evaluated)
"""
run = OPRO_RUNS.get(run_id)
if not run:
return
run["iteration"] += 1
run["current_candidates"] = candidates
# If scores provided, update trajectory
if scores and len(scores) == len(candidates):
for candidate, score in zip(candidates, scores):
run["trajectory"].append((candidate, score))
# Update best if this is better
if score > run["best_score"]:
run["best_score"] = score
run["best_instruction"] = candidate
# Log the iteration
OPRO_RUN_LOG.append({
"run_id": run_id,
"iteration": run["iteration"],
"num_candidates": len(candidates),
"best_score": run["best_score"]
})
def add_opro_evaluation(
run_id: str,
instruction: str,
score: float
):
"""
Add a single evaluation result to OPRO run.
Args:
run_id: OPRO run identifier
instruction: System instruction that was evaluated
score: Performance score
"""
run = OPRO_RUNS.get(run_id)
if not run:
return
# Add to trajectory
run["trajectory"].append((instruction, score))
# Update best if this is better
if score > run["best_score"]:
run["best_score"] = score
run["best_instruction"] = instruction
def get_opro_trajectory(run_id: str) -> List[Tuple[str, float]]:
"""
Get the performance trajectory for an OPRO run.
Returns:
List of (instruction, score) tuples sorted by score (highest first)
"""
run = OPRO_RUNS.get(run_id)
if not run:
return []
trajectory = run["trajectory"]
return sorted(trajectory, key=lambda x: x[1], reverse=True)
def set_opro_test_cases(
run_id: str,
test_cases: List[Tuple[str, str]]
):
"""
Set or update test cases for an OPRO run.
Args:
run_id: OPRO run identifier
test_cases: List of (input, expected_output) tuples
"""
run = OPRO_RUNS.get(run_id)
if run:
run["test_cases"] = test_cases
def complete_opro_run(run_id: str):
"""Mark an OPRO run as completed."""
run = OPRO_RUNS.get(run_id)
if run:
run["status"] = "completed"
def list_opro_runs(session_id: str = None) -> List[Dict[str, Any]]:
"""
List all OPRO runs with summary information.
Args:
session_id: Optional session ID to filter runs by session
Returns:
List of run summaries
"""
runs_to_list = OPRO_RUNS.items()
# Filter by session if provided
if session_id:
runs_to_list = [(rid, r) for rid, r in runs_to_list if r.get("session_id") == session_id]
return [
{
"run_id": run_id,
"task_description": run["task_description"][:100] + "..." if len(run["task_description"]) > 100 else run["task_description"],
"iteration": run["iteration"],
"best_score": run["best_score"],
"num_test_cases": len(run["test_cases"]),
"status": run["status"],
"session_id": run.get("session_id")
}
for run_id, run in runs_to_list
]