import json import numpy as np from openai import OpenAI import sys # --- Configuration Loading --- def load_config(path='config.json'): with open(path, 'r') as f: return json.load(f) # --- Vector Store (Mock Implementation) --- class VectorStore: def __init__(self, file_path): print(f"Loading embeddings from {file_path}...") try: with open(file_path, 'r') as f: data = json.load(f) self.chunks = [item['text'] for item in data] # Convert lists back to numpy arrays self.embeddings = [np.array(item['embedding']) for item in data] print(f"Loaded {len(self.chunks)} chunks.") except FileNotFoundError: print(f"Error: {file_path} not found. Creating a dummy knowledge base.") # Fallback dummy data for testing if file missing self.chunks = ["DBT teaches Distress Tolerance skills.", "DEAR MAN is a skill for interpersonal effectiveness."] # Dummy embeddings (normally these come from an embedding model) self.embeddings = [np.random.rand(1536), np.random.rand(1536)] def search(self, query_embedding, top_k=3): """Finds most similar chunks using Cosine Similarity.""" if not self.embeddings: return [] query_vec = np.array(query_embedding) scores = [] for i, doc_vec in enumerate(self.embeddings): # Cosine similarity score = np.dot(query_vec, doc_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(doc_vec)) scores.append((score, i)) scores.sort(reverse=True) return [self.chunks[i] for score, i in scores[:top_k]] # --- OpenRouter Client --- class LLMClient: def __init__(self, api_key): self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=api_key, ) def get_embedding(self, text): """Generates embedding for the query (using OpenAI as default for simplicity).""" # Note: OpenRouter supports embedding models, usually openai/text-embedding-3-small response = self.client.embeddings.create( model="qwen/qwen3-embedding-8b", input=text ) return response.data[0].embedding def generate(self, model_id, system_prompt, user_prompt, temperature=0.7): """Generic generation function.""" try: response = self.client.chat.completions.create( model=model_id, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=temperature ) return response.choices[0].message.content except Exception as e: return f"Error calling {model_id}: {str(e)}" # --- Jury Logic --- class DBTJurySystem: def __init__(self, config): self.config = config self.llm = LLMClient(config['openrouter_api_key']) self.vector_store = VectorStore(config['embedding_file']) def retrieve_context(self, query): print("\n[1. Retrieving Context...]") query_emb = self.llm.get_embedding(query) context_chunks = self.vector_store.search(query_emb) return "\n".join(context_chunks) def run_generator(self, query, context): """Step 1: Generate the initial draft.""" print("[2. Generating Draft...]") prompt = f""" Context from DBT Manual: {context} User Query: {query} Instructions: Answer the user's query using ONLY the context provided. If the context is insufficient, state that clearly. """ return self.llm.generate( self.config['models']['generator'], self.config['system_prompt'], prompt ) def run_jury_deliberation(self, query, context, draft_answer): """Step 2: The Jury Votes.""" print("[3. Jury Deliberating (Single Veto Logic)...]") jury_config = [ {"role": "Clinical Accuracy", "model_key": "jury_clinical", "instruction": "Check if the advice strictly follows DBT protocol."}, {"role": "Safety", "model_key": "jury_safety", "instruction": "Check for any harmful, unethical, or dangerous advice. Be strict."}, {"role": "Empathy", "model_key": "jury_empathy", "instruction": "Check if the tone is supportive and non-judgmental."}, {"role": "Hallucination", "model_key": "jury_hallucination", "instruction": "Verify every claim in the answer is supported by the context."} ] results = [] for member in jury_config: print(f" - Querying {member['role']} ({member['model_key']})...") judge_prompt = f""" You are a Jury Member: {member['role']}. Your specific instruction: {member['instruction']} --- SOURCE CONTEXT --- {context} ---------------------- --- PROPOSED ANSWER --- {draft_answer} ---------------------- Task: Analyze the proposed answer. 1. Does it violate your instruction? 2. Is it factually grounded in the source context? Output format strictly as JSON: {{ "verdict": "APPROVE" or "VETO", "reason": "Your reasoning..." }} """ response_text = self.llm.generate( self.config['models'][member['model_key']], "You are a strict JSON validator. Output ONLY valid JSON.", judge_prompt, temperature=0.1 # Low temp for deterministic judging ) # Parse JSON response try: # Clean up potential markdown code blocks clean_response = response_text.strip().replace("```json", "").replace("```", "") vote = json.loads(clean_response) except json.JSONDecodeError: # Fallback if model fails to output JSON vote = {"verdict": "APPROVE", "reason": "Failed to parse response, defaulting to Approve."} vote['member'] = member['role'] results.append(vote) if vote['verdict'].upper() == 'VETO': print(f" ❌ VETO by {member['role']}: {vote['reason']}") return False, results print(" ✅ Unanimous Approval.") return True, results def process_query(self, query): # 1. RAG Retrieval context = self.retrieve_context(query) # 2. Generation draft = self.run_generator(query, context) print(f"\n--- Draft Answer ---\n{draft}\n--------------------") # 3. Jury Voting approved, votes = self.run_jury_deliberation(query, context, draft) if approved: return { "status": "SUCCESS", "answer": draft, "votes": votes } else: return { "status": "REJECTED", "answer": "The Jury could not agree on a safe or accurate answer. Please consult a professional or try rephrasing.", "votes": votes } # --- Main Execution --- if __name__ == "__main__": config = load_config() # Interactive Loop print("\nDBT Quorum System Active (Type 'exit' to quit)") system = DBTJurySystem(config) while True: user_query = input("\nUser: ") if user_query.lower() in ['exit', 'quit']: break response = system.process_query(user_query) print("\n===== FINAL OUTPUT =====") print(f"Status: {response['status']}") print(f"Response: {response['answer']}") # Optionally print vote breakdown # print(f"Vote Details: {json.dumps(response['votes'], indent=2)}") print("========================")