diff --git a/src/ai_generators/ollama_md_generator.py b/src/ai_generators/ollama_md_generator.py
index 5c5dd0b..bb2b4cc 100644
--- a/src/ai_generators/ollama_md_generator.py
+++ b/src/ai_generators/ollama_md_generator.py
@@ -1,11 +1,17 @@
-import os, re, json, random, time, string
-from ollama import Client
+import json
+import os
+import random
+import re
+import string
+import time
+from concurrent.futures import ThreadPoolExecutor, TimeoutError
+
import chromadb
from langchain_ollama import ChatOllama
+from ollama import Client
class OllamaGenerator:
-
def __init__(self, title: str, content: str, inner_title: str):
self.title = title
self.inner_title = inner_title
@@ -14,16 +20,20 @@ class OllamaGenerator:
print("In Class")
print(os.environ["CONTENT_CREATOR_MODELS"])
try:
- chroma_port = int(os.environ['CHROMA_PORT'])
+ chroma_port = int(os.environ["CHROMA_PORT"])
except ValueError as e:
raise Exception(f"CHROMA_PORT is not an integer: {e}")
- self.chroma = chromadb.HttpClient(host=os.environ['CHROMA_HOST'], port=chroma_port)
- ollama_url = f"{os.environ["OLLAMA_PROTOCOL"]}://{os.environ["OLLAMA_HOST"]}:{os.environ["OLLAMA_PORT"]}"
+ self.chroma = chromadb.HttpClient(
+ host=os.environ["CHROMA_HOST"], port=chroma_port
+ )
+ ollama_url = f"{os.environ['OLLAMA_PROTOCOL']}://{os.environ['OLLAMA_HOST']}:{os.environ['OLLAMA_PORT']}"
self.ollama_client = Client(host=ollama_url)
self.ollama_model = os.environ["EDITOR_MODEL"]
self.embed_model = os.environ["EMBEDDING_MODEL"]
self.agent_models = json.loads(os.environ["CONTENT_CREATOR_MODELS"])
- self.llm = ChatOllama(model=self.ollama_model, temperature=0.6, top_p=0.5) #This is the level head in the room
+ self.llm = ChatOllama(
+ model=self.ollama_model, temperature=0.6, top_p=0.5
+ ) # This is the level head in the room
self.prompt_inject = f"""
You are a journalist, Software Developer and DevOps expert
writing a 5000 word draft blog article for other tech enthusiasts.
@@ -37,8 +47,8 @@ class OllamaGenerator:
"""
def split_into_chunks(self, text, chunk_size=100):
- '''Split text into chunks of size chunk_size'''
- words = re.findall(r'\S+', text)
+ """Split text into chunks of size chunk_size"""
+ words = re.findall(r"\S+", text)
chunks = []
current_chunk = []
@@ -49,18 +59,19 @@ class OllamaGenerator:
word_count += 1
if word_count >= chunk_size:
- chunks.append(' '.join(current_chunk))
+ chunks.append(" ".join(current_chunk))
current_chunk = []
word_count = 0
if current_chunk:
- chunks.append(' '.join(current_chunk))
+ chunks.append(" ".join(current_chunk))
return chunks
def generate_draft(self, model) -> str:
- '''Generate a draft blog post using the specified model'''
- try:
+ """Generate a draft blog post using the specified model"""
+
+ def _generate():
# the idea behind this is to make the "creativity" random amongst the content creators
# contorlling temperature will allow cause the output to allow more "random" connections in sentences
# Controlling top_p will tighten or loosen the embedding connections made
@@ -69,56 +80,141 @@ class OllamaGenerator:
temp = random.uniform(0.5, 1.0)
top_p = random.uniform(0.4, 0.8)
top_k = int(random.uniform(30, 80))
- agent_llm = ChatOllama(model=model, temperature=temp, top_p=top_p, top_k=top_k)
+ agent_llm = ChatOllama(
+ model=model, temperature=temp, top_p=top_p, top_k=top_k
+ )
messages = [
- ("system", "You are a creative writer specialising in writing about technology"),
- ("human", self.prompt_inject )
+ (
+ "system",
+ "You are a creative writer specialising in writing about technology",
+ ),
+ ("human", self.prompt_inject),
]
response = agent_llm.invoke(messages)
- # self.response = self.ollama_client.chat(model=model,
- # messages=[
- # {
- # 'role': 'user',
- # 'content': f'{self.prompt_inject}',
- # },
- # ])
- #print ("draft")
- #print (response)
- return response.text()#['message']['content']
+ return (
+ response.text if hasattr(response, "text") else str(response)
+ ) # ['message']['content']
- except Exception as e:
- raise Exception(f"Failed to generate blog draft: {e}")
+ # Retry mechanism with 30-minute timeout
+ timeout_seconds = 30 * 60 # 30 minutes
+ max_retries = 3
+
+ for attempt in range(max_retries):
+ try:
+ with ThreadPoolExecutor(max_workers=1) as executor:
+ future = executor.submit(_generate)
+ result = future.result(timeout=timeout_seconds)
+ return result
+ except TimeoutError:
+ print(
+ f"AI call timed out after {timeout_seconds} seconds on attempt {attempt + 1}"
+ )
+ if attempt < max_retries - 1:
+ print("Retrying...")
+ time.sleep(5) # Wait 5 seconds before retrying
+ continue
+ else:
+ raise Exception(
+ f"AI call failed to complete after {max_retries} attempts with {timeout_seconds} second timeouts"
+ )
+ except Exception as e:
+ if attempt < max_retries - 1:
+ print(f"Attempt {attempt + 1} failed with error: {e}. Retrying...")
+ time.sleep(5) # Wait 5 seconds before retrying
+ continue
+ else:
+ raise Exception(
+ f"Failed to generate blog draft after {max_retries} attempts: {e}"
+ )
def get_draft_embeddings(self, draft_chunks):
- '''Get embeddings for the draft chunks'''
- embeds = self.ollama_client.embed(model=self.embed_model, input=draft_chunks)
- return embeds.get('embeddings', [])
+ """Get embeddings for the draft chunks"""
+ try:
+ # Handle empty draft chunks
+ if not draft_chunks:
+ print("Warning: No draft chunks to embed")
+ return []
+
+ embeds = self.ollama_client.embed(
+ model=self.embed_model, input=draft_chunks
+ )
+ embeddings = embeds.get("embeddings", [])
+
+ # Check if embeddings were generated successfully
+ if not embeddings:
+ print("Warning: No embeddings generated")
+ return []
+
+ return embeddings
+ except Exception as e:
+ print(f"Error generating embeddings: {e}")
+ return []
def id_generator(self, size=6, chars=string.ascii_uppercase + string.digits):
- return ''.join(random.choice(chars) for _ in range(size))
+ return "".join(random.choice(chars) for _ in range(size))
def load_to_vector_db(self):
- '''Load the generated blog drafts into a vector database'''
- collection_name = f"blog_{self.title.lower().replace(" ", "_")}_{self.id_generator()}"
- collection = self.chroma.get_or_create_collection(name=collection_name)#, metadata={"hnsw:space": "cosine"})
- #if any(collection.name == collectionname for collectionname in self.chroma.list_collections()):
+ """Load the generated blog drafts into a vector database"""
+ collection_name = (
+ f"blog_{self.title.lower().replace(' ', '_')}_{self.id_generator()}"
+ )
+ collection = self.chroma.get_or_create_collection(
+ name=collection_name
+ ) # , metadata={"hnsw:space": "cosine"})
+ # if any(collection.name == collectionname for collectionname in self.chroma.list_collections()):
# self.chroma.delete_collection("blog_creator")
for model in self.agent_models:
- print (f"Generating draft from {model} for load into vector database")
- draft_chunks = self.split_into_chunks(self.generate_draft(model))
- print(f"generating embeds")
- embeds = self.get_draft_embeddings(draft_chunks)
- ids = [model + str(i) for i in range(len(draft_chunks))]
- chunknumber = list(range(len(draft_chunks)))
- metadata = [{"model_agent": model} for index in chunknumber]
- print(f'loading into collection')
- collection.add(documents=draft_chunks, embeddings=embeds, ids=ids, metadatas=metadata)
+ print(f"Generating draft from {model} for load into vector database")
+ try:
+ draft_content = self.generate_draft(model)
+ draft_chunks = self.split_into_chunks(draft_content)
+
+ # Skip if no content was generated
+ if not draft_chunks or all(
+ chunk.strip() == "" for chunk in draft_chunks
+ ):
+ print(f"Skipping {model} - no content generated")
+ continue
+
+ print(f"generating embeds for {model}")
+ embeds = self.get_draft_embeddings(draft_chunks)
+
+ # Skip if no embeddings were generated
+ if not embeds:
+ print(f"Skipping {model} - no embeddings generated")
+ continue
+
+ # Ensure we have the same number of embeddings as chunks
+ if len(embeds) != len(draft_chunks):
+ print(
+ f"Warning: Mismatch between chunks ({len(draft_chunks)}) and embeddings ({len(embeds)}) for {model}"
+ )
+ # Truncate or pad to match
+ min_length = min(len(embeds), len(draft_chunks))
+ draft_chunks = draft_chunks[:min_length]
+ embeds = embeds[:min_length]
+ if min_length == 0:
+ print(f"Skipping {model} - no valid content/embeddings pairs")
+ continue
+
+ ids = [model + str(i) for i in range(len(draft_chunks))]
+ chunknumber = list(range(len(draft_chunks)))
+ metadata = [{"model_agent": model} for index in chunknumber]
+ print(f"loading into collection for {model}")
+ collection.add(
+ documents=draft_chunks,
+ embeddings=embeds,
+ ids=ids,
+ metadatas=metadata,
+ )
+ except Exception as e:
+ print(f"Error processing model {model}: {e}")
+ # Continue with other models rather than failing completely
+ continue
return collection
-
def generate_markdown(self) -> str:
-
prompt_human = f"""
You are an editor taking information from {len(self.agent_models)} Software
Developers and Data experts
@@ -133,29 +229,104 @@ class OllamaGenerator:
The basis for the content of the blog is:
{self.content}
"""
- try:
- query_embed = self.ollama_client.embed(model=self.embed_model, input=prompt_human)['embeddings']
+
+ def _generate_final_document():
+ try:
+ embed_result = self.ollama_client.embed(
+ model=self.embed_model, input=prompt_human
+ )
+ query_embed = embed_result.get("embeddings", [])
+ if not query_embed:
+ print(
+ "Warning: Failed to generate query embeddings, using empty list"
+ )
+ query_embed = [[]] # Use a single empty embedding as fallback
+ except Exception as e:
+ print(f"Error generating query embeddings: {e}")
+ # Generate empty embeddings as fallback
+ query_embed = [[]] # Use a single empty embedding as fallback
+
collection = self.load_to_vector_db()
- collection_query = collection.query(query_embeddings=query_embed, n_results=100)
- print("Showing pertinent info from drafts used in final edited edition")
- pertinent_draft_info = '\n\n'.join(collection.query(query_embeddings=query_embed, n_results=100)['documents'][0])
- #print(pertinent_draft_info)
+
+ # Try to query the collection, with fallback for empty collections
+ try:
+ collection_query = collection.query(
+ query_embeddings=query_embed, n_results=100
+ )
+ print("Showing pertinent info from drafts used in final edited edition")
+
+ # Get documents with error handling
+ query_result = collection.query(
+ query_embeddings=query_embed, n_results=100
+ )
+ documents = query_result.get("documents", [])
+
+ if documents and len(documents) > 0 and len(documents[0]) > 0:
+ pertinent_draft_info = "\n\n".join(documents[0])
+ else:
+ print("Warning: No relevant documents found in collection")
+ pertinent_draft_info = "No relevant information found in drafts."
+
+ except Exception as query_error:
+ print(f"Error querying collection: {query_error}")
+ pertinent_draft_info = (
+ "No relevant information found in drafts due to query error."
+ )
+ # print(pertinent_draft_info)
prompt_system = f"""Generate the final, 5000 word, draft of the blog using this information from the drafts: {pertinent_draft_info}
- Only output in markdown, do not wrap in markdown tags, Only provide the draft not a commentary on the drafts in the context
"""
print("Generating final document")
- messages = [("system", prompt_system), ("human", prompt_human),]
- self.response = self.llm.invoke(messages).text()
+ messages = [
+ ("system", prompt_system),
+ ("human", prompt_human),
+ ]
+ response = self.llm.invoke(messages)
+ return response.text if hasattr(response, "text") else str(response)
+
+ try:
+ # Retry mechanism with 30-minute timeout
+ timeout_seconds = 30 * 60 # 30 minutes
+ max_retries = 3
+
+ for attempt in range(max_retries):
+ try:
+ with ThreadPoolExecutor(max_workers=1) as executor:
+ future = executor.submit(_generate_final_document)
+ self.response = future.result(timeout=timeout_seconds)
+ break # Success, exit the retry loop
+ except TimeoutError:
+ print(
+ f"AI call timed out after {timeout_seconds} seconds on attempt {attempt + 1}"
+ )
+ if attempt < max_retries - 1:
+ print("Retrying...")
+ time.sleep(5) # Wait 5 seconds before retrying
+ continue
+ else:
+ raise Exception(
+ f"AI call failed to complete after {max_retries} attempts with {timeout_seconds} second timeouts"
+ )
+ except Exception as e:
+ if attempt < max_retries - 1:
+ print(
+ f"Attempt {attempt + 1} failed with error: {e}. Retrying..."
+ )
+ time.sleep(5) # Wait 5 seconds before retrying
+ continue
+ else:
+ raise Exception(
+ f"Failed to generate markdown after {max_retries} attempts: {e}"
+ )
+
# self.response = self.ollama_client.chat(model=self.ollama_model,
# messages=[
- # {
- # 'role': 'user',
# 'content': f'{prompt_enhanced}',
# },
# ])
- #print ("Markdown Generated")
- #print (self.response)
- return self.response#['message']['content']
+ # print ("Markdown Generated")
+ # print (self.response)
+ return self.response # ['message']['content']
except Exception as e:
raise Exception(f"Failed to generate markdown: {e}")
@@ -165,6 +336,43 @@ class OllamaGenerator:
f.write(self.generate_markdown())
def generate_system_message(self, prompt_system, prompt_human):
- messages = [("system", prompt_system), ("human", prompt_human),]
- ai_message = self.llm.invoke(messages).text()
- return ai_message
+ def _generate():
+ messages = [
+ ("system", prompt_system),
+ ("human", prompt_human),
+ ]
+ response = self.llm.invoke(messages)
+ ai_message = response.text if hasattr(response, "text") else str(response)
+ return ai_message
+
+ # Retry mechanism with 30-minute timeout
+ timeout_seconds = 30 * 60 # 30 minutes
+ max_retries = 3
+
+ for attempt in range(max_retries):
+ try:
+ with ThreadPoolExecutor(max_workers=1) as executor:
+ future = executor.submit(_generate)
+ result = future.result(timeout=timeout_seconds)
+ return result
+ except TimeoutError:
+ print(
+ f"AI call timed out after {timeout_seconds} seconds on attempt {attempt + 1}"
+ )
+ if attempt < max_retries - 1:
+ print("Retrying...")
+ time.sleep(5) # Wait 5 seconds before retrying
+ continue
+ else:
+ raise Exception(
+ f"AI call failed to complete after {max_retries} attempts with {timeout_seconds} second timeouts"
+ )
+ except Exception as e:
+ if attempt < max_retries - 1:
+ print(f"Attempt {attempt + 1} failed with error: {e}. Retrying...")
+ time.sleep(5) # Wait 5 seconds before retrying
+ continue
+ else:
+ raise Exception(
+ f"Failed to generate system message after {max_retries} attempts: {e}"
+ )
diff --git a/src/repo_management/repo_manager.py b/src/repo_management/repo_manager.py
index 9465a33..14e40c2 100644
--- a/src/repo_management/repo_manager.py
+++ b/src/repo_management/repo_manager.py
@@ -1,8 +1,11 @@
-import os, shutil
+import os
+import shutil
from urllib.parse import quote
+
from git import Repo
from git.exc import GitCommandError
+
class GitRepository:
# This is designed to be transitory it will desctruvtively create the repo at repo_path
# if you have uncommited changes you can kiss them goodbye!
@@ -11,8 +14,8 @@ class GitRepository:
def __init__(self, repo_path, username=None, password=None):
git_protocol = os.environ["GIT_PROTOCOL"]
git_remote = os.environ["GIT_REMOTE"]
- #if username is not set we don't need parse to the url
- if username==None or password == None:
+ # if username is not set we don't need parse to the url
+ if username == None or password == None:
remote = f"{git_protocol}://{git_remote}"
else:
# of course if it is we need to parse and escape it so that it
@@ -39,7 +42,7 @@ class GitRepository:
print(f"Cloning failed: {e}")
return False
- def fetch(self, remote_name='origin', ref_name='main'):
+ def fetch(self, remote_name="origin", ref_name="main"):
"""Fetch updates from a remote repository with authentication"""
try:
self.repo.remotes[remote_name].fetch(ref_name=ref_name)
@@ -48,7 +51,7 @@ class GitRepository:
print(f"Fetching failed: {e}")
return False
- def pull(self, remote_name='origin', ref_name='main'):
+ def pull(self, remote_name="origin", ref_name="main"):
"""Pull updates from a remote repository with authentication"""
print("Pulling Latest Updates (if any)")
try:
@@ -62,18 +65,6 @@ class GitRepository:
"""List all branches in the repository"""
return [branch.name for branch in self.repo.branches]
-
- def create_and_switch_branch(self, branch_name, remote_name='origin', ref_name='main'):
- """Create a new branch in the repository with authentication."""
- try:
- print(f"Creating Branch {branch_name}")
- # Use the same remote and ref as before
- self.repo.git.branch(branch_name)
- except GitCommandError:
- print("Branch already exists switching")
- # ensure remote commits are pulled into local
- self.repo.git.checkout(branch_name)
-
def add_and_commit(self, message=None):
"""Add and commit changes to the repository."""
try:
@@ -91,12 +82,27 @@ class GitRepository:
print(f"Commit failed: {e}")
return False
- def create_copy_commit_push(self, file_path, title, commit_messge):
- self.create_and_switch_branch(title)
+ def create_copy_commit_push(self, file_path, title, commit_message):
+ # Check if branch exists remotely
+ remote_branches = [
+ ref.name.split("/")[-1] for ref in self.repo.remotes.origin.refs
+ ]
- self.pull(ref_name=title)
- shutil.copy(f"{file_path}", f"{self.repo_path}src/content/")
+ if title in remote_branches:
+ # Branch exists remotely, checkout and pull
+ self.repo.git.checkout(title)
+ self.pull(ref_name=title)
+ else:
+ # New branch, create from main
+ self.repo.git.checkout("-b", title, "origin/main")
- self.add_and_commit(f"'{commit_messge}'")
+ # Ensure destination directory exists
+ dest_dir = f"{self.repo_path}src/content/"
+ os.makedirs(dest_dir, exist_ok=True)
- self.repo.git.push()
+ # Copy file
+ shutil.copy(f"{file_path}", dest_dir)
+
+ # Commit and push
+ self.add_and_commit(commit_message)
+ self.repo.git.push("--set-upstream", "origin", title)