Build a Multi-Chain Crypto Trading Telegram Bot with SwarmZero, Bitquery and DEXRabbit

Telegram bot that helps you trade crypto using Bitquery indexed on-chain data and DEXRabbit.

This tutorial will guide you through building an AI-powered cryptocurrency trading bot that monitors multiple blockchain networks and provides trading insights through a Telegram interface. The bot leverages BitQuery for market data and SwarmZero's AI capabilities for analysis.

Prerequisites

Before starting, ensure you have:

  • Python 3.11 or higher installed

  • Git installed

  • Poetry for dependency management

  • A Telegram Bot Token (from BotFather)

  • A BitQuery OAuth Token

  • An OpenAI API Key

Step-by-Step Tutorial

1. Clone the Repository

First, clone the repository to your local machine:

git clone https://github.com/swarmzero/examples.git
cd examples/agents/crypto_trader_bot

2. Setting Up the Environment

Create and activate a Poetry environment:

# Install Poetry if you haven't already
curl -sSL https://install.python-poetry.org | python3 -

# Install dependencies
poetry install

3. Configure Environment Variables

Create a .env file in the project root:

touch .env

Add the following environment variables to your .env file:

OPENAI_API_KEY=your_openai_api_key
BOTFATHER_API_TOKEN=your_telegram_bot_token
BITQUERY_OAUTH_TOKEN=your_bitquery_oauth_token
LOG_LEVEL=INFO  # Options: DEBUG, INFO, WARNING, ERROR

4. Building the Components

Let's build each component of the bot step by step.

4.1 Configuration Management (config.py)

First, let's create the configuration management system:

import logging
from typing import Dict, NamedTuple

logger = logging.getLogger(__name__)

class ChainConfig(NamedTuple):
    name: str
    url_path: str
    native_token: str
    explorer: str

# Define supported blockchain networks
SUPPORTED_CHAINS: Dict[str, ChainConfig] = {
    "solana": ChainConfig("Solana", "solana", "SOL", "https://solscan.io"),
    "eth": ChainConfig("Ethereum", "eth", "ETH", "https://etherscan.io"),
    "bsc": ChainConfig("BSC", "bsc", "BNB", "https://bscscan.com"),
    # Add more chains as needed
}

# Chain name variations for user-friendly input
CHAIN_ALIASES = {
    "ethereum": "eth",
    "ether": "eth",
    "binance": "bsc",
    "binance smart chain": "bsc",
}

def normalize_chain_name(chain: str) -> str:
    """Convert various chain name formats to our standard chain ID"""
    chain = chain.lower().strip()
    return CHAIN_ALIASES.get(chain, chain)

def validate_chain(chain: str) -> bool:
    """Validate if a chain is supported"""
    normalized_chain = normalize_chain_name(chain)
    is_valid = normalized_chain in SUPPORTED_CHAINS
    if not is_valid:
        logger.warning(f"Attempted to use unsupported chain: {chain}")
    return is_valid

def get_chain_id(chain: str) -> str:
    """Get the standardized chain ID from any valid chain name variation"""
    normalized_chain = normalize_chain_name(chain)
    if normalized_chain in SUPPORTED_CHAINS:
        return normalized_chain
    raise ValueError(f"Unsupported chain: {chain}")

4.2 BitQuery Service Integration (bitquery_service.py)

Next, create the BitQuery service to fetch market data:

import logging
from datetime import datetime, timedelta, timezone
from typing import Dict

import aiohttp
from config import get_chain_id

logger = logging.getLogger(__name__)

class BitQueryService:
    def __init__(self, oauth_token: str):
        if not oauth_token:
            raise ValueError("BitQuery OAuth token is required")

        logger.debug("Initializing BitQueryService")
        self.oauth_token = oauth_token
        self.url = "https://streaming.bitquery.io/graphql"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {oauth_token}",
        }

    def _get_base_trade_fields(self, address_field: str = "SmartContract") -> str:
        """Get common trade fields structure for all chains"""
        return f"""
            Block {{
                Number
                Time
            }}
            Transaction {{
                Hash
            }}
            Trade {{
                Buy {{
                    Amount
                    Currency {{
                        Name
                        Symbol
                        {address_field}
                    }}
                    Price
                }}
                Sell {{
                    Amount
                    Currency {{
                        Name
                        Symbol
                        {address_field}
                    }}
                    Price
                }}
                Dex {{
                    ProtocolName
                }}
            }}
        """

    def _get_chain_query(self, chain: str) -> tuple[str, str]:
        """Get chain-specific query structure and namespace"""
        if chain == "solana":
            return "Solana", "MintAddress"
        elif chain == "tron":
            return "Tron", "Address"
        elif chain == "ton":
            return "TON", "Address"
        else:  # EVM chains
            return "EVM", "SmartContract"

    async def get_chain_activity(self, chain: str, time_window: int = 60) -> Dict:
        """Fetch trading activity for specified chain"""
        try:
            logger.debug(f"Fetching chain activity for {chain}")
            now = datetime.now(timezone.utc)
            time_ago = now - timedelta(minutes=time_window)

            chain = get_chain_id(chain)
            namespace, address_field = self._get_chain_query(chain)
            trade_fields = self._get_base_trade_fields(address_field)

            # Build query based on chain type
            if namespace == "EVM":
                query = f"""
                query ($network: evm_network!, $since: DateTime) {{
                  {namespace}(network: $network) {{
                    DEXTrades(
                      orderBy: {{descending: Block_Time}}
                      where: {{Block: {{Time: {{since: $since}}}}}}
                    ) {{
                      {trade_fields}
                    }}
                  }}
                }}
                """
                variables = {"network": chain.lower(), "since": time_ago.isoformat()}
            else:
                query = f"""
                query ($since: DateTime) {{
                  {namespace} {{
                    DEXTrades(
                      orderBy: {{descending: Block_Time}}
                      where: {{Block: {{Time: {{since: $since}}}}}}
                    ) {{
                      {trade_fields}
                    }}
                  }}
                }}
                """
                variables = {"since": time_ago.isoformat()}

            async with aiohttp.ClientSession() as session:
                async with session.post(
                    self.url,
                    headers=self.headers,
                    json={"query": query, "variables": variables},
                ) as response:
                    if response.status != 200:
                        error_text = await response.text()
                        logger.error(f"BitQuery API error: {error_text}")
                        raise aiohttp.ClientError(f"BitQuery API returned status {response.status}")

                    data = await response.json()
                    if "errors" in data:
                        logger.error(f"GraphQL errors: {data['errors']}")
                        raise ValueError(f"GraphQL query failed: {data['errors']}")

                    return data

        except Exception as e:
            logger.error(f"Error fetching chain activity: {str(e)}")
            raise

4.3 Trading Logic (dex_agent.py)

Now, create the DEX agent that handles trading logic and analysis:

import logging
from decimal import Decimal
from typing import Dict, List

from swarmzero import Agent
from bitquery_service import BitQueryService
from config import SUPPORTED_CHAINS, get_chain_id, validate_chain

logger = logging.getLogger(__name__)

class DexAgent(Agent):
    def __init__(self, bitquery_service: BitQueryService):
        if not bitquery_service:
            raise ValueError("BitQueryService instance is required")

        self.bitquery = bitquery_service

        super().__init__(
            name="Multi-Chain DEX Agent",
            instruction="""You are an AI agent that helps trade across multiple blockchains.
            I can analyze market data and provide insights on various chains including Ethereum, BSC, etc.
            
            You can ask me to:
            - Analyze market activity on any supported chain
            - Get trading suggestions based on price movements
            - Check which blockchain networks are supported
            """,
            functions=[self.analyze_market, self.suggest_trades, self.get_supported_chains],
            config_path="./swarmzero_config.toml",
        )

    async def analyze_market(self, chain: str) -> Dict:
        """Analyze market activity for a specific chain"""
        try:
            if not validate_chain(chain):
                raise ValueError(f"Unsupported chain: {chain}")

            chain_id = get_chain_id(chain)
            data = await self.bitquery.get_chain_activity(chain)

            namespace = "EVM" if chain_id not in ["solana", "tron", "ton"] else chain_id.capitalize()
            trades = data.get("data", {}).get(namespace, {}).get("DEXTrades", [])

            analysis = {
                "total_trades": len(trades),
                "volume_24h": Decimal(0),
                "active_pairs": set(),
                "active_dexes": set(),
                "price_changes": {},
            }

            # Process trades
            for trade in trades:
                try:
                    buy = trade.get("Trade", {}).get("Buy", {})
                    sell = trade.get("Trade", {}).get("Sell", {})
                    dex = trade.get("Trade", {}).get("Dex", {})

                    # Track volumes
                    buy_amount = Decimal(str(buy.get("Amount", 0)))
                    analysis["volume_24h"] += buy_amount

                    # Track trading pairs
                    buy_currency = buy.get("Currency", {})
                    sell_currency = sell.get("Currency", {})
                    pair = (buy_currency.get("Symbol"), sell_currency.get("Symbol"))
                    if all(pair):
                        analysis["active_pairs"].add(pair)

                    # Track DEXes
                    dex_name = dex.get("ProtocolName")
                    if dex_name:
                        analysis["active_dexes"].add(dex_name)

                    # Track price changes
                    price = Decimal(str(buy.get("Price", 0)))
                    symbol = buy_currency.get("Symbol")
                    if symbol and price > 0:
                        if symbol not in analysis["price_changes"]:
                            analysis["price_changes"][symbol] = {"prices": [], "change_24h": 0}
                        analysis["price_changes"][symbol]["prices"].append(price)

                except Exception as e:
                    logger.warning(f"Error processing trade: {str(e)}")
                    continue

            # Convert sets to lists for JSON serialization
            analysis["active_pairs"] = list(analysis["active_pairs"])
            analysis["active_dexes"] = list(analysis["active_dexes"])

            return analysis

        except Exception as e:
            logger.error(f"Failed to analyze market: {str(e)}")
            raise

    async def suggest_trades(self, chain: str) -> List[Dict]:
        """Generate trade suggestions based on market analysis"""
        try:
            if not validate_chain(chain):
                raise ValueError(f"Unsupported chain: {chain}")

            analysis = await self.analyze_market(chain)
            suggestions = []

            for symbol, price_data in analysis["price_changes"].items():
                try:
                    change = price_data["change_24h"]
                    volume = analysis["volume_24h"]

                    # Calculate confidence score
                    confidence = 0
                    if abs(change) > 5:  # Price movement
                        confidence += 20
                    if volume > 10000:  # Volume
                        confidence += 30
                    if analysis["total_trades"] > 50:  # Trading activity
                        confidence += 25
                    if len(analysis["active_pairs"]) > 5:  # Market depth
                        confidence += 25

                    if confidence >= 60:
                        action = "buy" if change < 0 else "sell"
                        suggestion = {
                            "token": symbol,
                            "action": action,
                            "confidence": confidence,
                            "signals": {
                                "price_momentum": change,
                                "volume_impact": volume > 10000,
                                "trade_frequency": analysis["total_trades"] > 50,
                                "market_depth": len(analysis["active_pairs"]) > 5,
                            }
                        }
                        suggestions.append(suggestion)

                except Exception as e:
                    logger.warning(f"Error processing suggestion: {str(e)}")
                    continue

            return sorted(suggestions, key=lambda x: x["confidence"], reverse=True)[:5]

        except Exception as e:
            logger.error(f"Failed to generate suggestions: {str(e)}")
            raise

4.4 Telegram Bot Implementation (dex_rabbit_bot.py)

Finally, create the Telegram bot interface:

import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters

from config import SUPPORTED_CHAINS
from dex_agent import DexAgent

logger = logging.getLogger(__name__)

class DexRabbitBot:
    def __init__(self, token: str, agent: DexAgent):
        self.app = Application.builder().token(token).build()
        self.agent = agent

    def setup_handlers(self):
        """Set up command and message handlers"""
        self.app.add_handler(CommandHandler("start", self._start_command))
        self.app.add_handler(CommandHandler("help", self._help_command))
        self.app.add_handler(CommandHandler("chains", self._chains_command))
        self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message))
        self.app.add_error_handler(self._error_handler)

    async def _start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle /start command"""
        welcome_message = (
            "👋 Welcome to DexRabbit Trading Bot!\n\n"
            "I can help you analyze cryptocurrency markets and find trading opportunities "
            "across multiple blockchain networks.\n\n"
            "Try asking me things like:\n"
            "• 'How's the ETH market looking?'\n"
            "• 'Show me trading opportunities on BSC'\n"
            "• 'What's happening on Solana?'\n\n"
            "Use /help to see all available commands."
        )
        await update.message.reply_text(welcome_message)

    async def _help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle /help command"""
        help_message = (
            "🤖 DexRabbit Bot Help\n\n"
            "Commands:\n"
            "/start - Start the bot\n"
            "/help - Show this help message\n"
            "/chains - List supported blockchains\n\n"
            "You can chat with me naturally! Try asking:\n"
            "• 'Analyze the ETH market'\n"
            "• 'What trades look good on BSC?'\n"
            "• 'Show me Solana trading activity'\n"
            "• 'Which chains do you support?'"
        )
        await update.message.reply_text(help_message)

    async def _chains_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle /chains command"""
        chains_list = "\n".join(
            f"• {config.name} ({chain_id.upper()}) - Native token: {config.native_token}"
            for chain_id, config in SUPPORTED_CHAINS.items()
        )
        message = f"🌐 Supported Blockchains:\n\n{chains_list}"
        await update.message.reply_text(message)

    async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle incoming chat messages"""
        try:
            await context.bot.send_chat_action(
                chat_id=update.effective_chat.id,
                action="typing"
            )
            response = await self.agent.chat(update.message.text)
            await update.message.reply_text(
                response,
                parse_mode="HTML",
                disable_web_page_preview=True
            )
        except Exception as e:
            logger.error(f"Error handling message: {str(e)}")
            await update.message.reply_text(
                "Sorry, I encountered an error. Please try again later."
            )

    async def _error_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle errors"""
        logger.error(f"Update {update} caused error {context.error}")
        if update and update.effective_chat:
            await context.bot.send_message(
                chat_id=update.effective_chat.id,
                text="Sorry, something went wrong. Please try again later."
            )

    def run(self):
        """Start the bot"""
        self.setup_handlers()
        logger.info("Bot started successfully")
        self.app.run_polling()

4.5 Main Entry Point (main.py)

Create the main entry point to tie everything together:

import logging
import os
from enum import Enum
from dotenv import load_dotenv

from bitquery_service import BitQueryService
from dex_agent import DexAgent
from dex_rabbit_bot import DexRabbitBot

class LogLevel(str, Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"

def setup_logging():
    """Configure logging based on environment variable"""
    log_level = os.getenv("LOG_LEVEL", "INFO").upper()
    if log_level not in LogLevel.__members__:
        print(f"Invalid LOG_LEVEL: {log_level}. Defaulting to INFO")
        log_level = "INFO"

    logging.basicConfig(
        level=getattr(logging, log_level),
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

def main():
    # Load environment variables
    load_dotenv()

    # Setup logging
    setup_logging()
    logger = logging.getLogger(__name__)

    try:
        # Get required tokens
        telegram_token = os.getenv("BOTFATHER_API_TOKEN")
        bitquery_token = os.getenv("BITQUERY_OAUTH_TOKEN")

        if not telegram_token or not bitquery_token:
            raise ValueError("Missing required environment variables")

        # Initialize services
        logger.info("Initializing services...")
        bitquery_service = BitQueryService(bitquery_token)
        agent = DexAgent(bitquery_service)

        # Create and run bot
        logger.info("Starting DexRabbit Bot...")
        bot = DexRabbitBot(telegram_token, agent)
        bot.run()

    except Exception as e:
        logger.error(f"Failed to start bot: {str(e)}")
        raise

if __name__ == "__main__":
    main()

5. Running the Bot

Activate the Poetry environment and run the bot:

poetry shell
poetry run python main.py

The bot will start and listen for commands through Telegram.

6. Interacting with the Bot

Once the bot is running, you can interact with it through Telegram:

  1. Start a chat with your bot using the Telegram handle

  2. Available commands:

    • /start - Initialize the bot

    • /help - Show help message

    • /chains - List supported blockchains

You can also chat naturally with the bot:

  • "Analyze the ETH market"

  • "What trades look good on BSC?"

  • "Show me Solana trading activity"

7. Supported Blockchains

The bot supports multiple networks including:

  • Ethereum (ETH)

  • Binance Smart Chain (BSC)

  • Polygon (MATIC)

  • Solana (SOL)

  • Optimism

  • Arbitrum

  • Base

  • Tron (TRX)

  • TON

  • opBNB

8. Features and Capabilities

The bot provides:

  1. Market Analysis

    • Trading volume tracking

    • Price change monitoring

    • Active trading pair analysis

    • DEX activity tracking

  2. Trading Suggestions

    • Price momentum analysis

    • Volume impact assessment

    • Trading frequency analysis

    • Market depth evaluation

    • Confidence scoring

  3. AI-Powered Insights

    • Natural language processing

    • Market trend analysis

    • Trading recommendations

    • Risk assessment

9. Customization

You can customize the bot's behavior by modifying:

  1. swarmzero_config.toml:

[model]
model = "gpt-4o"

[sample_prompts]
prompts = [
    "How's the Ethereum market looking?",
    "Show me trading opportunities on BSC",
    "What's happening on Solana?"
]

[environment]
type = "dev"

[timeout]
llm = 30
  1. Supported chains in config.py:

SUPPORTED_CHAINS: Dict[str, ChainConfig] = {
    "eth": ChainConfig("Ethereum", "eth", "ETH", "https://etherscan.io"),
    "bsc": ChainConfig("BSC", "bsc", "BNB", "https://bscscan.com"),
    # Add more chains as needed
}

Next Steps

After setting up the basic bot, consider:

  1. Adding custom trading strategies

  2. Implementing additional data sources

  3. Creating automated trading features

  4. Adding more blockchain networks

  5. Enhancing the AI analysis capabilities

Support and Resources

Last updated

Logo

© 2025 SwarmZero Technology Solutions Inc. All rights reserved.