Imagine having a team of AI agents that can take a single research prompt and handle everything—from finding sources and scraping data to analyzing information and publishing findings across platforms like Google Docs or SharePoint.
This AI Researcher Swarm automates the entire research and publishing process, powered by SwarmZero and integrations like Firecrawl and SerpApi, making research easy, fast, and hands-free. Perfect for anyone looking to simplify and scale their research workflows with AI.
This project uses gpt-4o by default, but you can easily swap it out for any of our supported models. Ensure to update the model name in the swarmzero_config.toml file.
swarmzero_config.toml: Configures the Swarm with agents and tools.
Main Scripts:
main.py: Runs the Swarm, initializing agents and coordinating tasks.
README.md: Documentation file explaining setup and usage.
Creating the App
In this section we will creating the essential tools that will be used by the Swarm to do the research and publish autonomously.
1. Structured Output - structure.py
structure.py
from pydantic import BaseModelfrom typing import ListclassSearchResult(BaseModel):"""Data model for each search result.""" title:str link:str snippet:strclassGoogleSearchResults(BaseModel):"""Structured output for Google Search Agent.""" objective:str results: List[SearchResult]classMapURLResult(BaseModel):"""Structured output for Map URL Agent.""" objective:str results: List[str]classScrapedContent(BaseModel):"""Structured output for Website Scraper Agent.""" objective:str results:str
2. Tools - tools.py
tools.py
import osimport loggingfrom dotenv import load_dotenvfrom fpdf import FPDFfrom firecrawl import FirecrawlAppfrom serpapi.google_search import GoogleSearchfrom app.tools.structures import ( GoogleSearchResults, SearchResult, MapURLResult, ScrapedContent,)load_dotenv()firecrawl_app =FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))logging.basicConfig(level=logging.INFO)defsearch_google(query:str,objective:str) -> GoogleSearchResults:""" Perform a Google search using SerpAPI and return structured results. This function executes a Google search query through SerpAPI and processes the results into a structured format using the GoogleSearchResults data class. Args: query (str): The search query to be executed objective (str): The purpose or goal of the search, used for context Returns: GoogleSearchResults: A structured object containing search results including titles, links, and snippets Raises: Exception: If no organic search results are found in the API response Example:>>> results = search_google("python programming", "Learn Python basics")>>> print(results.results[0].title) """ logging.info(f"Searching Google with query: '{query}' for objective: '{objective}'") search_params ={"engine":"google","q": query,"api_key": os.getenv("SERP_API_KEY"),} search =GoogleSearch(search_params) search_results = search.get_dict().get("organic_results", [])ifnot search_results:raiseException("No organic results found from SerpAPI.") structured_results = [SearchResult( title=result.get("title", ""), link=result.get("link", ""), snippet=result.get("snippet", ""), )for result in search_resultsif result.get("link") ]returnGoogleSearchResults(objective=objective, results=structured_results)defmap_url_pages(url:str,objective:str,search_query:str) -> MapURLResult:""" Map all pages of a website that match a search query using Firecrawl. This function crawls a website and identifies relevant pages based on a search query, returning the results in a structured format. Args: url (str): The base URL of the website to map objective (str): The purpose or goal of the mapping operation search_query (str): Query string to filter relevant pages Returns: MapURLResult: A structured object containing the list of matching URLs found Example:>>> results = map_url_pages("https://example.com", "Find pricing pages", "pricing")>>> for url in results.results:... print(url) """ logging.info(f"Mapping URLs for website: '{url}' with search query: '{search_query}' and objective: '{objective}'" ) map_status = firecrawl_app.map_url(url, params={"search": search_query})if map_status.get("status")=="success": links = map_status.get("links", []) top_links = [link for link in links if link]returnMapURLResult(objective=objective, results=top_links)else:returnMapURLResult(objective=objective, results=[])defscrape_url(url:str,objective:str) -> ScrapedContent:""" Scrape content from a specified URL using Firecrawl. This function extracts content from a webpage and returns it in a structured format. The content is converted to markdown format for better readability and processing. Args: url (str): The URL to scrape content from objective (str): The purpose or goal of the scraping operation Returns: ScrapedContent: A structured object containing the scraped content Raises: Exception: If scraping fails or returns empty content Example:>>> content = scrape_url("https://example.com/about", "Extract company information")>>> print(content.results) """ logging.info(f"Scraping URL: '{url}' with objective: '{objective}'") scrape_result = firecrawl_app.scrape_url(url, params={"formats": ["markdown"]})ifnot scrape_result:raiseException("Scraping failed or returned empty content.") content = scrape_result["markdown"]ifnot content:raiseException(f"No content retrieved from {url}")returnScrapedContent(objective=objective, results=content)defsave_as_local_pdf(title:str,results_text:str,pdf_output_path:str) ->str:""" Generate a PDF file containing the provided text content. This function creates a formatted PDF document with a header and the provided content. The PDF is saved to the specified local path, creating directories if needed. Args: title (str): The title of the content to be included in the PDF results_text (str): The text content to be included in the PDF pdf_output_path (str): The full path where the PDF should be saved Returns: str: The absolute path to the created PDF file, or None if creation fails Raises: OSError: If there are permission issues or problems creating directories Exception: For any other errors during PDF creation Example:>>> text = "Sample report content\\nWith multiple lines">>> pdf_path = save_as_local_pdf(text, "output/report.pdf")>>> print(f"PDF saved to: {pdf_path}") """try: pdf =FPDF() pdf.add_page()# header pdf.set_font("Arial", "B", 16) pdf.cell(0, 10, title, ln=True, align="C") pdf.ln(10)# line break# content pdf.set_font("Arial", size=12)for line in results_text.strip().split("\n"): pdf.multi_cell(0, 10, txt=line.strip()) pdf.ln(1)# ensure the output directory exists output_path = os.path.abspath(pdf_output_path) output_dir = os.path.dirname(output_path)ifnot os.path.exists(output_dir): os.makedirs(output_dir) pdf.output(output_path) logging.info(f"PDF file created successfully at: {output_path}")return output_pathexceptExceptionas e: logging.error(f"Error during PDF creation: {e}")returnNone
2a. Publisher Tools - publishers.py
confluence.py
import jsonimport osimport loggingimport requestsfrom dotenv import load_dotenvlogging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")logger = logging.getLogger(__name__)load_dotenv()defpublish_to_confluence(title:str,results_text:str) ->str:""" Publishes results to Confluence by creating or updating a page. Args: title (str): The title of the Confluence page. results_text (str): The content to be published on the page. Returns: str: The URL of the created or updated Confluence page. Raises: Exception: If an error occurs during the publishing process. """ CONFLUENCE_BASE_URL = os.getenv("CONFLUENCE_BASE_URL").rstrip("/")ifnot CONFLUENCE_BASE_URL.endswith("/wiki"): CONFLUENCE_BASE_URL =f"{CONFLUENCE_BASE_URL}/wiki" CONFLUENCE_API_ENDPOINT =f"{CONFLUENCE_BASE_URL}/rest/api/content/" CONFLUENCE_USERNAME = os.getenv("CONFLUENCE_USERNAME") CONFLUENCE_API_TOKEN = os.getenv("CONFLUENCE_API_TOKEN") CONFLUENCE_SPACE_KEY = os.getenv("CONFLUENCE_SPACE_KEY")try:# search for existing page search_params ={"title": title,"spaceKey": CONFLUENCE_SPACE_KEY,"expand":"version",} search_response = requests.get( CONFLUENCE_API_ENDPOINT, params=search_params, auth=(CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN), )if search_response.status_code ==200: search_data = search_response.json()# update existing pageif search_data.get("size", 0)>0: page = search_data["results"][0] page_id = page["id"] version = page["version"]["number"] update_data ={"id": page_id,"type":"page","title": title,"space":{"key": CONFLUENCE_SPACE_KEY},"body":{"storage":{"value": results_text,"representation":"storage"}},"version":{"number": version +1},} update_response = requests.put(f"{CONFLUENCE_API_ENDPOINT}{page_id}", json=update_data, auth=(CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN), )if update_response.status_code ==200:returnf"{CONFLUENCE_BASE_URL}/spaces/{CONFLUENCE_SPACE_KEY}/pages/{page_id}"# create new pageelse: create_data ={"type":"page","title": title,"space":{"key": CONFLUENCE_SPACE_KEY},"body":{"storage":{"value": results_text,"representation":"storage"}},} create_response = requests.post( CONFLUENCE_API_ENDPOINT, json=create_data, auth=(CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN), )if create_response.status_code in [200,201]: page = create_response.json()returnf"{CONFLUENCE_BASE_URL}/spaces/{CONFLUENCE_SPACE_KEY}/pages/{page['id']}"returnNoneexceptExceptionas e: logger.error("Failed to publish to Confluence", exc_info=True)returnNone
google_docs.py
import osimport loggingfrom google.auth.transport.requests import Requestfrom google.oauth2.credentials import Credentialsfrom google_auth_oauthlib.flow import InstalledAppFlowfrom googleapiclient.discovery import buildimport dotenvlogging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")logger = logging.getLogger(__name__)dotenv.load_dotenv()defpublish_to_google_docs(title:str,results_text:str) ->str:""" Publishes results to a new Google Docs document. Args: title (str): The title of the Google Docs document. results_text (str): The content to be published in the document. Returns: str: The URL of the created Google Docs document. Raises: FileNotFoundError: If the Google credentials file is missing. Exception: If an error occurs during the publishing process. """ GOOGLE_SCOPES = os.getenv("GOOGLE_SCOPES","https://www.googleapis.com/auth/documents https://www.googleapis.com/auth/drive.file", ).split() GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", "credentials.json" ) token_path ="token.json" creds =Noneif os.path.exists(token_path): logger.debug("Loading existing credentials from token file") creds = Credentials.from_authorized_user_file(token_path, GOOGLE_SCOPES)ifnot creds ornot creds.valid:if creds and creds.expired and creds.refresh_token: logger.info("Refreshing expired credentials") creds.refresh(Request())else:ifnot os.path.exists(GOOGLE_APPLICATION_CREDENTIALS): logger.error(f"Credentials file '{GOOGLE_APPLICATION_CREDENTIALS}' not found" )raiseFileNotFoundError(f"Missing '{GOOGLE_APPLICATION_CREDENTIALS}' file." ) logger.info("Initiating OAuth2 flow for new credentials") flow = InstalledAppFlow.from_client_secrets_file( GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_SCOPES ) creds = flow.run_local_server(port=0) logger.debug("Saving new credentials to token file")withopen(token_path, "w")as token: token.write(creds.to_json())try: logger.info("Initializing Google Docs service") docs_service =build("docs", "v1", credentials=creds) logger.info(f'Creating new document with title: "{title}"') doc = docs_service.documents().create(body={"title": title}).execute() document_id = doc.get("documentId") document_url =f"https://docs.google.com/document/d/{document_id}/edit" logger.info(f"Document created successfully with ID: {document_id}") requests_batch = [{"insertText":{"location":{"index":1},"text":f"{title}\n\n{results_text}",}},{"updateParagraphStyle":{"range":{"startIndex":1,"endIndex":len(title)+1},"paragraphStyle":{"namedStyleType":"HEADING_1"},"fields":"namedStyleType",}}, ] logger.debug("Updating document with content") docs_service.documents().batchUpdate( documentId=document_id, body={"requests": requests_batch} ).execute() logger.info("Content successfully published to Google Docs")return document_urlexceptExceptionas e: logger.error(f"Failed to publish document: {str(e)}", exc_info=True)returnNone
sharepoint.py
import msalimport osimport loggingimport requestsfrom urllib.parse import quote_plusfrom dotenv import load_dotenvfrom docx import Documentfrom io import BytesIOlogging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")logger = logging.getLogger(__name__)load_dotenv()defpublish_to_sharepoint(title:str,results_text:str) ->str:""" Publishes results to SharePoint by creating a new file in the specified drive. Args: title (str): The title of the file to be uploaded. results_text (str): The content to be uploaded as a file. Returns: str: The URL of the uploaded SharePoint file. Raises: Exception: If an error occurs during authentication or file upload. """ SHAREPOINT_CLIENT_ID = os.getenv("SHAREPOINT_CLIENT_ID") SHAREPOINT_CLIENT_SECRET = os.getenv("SHAREPOINT_CLIENT_SECRET") SHAREPOINT_TENANT_ID = os.getenv("SHAREPOINT_TENANT_ID") SHAREPOINT_SITE_ID = os.getenv("SHAREPOINT_SITE_ID") SHAREPOINT_DRIVE_ID = os.getenv("SHAREPOINT_DRIVE_ID")try: logger.debug("Initiating SharePoint authentication") authority =f"https://login.microsoftonline.com/{SHAREPOINT_TENANT_ID}" logger.debug(f"Using authority URL: {authority}") app = msal.ConfidentialClientApplication( SHAREPOINT_CLIENT_ID, authority=authority, client_credential=SHAREPOINT_CLIENT_SECRET, ) scopes = ["https://graph.microsoft.com/.default"] logger.debug(f"Requesting scopes: {scopes}")# get access token logger.info("Requesting access token") result = app.acquire_token_for_client(scopes=scopes)if"access_token"notin result: error_msg = result.get("error_description", "Unknown error") error_code = result.get("error", "Unknown error code") logger.error(f"""Failed to obtain SharePoint access token: Error Code: {error_code} Description: {error_msg} Full Result: {result}""" )returnNone logger.info("Successfully acquired access token") access_token = result["access_token"] headers ={"Authorization":f"Bearer {access_token}","Content-Type":"application/vnd.openxmlformats-officedocument.wordprocessingml.document",} logger.debug("Request headers prepared (token hidden)") file_name =f"{title}.docx"# convert text content to basic Word document format doc =Document() doc.add_heading(title, 0) doc.add_paragraph(results_text)# save to bytes doc_bytes =BytesIO() doc.save(doc_bytes) file_content = doc_bytes.getvalue() logger.info(f"Preparing file upload: {file_name}") logger.debug(f"File size: {len(file_content)} bytes") upload_url = (f"https://graph.microsoft.com/v1.0/sites/{SHAREPOINT_SITE_ID}"f"/drives/{SHAREPOINT_DRIVE_ID}/root:/{quote_plus(file_name)}:/content" ) logger.debug(f"Upload URL: {upload_url}") logger.info(f"Uploading file to SharePoint: {file_name}") response = requests.put(upload_url, headers=headers, data=file_content) logger.debug(f"""SharePoint Response Details: Status Code: {response.status_code} Headers: {dict(response.headers)} Response Text: {response.text}""" )if response.status_code in [200,201]:try: file_info = response.json() web_url = file_info.get("webUrl") logger.info(f"File uploaded successfully") logger.info(f"Web URL: {web_url}")return web_urlexceptExceptionas e: logger.error(f"""Failed to parse successful upload response: Error: {str(e)} Response Text: {response.text}""" )returnNoneelse: logger.error(f"""SharePoint upload failed: Status Code: {response.status_code} Response: {response.text} URL: {upload_url} File Name: {file_name}""" )returnNoneexceptExceptionas e: logger.error("Unexpected error during SharePoint upload", exc_info=True) logger.debug(f"Error details: {str(e)}")returnNone
Creating the Swarm
As the first step, set up the config file:
swarmzero_config.toml
[model]model ="gpt-4o"[environment]type="dev"[sample_prompts]prompts = ["Research the history of the internet","Publish research about using crypto-economics to incentivize AI agents to collaborate and publish to Google Docs", "Research the history of the stock market and the role of the Federal Reserve. Publish the findings to Confluence.",
"Can you help me research the feasibility of distributed model training for LLMs? When you're done, publish the findings to Sharepoint.",
]
Follow the below code to setup the AI researcher Swarm:
main.py
import asyncioimport loggingfrom swarmzero import Agent, Swarmfrom swarmzero.sdk_context import SDKContextfrom app.tools import ( search_google, map_url_pages, scrape_url, save_as_local_pdf,)from app.tools.publishers import ( publish_to_google_docs, publish_to_sharepoint, publish_to_confluence,)import dotenvdotenv.load_dotenv()logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()],)config_path ="./swarmzero_config.toml"sdk_context =SDKContext(config_path=config_path)google_search_agent =Agent( name="Google Search Agent", instruction="""You are a Google Search Agent specialized in searching the web. Perform searches based on the user's research topic and provide a list of relevant website URLs. Output should be a JSON object with the following structure: { "objective": "<research_topic>", "results": [ { "title": "<title>", "link": "<url>", "snippet": "<snippet>" }, ... ] }""", functions=[search_google], config_path=config_path, swarm_mode=True,)map_url_agent =Agent( name="Map URL Agent", instruction="""You are a Map URL Agent specialized in mapping web pages from provided website URLs. For each URL, identify and list relevant subpages that align with the user's research objective. Output should be a JSON object with the following structure: { "objective": "<research_objective>", "results": ["<subpage_url1>", "<subpage_url2>", ...] }""", functions=[map_url_pages], config_path=config_path, swarm_mode=True,)website_scraper_agent =Agent( name="Website Scraper Agent", instruction="""You are a Website Scraper Agent specialized in extracting content from mapped URLs. Scrape the necessary information required for analysis and ensure the content is clean and structured. Output should be a JSON object with the following structure: { "objective": "<research_objective>", "results": "<scraped_content>" }""", functions=[scrape_url], config_path=config_path, swarm_mode=True,)analyst_agent =Agent( name="Analyst Agent", instruction="""You are an Analyst Agent that examines scraped website content and extracts structured data. Analyze the content to identify key themes, entities, and insights relevant to the research objective. Provide your analysis as a JSON object in the following format: { "objective": "<research_objective>", "analysis": { "key_themes": [...], "entities": [...], "insights": [...] } }""", functions=[], config_path=config_path, swarm_mode=True,)publisher_agent =Agent( name="Publisher Agent", instruction="""You are a Publisher Agent that disseminates research findings to various platforms. Use as much of the content provided to you as possible. The final output should be at least 750 words. You will be told whether to publish the analyzed data to Google Docs, SharePoint, Confluence or save it as a local PDF.
If they do not specify, then always default to saving as a local PDF as `./swarmzero-data/output/<title>.pdf`.""", functions=[ save_as_local_pdf, publish_to_google_docs, publish_to_sharepoint, publish_to_confluence, ], config_path=config_path, swarm_mode=True,)research_swarm =Swarm( name="Research Swarm", description="A swarm of AI Agents that can research arbitrary topics.", instruction="""You are the leader of a research team that produces new research for user-provided topics. Upon receiving a research topic, execute the following steps in order: 1. **Search the Web:** Utilize the Google Search Agent to find relevant websites based on the user's research topic.
2. **Map Webpages:** Use the Map URL Agent to identify and list pertinent subpages from the search results. Provide it with the relevant objective.
If no subpages can be found in all of the URLs, return to step 1 and try a different query. 3. **Scrape Content:** Call the Website Scraper Agent to extract necessary information from the mapped URLs. 4. **Analyze Content:** Use the Analyst Agent to process the scraped content and generate structured JSON data. 5. **Publish Findings:** Finally, instruct the Publisher Agent to output the final analysis. Provide a concise title for the publisher along with the content from the Analyst Agent. Inform this agent about where the user would like to publish the research. If the user does not specify how to publish the research, save the research as a local PDF. If an agent is unable to properly execute its task, retry it with a different prompt and/or inputs. Ensure each agent completes its task before proceeding to the next step. Maintain clear and concise communication throughout the process. You must publish the results of any research conducted.""", agents=[ google_search_agent, map_url_agent, website_scraper_agent, analyst_agent, publisher_agent, ], functions=[], sdk_context=sdk_context, max_iterations=99,)asyncdefmain():print("\n\nWelcome to the Research Swarm!\nVisit https://SwarmZero.ai to learn more.\nType 'exit' to quit.\n" )whileTrue: prompt =input("\nWhat would you like to research? \n\n")if prompt.lower()=="exit":breaktry: logging.info(f"Research topic received: '{prompt}'") response =await research_swarm.chat(prompt)print("\nResearch Findings:\n")print(response)exceptExceptionas e: logging.error(f"An error occurred during the research process: {e}")print("\nAn error occurred while processing your research topic. Please try again.\n" )if__name__=="__main__": asyncio.run(main())
Running the Swarm
Run the main application using:
poetry run python main.py
This will initialize the AI agents, perform web searches, map URLs, scrape content, and publish the results to the configured platforms.
Example Prompts
When prompted, you can enter research queries like:
Research the history of the internet
Research crypto-economics and AI agent collaboration, publish to Google Docs
Research the history of the stock market and Federal Reserve, publish to Confluence
Research distributed model training for LLMs and publish to SharePoint
Each prompt will trigger the swarm to:
Search for relevant information
Extract and analyze content
Generate a comprehensive research document
Publish to your specified platform (it defaults to saving a PDF locally if not specified)
Example Outputs
Local PDF
More generated PDFs can be found in the sample outputs folder.