|

A Coding Guide to Build a Hierarchical Supervisor Agent Framework with CrewAI and Google Gemini for Coordinated Multi-Agent Workflows

🚀

In this tutorial, we stroll you thru the design and implementation of a complicated Supervisor Agent Framework utilizing CrewAI with Google Gemini mannequin. We arrange specialised brokers, together with researchers, analysts, writers, and reviewers, and deliver them beneath a supervisor agent who coordinates and displays their work. By combining structured activity configurations, hierarchical workflows, and built-in instruments, we create a system the place every agent performs a outlined function. At the identical time, the supervisor ensures high quality and coherence throughout your complete venture lifecycle. Check out the FULL CODES here.

!pip set up crewai crewai-tools langchain-google-genai python-dotenv


import os
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum


from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, WebsiteSearchTool
from langchain_google_genai import ChatGoogleGenerativeAI
from dotenv import load_dotenv


class TaskPrecedence(Enum):
   LOW = 1
   MEDIUM = 2
   HIGH = 3
   CRITICAL = 4

We start by putting in the libraries and important modules to arrange our CrewAI framework. Here, we outline the TaskPrecedence enum, which helps us assign completely different ranges of urgency and significance to every activity we create. Check out the FULL CODES here.

@dataclass
class TaskConfig:
   description: str
   expected_output: str
   precedence: TaskPrecedence
   max_execution_time: int = 300 
   requires_human_input: bool = False


class SupervisorFramework:
   """
   Advanced Supervisor Agent Framework utilizing CrewAI
   Manages a number of specialised brokers with hierarchical coordination
   """
  
   def __init__(self, gemini_api_key: str, serper_api_key: str = None):
       """
       Initialize the supervisor framework
      
       Args:
           gemini_api_key: Google Gemini API key (free tier)
           serper_api_key: Serper API key for internet search (non-obligatory, has free tier)
       """
       os.environ["GOOGLE_API_KEY"] = gemini_api_key
       if serper_api_key:
           os.environ["SERPER_API_KEY"] = serper_api_key
      
       self.llm = ChatGoogleGenerativeAI(
           mannequin="gemini-1.5-flash",
           temperature=0.7,
           max_tokens=2048
       )
      
       self.instruments = []
       if serper_api_key:
           self.instruments.append(SerperDevTool())
      
       self.brokers = {}
       self.supervisor = None
       self.crew = None
      
       print("🚀 SupervisorFramework initialized efficiently!")
       print(f"📡 LLM Model: {self.llm.mannequin}")
       print(f"🛠  Available instruments: {len(self.instruments)}")


   def create_research_agent(self) -> Agent:
       """Create a specialised analysis agent"""
       return Agent(
           function="Senior Research Analyst",
           purpose="Conduct complete analysis and collect correct data on any given subject",
           backstory="""You are an skilled analysis analyst with years of expertise in
           data gathering, fact-checking, and synthesizing advanced information from a number of sources.
           You excel at discovering dependable sources and presenting well-structured analysis findings.""",
           verbose=True,
           allow_delegation=False,
           llm=self.llm,
           instruments=self.instruments,
           max_iter=3,
           reminiscence=True
       )


   def create_analyst_agent(self) -> Agent:
       """Create a specialised information analyst agent"""
       return Agent(
           function="Strategic Data Analyst",
           purpose="Analyze information, determine patterns, and present actionable insights",
           backstory="""You are a strategic information analyst with experience in statistical evaluation,
           sample recognition, and enterprise intelligence. You remodel uncooked information and analysis
           into significant insights that drive decision-making.""",
           verbose=True,
           allow_delegation=False,
           llm=self.llm,
           max_iter=3,
           reminiscence=True
       )


   def create_writer_agent(self) -> Agent:
       """Create a specialised content material author agent"""
       return Agent(
           function="Expert Technical Writer",
           purpose="Create clear, participating, and well-structured written content material",
           backstory="""You are an skilled technical author with a expertise for making advanced
           data accessible and participating. You concentrate on creating documentation,
           experiences, and content material that successfully communicates insights to numerous audiences.""",
           verbose=True,
           allow_delegation=False,
           llm=self.llm,
           max_iter=3,
           reminiscence=True
       )


   def create_reviewer_agent(self) -> Agent:
       """Create a high quality assurance reviewer agent"""
       return Agent(
           function="Quality Assurance Reviewer",
           purpose="Review, validate, and enhance the standard of all deliverables",
           backstory="""You are a meticulous high quality assurance skilled with an eye fixed for element
           and a dedication to excellence. You guarantee all work meets excessive requirements of accuracy,
           completeness, and readability earlier than remaining supply.""",
           verbose=True,
           allow_delegation=False,
           llm=self.llm,
           max_iter=2,
           reminiscence=True
       )


   def create_supervisor_agent(self) -> Agent:
       """Create the primary supervisor agent"""
       return Agent(
           function="Project Supervisor & Coordinator",
           purpose="Coordinate staff efforts, handle workflows, and guarantee venture success",
           backstory="""You are an skilled venture supervisor with experience in staff
           coordination, workflow optimization, and high quality administration. You be sure that all
           staff members work effectively in direction of widespread objectives and keep excessive requirements
           all through the venture lifecycle.""",
           verbose=True,
           allow_delegation=True,
           llm=self.llm,
           max_iter=2,
           reminiscence=True
       )


   def setup_agents(self):
       """Initialize all brokers within the framework"""
       print("🤖 Setting up specialised brokers...")
      
       self.brokers = {
           'researcher': self.create_research_agent(),
           'analyst': self.create_analyst_agent(),
           'author': self.create_writer_agent(),
           'reviewer': self.create_reviewer_agent()
       }
      
       self.supervisor = self.create_supervisor_agent()
      
       print(f"✅ Created {len(self.brokers)} specialised brokers + 1 supervisor")
      
       for function, agent in self.brokers.objects():
           print(f"   └── {function.title()}: {agent.function}")


   def create_task_workflow(self, subject: str, task_configs: Dict[str, TaskConfig]) -> List[Task]:
       """
       Create a complete activity workflow
      
       Args:
           subject: Main subject/venture focus
           task_configs: Dictionary of activity configurations
          
       Returns:
           List of CrewAI Task objects
       """
       duties = []
      
       if 'analysis' in task_configs:
           config = task_configs['research']
           research_task = Task(
               description=f"{config.description} Focus on: {subject}",
               expected_output=config.expected_output,
               agent=self.brokers['researcher']
           )
           duties.append(research_task)
      
       if 'evaluation' in task_configs:
           config = task_configs['analysis']
           analysis_task = Task(
               description=f"{config.description} Analyze the analysis findings about: {subject}",
               expected_output=config.expected_output,
               agent=self.brokers['analyst'],
               context=duties 
           )
           duties.append(analysis_task)
      
       if 'writing' in task_configs:
           config = task_configs['writing']
           writing_task = Task(
               description=f"{config.description} Create content material about: {subject}",
               expected_output=config.expected_output,
               agent=self.brokers['writer'],
               context=duties
           )
           duties.append(writing_task)
      
       if 'evaluate' in task_configs:
           config = task_configs['review']
           review_task = Task(
               description=f"{config.description} Review all work associated to: {subject}",
               expected_output=config.expected_output,
               agent=self.brokers['reviewer'],
               context=duties
           )
           duties.append(review_task)
      
       supervisor_task = Task(
           description=f"""As the venture supervisor, coordinate your complete workflow for: {subject}.
           Monitor progress, guarantee high quality requirements, resolve any conflicts between brokers,
           and present remaining venture oversight. Ensure all deliverables meet necessities.""",
           expected_output="""A complete venture abstract together with:
           - Executive abstract of all accomplished work
           - Quality evaluation of deliverables
           - Recommendations for enhancements or subsequent steps
           - Final venture standing report""",
           agent=self.supervisor,
           context=duties
       )
       duties.append(supervisor_task)
      
       return duties


   def execute_project(self,
                      subject: str,
                      task_configs: Dict[str, TaskConfig],
                      process_type: Process = Process.hierarchical) -> Dict[str, Any]:
       """
       Execute a full venture utilizing the supervisor framework
      
       Args:
           subject: Main venture subject
           task_configs: Task configurations
           process_type: CrewAI course of sort (hierarchical beneficial for supervisor)
          
       Returns:
           Dictionary containing execution outcomes
       """
       print(f"🚀 Starting venture execution: {subject}")
       print(f"📋 Process sort: {process_type.worth}")
      
       if not self.brokers or not self.supervisor:
           self.setup_agents()
      
       duties = self.create_task_workflow(subject, task_configs)
       print(f"📝 Created {len(duties)} duties in workflow")
      
       crew_agents = listing(self.brokers.values()) + [self.supervisor]
      
       self.crew = Crew(
           brokers=crew_agents,
           duties=duties,
           course of=process_type,
           manager_llm=self.llm, 
           verbose=True,
           reminiscence=True
       )
      
       print("🎯 Executing venture...")
       strive:
           consequence = self.crew.kickoff()
          
           return {
               'standing': 'success',
               'consequence': consequence,
               'subject': subject,
               'tasks_completed': len(duties),
               'agents_involved': len(crew_agents)
           }
          
       besides Exception as e:
           print(f"❌ Error throughout execution: {str(e)}")
           return {
               'standing': 'error',
               'error': str(e),
               'subject': subject
           }


   def get_crew_usage_metrics(self) -> Dict[str, Any]:
       """Get utilization metrics from the crew"""
       if not self.crew:
           return {'error': 'No crew execution discovered'}
      
       strive:
           return {
               'total_tokens_used': getattr(self.crew, 'total_tokens_used', 'Not out there'),
               'total_cost': getattr(self.crew, 'total_cost', 'Not out there'),
               'execution_time': getattr(self.crew, 'execution_time', 'Not out there')
           }
       besides:
           return {'be aware': 'Metrics not out there for this execution'}

We outline a versatile TaskConfig information class to seize every activity’s intent, anticipated output, precedence, and runtime necessities, thereby standardizing how work flows by means of the system. We then construct a SupervisorFramework that wires in Gemini, non-obligatory search instruments, and a coordinated crew of specialised brokers, so we orchestrate analysis → evaluation → writing → evaluate beneath a supervising agent in actual time. Check out the FULL CODES here.

def create_sample_task_configs() -> Dict[str, TaskConfig]:
   """Create pattern activity configurations for demonstration"""
   return {
       'analysis': TaskConfig(
           description="Conduct complete analysis on the given subject. Gather data from dependable sources and compile key findings.",
           expected_output="A detailed analysis report with key findings, statistics, developments, and supply references (minimal 500 phrases).",
           precedence=TaskPrecedence.HIGH
       ),
       'evaluation': TaskConfig(
           description="Analyze the analysis findings to determine patterns, insights, and implications.",
           expected_output="An analytical report highlighting key insights, developments, alternatives, and potential challenges (minimal 400 phrases).",
           precedence=TaskPrecedence.HIGH
       ),
       'writing': TaskConfig(
           description="Create a complete, well-structured doc based mostly on the analysis and evaluation.",
           expected_output="A skilled doc with clear construction, participating content material, and actionable suggestions (minimal 800 phrases).",
           precedence=TaskPrecedence.MEDIUM
       ),
       'evaluate': TaskConfig(
           description="Review all deliverables for high quality, accuracy, completeness, and coherence.",
           expected_output="A high quality evaluation report with suggestions for enhancements and remaining approval standing.",
           precedence=TaskPrecedence.CRITICAL
       )
   }

We now create a helper perform create_sample_task_configs() that defines default activity blueprints for analysis, evaluation, writing, and evaluate. By setting clear descriptions, anticipated outputs, and priorities, we be sure that our brokers know precisely what to ship and the criticality of every step within the workflow. Check out the FULL CODES here.

def demo_supervisor_framework():
   """
   Demo perform to showcase the supervisor framework
   Replace 'your_gemini_api_key' with your precise API key
   """
   print("🎬 CrewAI Supervisor Framework Demo")
   print("=" * 50)
  
   framework = SupervisorFramework(
       gemini_api_key="Use Your API Key Here", 
       serper_api_key=None 
   )
  
   task_configs = create_sample_task_configs()
  
   print(f"📊 Demo Topic: {subject}")
   print(f"📋 Task Configurations: {listing(task_configs.keys())}")
  
   outcomes = framework.execute_project(subject, task_configs)
  
   print("n" + "=" * 50)
   print("📈 EXECUTION RESULTS")
   print("=" * 50)
  
   if outcomes['status'] == 'success':
       print(f"✅ Status: {outcomes['status'].higher()}")
       print(f"📝 Tasks Completed: {outcomes['tasks_completed']}")
       print(f"🤖 Agents Involved: {outcomes['agents_involved']}")
       print(f"📄 Final Result Preview: {str(outcomes['result'])[:200]}...")
   else:
       print(f"❌ Status: {outcomes['status'].higher()}")
       print(f"🚫 Error: {outcomes['error']}")
  
   metrics = framework.get_crew_usage_metrics()
   print(f"n💰 Usage Metrics: {metrics}")


if __name__ == "__main__":
   demo_supervisor_framework()

We add a demo_supervisor_framework() perform to showcase the complete workflow in motion. Here, we initialize the framework with a Gemini API key, outline a pattern subject, load activity configurations, and execute the venture. We then show activity progress, execution outcomes, and utilization metrics so we are able to clearly see how the supervisor coordinates the brokers end-to-end.

In conclusion, we see how the Supervisor Framework permits us to systematically handle advanced initiatives by using a number of specialised brokers that work in unison. We can now execute analysis, evaluation, writing, and reviewing duties in a coordinated workflow, with the supervisor making certain high quality and alignment at each stage. With this setup, we’re outfitted to deal with real-world initiatives extra effectively, turning summary objectives into actionable, high-quality deliverables.


Check out the FULL CODES here. Feel free to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Also, be at liberty to comply with us on Twitter and don’t neglect to be a part of our 100k+ ML SubReddit and Subscribe to our Newsletter.

The put up A Coding Guide to Build a Hierarchical Supervisor Agent Framework with CrewAI and Google Gemini for Coordinated Multi-Agent Workflows appeared first on MarkTechPost.

Similar Posts