This tutorial will guide you through building an AI-powered cryptocurrency trading bot that monitors multiple blockchain networks and provides trading insights through a Telegram interface. The bot leverages BitQuery for market data and SwarmZero's AI capabilities for analysis.
Prerequisites
Before starting, ensure you have:
Python 3.11 or higher installed
Poetry for dependency management
A Telegram Bot Token (from BotFather)
Step-by-Step Tutorial
1. Clone the Repository
First, clone the repository to your local machine:
Copy git clone https://github.com/swarmzero/examples.git
cd examples/agents/crypto_trader_bot
2. Setting Up the Environment
Create and activate a Poetry environment:
Copy # Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -
# Install dependencies
poetry install
3. Configure Environment Variables
Create a .env
file in the project root:
Add the following environment variables to your .env
file:
Copy OPENAI_API_KEY=your_openai_api_key
BOTFATHER_API_TOKEN=your_telegram_bot_token
BITQUERY_OAUTH_TOKEN=your_bitquery_oauth_token
LOG_LEVEL=INFO # Options: DEBUG, INFO, WARNING, ERROR
4. Building the Components
Let's build each component of the bot step by step.
4.1 Configuration Management (config.py)
First, let's create the configuration management system:
Copy import logging
from typing import Dict , NamedTuple
logger = logging . getLogger ( __name__ )
class ChainConfig ( NamedTuple ):
name : str
url_path : str
native_token : str
explorer : str
# Define supported blockchain networks
SUPPORTED_CHAINS : Dict [ str , ChainConfig ] = {
"solana" : ChainConfig ( "Solana" , "solana" , "SOL" , "https://solscan.io" ),
"eth" : ChainConfig ( "Ethereum" , "eth" , "ETH" , "https://etherscan.io" ),
"bsc" : ChainConfig ( "BSC" , "bsc" , "BNB" , "https://bscscan.com" ),
# Add more chains as needed
}
# Chain name variations for user-friendly input
CHAIN_ALIASES = {
"ethereum" : "eth" ,
"ether" : "eth" ,
"binance" : "bsc" ,
"binance smart chain" : "bsc" ,
}
def normalize_chain_name ( chain : str ) -> str :
"""Convert various chain name formats to our standard chain ID"""
chain = chain . lower (). strip ()
return CHAIN_ALIASES . get (chain, chain)
def validate_chain ( chain : str ) -> bool :
"""Validate if a chain is supported"""
normalized_chain = normalize_chain_name (chain)
is_valid = normalized_chain in SUPPORTED_CHAINS
if not is_valid :
logger . warning ( f "Attempted to use unsupported chain: { chain } " )
return is_valid
def get_chain_id ( chain : str ) -> str :
"""Get the standardized chain ID from any valid chain name variation"""
normalized_chain = normalize_chain_name (chain)
if normalized_chain in SUPPORTED_CHAINS :
return normalized_chain
raise ValueError ( f "Unsupported chain: { chain } " )
4.2 BitQuery Service Integration (bitquery_service.py)
Next, create the BitQuery service to fetch market data:
Copy import logging
from datetime import datetime , timedelta , timezone
from typing import Dict
import aiohttp
from config import get_chain_id
logger = logging . getLogger ( __name__ )
class BitQueryService :
def __init__ ( self , oauth_token : str ):
if not oauth_token :
raise ValueError ( "BitQuery OAuth token is required" )
logger . debug ( "Initializing BitQueryService" )
self . oauth_token = oauth_token
self . url = "https://streaming.bitquery.io/graphql"
self . headers = {
"Content-Type" : "application/json" ,
"Authorization" : f "Bearer { oauth_token } " ,
}
def _get_base_trade_fields ( self , address_field : str = "SmartContract" ) -> str :
"""Get common trade fields structure for all chains"""
return f """
Block {{
Number
Time
}}
Transaction {{
Hash
}}
Trade {{
Buy {{
Amount
Currency {{
Name
Symbol
{ address_field }
}}
Price
}}
Sell {{
Amount
Currency {{
Name
Symbol
{ address_field }
}}
Price
}}
Dex {{
ProtocolName
}}
}}
"""
def _get_chain_query ( self , chain : str ) -> tuple [ str , str ] :
"""Get chain-specific query structure and namespace"""
if chain == "solana" :
return "Solana" , "MintAddress"
elif chain == "tron" :
return "Tron" , "Address"
elif chain == "ton" :
return "TON" , "Address"
else : # EVM chains
return "EVM" , "SmartContract"
async def get_chain_activity ( self , chain : str , time_window : int = 60 ) -> Dict:
"""Fetch trading activity for specified chain"""
try :
logger . debug ( f "Fetching chain activity for { chain } " )
now = datetime . now (timezone.utc)
time_ago = now - timedelta (minutes = time_window)
chain = get_chain_id (chain)
namespace , address_field = self . _get_chain_query (chain)
trade_fields = self . _get_base_trade_fields (address_field)
# Build query based on chain type
if namespace == "EVM" :
query = f """
query ($network: evm_network!, $since: DateTime) {{
{ namespace } (network: $network) {{
DEXTrades(
orderBy: {{ descending: Block_Time }}
where: {{ Block: {{ Time: {{ since: $since }}}}}}
) {{
{ trade_fields }
}}
}}
}}
"""
variables = { "network" : chain . lower (), "since" : time_ago . isoformat ()}
else :
query = f """
query ($since: DateTime) {{
{ namespace } {{
DEXTrades(
orderBy: {{ descending: Block_Time }}
where: {{ Block: {{ Time: {{ since: $since }}}}}}
) {{
{ trade_fields }
}}
}}
}}
"""
variables = { "since" : time_ago . isoformat ()}
async with aiohttp . ClientSession () as session :
async with session . post (
self.url,
headers = self.headers,
json = { "query" : query, "variables" : variables},
) as response :
if response . status != 200 :
error_text = await response . text ()
logger . error ( f "BitQuery API error: { error_text } " )
raise aiohttp . ClientError ( f "BitQuery API returned status { response.status } " )
data = await response . json ()
if "errors" in data :
logger . error ( f "GraphQL errors: { data[ 'errors' ] } " )
raise ValueError ( f "GraphQL query failed: { data[ 'errors' ] } " )
return data
except Exception as e :
logger . error ( f "Error fetching chain activity: {str (e) } " )
raise
4.3 Trading Logic (dex_agent.py)
Now, create the DEX agent that handles trading logic and analysis:
Copy import logging
from decimal import Decimal
from typing import Dict , List
from swarmzero import Agent
from bitquery_service import BitQueryService
from config import SUPPORTED_CHAINS , get_chain_id , validate_chain
logger = logging . getLogger ( __name__ )
class DexAgent ( Agent ):
def __init__ ( self , bitquery_service : BitQueryService):
if not bitquery_service :
raise ValueError ( "BitQueryService instance is required" )
self . bitquery = bitquery_service
super (). __init__ (
name = "Multi-Chain DEX Agent" ,
instruction = """You are an AI agent that helps trade across multiple blockchains.
I can analyze market data and provide insights on various chains including Ethereum, BSC, etc.
You can ask me to:
- Analyze market activity on any supported chain
- Get trading suggestions based on price movements
- Check which blockchain networks are supported
""" ,
functions = [self.analyze_market, self.suggest_trades, self.get_supported_chains],
config_path = "./swarmzero_config.toml" ,
)
async def analyze_market ( self , chain : str ) -> Dict:
"""Analyze market activity for a specific chain"""
try :
if not validate_chain (chain):
raise ValueError ( f "Unsupported chain: { chain } " )
chain_id = get_chain_id (chain)
data = await self . bitquery . get_chain_activity (chain)
namespace = "EVM" if chain_id not in [ "solana" , "tron" , "ton" ] else chain_id . capitalize ()
trades = data . get ( "data" , {}). get (namespace, {}). get ( "DEXTrades" , [])
analysis = {
"total_trades" : len (trades),
"volume_24h" : Decimal ( 0 ),
"active_pairs" : set (),
"active_dexes" : set (),
"price_changes" : {},
}
# Process trades
for trade in trades :
try :
buy = trade . get ( "Trade" , {}). get ( "Buy" , {})
sell = trade . get ( "Trade" , {}). get ( "Sell" , {})
dex = trade . get ( "Trade" , {}). get ( "Dex" , {})
# Track volumes
buy_amount = Decimal ( str (buy. get ( "Amount" , 0 )))
analysis [ "volume_24h" ] += buy_amount
# Track trading pairs
buy_currency = buy . get ( "Currency" , {})
sell_currency = sell . get ( "Currency" , {})
pair = (buy_currency . get ( "Symbol" ), sell_currency . get ( "Symbol" ) )
if all (pair):
analysis [ "active_pairs" ]. add (pair)
# Track DEXes
dex_name = dex . get ( "ProtocolName" )
if dex_name :
analysis [ "active_dexes" ]. add (dex_name)
# Track price changes
price = Decimal ( str (buy. get ( "Price" , 0 )))
symbol = buy_currency . get ( "Symbol" )
if symbol and price > 0 :
if symbol not in analysis [ "price_changes" ]:
analysis [ "price_changes" ] [symbol] = { "prices" : [] , "change_24h" : 0 }
analysis [ "price_changes" ] [symbol][ "prices" ] . append (price)
except Exception as e :
logger . warning ( f "Error processing trade: {str (e) } " )
continue
# Convert sets to lists for JSON serialization
analysis [ "active_pairs" ] = list (analysis[ "active_pairs" ])
analysis [ "active_dexes" ] = list (analysis[ "active_dexes" ])
return analysis
except Exception as e :
logger . error ( f "Failed to analyze market: {str (e) } " )
raise
async def suggest_trades ( self , chain : str ) -> List [ Dict ] :
"""Generate trade suggestions based on market analysis"""
try :
if not validate_chain (chain):
raise ValueError ( f "Unsupported chain: { chain } " )
analysis = await self . analyze_market (chain)
suggestions = []
for symbol , price_data in analysis [ "price_changes" ]. items ():
try :
change = price_data [ "change_24h" ]
volume = analysis [ "volume_24h" ]
# Calculate confidence score
confidence = 0
if abs (change) > 5 : # Price movement
confidence += 20
if volume > 10000 : # Volume
confidence += 30
if analysis [ "total_trades" ] > 50 : # Trading activity
confidence += 25
if len (analysis[ "active_pairs" ]) > 5 : # Market depth
confidence += 25
if confidence >= 60 :
action = "buy" if change < 0 else "sell"
suggestion = {
"token" : symbol ,
"action" : action ,
"confidence" : confidence ,
"signals" : {
"price_momentum" : change ,
"volume_impact" : volume > 10000 ,
"trade_frequency" : analysis [ "total_trades" ] > 50 ,
"market_depth" : len (analysis[ "active_pairs" ]) > 5 ,
}
}
suggestions . append (suggestion)
except Exception as e :
logger . warning ( f "Error processing suggestion: {str (e) } " )
continue
return sorted (suggestions, key =lambda x : x[ "confidence" ], reverse = True ) [ : 5 ]
except Exception as e :
logger . error ( f "Failed to generate suggestions: {str (e) } " )
raise
4.4 Telegram Bot Implementation (dex_rabbit_bot.py)
Finally, create the Telegram bot interface:
Copy import logging
from telegram import Update
from telegram . ext import Application , CommandHandler , ContextTypes , MessageHandler , filters
from config import SUPPORTED_CHAINS
from dex_agent import DexAgent
logger = logging . getLogger ( __name__ )
class DexRabbitBot :
def __init__ ( self , token : str , agent : DexAgent):
self . app = Application . builder (). token (token). build ()
self . agent = agent
def setup_handlers ( self ):
"""Set up command and message handlers"""
self . app . add_handler ( CommandHandler ( "start" , self._start_command))
self . app . add_handler ( CommandHandler ( "help" , self._help_command))
self . app . add_handler ( CommandHandler ( "chains" , self._chains_command))
self . app . add_handler ( MessageHandler (filters.TEXT & ~ filters.COMMAND, self._handle_message))
self . app . add_error_handler (self._error_handler)
async def _start_command ( self , update : Update , context : ContextTypes . DEFAULT_TYPE):
"""Handle /start command"""
welcome_message = (
"👋 Welcome to DexRabbit Trading Bot!\n\n"
"I can help you analyze cryptocurrency markets and find trading opportunities "
"across multiple blockchain networks.\n\n"
"Try asking me things like:\n"
"• 'How's the ETH market looking?'\n"
"• 'Show me trading opportunities on BSC'\n"
"• 'What's happening on Solana?'\n\n"
"Use /help to see all available commands."
)
await update . message . reply_text (welcome_message)
async def _help_command ( self , update : Update , context : ContextTypes . DEFAULT_TYPE):
"""Handle /help command"""
help_message = (
"🤖 DexRabbit Bot Help\n\n"
"Commands:\n"
"/start - Start the bot\n"
"/help - Show this help message\n"
"/chains - List supported blockchains\n\n"
"You can chat with me naturally! Try asking:\n"
"• 'Analyze the ETH market'\n"
"• 'What trades look good on BSC?'\n"
"• 'Show me Solana trading activity'\n"
"• 'Which chains do you support?'"
)
await update . message . reply_text (help_message)
async def _chains_command ( self , update : Update , context : ContextTypes . DEFAULT_TYPE):
"""Handle /chains command"""
chains_list = "\n" . join (
f "• { config.name } ( { chain_id. upper () } ) - Native token: { config.native_token } "
for chain_id, config in SUPPORTED_CHAINS. items ()
)
message = f "🌐 Supported Blockchains: \n\n { chains_list } "
await update . message . reply_text (message)
async def _handle_message ( self , update : Update , context : ContextTypes . DEFAULT_TYPE):
"""Handle incoming chat messages"""
try :
await context . bot . send_chat_action (
chat_id = update.effective_chat.id,
action = "typing"
)
response = await self . agent . chat (update.message.text)
await update . message . reply_text (
response,
parse_mode = "HTML" ,
disable_web_page_preview = True
)
except Exception as e :
logger . error ( f "Error handling message: {str (e) } " )
await update . message . reply_text (
"Sorry, I encountered an error. Please try again later."
)
async def _error_handler ( self , update : Update , context : ContextTypes . DEFAULT_TYPE):
"""Handle errors"""
logger . error ( f "Update { update } caused error { context.error } " )
if update and update . effective_chat :
await context . bot . send_message (
chat_id = update.effective_chat.id,
text = "Sorry, something went wrong. Please try again later."
)
def run ( self ):
"""Start the bot"""
self . setup_handlers ()
logger . info ( "Bot started successfully" )
self . app . run_polling ()
4.5 Main Entry Point (main.py)
Create the main entry point to tie everything together:
Copy import logging
import os
from enum import Enum
from dotenv import load_dotenv
from bitquery_service import BitQueryService
from dex_agent import DexAgent
from dex_rabbit_bot import DexRabbitBot
class LogLevel ( str , Enum ):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
def setup_logging ():
"""Configure logging based on environment variable"""
log_level = os . getenv ( "LOG_LEVEL" , "INFO" ). upper ()
if log_level not in LogLevel . __members__ :
print ( f "Invalid LOG_LEVEL: { log_level } . Defaulting to INFO" )
log_level = "INFO"
logging . basicConfig (
level = getattr (logging, log_level),
format = " %(asctime)s - %(name)s - %(levelname)s - %(message)s " ,
)
def main ():
# Load environment variables
load_dotenv ()
# Setup logging
setup_logging ()
logger = logging . getLogger ( __name__ )
try :
# Get required tokens
telegram_token = os . getenv ( "BOTFATHER_API_TOKEN" )
bitquery_token = os . getenv ( "BITQUERY_OAUTH_TOKEN" )
if not telegram_token or not bitquery_token :
raise ValueError ( "Missing required environment variables" )
# Initialize services
logger . info ( "Initializing services..." )
bitquery_service = BitQueryService (bitquery_token)
agent = DexAgent (bitquery_service)
# Create and run bot
logger . info ( "Starting DexRabbit Bot..." )
bot = DexRabbitBot (telegram_token, agent)
bot . run ()
except Exception as e :
logger . error ( f "Failed to start bot: {str (e) } " )
raise
if __name__ == "__main__" :
main ()
5. Running the Bot
Activate the Poetry environment and run the bot:
Copy poetry shell
poetry run python main.py
The bot will start and listen for commands through Telegram.
6. Interacting with the Bot
Once the bot is running, you can interact with it through Telegram:
Start a chat with your bot using the Telegram handle
Available commands:
/start
- Initialize the bot
/help
- Show help message
/chains
- List supported blockchains
You can also chat naturally with the bot:
"What trades look good on BSC?"
"Show me Solana trading activity"
7. Supported Blockchains
The bot supports multiple networks including:
Binance Smart Chain (BSC)
8. Features and Capabilities
The bot provides:
Market Analysis
Active trading pair analysis
Trading Suggestions
Trading frequency analysis
AI-Powered Insights
Natural language processing
9. Customization
You can customize the bot's behavior by modifying:
Copy [model]
model = "gpt-4o"
[sample_prompts]
prompts = [
"How's the Ethereum market looking?" ,
"Show me trading opportunities on BSC" ,
"What's happening on Solana?"
]
[environment]
type = "dev"
[timeout]
llm = 30
Supported chains in config.py
:
Copy SUPPORTED_CHAINS : Dict [ str , ChainConfig ] = {
"eth" : ChainConfig ( "Ethereum" , "eth" , "ETH" , "https://etherscan.io" ),
"bsc" : ChainConfig ( "BSC" , "bsc" , "BNB" , "https://bscscan.com" ),
# Add more chains as needed
}
Next Steps
After setting up the basic bot, consider:
Adding custom trading strategies
Implementing additional data sources
Creating automated trading features
Adding more blockchain networks
Enhancing the AI analysis capabilities
Support and Resources