Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/ffi/src/catalog_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl FFI_CatalogProvider {
/// defined on this struct must only use the stable functions provided in
/// FFI_CatalogProvider to interact with the foreign table provider.
#[derive(Debug)]
pub struct ForeignCatalogProvider(FFI_CatalogProvider);
pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider);

unsafe impl Send for ForeignCatalogProvider {}
unsafe impl Sync for ForeignCatalogProvider {}
Expand Down
283 changes: 283 additions & 0 deletions datafusion/ffi/src/catalog_provider_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, ffi::c_void, sync::Arc};

use abi_stable::{
std_types::{ROption, RString, RVec},
StableAbi,
};
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use tokio::runtime::Handle;

use crate::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};

/// A stable struct for sharing [`CatalogProviderList`] across FFI boundaries.
#[repr(C)]
#[derive(Debug, StableAbi)]
#[allow(non_camel_case_types)]
pub struct FFI_CatalogProviderList {
/// Register a catalog
pub register_catalog: unsafe extern "C" fn(
Comment thread
timsaucer marked this conversation as resolved.
&Self,
name: RString,
catalog: &FFI_CatalogProvider,
Comment thread
timsaucer marked this conversation as resolved.
) -> ROption<FFI_CatalogProvider>,

/// List of existing catalogs
pub catalog_names: unsafe extern "C" fn(&Self) -> RVec<RString>,
Comment thread
timsaucer marked this conversation as resolved.

/// Access a catalog
pub catalog:
Comment thread
timsaucer marked this conversation as resolved.
unsafe extern "C" fn(&Self, name: RString) -> ROption<FFI_CatalogProvider>,

/// Used to create a clone on the provider of the execution plan. This should
Comment thread
timsaucer marked this conversation as resolved.
Outdated
/// only need to be called by the receiver of the plan.
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,

/// Release the memory of the private data when it is no longer being used.
pub release: unsafe extern "C" fn(arg: &mut Self),

/// Return the major DataFusion version number of this provider.
pub version: unsafe extern "C" fn() -> u64,

/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignCatalogProviderList`] should never attempt to access this data.
pub private_data: *mut c_void,
}

unsafe impl Send for FFI_CatalogProviderList {}
unsafe impl Sync for FFI_CatalogProviderList {}

struct ProviderPrivateData {
provider: Arc<dyn CatalogProviderList + Send>,
runtime: Option<Handle>,
}

impl FFI_CatalogProviderList {
unsafe fn inner(&self) -> &Arc<dyn CatalogProviderList + Send> {
let private_data = self.private_data as *const ProviderPrivateData;
&(*private_data).provider
}

unsafe fn runtime(&self) -> Option<Handle> {
let private_data = self.private_data as *const ProviderPrivateData;
(*private_data).runtime.clone()
}
}

unsafe extern "C" fn catalog_names_fn_wrapper(
provider: &FFI_CatalogProviderList,
) -> RVec<RString> {
let names = provider.inner().catalog_names();
names.into_iter().map(|s| s.into()).collect()
}

unsafe extern "C" fn register_catalog_fn_wrapper(
Comment thread
timsaucer marked this conversation as resolved.
Comment thread
timsaucer marked this conversation as resolved.
provider: &FFI_CatalogProviderList,
name: RString,
catalog: &FFI_CatalogProvider,
) -> ROption<FFI_CatalogProvider> {
let runtime = provider.runtime();
let provider = provider.inner();
let catalog = Arc::new(ForeignCatalogProvider::from(catalog));

provider
.register_catalog(name.into(), catalog)
.map(|catalog| FFI_CatalogProvider::new(catalog, runtime))
.into()
}

unsafe extern "C" fn catalog_fn_wrapper(
provider: &FFI_CatalogProviderList,
name: RString,
) -> ROption<FFI_CatalogProvider> {
let runtime = provider.runtime();
let provider = provider.inner();
provider
.catalog(name.as_str())
.map(|catalog| FFI_CatalogProvider::new(catalog, runtime))
.into()
}

unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) {
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
drop(private_data);
}

unsafe extern "C" fn clone_fn_wrapper(
provider: &FFI_CatalogProviderList,
) -> FFI_CatalogProviderList {
let old_private_data = provider.private_data as *const ProviderPrivateData;
let runtime = (*old_private_data).runtime.clone();

let private_data = Box::into_raw(Box::new(ProviderPrivateData {
provider: Arc::clone(&(*old_private_data).provider),
runtime,
})) as *mut c_void;

FFI_CatalogProviderList {
register_catalog: register_catalog_fn_wrapper,
catalog_names: catalog_names_fn_wrapper,
catalog: catalog_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data,
}
}

impl Drop for FFI_CatalogProviderList {
fn drop(&mut self) {
unsafe { (self.release)(self) }
}
}

impl FFI_CatalogProviderList {
/// Creates a new [`FFI_CatalogProviderList`].
pub fn new(
provider: Arc<dyn CatalogProviderList + Send>,
runtime: Option<Handle>,
) -> Self {
let private_data = Box::new(ProviderPrivateData { provider, runtime });

Self {
register_catalog: register_catalog_fn_wrapper,
catalog_names: catalog_names_fn_wrapper,
catalog: catalog_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data: Box::into_raw(private_data) as *mut c_void,
}
}
}

/// This wrapper struct exists on the receiver side of the FFI interface, so it has
/// no guarantees about being able to access the data in `private_data`. Any functions
/// defined on this struct must only use the stable functions provided in
/// FFI_CatalogProviderList to interact with the foreign table provider.
Comment thread
timsaucer marked this conversation as resolved.
Outdated
#[derive(Debug)]
pub struct ForeignCatalogProviderList(FFI_CatalogProviderList);

unsafe impl Send for ForeignCatalogProviderList {}
unsafe impl Sync for ForeignCatalogProviderList {}

impl From<&FFI_CatalogProviderList> for ForeignCatalogProviderList {
fn from(provider: &FFI_CatalogProviderList) -> Self {
Self(provider.clone())
}
}

impl Clone for FFI_CatalogProviderList {
fn clone(&self) -> Self {
unsafe { (self.clone)(self) }
}
}

impl CatalogProviderList for ForeignCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
unsafe {
let catalog = match catalog.as_any().downcast_ref::<ForeignCatalogProvider>()
{
Some(s) => &s.0,
None => &FFI_CatalogProvider::new(catalog, None),
};

(self.0.register_catalog)(&self.0, name.into(), catalog)
.map(|s| Arc::new(ForeignCatalogProvider(s)) as Arc<dyn CatalogProvider>)
.into()
}
}

fn catalog_names(&self) -> Vec<String> {
unsafe {
(self.0.catalog_names)(&self.0)
.into_iter()
.map(Into::into)
.collect()
}
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
unsafe {
(self.0.catalog)(&self.0, name.into())
.map(|catalog| {
Arc::new(ForeignCatalogProvider(catalog)) as Arc<dyn CatalogProvider>
})
.into()
}
}
}

#[cfg(test)]
mod tests {
use datafusion::catalog::{MemoryCatalogProvider, MemoryCatalogProviderList};

use super::*;

#[test]
fn test_round_trip_ffi_catalog_provider_list() {
let prior_catalog = Arc::new(MemoryCatalogProvider::new());

let catalog_list = Arc::new(MemoryCatalogProviderList::new());
assert!(catalog_list
.as_ref()
.register_catalog("prior_catalog".to_owned(), prior_catalog)
.is_none());

let ffi_catalog_list = FFI_CatalogProviderList::new(catalog_list, None);

let foreign_catalog_list: ForeignCatalogProviderList = (&ffi_catalog_list).into();

let prior_catalog_names = foreign_catalog_list.catalog_names();
assert_eq!(prior_catalog_names.len(), 1);
assert_eq!(prior_catalog_names[0], "prior_catalog");

// Replace an existing catalog with one of the same name
let returned_catalog = foreign_catalog_list.register_catalog(
"prior_catalog".to_owned(),
Arc::new(MemoryCatalogProvider::new()),
);
assert!(returned_catalog.is_some());
assert_eq!(foreign_catalog_list.catalog_names().len(), 1);

// Add a new catalog
let returned_catalog = foreign_catalog_list.register_catalog(
"second_catalog".to_owned(),
Arc::new(MemoryCatalogProvider::new()),
);
assert!(returned_catalog.is_none());
assert_eq!(foreign_catalog_list.catalog_names().len(), 2);

// Retrieve non-existent catalog
let returned_catalog = foreign_catalog_list.catalog("non_existent_catalog");
assert!(returned_catalog.is_none());

// Retrieve valid catalog
let returned_catalog = foreign_catalog_list.catalog("second_catalog");
assert!(returned_catalog.is_some());
}
}
1 change: 1 addition & 0 deletions datafusion/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

pub mod arrow_wrappers;
pub mod catalog_provider;
pub mod catalog_provider_list;
pub mod execution_plan;
pub mod insert_op;
pub mod plan_properties;
Expand Down
54 changes: 54 additions & 0 deletions datafusion/ffi/src/tests/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
use std::{any::Any, fmt::Debug, sync::Arc};

use crate::catalog_provider::FFI_CatalogProvider;
use crate::catalog_provider_list::FFI_CatalogProviderList;
use arrow::datatypes::Schema;
use async_trait::async_trait;
use datafusion::catalog::{CatalogProviderList, MemoryCatalogProviderList};
Comment thread
timsaucer marked this conversation as resolved.
Outdated
use datafusion::{
catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
Expand Down Expand Up @@ -181,3 +183,55 @@ pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider {
let catalog_provider = Arc::new(FixedCatalogProvider::default());
FFI_CatalogProvider::new(catalog_provider, None)
}

/// This catalog provider list is intended only for unit tests. It prepopulates with one
/// catalog and only allows for catalogs named after four colors.
#[derive(Debug)]
pub struct FixedCatalogProviderList {
inner: MemoryCatalogProviderList,
}

impl Default for FixedCatalogProviderList {
fn default() -> Self {
let inner = MemoryCatalogProviderList::new();

let _ = inner.register_catalog(
"blue".to_owned(),
Arc::new(FixedCatalogProvider::default()),
);

Self { inner }
}
}

impl CatalogProviderList for FixedCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}

fn catalog_names(&self) -> Vec<String> {
self.inner.catalog_names()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.inner.catalog(name)
}

fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
if !["blue", "red", "green", "yellow"].contains(&name.as_str()) {
log::warn!("FixedCatalogProviderList only provides four schemas: blue, red, green, yellow");
Comment thread
timsaucer marked this conversation as resolved.
Outdated
return None;
}

self.inner.register_catalog(name, catalog)
}
}

pub(crate) extern "C" fn create_catalog_provider_list() -> FFI_CatalogProviderList {
let catalog_provider_list = Arc::new(FixedCatalogProviderList::default());
FFI_CatalogProviderList::new(catalog_provider_list, None)
}
6 changes: 6 additions & 0 deletions datafusion/ffi/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::udaf::FFI_AggregateUDF;
use crate::udwf::FFI_WindowUDF;

use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
use crate::catalog_provider_list::FFI_CatalogProviderList;
use crate::tests::catalog::create_catalog_provider_list;
use arrow::array::RecordBatch;
use async_provider::create_async_table_provider;
use datafusion::{
Expand Down Expand Up @@ -62,6 +64,9 @@ pub struct ForeignLibraryModule {
/// Construct an opinionated catalog provider
pub create_catalog: extern "C" fn() -> FFI_CatalogProvider,

/// Construct an opinionated catalog provider list
pub create_catalog_list: extern "C" fn() -> FFI_CatalogProviderList,

/// Constructs the table provider
pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,

Expand Down Expand Up @@ -123,6 +128,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider {
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
ForeignLibraryModule {
create_catalog: create_catalog_provider,
create_catalog_list: create_catalog_provider_list,
create_table: construct_table_provider,
create_scalar_udf: create_ffi_abs_func,
create_nullary_udf: create_ffi_random_func,
Expand Down
Loading