Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion src/common/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;

use arrow::error::ArrowError;
use common_error::prelude::*;
use common_recordbatch::error::Error as RecordbatchError;
use datafusion_common::DataFusionError;
use datatypes::arrow;
use datatypes::arrow::datatypes::DataType as ArrowDatatype;
Expand All @@ -26,6 +27,22 @@ use statrs::StatsError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Fail to execute Python UDF, source: {}", msg))]
PyUdf {
// TODO(discord9): find a way that prevent circle depend(query<-script<-query) and can use script's error type
msg: String,
Comment thread
discord9 marked this conversation as resolved.
backtrace: Backtrace,
},

#[snafu(display(
Comment thread
discord9 marked this conversation as resolved.
"Fail to create temporary recordbatch when eval Python UDF, source: {}",
source
))]
UdfTempRecordBatch {
#[snafu(backtrace)]
source: RecordbatchError,
},

#[snafu(display("Fail to execute function, source: {}", source))]
ExecuteFunction {
source: DataFusionError,
Expand Down Expand Up @@ -167,7 +184,9 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteFunction { .. }
Error::UdfTempRecordBatch { .. }
| Error::PyUdf { .. }
| Error::ExecuteFunction { .. }
| Error::GenerateFunction { .. }
| Error::CreateAccumulator { .. }
| Error::DowncastVector { .. }
Expand Down
16 changes: 8 additions & 8 deletions src/script/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ once_cell = "1.17.0"
paste = { workspace = true, optional = true }
query = { path = "../query" }
# TODO(discord9): This is a forked and tweaked version of RustPython, please update it to newest original RustPython After Update toolchain to 1.65
rustpython-ast = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" }
rustpython-codegen = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" }
rustpython-compiler = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" }
rustpython-compiler-core = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" }
rustpython-parser = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" }
rustpython-pylib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537", features = [
rustpython-ast = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345" }
Comment thread
killme2008 marked this conversation as resolved.
rustpython-codegen = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345" }
rustpython-compiler = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345" }
rustpython-compiler-core = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345" }
rustpython-parser = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345" }
rustpython-pylib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345", features = [
"freeze-stdlib",
] }
rustpython-stdlib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537" }
rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = true, rev = "f89b1537", features = [
rustpython-stdlib = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345" }
rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = true, rev = "2e126345", features = [
"default",
"codegen",
] }
Expand Down
4 changes: 4 additions & 0 deletions src/script/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl ScriptManager {

logging::info!("Compiled and cached script: {}", name);

script.as_ref().register_udf();

logging::info!("Script register as UDF: {}", name);

Ok(script)
}

Expand Down
21 changes: 12 additions & 9 deletions src/script/src/python/coprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use common_recordbatch::RecordBatch;
use common_telemetry::info;
use datatypes::arrow::array::Array;
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Helper, VectorRef};
Expand Down Expand Up @@ -55,7 +54,7 @@ thread_local!(static INTERPRETER: RefCell<Option<Arc<Interpreter>>> = RefCell::n
pub struct AnnotationInfo {
/// if None, use types inferred by PyVector
// TODO(yingwen): We should use our data type. i.e. ConcreteDataType.
pub datatype: Option<ArrowDataType>,
pub datatype: Option<ConcreteDataType>,
pub is_nullable: bool,
}

Expand Down Expand Up @@ -122,14 +121,12 @@ impl Coprocessor {
} = anno[idx].to_owned().unwrap_or_else(|| {
// default to be not nullable and use DataType inferred by PyVector itself
AnnotationInfo {
datatype: Some(real_ty.as_arrow_type()),
datatype: Some(real_ty.clone()),
is_nullable: false,
}
});
let column_type = match ty {
Some(arrow_type) => {
ConcreteDataType::try_from(&arrow_type).context(TypeCastSnafu)?
}
Some(anno_type) => anno_type,
// if type is like `_` or `_ | None`
None => real_ty,
};
Expand Down Expand Up @@ -165,9 +162,10 @@ impl Coprocessor {
{
let real_ty = col.data_type();
let anno_ty = datatype;
if real_ty.as_arrow_type() != *anno_ty {
if real_ty != *anno_ty {
let array = col.to_arrow_array();
let array = compute::cast(&array, anno_ty).context(ArrowSnafu)?;
let array =
compute::cast(&array, &anno_ty.as_arrow_type()).context(ArrowSnafu)?;
*col = Helper::try_into_vector(array).context(TypeCastSnafu)?;
}
}
Expand Down Expand Up @@ -223,6 +221,7 @@ fn check_args_anno_real_type(
for (idx, arg) in args.iter().enumerate() {
let anno_ty = copr.arg_types[idx].to_owned();
let real_ty = arg.to_arrow_array().data_type().to_owned();
let real_ty = ConcreteDataType::from_arrow_type(&real_ty);
let is_nullable: bool = rb.schema.column_schemas()[idx].is_nullable();
ensure!(
anno_ty
Expand Down Expand Up @@ -372,7 +371,11 @@ pub(crate) fn init_interpreter() -> Arc<Interpreter> {
let native_module_allow_list = HashSet::from([
"array", "cmath", "gc", "hashlib", "_json", "_random", "math",
]);
let interpreter = Arc::new(vm::Interpreter::with_init(Default::default(), |vm| {
// TODO(discord9): edge cases, can't use "..Default::default" because Settings is `#[non_exhaustive]`
// so more in here: https://internals.rust-lang.org/t/allow-constructing-non-exhaustive-structs-using-default-default/13868
let mut settings = vm::Settings::default();
settings.no_sig_int = true;
Comment thread
killme2008 marked this conversation as resolved.
let interpreter = Arc::new(vm::Interpreter::with_init(settings, |vm| {
// not using full stdlib to prevent security issue, instead filter out a few simple util module
vm.add_native_modules(
rustpython_stdlib::get_module_inits()
Expand Down
28 changes: 14 additions & 14 deletions src/script/src/python/coprocessor/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashSet;

use datatypes::arrow::datatypes::DataType;
use datatypes::prelude::ConcreteDataType;
use rustpython_parser::ast::{Arguments, Location};
use rustpython_parser::{ast, parser};
#[cfg(test)]
Expand Down Expand Up @@ -81,20 +81,20 @@ fn pylist_to_vec(lst: &ast::Expr<()>) -> Result<Vec<String>> {
}
}

fn try_into_datatype(ty: &str, loc: &Location) -> Result<Option<DataType>> {
fn try_into_datatype(ty: &str, loc: &Location) -> Result<Option<ConcreteDataType>> {
match ty {
"bool" => Ok(Some(DataType::Boolean)),
"u8" => Ok(Some(DataType::UInt8)),
"u16" => Ok(Some(DataType::UInt16)),
"u32" => Ok(Some(DataType::UInt32)),
"u64" => Ok(Some(DataType::UInt64)),
"i8" => Ok(Some(DataType::Int8)),
"i16" => Ok(Some(DataType::Int16)),
"i32" => Ok(Some(DataType::Int32)),
"i64" => Ok(Some(DataType::Int64)),
"f16" => Ok(Some(DataType::Float16)),
"f32" => Ok(Some(DataType::Float32)),
"f64" => Ok(Some(DataType::Float64)),
"bool" => Ok(Some(ConcreteDataType::boolean_datatype())),
"u8" => Ok(Some(ConcreteDataType::uint8_datatype())),
"u16" => Ok(Some(ConcreteDataType::uint16_datatype())),
"u32" => Ok(Some(ConcreteDataType::uint32_datatype())),
"u64" => Ok(Some(ConcreteDataType::uint64_datatype())),
"i8" => Ok(Some(ConcreteDataType::int8_datatype())),
"i16" => Ok(Some(ConcreteDataType::int16_datatype())),
"i32" => Ok(Some(ConcreteDataType::int32_datatype())),
"i64" => Ok(Some(ConcreteDataType::int64_datatype())),
"f32" => Ok(Some(ConcreteDataType::float32_datatype())),
"f64" => Ok(Some(ConcreteDataType::float64_datatype())),
"str" => Ok(Some(ConcreteDataType::string_datatype())),
// for any datatype
"_" => Ok(None),
// note the different between "_" and _
Expand Down
Loading