11import json
2- import threading
3- import time
42import uuid
5- from typing import Any , ClassVar , Dict , List , Optional , Union
3+ from typing import Any , Dict , List , Optional , Union
64
75from pydantic import (
86 UUID4 ,
1917
2018from crewai .agent import Agent
2119from crewai .agents .cache import CacheHandler
22- from crewai .i18n import I18N
2320from crewai .process import Process
2421from crewai .task import Task
2522from crewai .tools .agent_tools import AgentTools
23+ from crewai .utilities import I18N , Logger , RPMController
2624
2725
2826class Crew (BaseModel ):
@@ -37,23 +35,26 @@ class Crew(BaseModel):
3735 config: Configuration settings for the crew.
3836 cache_handler: Handles caching for the crew's operations.
3937 max_rpm: Maximum number of requests per minute for the crew execution to be respected.
40- rpm: Current number of requests per minute for the crew execution.
4138 id: A unique identifier for the crew instance.
4239 """
4340
4441 __hash__ = object .__hash__
45- _timer : Optional [threading .Timer ] = PrivateAttr (default = None )
46- lock : ClassVar [threading .Lock ] = threading .Lock ()
47- rpm : ClassVar [int ] = 0
48- max_rpm : Optional [int ] = Field (default = None )
42+ _rpm_controller : RPMController = PrivateAttr ()
43+ _logger : Logger = PrivateAttr ()
44+ _cache_handler : Optional [InstanceOf [CacheHandler ]] = PrivateAttr (
45+ default = CacheHandler ()
46+ )
4947 model_config = ConfigDict (arbitrary_types_allowed = True )
5048 tasks : List [Task ] = Field (default_factory = list )
5149 agents : List [Agent ] = Field (default_factory = list )
5250 process : Process = Field (default = Process .sequential )
5351 verbose : Union [int , bool ] = Field (default = 0 )
5452 config : Optional [Union [Json , Dict [str , Any ]]] = Field (default = None )
55- cache_handler : Optional [InstanceOf [CacheHandler ]] = Field (default = CacheHandler ())
5653 id : UUID4 = Field (default_factory = uuid .uuid4 , frozen = True )
54+ max_rpm : Optional [int ] = Field (
55+ default = None ,
56+ description = "Maximum number of requests per minute for the crew execution to be respected." ,
57+ )
5758 language : str = Field (
5859 default = "en" ,
5960 description = "Language used for the crew, defaults to English." ,
@@ -74,9 +75,10 @@ def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
7475 return json .loads (v ) if isinstance (v , Json ) else v
7576
7677 @model_validator (mode = "after" )
77- def set_reset_counter (self ):
78- if self .max_rpm :
79- self ._reset_request_count ()
78+ def set_private_attrs (self ):
79+ self ._cache_handler = CacheHandler ()
80+ self ._logger = Logger (self .verbose )
81+ self ._rpm_controller = RPMController (max_rpm = self .max_rpm , logger = self ._logger )
8082 return self
8183
8284 @model_validator (mode = "after" )
@@ -94,8 +96,8 @@ def check_config(self):
9496
9597 if self .agents :
9698 for agent in self .agents :
97- agent .set_cache_handler (self .cache_handler )
98- agent .set_request_within_rpm_limit (self .ensure_request_within_rpm_limit )
99+ agent .set_cache_handler (self ._cache_handler )
100+ agent .set_rpm_controller (self ._rpm_controller )
99101 return self
100102
101103 def _setup_from_config (self ):
@@ -116,28 +118,9 @@ def _create_task(self, task_config):
116118 del task_config ["agent" ]
117119 return Task (** task_config , agent = task_agent )
118120
119- def ensure_request_within_rpm_limit (self ):
120- if not self .max_rpm :
121- return True
122-
123- with Crew .lock :
124- if Crew .rpm < self .max_rpm :
125- Crew .rpm += 1
126- return True
127- self ._log ("info" , "Max RPM reached, waiting for next minute to start." )
128-
129- return self ._wait_for_next_minute ()
130-
131- def _wait_for_next_minute (self ):
132- time .sleep (60 )
133- with Crew .lock :
134- Crew .rpm = 0
135- return True
136-
137121 def kickoff (self ) -> str :
138122 """Starts the crew to work on its assigned tasks."""
139123 for agent in self .agents :
140- agent .cache_handler = self .cache_handler
141124 agent .i18n = I18N (language = self .language )
142125
143126 if self .process == Process .sequential :
@@ -149,33 +132,18 @@ def _sequential_loop(self) -> str:
149132 for task in self .tasks :
150133 self ._prepare_and_execute_task (task )
151134 task_output = task .execute (task_output )
152- self ._log ("debug" , f"\n [{ task .agent .role } ] Task output: { task_output } \n \n " )
153- self ._stop_timer ()
135+ self ._logger .log (
136+ "debug" , f"[{ task .agent .role } ] Task output: { task_output } \n \n "
137+ )
138+
139+ if self .max_rpm :
140+ self ._rpm_controller .stop_rpm_counter ()
154141 return task_output
155142
156143 def _prepare_and_execute_task (self , task ):
157144 """Prepares and logs information about the task being executed."""
158145 if task .agent .allow_delegation :
159146 task .tools += AgentTools (agents = self .agents ).tools ()
160147
161- self ._log ("debug" , f"Working Agent: { task .agent .role } " )
162- self ._log ("info" , f"Starting Task: { task .description } " )
163-
164- def _log (self , level , message ):
165- """Logs a message at the specified verbosity level."""
166- level_map = {"debug" : 1 , "info" : 2 }
167- verbose_level = (
168- 2 if isinstance (self .verbose , bool ) and self .verbose else self .verbose
169- )
170- if verbose_level and level_map [level ] <= verbose_level :
171- print (f"\n { message } " )
172-
173- def _stop_timer (self ):
174- if self ._timer :
175- self ._timer .cancel ()
176-
177- def _reset_request_count (self ):
178- self ._stop_timer ()
179- self ._timer = threading .Timer (60.0 , self ._reset_request_count )
180- self ._timer .start ()
181- Crew .rpm = 0
148+ self ._logger .log ("debug" , f"Working Agent: { task .agent .role } " )
149+ self ._logger .log ("info" , f"Starting Task: { task .description } " )
0 commit comments