@@ -42,21 +42,6 @@ def __init__(self, config: Dict[str, Any], loop: AbstractEventLoop):
42
42
self ._consumer = AIOKafkaConsumer (** conf , loop = loop )
43
43
loop .run_until_complete (self ._consumer .start ())
44
44
45
- def on_assign_offset_end (self , consumer : AIOKafkaConsumer , partitions : List [TopicPartition ]) -> None :
46
- for p in partitions :
47
- p .offset = consumer .last_stable_offset (p )
48
- self .on_assign_log (consumer , partitions )
49
- try :
50
- consumer .assign (partitions )
51
- except KafkaError as e :
52
- self ._error_callback (e )
53
-
54
- def on_coop_assign_offset_end (self , consumer : AIOKafkaConsumer , partitions : List [TopicPartition ]) -> None :
55
- for p in partitions :
56
- p .offset = consumer .last_stable_offset (p )
57
- self .on_assign_log (consumer , partitions )
58
- consumer .assign (consumer .assignment ().update (partitions ))
59
-
60
45
def on_assign_log (self , consumer : AIOKafkaConsumer , partitions : List [TopicPartition ]) -> None :
61
46
log_level = "WARNING"
62
47
params = {
@@ -77,19 +62,11 @@ def subscribe(self, topics: Optional[Iterable[str]] = None) -> None:
77
62
try :
78
63
self ._consumer .subscribe (topics , listener = CoreConsumerRebalanceListener (
79
64
consumer = self ._consumer ,
80
- on_assign_callback = (self .get_on_assign_callback () if self .assign_offset_end
81
- else self .on_assign_log )
65
+ on_assign_callback = self .on_assign_log
82
66
))
83
67
except KafkaError as e :
84
68
self ._error_callback (e )
85
69
86
- def get_on_assign_callback (self ) -> Callable [[AIOKafkaConsumer , List [TopicPartition ]], None ]:
87
- if "cooperative" in self ._config ["conf" ].get ("partition_assignment_strategy" , "" ):
88
- callback = self .on_coop_assign_offset_end
89
- else :
90
- callback = self .on_assign_offset_end
91
- return callback
92
-
93
70
def unsubscribe (self ) -> None :
94
71
self ._consumer .unsubscribe ()
95
72
0 commit comments