import json import os import time import asyncio from openai import AsyncOpenAI from ebooklib import epub, ITEM_DOCUMENT from bs4 import BeautifulSoup # --- Configuration --- CONFIG_PATH = 'config.json' INPUT_EPUB = 'dbt.epub' OUTPUT_FILE = 'dbt_knowledge.json' # Parallelization Settings BATCH_SIZE = 50 MAX_CONCURRENT_REQUESTS = 10 def load_config(): with open(CONFIG_PATH, 'r') as f: return json.load(f) def get_text_from_epub(epub_path): """Extracts text from an EPUB file, organizing by chapter/section.""" print(f"Reading EPUB: {epub_path}...") book = epub.read_epub(epub_path) text_sections = [] for item in book.get_items(): if item.get_type() == ITEM_DOCUMENT: soup = BeautifulSoup(item.get_content(), 'html.parser') for script in soup(["script", "style"]): script.decompose() text = soup.get_text(separator='\n') lines = [line.strip() for line in text.splitlines() if line.strip()] clean_text = '\n'.join(lines) if len(clean_text) > 100: title_tag = soup.find(['h1', 'h2', 'h3']) title = title_tag.get_text().strip() if title_tag else "Unknown Section" text_sections.append({"title": title, "text": clean_text}) print(f"Extracted {len(text_sections)} sections.") return text_sections # RENAMED function from 'chunk_text' to 'split_text' to avoid variable name collision def split_text(text, max_chars=1000): """Splits text into smaller chunks.""" chunks = [] current_chunk = "" paragraphs = text.split('\n') for para in paragraphs: if len(current_chunk) + len(para) < max_chars: current_chunk += para + "\n" else: chunks.append(current_chunk.strip()) current_chunk = para + "\n" if current_chunk: chunks.append(current_chunk.strip()) return chunks async def process_batch(client, batch_data, semaphore, pbar_counter): """ Sends a batch of chunks to the API concurrently. batch_data is a list of dicts: {'source': ..., 'index': ..., 'text': ...} """ async with semaphore: try: # Extract just the text strings for the API input inputs = [item['text'] for item in batch_data] response = await client.embeddings.create( model="qwen/qwen3-embedding-8b", input=inputs ) # Map results back to the data results = [] for i, data in enumerate(response.data): original_item = batch_data[i] results.append({ "id": f"{original_item['source']}_{original_item['index']}", "source": original_item['source'], "text": original_item['text'], "embedding": data.embedding }) # Update progress counter pbar_counter[0] += len(batch_data) print(f"\r Processed {pbar_counter[0]} chunks...", end='', flush=True) return results except Exception as e: print(f"\nError processing batch: {e}") return [] async def main_async(): config = load_config() client = AsyncOpenAI( base_url="https://openrouter.ai/api/v1", api_key=config['openrouter_api_key'] ) # 1. Extract Text sections = get_text_from_epub(INPUT_EPUB) # 2. Prepare all chunks print("Preparing chunks...") all_chunks = [] for section in sections: if "copyright" in section['title'].lower() or "contents" in section['title'].lower(): continue # Call the renamed function chunks = split_text(section['text'], max_chars=800) for i, text_content in enumerate(chunks): if text_content: all_chunks.append({ "source": section['title'], "index": i, "text": text_content }) print(f"Total chunks to embed: {len(all_chunks)}") # 3. Create Batches batches = [] for i in range(0, len(all_chunks), BATCH_SIZE): batch = all_chunks[i : i + BATCH_SIZE] batches.append(batch) # 4. Process Concurrently semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) pbar_counter = [0] print("Generating embeddings in parallel...") tasks = [] for batch in batches: tasks.append(process_batch(client, batch, semaphore, pbar_counter)) results_nested = await asyncio.gather(*tasks) # Flatten results final_data = [item for sublist in results_nested for item in sublist] # 5. Save to JSON print(f"\nSaving {len(final_data)} chunks to {OUTPUT_FILE}...") with open(OUTPUT_FILE, 'w') as f: json.dump(final_data, f) print("Done!") if __name__ == "__main__": asyncio.run(main_async())