1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- use std:: collections:: HashSet ;
15+ use std:: collections:: { HashMap , HashSet } ;
16+ use std:: sync:: Arc ;
1617use std:: { iter, mem} ;
1718
18- use api:: v1:: region:: region_request;
19- use api:: v1:: { DeleteRequests , RowDeleteRequest , RowDeleteRequests } ;
20- use catalog:: CatalogManager ;
21- use client:: region_handler:: RegionRequestHandler ;
19+ use api:: v1:: region:: { DeleteRequests as RegionDeleteRequests , RegionRequestHeader } ;
20+ use api:: v1:: { DeleteRequests , RowDeleteRequests } ;
21+ use catalog:: CatalogManagerRef ;
22+ use common_meta:: datanode_manager:: { AffectedRows , DatanodeManagerRef } ;
23+ use common_meta:: peer:: Peer ;
2224use common_query:: Output ;
25+ use futures_util:: future;
26+ use metrics:: counter;
27+ use partition:: manager:: PartitionRuleManagerRef ;
2328use session:: context:: QueryContextRef ;
2429use snafu:: { ensure, OptionExt , ResultExt } ;
30+ use table:: requests:: DeleteRequest as TableDeleteRequest ;
2531use table:: TableRef ;
2632
2733use crate :: error:: {
28- CatalogSnafu , InvalidDeleteRequestSnafu , MissingTimeIndexColumnSnafu , RequestDatanodeSnafu ,
29- Result , TableNotFoundSnafu ,
34+ CatalogSnafu , FindRegionLeaderSnafu , InvalidDeleteRequestSnafu , JoinTaskSnafu ,
35+ MissingTimeIndexColumnSnafu , RequestDeletesSnafu , Result , TableNotFoundSnafu ,
3036} ;
31- use crate :: req_convert:: delete:: { ColumnToRow , RowToRegion } ;
37+ use crate :: region_req_factory:: RegionRequestFactory ;
38+ use crate :: req_convert:: delete:: { ColumnToRow , RowToRegion , TableToRegion } ;
3239
33- pub ( crate ) struct Deleter < ' a > {
34- catalog_manager : & ' a dyn CatalogManager ,
35- region_request_handler : & ' a dyn RegionRequestHandler ,
40+ pub struct Deleter {
41+ catalog_manager : CatalogManagerRef ,
42+ partition_manager : PartitionRuleManagerRef ,
43+ datanode_manager : DatanodeManagerRef ,
3644}
3745
38- impl < ' a > Deleter < ' a > {
46+ pub type DeleterRef = Arc < Deleter > ;
47+
48+ impl Deleter {
3949 pub fn new (
40- catalog_manager : & ' a dyn CatalogManager ,
41- region_request_handler : & ' a dyn RegionRequestHandler ,
50+ catalog_manager : CatalogManagerRef ,
51+ partition_manager : PartitionRuleManagerRef ,
52+ datanode_manager : DatanodeManagerRef ,
4253 ) -> Self {
4354 Self {
4455 catalog_manager,
45- region_request_handler,
56+ partition_manager,
57+ datanode_manager,
4658 }
4759 }
4860
@@ -67,31 +79,99 @@ impl<'a> Deleter<'a> {
6779 . map ( |r| !r. rows . is_empty ( ) )
6880 . unwrap_or_default ( )
6981 } ) ;
70- validate_row_count_match ( & requests) ?;
82+ validate_column_count_match ( & requests) ?;
7183
7284 let requests = self . trim_columns ( requests, & ctx) . await ?;
73- let region_request = RowToRegion :: new ( self . catalog_manager , & ctx)
74- . convert ( requests)
75- . await ?;
76- let region_request = region_request:: Body :: Deletes ( region_request) ;
85+ let deletes = RowToRegion :: new (
86+ self . catalog_manager . as_ref ( ) ,
87+ self . partition_manager . as_ref ( ) ,
88+ & ctx,
89+ )
90+ . convert ( requests)
91+ . await ?;
7792
78- let affected_rows = self
79- . region_request_handler
80- . handle ( region_request, ctx)
81- . await
82- . context ( RequestDatanodeSnafu ) ?;
93+ let affected_rows = self . do_request ( deletes, ctx. trace_id ( ) , 0 ) . await ?;
8394 Ok ( Output :: AffectedRows ( affected_rows as _ ) )
8495 }
96+
97+ pub async fn handle_table_delete (
98+ & self ,
99+ request : TableDeleteRequest ,
100+ ctx : QueryContextRef ,
101+ ) -> Result < AffectedRows > {
102+ let catalog = request. catalog_name . as_str ( ) ;
103+ let schema = request. schema_name . as_str ( ) ;
104+ let table = request. table_name . as_str ( ) ;
105+ let table = self . get_table ( catalog, schema, table) . await ?;
106+ let table_info = table. table_info ( ) ;
107+
108+ let deletes = TableToRegion :: new ( & table_info, & self . partition_manager )
109+ . convert ( request)
110+ . await ?;
111+ self . do_request ( deletes, ctx. trace_id ( ) , 0 ) . await
112+ }
85113}
86114
87- impl < ' a > Deleter < ' a > {
115+ impl Deleter {
116+ async fn do_request (
117+ & self ,
118+ requests : RegionDeleteRequests ,
119+ trace_id : u64 ,
120+ span_id : u64 ,
121+ ) -> Result < AffectedRows > {
122+ let header = RegionRequestHeader { trace_id, span_id } ;
123+ let request_factory = RegionRequestFactory :: new ( header) ;
124+
125+ let tasks = self
126+ . group_requests_by_peer ( requests)
127+ . await ?
128+ . into_iter ( )
129+ . map ( |( peer, deletes) | {
130+ let request = request_factory. build_delete ( deletes) ;
131+ let datanode_manager = self . datanode_manager . clone ( ) ;
132+ common_runtime:: spawn_write ( async move {
133+ datanode_manager
134+ . datanode ( & peer)
135+ . await
136+ . handle ( request)
137+ . await
138+ . context ( RequestDeletesSnafu )
139+ } )
140+ } ) ;
141+ let results = future:: try_join_all ( tasks) . await . context ( JoinTaskSnafu ) ?;
142+
143+ let affected_rows = results. into_iter ( ) . sum :: < Result < u64 > > ( ) ?;
144+ counter ! ( crate :: metrics:: DIST_DELETE_ROW_COUNT , affected_rows) ;
145+ Ok ( affected_rows)
146+ }
147+
148+ async fn group_requests_by_peer (
149+ & self ,
150+ requests : RegionDeleteRequests ,
151+ ) -> Result < HashMap < Peer , RegionDeleteRequests > > {
152+ let mut deletes: HashMap < Peer , RegionDeleteRequests > = HashMap :: new ( ) ;
153+
154+ for req in requests. requests {
155+ let peer = self
156+ . partition_manager
157+ . find_region_leader ( req. region_id . into ( ) )
158+ . await
159+ . context ( FindRegionLeaderSnafu ) ?;
160+ deletes. entry ( peer) . or_default ( ) . requests . push ( req) ;
161+ }
162+
163+ Ok ( deletes)
164+ }
165+
88166 async fn trim_columns (
89167 & self ,
90168 mut requests : RowDeleteRequests ,
91169 ctx : & QueryContextRef ,
92170 ) -> Result < RowDeleteRequests > {
93171 for req in & mut requests. deletes {
94- let table = self . get_table ( req, ctx) . await ?;
172+ let catalog = ctx. current_catalog ( ) ;
173+ let schema = ctx. current_schema ( ) ;
174+ let table = self . get_table ( catalog, schema, & req. table_name ) . await ?;
95175 let key_column_names = self . key_column_names ( & table) ?;
96176
97177 let rows = req. rows . as_mut ( ) . unwrap ( ) ;
@@ -142,25 +222,25 @@ impl<'a> Deleter<'a> {
142222 Ok ( key_column_names)
143223 }
144224
145- async fn get_table ( & self , req : & RowDeleteRequest , ctx : & QueryContextRef ) -> Result < TableRef > {
225+ async fn get_table ( & self , catalog : & str , schema : & str , table : & str ) -> Result < TableRef > {
146226 self . catalog_manager
147- . table ( ctx . current_catalog ( ) , ctx . current_schema ( ) , & req . table_name )
227+ . table ( catalog , schema , table )
148228 . await
149229 . context ( CatalogSnafu ) ?
150230 . with_context ( || TableNotFoundSnafu {
151- table_name : req . table_name . clone ( ) ,
231+ table_name : common_catalog :: format_full_table_name ( catalog , schema , table ) ,
152232 } )
153233 }
154234}
155235
156- fn validate_row_count_match ( requests : & RowDeleteRequests ) -> Result < ( ) > {
236+ fn validate_column_count_match ( requests : & RowDeleteRequests ) -> Result < ( ) > {
157237 for request in & requests. deletes {
158238 let rows = request. rows . as_ref ( ) . unwrap ( ) ;
159239 let column_count = rows. schema . len ( ) ;
160240 ensure ! (
161241 rows. rows. iter( ) . all( |r| r. values. len( ) == column_count) ,
162242 InvalidDeleteRequestSnafu {
163- reason: "row count mismatch"
243+ reason: "column count mismatch"
164244 }
165245 )
166246 }
0 commit comments