Compare commits

..

No commits in common. "master" and "pipeline_creation" have entirely different histories.

3 changed files with 95 additions and 313 deletions

View File

@ -1,7 +1,7 @@
name: Create Blog Article if new notes exist
on:
schedule:
- cron: "15 18 * * *"
- cron: "15 3 * * *"
push:
branches:
- master

View File

@ -1,17 +1,11 @@
import json
import os
import random
import re
import string
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import os, re, json, random, time, string
from ollama import Client
import chromadb
from langchain_ollama import ChatOllama
from ollama import Client
class OllamaGenerator:
def __init__(self, title: str, content: str, inner_title: str):
self.title = title
self.inner_title = inner_title
@ -20,35 +14,31 @@ class OllamaGenerator:
print("In Class")
print(os.environ["CONTENT_CREATOR_MODELS"])
try:
chroma_port = int(os.environ["CHROMA_PORT"])
chroma_port = int(os.environ['CHROMA_PORT'])
except ValueError as e:
raise Exception(f"CHROMA_PORT is not an integer: {e}")
self.chroma = chromadb.HttpClient(
host=os.environ["CHROMA_HOST"], port=chroma_port
)
ollama_url = f"{os.environ['OLLAMA_PROTOCOL']}://{os.environ['OLLAMA_HOST']}:{os.environ['OLLAMA_PORT']}"
self.chroma = chromadb.HttpClient(host=os.environ['CHROMA_HOST'], port=chroma_port)
ollama_url = f"{os.environ["OLLAMA_PROTOCOL"]}://{os.environ["OLLAMA_HOST"]}:{os.environ["OLLAMA_PORT"]}"
self.ollama_client = Client(host=ollama_url)
self.ollama_model = os.environ["EDITOR_MODEL"]
self.embed_model = os.environ["EMBEDDING_MODEL"]
self.agent_models = json.loads(os.environ["CONTENT_CREATOR_MODELS"])
self.llm = ChatOllama(
model=self.ollama_model, temperature=0.6, top_p=0.5
) # This is the level head in the room
self.llm = ChatOllama(model=self.ollama_model, temperature=0.6, top_p=0.5) #This is the level head in the room
self.prompt_inject = f"""
You are a journalist, Software Developer and DevOps expert
writing a 5000 word draft blog article for other tech enthusiasts.
writing a 3000 word draft blog article for other tech enthusiasts.
You like to use almost no code examples and prefer to talk
in a light comedic tone. You are also Australian
As this person write this blog as a markdown document.
The title for the blog is {self.inner_title}.
Do not output the title in the markdown.
The basis for the content of the blog is:
<blog>{self.content}</blog>
{self.content}
"""
def split_into_chunks(self, text, chunk_size=100):
"""Split text into chunks of size chunk_size"""
words = re.findall(r"\S+", text)
'''Split text into chunks of size chunk_size'''
words = re.findall(r'\S+', text)
chunks = []
current_chunk = []
@ -59,19 +49,18 @@ class OllamaGenerator:
word_count += 1
if word_count >= chunk_size:
chunks.append(" ".join(current_chunk))
chunks.append(' '.join(current_chunk))
current_chunk = []
word_count = 0
if current_chunk:
chunks.append(" ".join(current_chunk))
chunks.append(' '.join(current_chunk))
return chunks
def generate_draft(self, model) -> str:
"""Generate a draft blog post using the specified model"""
def _generate():
'''Generate a draft blog post using the specified model'''
try:
# the idea behind this is to make the "creativity" random amongst the content creators
# contorlling temperature will allow cause the output to allow more "random" connections in sentences
# Controlling top_p will tighten or loosen the embedding connections made
@ -80,253 +69,89 @@ class OllamaGenerator:
temp = random.uniform(0.5, 1.0)
top_p = random.uniform(0.4, 0.8)
top_k = int(random.uniform(30, 80))
agent_llm = ChatOllama(
model=model, temperature=temp, top_p=top_p, top_k=top_k
)
agent_llm = ChatOllama(model=model, temperature=temp, top_p=top_p, top_k=top_k)
messages = [
(
"system",
"You are a creative writer specialising in writing about technology",
),
("human", self.prompt_inject),
("system", self.prompt_inject),
("human", "make the blog post in a format to be edited easily" )
]
response = agent_llm.invoke(messages)
return (
response.text if hasattr(response, "text") else str(response)
) # ['message']['content']
# self.response = self.ollama_client.chat(model=model,
# messages=[
# {
# 'role': 'user',
# 'content': f'{self.prompt_inject}',
# },
# ])
#print ("draft")
#print (response)
return response.text()#['message']['content']
# Retry mechanism with 30-minute timeout
timeout_seconds = 30 * 60 # 30 minutes
max_retries = 3
for attempt in range(max_retries):
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_generate)
result = future.result(timeout=timeout_seconds)
return result
except TimeoutError:
print(
f"AI call timed out after {timeout_seconds} seconds on attempt {attempt + 1}"
)
if attempt < max_retries - 1:
print("Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
else:
raise Exception(
f"AI call failed to complete after {max_retries} attempts with {timeout_seconds} second timeouts"
)
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed with error: {e}. Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
else:
raise Exception(
f"Failed to generate blog draft after {max_retries} attempts: {e}"
)
raise Exception(f"Failed to generate blog draft: {e}")
def get_draft_embeddings(self, draft_chunks):
"""Get embeddings for the draft chunks"""
try:
# Handle empty draft chunks
if not draft_chunks:
print("Warning: No draft chunks to embed")
return []
embeds = self.ollama_client.embed(
model=self.embed_model, input=draft_chunks
)
embeddings = embeds.get("embeddings", [])
# Check if embeddings were generated successfully
if not embeddings:
print("Warning: No embeddings generated")
return []
return embeddings
except Exception as e:
print(f"Error generating embeddings: {e}")
return []
'''Get embeddings for the draft chunks'''
embeds = self.ollama_client.embed(model=self.embed_model, input=draft_chunks)
return embeds.get('embeddings', [])
def id_generator(self, size=6, chars=string.ascii_uppercase + string.digits):
return "".join(random.choice(chars) for _ in range(size))
return ''.join(random.choice(chars) for _ in range(size))
def load_to_vector_db(self):
"""Load the generated blog drafts into a vector database"""
collection_name = (
f"blog_{self.title.lower().replace(' ', '_')}_{self.id_generator()}"
)
collection = self.chroma.get_or_create_collection(
name=collection_name
) # , metadata={"hnsw:space": "cosine"})
# if any(collection.name == collectionname for collectionname in self.chroma.list_collections()):
'''Load the generated blog drafts into a vector database'''
collection_name = f"blog_{self.title.lower().replace(" ", "_")}_{self.id_generator()}"
collection = self.chroma.get_or_create_collection(name=collection_name)#, metadata={"hnsw:space": "cosine"})
#if any(collection.name == collectionname for collectionname in self.chroma.list_collections()):
# self.chroma.delete_collection("blog_creator")
for model in self.agent_models:
print(f"Generating draft from {model} for load into vector database")
try:
draft_content = self.generate_draft(model)
draft_chunks = self.split_into_chunks(draft_content)
# Skip if no content was generated
if not draft_chunks or all(
chunk.strip() == "" for chunk in draft_chunks
):
print(f"Skipping {model} - no content generated")
continue
print(f"generating embeds for {model}")
print (f"Generating draft from {model} for load into vector database")
draft_chunks = self.split_into_chunks(self.generate_draft(model))
print(f"generating embeds")
embeds = self.get_draft_embeddings(draft_chunks)
# Skip if no embeddings were generated
if not embeds:
print(f"Skipping {model} - no embeddings generated")
continue
# Ensure we have the same number of embeddings as chunks
if len(embeds) != len(draft_chunks):
print(
f"Warning: Mismatch between chunks ({len(draft_chunks)}) and embeddings ({len(embeds)}) for {model}"
)
# Truncate or pad to match
min_length = min(len(embeds), len(draft_chunks))
draft_chunks = draft_chunks[:min_length]
embeds = embeds[:min_length]
if min_length == 0:
print(f"Skipping {model} - no valid content/embeddings pairs")
continue
ids = [model + str(i) for i in range(len(draft_chunks))]
chunknumber = list(range(len(draft_chunks)))
metadata = [{"model_agent": model} for index in chunknumber]
print(f"loading into collection for {model}")
collection.add(
documents=draft_chunks,
embeddings=embeds,
ids=ids,
metadatas=metadata,
)
except Exception as e:
print(f"Error processing model {model}: {e}")
# Continue with other models rather than failing completely
continue
print(f'loading into collection')
collection.add(documents=draft_chunks, embeddings=embeds, ids=ids, metadatas=metadata)
return collection
def generate_markdown(self) -> str:
prompt_human = f"""
prompt_system = f"""
You are an editor taking information from {len(self.agent_models)} Software
Developers and Data experts
writing a 5000 word blog article. You like when they use almost no code examples.
writing a 3000 word blog article. You like when they use almost no code examples.
You are also Australian. The content may have light comedic elements,
you are more professional and will attempt to tone these down
As this person produce the final version of this blog as a markdown document
keeping in mind the context provided by the previous drafts.
You are to produce the content not placeholders for further editors
As this person produce and an amalgamtion of this blog as a markdown document.
The title for the blog is {self.inner_title}.
Do not output the title in the markdown. Avoid repeated sentences
The basis for the content of the blog is:
<blog>{self.content}</blog>
{self.content}
"""
def _generate_final_document():
try:
embed_result = self.ollama_client.embed(
model=self.embed_model, input=prompt_human
)
query_embed = embed_result.get("embeddings", [])
if not query_embed:
print(
"Warning: Failed to generate query embeddings, using empty list"
)
query_embed = [[]] # Use a single empty embedding as fallback
except Exception as e:
print(f"Error generating query embeddings: {e}")
# Generate empty embeddings as fallback
query_embed = [[]] # Use a single empty embedding as fallback
query_embed = self.ollama_client.embed(model=self.embed_model, input=prompt_system)['embeddings']
collection = self.load_to_vector_db()
# Try to query the collection, with fallback for empty collections
try:
collection_query = collection.query(
query_embeddings=query_embed, n_results=100
)
collection_query = collection.query(query_embeddings=query_embed, n_results=100)
print("Showing pertinent info from drafts used in final edited edition")
# Get documents with error handling
query_result = collection.query(
query_embeddings=query_embed, n_results=100
)
documents = query_result.get("documents", [])
if documents and len(documents) > 0 and len(documents[0]) > 0:
pertinent_draft_info = "\n\n".join(documents[0])
else:
print("Warning: No relevant documents found in collection")
pertinent_draft_info = "No relevant information found in drafts."
except Exception as query_error:
print(f"Error querying collection: {query_error}")
pertinent_draft_info = (
"No relevant information found in drafts due to query error."
)
# print(pertinent_draft_info)
prompt_system = f"""Generate the final, 5000 word, draft of the blog using this information from the drafts: <context>{pertinent_draft_info}</context>
- Only output in markdown, do not wrap in markdown tags, Only provide the draft not a commentary on the drafts in the context
"""
pertinent_draft_info = '\n\n'.join(collection.query(query_embeddings=query_embed, n_results=100)['documents'][0])
#print(pertinent_draft_info)
prompt_human = f"Generate the final document using this information from the drafts: {pertinent_draft_info} - Only output in markdown, do not wrap in markdown tags"
print("Generating final document")
messages = [
("system", prompt_system),
("human", prompt_human),
]
response = self.llm.invoke(messages)
return response.text if hasattr(response, "text") else str(response)
try:
# Retry mechanism with 30-minute timeout
timeout_seconds = 30 * 60 # 30 minutes
max_retries = 3
for attempt in range(max_retries):
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_generate_final_document)
self.response = future.result(timeout=timeout_seconds)
break # Success, exit the retry loop
except TimeoutError:
print(
f"AI call timed out after {timeout_seconds} seconds on attempt {attempt + 1}"
)
if attempt < max_retries - 1:
print("Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
else:
raise Exception(
f"AI call failed to complete after {max_retries} attempts with {timeout_seconds} second timeouts"
)
except Exception as e:
if attempt < max_retries - 1:
print(
f"Attempt {attempt + 1} failed with error: {e}. Retrying..."
)
time.sleep(5) # Wait 5 seconds before retrying
continue
else:
raise Exception(
f"Failed to generate markdown after {max_retries} attempts: {e}"
)
messages = [("system", prompt_system), ("human", prompt_human),]
self.response = self.llm.invoke(messages).text()
# self.response = self.ollama_client.chat(model=self.ollama_model,
# messages=[
# {
# 'role': 'user',
# 'content': f'{prompt_enhanced}',
# },
# ])
# print ("Markdown Generated")
# print (self.response)
return self.response # ['message']['content']
#print ("Markdown Generated")
#print (self.response)
return self.response#['message']['content']
except Exception as e:
raise Exception(f"Failed to generate markdown: {e}")
@ -336,43 +161,6 @@ class OllamaGenerator:
f.write(self.generate_markdown())
def generate_system_message(self, prompt_system, prompt_human):
def _generate():
messages = [
("system", prompt_system),
("human", prompt_human),
]
response = self.llm.invoke(messages)
ai_message = response.text if hasattr(response, "text") else str(response)
messages = [("system", prompt_system), ("human", prompt_human),]
ai_message = self.llm.invoke(messages).text()
return ai_message
# Retry mechanism with 30-minute timeout
timeout_seconds = 30 * 60 # 30 minutes
max_retries = 3
for attempt in range(max_retries):
try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_generate)
result = future.result(timeout=timeout_seconds)
return result
except TimeoutError:
print(
f"AI call timed out after {timeout_seconds} seconds on attempt {attempt + 1}"
)
if attempt < max_retries - 1:
print("Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
else:
raise Exception(
f"AI call failed to complete after {max_retries} attempts with {timeout_seconds} second timeouts"
)
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed with error: {e}. Retrying...")
time.sleep(5) # Wait 5 seconds before retrying
continue
else:
raise Exception(
f"Failed to generate system message after {max_retries} attempts: {e}"
)

View File

@ -1,11 +1,8 @@
import os
import shutil
import os, shutil
from urllib.parse import quote
from git import Repo
from git.exc import GitCommandError
class GitRepository:
# This is designed to be transitory it will desctruvtively create the repo at repo_path
# if you have uncommited changes you can kiss them goodbye!
@ -14,8 +11,8 @@ class GitRepository:
def __init__(self, repo_path, username=None, password=None):
git_protocol = os.environ["GIT_PROTOCOL"]
git_remote = os.environ["GIT_REMOTE"]
# if username is not set we don't need parse to the url
if username == None or password == None:
#if username is not set we don't need parse to the url
if username==None or password == None:
remote = f"{git_protocol}://{git_remote}"
else:
# of course if it is we need to parse and escape it so that it
@ -42,7 +39,7 @@ class GitRepository:
print(f"Cloning failed: {e}")
return False
def fetch(self, remote_name="origin", ref_name="main"):
def fetch(self, remote_name='origin', ref_name='main'):
"""Fetch updates from a remote repository with authentication"""
try:
self.repo.remotes[remote_name].fetch(ref_name=ref_name)
@ -51,7 +48,7 @@ class GitRepository:
print(f"Fetching failed: {e}")
return False
def pull(self, remote_name="origin", ref_name="main"):
def pull(self, remote_name='origin', ref_name='main'):
"""Pull updates from a remote repository with authentication"""
print("Pulling Latest Updates (if any)")
try:
@ -65,6 +62,18 @@ class GitRepository:
"""List all branches in the repository"""
return [branch.name for branch in self.repo.branches]
def create_and_switch_branch(self, branch_name, remote_name='origin', ref_name='main'):
"""Create a new branch in the repository with authentication."""
try:
print(f"Creating Branch {branch_name}")
# Use the same remote and ref as before
self.repo.git.branch(branch_name)
except GitCommandError:
print("Branch already exists switching")
# ensure remote commits are pulled into local
self.repo.git.checkout(branch_name)
def add_and_commit(self, message=None):
"""Add and commit changes to the repository."""
try:
@ -82,27 +91,12 @@ class GitRepository:
print(f"Commit failed: {e}")
return False
def create_copy_commit_push(self, file_path, title, commit_message):
# Check if branch exists remotely
remote_branches = [
ref.name.split("/")[-1] for ref in self.repo.remotes.origin.refs
]
def create_copy_commit_push(self, file_path, title, commit_messge):
self.create_and_switch_branch(title)
if title in remote_branches:
# Branch exists remotely, checkout and pull
self.repo.git.checkout(title)
self.pull(ref_name=title)
else:
# New branch, create from main
self.repo.git.checkout("-b", title, "origin/main")
shutil.copy(f"{file_path}", f"{self.repo_path}src/content/")
# Ensure destination directory exists
dest_dir = f"{self.repo_path}src/content/"
os.makedirs(dest_dir, exist_ok=True)
self.add_and_commit(f"'{commit_messge}'")
# Copy file
shutil.copy(f"{file_path}", dest_dir)
# Commit and push
self.add_and_commit(commit_message)
self.repo.git.push("--set-upstream", "origin", title)
self.repo.git.push()