@@ -105,6 +105,97 @@ def indicate_duplicates(
105105 )
106106
107107
108+ def interpolate (block : blocks .Block , method : str = "linear" ) -> blocks .Block :
109+ if method != "linear" :
110+ raise NotImplementedError (
111+ f"Only 'linear' interpolate method supported. { constants .FEEDBACK_LINK } "
112+ )
113+ backwards_window = windows .WindowSpec (following = 0 )
114+ forwards_window = windows .WindowSpec (preceding = 0 )
115+
116+ output_column_ids = []
117+
118+ original_columns = block .value_columns
119+ original_labels = block .column_labels
120+ block , offsets = block .promote_offsets ()
121+ for column in original_columns :
122+ # null in same places column is null
123+ should_interpolate = block ._column_type (column ) in [
124+ pd .Float64Dtype (),
125+ pd .Int64Dtype (),
126+ ]
127+ if should_interpolate :
128+ block , notnull = block .apply_unary_op (column , ops .notnull_op )
129+ block , masked_offsets = block .apply_binary_op (
130+ offsets , notnull , ops .partial_arg3 (ops .where_op , None )
131+ )
132+
133+ block , previous_value = block .apply_window_op (
134+ column , agg_ops .LastNonNullOp (), backwards_window
135+ )
136+ block , next_value = block .apply_window_op (
137+ column , agg_ops .FirstNonNullOp (), forwards_window
138+ )
139+ block , previous_value_offset = block .apply_window_op (
140+ masked_offsets ,
141+ agg_ops .LastNonNullOp (),
142+ backwards_window ,
143+ skip_reproject_unsafe = True ,
144+ )
145+ block , next_value_offset = block .apply_window_op (
146+ masked_offsets ,
147+ agg_ops .FirstNonNullOp (),
148+ forwards_window ,
149+ skip_reproject_unsafe = True ,
150+ )
151+
152+ block , prediction_id = _interpolate (
153+ block ,
154+ previous_value_offset ,
155+ previous_value ,
156+ next_value_offset ,
157+ next_value ,
158+ offsets ,
159+ )
160+
161+ block , interpolated_column = block .apply_binary_op (
162+ column , prediction_id , ops .fillna_op
163+ )
164+ # Pandas performs ffill-like behavior to extrapolate forwards
165+ block , interpolated_and_ffilled = block .apply_binary_op (
166+ interpolated_column , previous_value , ops .fillna_op
167+ )
168+
169+ output_column_ids .append (interpolated_and_ffilled )
170+ else :
171+ output_column_ids .append (column )
172+
173+ # Force reproject since used `skip_project_unsafe` perviously
174+ block = block .select_columns (output_column_ids )._force_reproject ()
175+ return block .with_column_labels (original_labels )
176+
177+
178+ def _interpolate (
179+ block : blocks .Block ,
180+ x0_id : str ,
181+ y0_id : str ,
182+ x1_id : str ,
183+ y1_id : str ,
184+ xpredict_id : str ,
185+ ) -> typing .Tuple [blocks .Block , str ]:
186+ """Applies linear interpolation equation to predict y values for xpredict."""
187+ block , x1x0diff = block .apply_binary_op (x1_id , x0_id , ops .sub_op )
188+ block , y1y0diff = block .apply_binary_op (y1_id , y0_id , ops .sub_op )
189+ block , xpredictx0diff = block .apply_binary_op (xpredict_id , x0_id , ops .sub_op )
190+
191+ block , y1_weight = block .apply_binary_op (y1y0diff , x1x0diff , ops .div_op )
192+ block , y1_part = block .apply_binary_op (xpredictx0diff , y1_weight , ops .mul_op )
193+
194+ block , prediction_id = block .apply_binary_op (y0_id , y1_part , ops .add_op )
195+ block = block .drop_columns ([x1x0diff , y1y0diff , xpredictx0diff , y1_weight , y1_part ])
196+ return block , prediction_id
197+
198+
108199def drop_duplicates (
109200 block : blocks .Block , columns : typing .Sequence [str ], keep : str = "first"
110201) -> blocks .Block :
0 commit comments