Imagine having a team of AI agents that can take a single research prompt and handle everything—from finding sources and scraping data to analyzing information and publishing findings across platforms like Google Docs or SharePoint.
This AI Researcher Swarm automates the entire research and publishing process, powered by SwarmZero and integrations like Firecrawl and SerpApi, making research easy, fast, and hands-free. Perfect for anyone looking to simplify and scale their research workflows with AI.
This project uses gpt-4o by default, but you can easily swap it out for any of our supported models. Ensure to update the model name in the swarmzero_config.toml file.
swarmzero_config.toml: Configures the Swarm with agents and tools.
Main Scripts:
main.py: Runs the Swarm, initializing agents and coordinating tasks.
README.md: Documentation file explaining setup and usage.
Creating the App
In this section we will creating the essential tools that will be used by the Swarm to do the research and publish autonomously.
1. Structured Output - structure.py
structure.py
from pydantic import BaseModel
from typing import List
class SearchResult(BaseModel):
"""Data model for each search result."""
title: str
link: str
snippet: str
class GoogleSearchResults(BaseModel):
"""Structured output for Google Search Agent."""
objective: str
results: List[SearchResult]
class MapURLResult(BaseModel):
"""Structured output for Map URL Agent."""
objective: str
results: List[str]
class ScrapedContent(BaseModel):
"""Structured output for Website Scraper Agent."""
objective: str
results: str
2. Tools - tools.py
tools.py
import os
import logging
from dotenv import load_dotenv
from fpdf import FPDF
from firecrawl import FirecrawlApp
from serpapi.google_search import GoogleSearch
from app.tools.structures import (
GoogleSearchResults,
SearchResult,
MapURLResult,
ScrapedContent,
)
load_dotenv()
firecrawl_app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
logging.basicConfig(level=logging.INFO)
def search_google(query: str, objective: str) -> GoogleSearchResults:
"""
Perform a Google search using SerpAPI and return structured results.
This function executes a Google search query through SerpAPI and processes the results
into a structured format using the GoogleSearchResults data class.
Args:
query (str): The search query to be executed
objective (str): The purpose or goal of the search, used for context
Returns:
GoogleSearchResults: A structured object containing search results including titles,
links, and snippets
Raises:
Exception: If no organic search results are found in the API response
Example:
>>> results = search_google("python programming", "Learn Python basics")
>>> print(results.results[0].title)
"""
logging.info(f"Searching Google with query: '{query}' for objective: '{objective}'")
search_params = {
"engine": "google",
"q": query,
"api_key": os.getenv("SERP_API_KEY"),
}
search = GoogleSearch(search_params)
search_results = search.get_dict().get("organic_results", [])
if not search_results:
raise Exception("No organic results found from SerpAPI.")
structured_results = [
SearchResult(
title=result.get("title", ""),
link=result.get("link", ""),
snippet=result.get("snippet", ""),
)
for result in search_results
if result.get("link")
]
return GoogleSearchResults(objective=objective, results=structured_results)
def map_url_pages(url: str, objective: str, search_query: str) -> MapURLResult:
"""
Map all pages of a website that match a search query using Firecrawl.
This function crawls a website and identifies relevant pages based on a search query,
returning the results in a structured format.
Args:
url (str): The base URL of the website to map
objective (str): The purpose or goal of the mapping operation
search_query (str): Query string to filter relevant pages
Returns:
MapURLResult: A structured object containing the list of matching URLs found
Example:
>>> results = map_url_pages("https://example.com", "Find pricing pages", "pricing")
>>> for url in results.results:
... print(url)
"""
logging.info(
f"Mapping URLs for website: '{url}' with search query: '{search_query}' and objective: '{objective}'"
)
map_status = firecrawl_app.map_url(url, params={"search": search_query})
if map_status.get("status") == "success":
links = map_status.get("links", [])
top_links = [link for link in links if link]
return MapURLResult(objective=objective, results=top_links)
else:
return MapURLResult(objective=objective, results=[])
def scrape_url(url: str, objective: str) -> ScrapedContent:
"""
Scrape content from a specified URL using Firecrawl.
This function extracts content from a webpage and returns it in a structured format.
The content is converted to markdown format for better readability and processing.
Args:
url (str): The URL to scrape content from
objective (str): The purpose or goal of the scraping operation
Returns:
ScrapedContent: A structured object containing the scraped content
Raises:
Exception: If scraping fails or returns empty content
Example:
>>> content = scrape_url("https://example.com/about", "Extract company information")
>>> print(content.results)
"""
logging.info(f"Scraping URL: '{url}' with objective: '{objective}'")
scrape_result = firecrawl_app.scrape_url(url, params={"formats": ["markdown"]})
if not scrape_result:
raise Exception("Scraping failed or returned empty content.")
content = scrape_result["markdown"]
if not content:
raise Exception(f"No content retrieved from {url}")
return ScrapedContent(objective=objective, results=content)
def save_as_local_pdf(title: str, results_text: str, pdf_output_path: str) -> str:
"""
Generate a PDF file containing the provided text content.
This function creates a formatted PDF document with a header and the provided content.
The PDF is saved to the specified local path, creating directories if needed.
Args:
title (str): The title of the content to be included in the PDF
results_text (str): The text content to be included in the PDF
pdf_output_path (str): The full path where the PDF should be saved
Returns:
str: The absolute path to the created PDF file, or None if creation fails
Raises:
OSError: If there are permission issues or problems creating directories
Exception: For any other errors during PDF creation
Example:
>>> text = "Sample report content\\nWith multiple lines"
>>> pdf_path = save_as_local_pdf(text, "output/report.pdf")
>>> print(f"PDF saved to: {pdf_path}")
"""
try:
pdf = FPDF()
pdf.add_page()
# header
pdf.set_font("Arial", "B", 16)
pdf.cell(0, 10, title, ln=True, align="C")
pdf.ln(10) # line break
# content
pdf.set_font("Arial", size=12)
for line in results_text.strip().split("\n"):
pdf.multi_cell(0, 10, txt=line.strip())
pdf.ln(1)
# ensure the output directory exists
output_path = os.path.abspath(pdf_output_path)
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
pdf.output(output_path)
logging.info(f"PDF file created successfully at: {output_path}")
return output_path
except Exception as e:
logging.error(f"Error during PDF creation: {e}")
return None
2a. Publisher Tools - publishers.py
confluence.py
import json
import os
import logging
import requests
from dotenv import load_dotenv
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
load_dotenv()
def publish_to_confluence(title: str, results_text: str) -> str:
"""
Publishes results to Confluence by creating or updating a page.
Args:
title (str): The title of the Confluence page.
results_text (str): The content to be published on the page.
Returns:
str: The URL of the created or updated Confluence page.
Raises:
Exception: If an error occurs during the publishing process.
"""
CONFLUENCE_BASE_URL = os.getenv("CONFLUENCE_BASE_URL").rstrip("/")
if not CONFLUENCE_BASE_URL.endswith("/wiki"):
CONFLUENCE_BASE_URL = f"{CONFLUENCE_BASE_URL}/wiki"
CONFLUENCE_API_ENDPOINT = f"{CONFLUENCE_BASE_URL}/rest/api/content/"
CONFLUENCE_USERNAME = os.getenv("CONFLUENCE_USERNAME")
CONFLUENCE_API_TOKEN = os.getenv("CONFLUENCE_API_TOKEN")
CONFLUENCE_SPACE_KEY = os.getenv("CONFLUENCE_SPACE_KEY")
try:
# search for existing page
search_params = {
"title": title,
"spaceKey": CONFLUENCE_SPACE_KEY,
"expand": "version",
}
search_response = requests.get(
CONFLUENCE_API_ENDPOINT,
params=search_params,
auth=(CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN),
)
if search_response.status_code == 200:
search_data = search_response.json()
# update existing page
if search_data.get("size", 0) > 0:
page = search_data["results"][0]
page_id = page["id"]
version = page["version"]["number"]
update_data = {
"id": page_id,
"type": "page",
"title": title,
"space": {"key": CONFLUENCE_SPACE_KEY},
"body": {
"storage": {"value": results_text, "representation": "storage"}
},
"version": {"number": version + 1},
}
update_response = requests.put(
f"{CONFLUENCE_API_ENDPOINT}{page_id}",
json=update_data,
auth=(CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN),
)
if update_response.status_code == 200:
return f"{CONFLUENCE_BASE_URL}/spaces/{CONFLUENCE_SPACE_KEY}/pages/{page_id}"
# create new page
else:
create_data = {
"type": "page",
"title": title,
"space": {"key": CONFLUENCE_SPACE_KEY},
"body": {
"storage": {"value": results_text, "representation": "storage"}
},
}
create_response = requests.post(
CONFLUENCE_API_ENDPOINT,
json=create_data,
auth=(CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN),
)
if create_response.status_code in [200, 201]:
page = create_response.json()
return f"{CONFLUENCE_BASE_URL}/spaces/{CONFLUENCE_SPACE_KEY}/pages/{page['id']}"
return None
except Exception as e:
logger.error("Failed to publish to Confluence", exc_info=True)
return None
google_docs.py
import os
import logging
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
import dotenv
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
dotenv.load_dotenv()
def publish_to_google_docs(title: str, results_text: str) -> str:
"""
Publishes results to a new Google Docs document.
Args:
title (str): The title of the Google Docs document.
results_text (str): The content to be published in the document.
Returns:
str: The URL of the created Google Docs document.
Raises:
FileNotFoundError: If the Google credentials file is missing.
Exception: If an error occurs during the publishing process.
"""
GOOGLE_SCOPES = os.getenv(
"GOOGLE_SCOPES",
"https://www.googleapis.com/auth/documents https://www.googleapis.com/auth/drive.file",
).split()
GOOGLE_APPLICATION_CREDENTIALS = os.getenv(
"GOOGLE_APPLICATION_CREDENTIALS", "credentials.json"
)
token_path = "token.json"
creds = None
if os.path.exists(token_path):
logger.debug("Loading existing credentials from token file")
creds = Credentials.from_authorized_user_file(token_path, GOOGLE_SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
logger.info("Refreshing expired credentials")
creds.refresh(Request())
else:
if not os.path.exists(GOOGLE_APPLICATION_CREDENTIALS):
logger.error(
f"Credentials file '{GOOGLE_APPLICATION_CREDENTIALS}' not found"
)
raise FileNotFoundError(
f"Missing '{GOOGLE_APPLICATION_CREDENTIALS}' file."
)
logger.info("Initiating OAuth2 flow for new credentials")
flow = InstalledAppFlow.from_client_secrets_file(
GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_SCOPES
)
creds = flow.run_local_server(port=0)
logger.debug("Saving new credentials to token file")
with open(token_path, "w") as token:
token.write(creds.to_json())
try:
logger.info("Initializing Google Docs service")
docs_service = build("docs", "v1", credentials=creds)
logger.info(f'Creating new document with title: "{title}"')
doc = docs_service.documents().create(body={"title": title}).execute()
document_id = doc.get("documentId")
document_url = f"https://docs.google.com/document/d/{document_id}/edit"
logger.info(f"Document created successfully with ID: {document_id}")
requests_batch = [
{
"insertText": {
"location": {"index": 1},
"text": f"{title}\n\n{results_text}",
}
},
{
"updateParagraphStyle": {
"range": {"startIndex": 1, "endIndex": len(title) + 1},
"paragraphStyle": {"namedStyleType": "HEADING_1"},
"fields": "namedStyleType",
}
},
]
logger.debug("Updating document with content")
docs_service.documents().batchUpdate(
documentId=document_id, body={"requests": requests_batch}
).execute()
logger.info("Content successfully published to Google Docs")
return document_url
except Exception as e:
logger.error(f"Failed to publish document: {str(e)}", exc_info=True)
return None
sharepoint.py
import msal
import os
import logging
import requests
from urllib.parse import quote_plus
from dotenv import load_dotenv
from docx import Document
from io import BytesIO
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
load_dotenv()
def publish_to_sharepoint(title: str, results_text: str) -> str:
"""
Publishes results to SharePoint by creating a new file in the specified drive.
Args:
title (str): The title of the file to be uploaded.
results_text (str): The content to be uploaded as a file.
Returns:
str: The URL of the uploaded SharePoint file.
Raises:
Exception: If an error occurs during authentication or file upload.
"""
SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID")
SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET")
SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID")
SHAREPOINT_SITE_ID = os.getenv("SHAREPOINT_SITE_ID")
SHAREPOINT_DRIVE_ID = os.getenv("SHAREPOINT_DRIVE_ID")
try:
logger.debug("Initiating SharePoint authentication")
authority = f"https://login.microsoftonline.com/{SHAREPOINT_TENANT_ID}"
logger.debug(f"Using authority URL: {authority}")
app = msal.ConfidentialClientApplication(
SHAREPOINT_CLIENT_ID,
authority=authority,
client_credential=SHAREPOINT_CLIENT_SECRET,
)
scopes = ["https://graph.microsoft.com/.default"]
logger.debug(f"Requesting scopes: {scopes}")
# get access token
logger.info("Requesting access token")
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" not in result:
error_msg = result.get("error_description", "Unknown error")
error_code = result.get("error", "Unknown error code")
logger.error(
f"""Failed to obtain SharePoint access token:
Error Code: {error_code}
Description: {error_msg}
Full Result: {result}"""
)
return None
logger.info("Successfully acquired access token")
access_token = result["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
}
logger.debug("Request headers prepared (token hidden)")
file_name = f"{title}.docx"
# convert text content to basic Word document format
doc = Document()
doc.add_heading(title, 0)
doc.add_paragraph(results_text)
# save to bytes
doc_bytes = BytesIO()
doc.save(doc_bytes)
file_content = doc_bytes.getvalue()
logger.info(f"Preparing file upload: {file_name}")
logger.debug(f"File size: {len(file_content)} bytes")
upload_url = (
f"https://graph.microsoft.com/v1.0/sites/{SHAREPOINT_SITE_ID}"
f"/drives/{SHAREPOINT_DRIVE_ID}/root:/{quote_plus(file_name)}:/content"
)
logger.debug(f"Upload URL: {upload_url}")
logger.info(f"Uploading file to SharePoint: {file_name}")
response = requests.put(upload_url, headers=headers, data=file_content)
logger.debug(
f"""SharePoint Response Details:
Status Code: {response.status_code}
Headers: {dict(response.headers)}
Response Text: {response.text}"""
)
if response.status_code in [200, 201]:
try:
file_info = response.json()
web_url = file_info.get("webUrl")
logger.info(f"File uploaded successfully")
logger.info(f"Web URL: {web_url}")
return web_url
except Exception as e:
logger.error(
f"""Failed to parse successful upload response:
Error: {str(e)}
Response Text: {response.text}"""
)
return None
else:
logger.error(
f"""SharePoint upload failed:
Status Code: {response.status_code}
Response: {response.text}
URL: {upload_url}
File Name: {file_name}"""
)
return None
except Exception as e:
logger.error("Unexpected error during SharePoint upload", exc_info=True)
logger.debug(f"Error details: {str(e)}")
return None
Creating the Swarm
As the first step, set up the config file:
swarmzero_config.toml
[model]
model = "gpt-4o"
[environment]
type = "dev"
[sample_prompts]
prompts = [
"Research the history of the internet",
"Publish research about using crypto-economics to incentivize AI agents to collaborate and publish to Google Docs",
"Research the history of the stock market and the role of the Federal Reserve. Publish the findings to Confluence.",
"Can you help me research the feasibility of distributed model training for LLMs? When you're done, publish the findings to Sharepoint.",
]
Follow the below code to setup the AI researcher Swarm:
main.py
import asyncio
import logging
from swarmzero import Agent, Swarm
from swarmzero.sdk_context import SDKContext
from app.tools import (
search_google,
map_url_pages,
scrape_url,
save_as_local_pdf,
)
from app.tools.publishers import (
publish_to_google_docs,
publish_to_sharepoint,
publish_to_confluence,
)
import dotenv
dotenv.load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
config_path = "./swarmzero_config.toml"
sdk_context = SDKContext(config_path=config_path)
google_search_agent = Agent(
name="Google Search Agent",
instruction="""You are a Google Search Agent specialized in searching the web.
Perform searches based on the user's research topic and provide a list of relevant website URLs.
Output should be a JSON object with the following structure:
{
"objective": "<research_topic>",
"results": [
{
"title": "<title>",
"link": "<url>",
"snippet": "<snippet>"
},
...
]
}""",
functions=[search_google],
config_path=config_path,
swarm_mode=True,
)
map_url_agent = Agent(
name="Map URL Agent",
instruction="""You are a Map URL Agent specialized in mapping web pages from provided website URLs.
For each URL, identify and list relevant subpages that align with the user's research objective.
Output should be a JSON object with the following structure:
{
"objective": "<research_objective>",
"results": ["<subpage_url1>", "<subpage_url2>", ...]
}""",
functions=[map_url_pages],
config_path=config_path,
swarm_mode=True,
)
website_scraper_agent = Agent(
name="Website Scraper Agent",
instruction="""You are a Website Scraper Agent specialized in extracting content from mapped URLs.
Scrape the necessary information required for analysis and ensure the content is clean and structured.
Output should be a JSON object with the following structure:
{
"objective": "<research_objective>",
"results": "<scraped_content>"
}""",
functions=[scrape_url],
config_path=config_path,
swarm_mode=True,
)
analyst_agent = Agent(
name="Analyst Agent",
instruction="""You are an Analyst Agent that examines scraped website content and extracts structured data.
Analyze the content to identify key themes, entities, and insights relevant to the research objective.
Provide your analysis as a JSON object in the following format:
{
"objective": "<research_objective>",
"analysis": {
"key_themes": [...],
"entities": [...],
"insights": [...]
}
}""",
functions=[],
config_path=config_path,
swarm_mode=True,
)
publisher_agent = Agent(
name="Publisher Agent",
instruction="""You are a Publisher Agent that disseminates research findings to various platforms.
Use as much of the content provided to you as possible. The final output should be at least 750 words.
You will be told whether to publish the analyzed data to Google Docs, SharePoint, Confluence or save it as a local PDF.
If they do not specify, then always default to saving as a local PDF as `./swarmzero-data/output/<title>.pdf`.""",
functions=[
save_as_local_pdf,
publish_to_google_docs,
publish_to_sharepoint,
publish_to_confluence,
],
config_path=config_path,
swarm_mode=True,
)
research_swarm = Swarm(
name="Research Swarm",
description="A swarm of AI Agents that can research arbitrary topics.",
instruction="""You are the leader of a research team that produces new research for user-provided topics.
Upon receiving a research topic, execute the following steps in order:
1. **Search the Web:** Utilize the Google Search Agent to find relevant websites based on the user's research topic.
2. **Map Webpages:** Use the Map URL Agent to identify and list pertinent subpages from the search results. Provide it with the relevant objective.
If no subpages can be found in all of the URLs, return to step 1 and try a different query.
3. **Scrape Content:** Call the Website Scraper Agent to extract necessary information from the mapped URLs.
4. **Analyze Content:** Use the Analyst Agent to process the scraped content and generate structured JSON data.
5. **Publish Findings:** Finally, instruct the Publisher Agent to output the final analysis.
Provide a concise title for the publisher along with the content from the Analyst Agent.
Inform this agent about where the user would like to publish the research.
If the user does not specify how to publish the research, save the research as a local PDF.
If an agent is unable to properly execute its task, retry it with a different prompt and/or inputs.
Ensure each agent completes its task before proceeding to the next step.
Maintain clear and concise communication throughout the process.
You must publish the results of any research conducted.""",
agents=[
google_search_agent,
map_url_agent,
website_scraper_agent,
analyst_agent,
publisher_agent,
],
functions=[],
sdk_context=sdk_context,
max_iterations=99,
)
async def main():
print(
"\n\nWelcome to the Research Swarm!\nVisit https://SwarmZero.ai to learn more.\nType 'exit' to quit.\n"
)
while True:
prompt = input("\nWhat would you like to research? \n\n")
if prompt.lower() == "exit":
break
try:
logging.info(f"Research topic received: '{prompt}'")
response = await research_swarm.chat(prompt)
print("\nResearch Findings:\n")
print(response)
except Exception as e:
logging.error(f"An error occurred during the research process: {e}")
print(
"\nAn error occurred while processing your research topic. Please try again.\n"
)
if __name__ == "__main__":
asyncio.run(main())
Running the Swarm
Run the main application using:
poetry run python main.py
This will initialize the AI agents, perform web searches, map URLs, scrape content, and publish the results to the configured platforms.
Example Prompts
When prompted, you can enter research queries like:
Research the history of the internet
Research crypto-economics and AI agent collaboration, publish to Google Docs
Research the history of the stock market and Federal Reserve, publish to Confluence
Research distributed model training for LLMs and publish to SharePoint
Each prompt will trigger the swarm to:
Search for relevant information
Extract and analyze content
Generate a comprehensive research document
Publish to your specified platform (it defaults to saving a PDF locally if not specified)
Example Outputs
Local PDF
More generated PDFs can be found in the sample outputs folder.