Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions cpp/deeplake_pg/sync_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string
Oid relid = RangeVarGetRelid(rel, NoLock, true);

if (!OidIsValid(relid)) {
// Table doesn't exist locally - create it
elog(LOG, "pg_deeplake sync: creating table %s from catalog", qualified_name.c_str());

// Gather columns for this table, sorted by position
std::vector<pg::dl_catalog::column_meta> table_columns;
for (const auto& col : catalog_columns) {
Expand All @@ -102,26 +99,12 @@ void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string
}

const char* qschema = quote_identifier(meta.schema_name.c_str());
const char* qtable = quote_identifier(meta.table_name.c_str());

// Build CREATE TABLE IF NOT EXISTS statement
StringInfoData buf;
initStringInfo(&buf);

// Create schema if needed
appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);

pg::utils::spi_connector connector;
if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) {
elog(WARNING, "pg_deeplake sync: failed to create schema %s", meta.schema_name.c_str());
pfree(buf.data);
continue;
}

// Build CREATE TABLE statement directly from catalog metadata
// This avoids calling the SQL function create_deeplake_table which may not exist
// in the postgres database (extension might not be installed there)
resetStringInfo(&buf);
const char* qtable = quote_identifier(meta.table_name.c_str());
appendStringInfo(&buf, "CREATE TABLE %s.%s (", qschema, qtable);
appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable);

bool first = true;
for (const auto& col : table_columns) {
Expand All @@ -131,18 +114,42 @@ void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string
first = false;
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
}

// Table path is now derived from deeplake.root_path GUC set at database level
// Path: {root_path}/{schema}/{table_name}
appendStringInfo(&buf, ") USING deeplake");

if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) {
// Don't log as warning - the dataset might not be available yet
// The sync worker will retry on the next cycle
elog(DEBUG1, "pg_deeplake sync: table %s not ready yet, will retry", qualified_name.c_str());
} else {
elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str());
// Wrap in subtransaction so that if another backend concurrently
// creates the same table (race on composite type), the error is
// caught and we continue instead of aborting the sync cycle.
MemoryContext saved_context = CurrentMemoryContext;
ResourceOwner saved_owner = CurrentResourceOwner;

BeginInternalSubTransaction(NULL);
PG_TRY();
{
pg::utils::spi_connector connector;

// Create schema if needed
StringInfoData schema_buf;
initStringInfo(&schema_buf);
appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
SPI_execute(schema_buf.data, false, 0);
pfree(schema_buf.data);

if (SPI_execute(buf.data, false, 0) == SPI_OK_UTILITY) {
elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str());
}

ReleaseCurrentSubTransaction();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing MemoryContextSwitchTo(saved_context) before ReleaseCurrentSubTransaction(). PostgreSQL subtransaction cleanup requires restoring the memory context before releasing, or you risk context corruption.

Suggested change
ReleaseCurrentSubTransaction();
MemoryContextSwitchTo(saved_context);
ReleaseCurrentSubTransaction();

}
PG_CATCH();
{
// Another backend created this table concurrently — not an error.
MemoryContextSwitchTo(saved_context);
CurrentResourceOwner = saved_owner;
RollbackAndReleaseCurrentSubTransaction();
FlushErrorState();
elog(DEBUG1, "pg_deeplake sync: concurrent creation of %s, skipping", qualified_name.c_str());
}
PG_END_TRY();

pfree(buf.data);
}
Expand Down
72 changes: 45 additions & 27 deletions cpp/deeplake_pg/table_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
#include <access/heapam.h>
#include <access/htup_details.h>
#include <access/parallel.h>
#include <access/xact.h>
#include <catalog/namespace.h>
#include <catalog/pg_type.h>
#include <executor/spi.h>
Expand Down Expand Up @@ -344,28 +345,15 @@ void table_storage::load_table_metadata()
continue;
}

// Not in DDL context (e.g., SET root_path) - safe to auto-create.
// Use catalog_only_guard to skip S3 dataset operations in create_table() —
// the dataset already exists on S3, we just need the pg_class entry.
catalog_only_guard co_guard;
pg::utils::memory_context_switcher context_switcher;
pg::utils::spi_connector connector;
bool pushed_snapshot = false;
if (!ActiveSnapshotSet()) {
PushActiveSnapshot(GetTransactionSnapshot());
pushed_snapshot = true;
}
// Build CREATE TABLE IF NOT EXISTS from catalog metadata.
// Wrap in a subtransaction so that if another backend concurrently
// creates the same table (race on composite type), the error is
// caught and we continue instead of aborting the session.
const char* qschema = quote_identifier(meta.schema_name.c_str());
const char* qtable = quote_identifier(meta.table_name.c_str());

StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
SPI_execute(buf.data, false, 0);

// Build CREATE TABLE statement directly from catalog metadata
// This avoids calling the SQL function create_deeplake_table which may not exist
resetStringInfo(&buf);
const char* qtable = quote_identifier(meta.table_name.c_str());
appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable);

bool first = true;
Expand All @@ -376,19 +364,49 @@ void table_storage::load_table_metadata()
first = false;
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
}

// Table path is now derived from deeplake.root_path GUC set at database level
// Path: {root_path}/{schema}/{table_name}
appendStringInfo(&buf, ") USING deeplake");

if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) {
elog(WARNING, "Failed to auto-create deeplake table %s from catalog", qualified_name.c_str());
}
pfree(buf.data);
MemoryContext saved_context = CurrentMemoryContext;
ResourceOwner saved_owner = CurrentResourceOwner;

BeginInternalSubTransaction(NULL);
PG_TRY();
{
catalog_only_guard co_guard;
pg::utils::spi_connector connector;
bool pushed_snapshot = false;
if (!ActiveSnapshotSet()) {
PushActiveSnapshot(GetTransactionSnapshot());
pushed_snapshot = true;
}

if (pushed_snapshot) {
PopActiveSnapshot();
// Create schema if needed
StringInfoData schema_buf;
initStringInfo(&schema_buf);
appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
SPI_execute(schema_buf.data, false, 0);
pfree(schema_buf.data);

SPI_execute(buf.data, false, 0);

if (pushed_snapshot) {
PopActiveSnapshot();
}

ReleaseCurrentSubTransaction();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing MemoryContextSwitchTo(saved_context) before ReleaseCurrentSubTransaction(). This can cause memory context corruption when the subtransaction commits successfully.

Suggested change
ReleaseCurrentSubTransaction();
MemoryContextSwitchTo(saved_context);
ReleaseCurrentSubTransaction();

}
PG_CATCH();
{
// Another backend created this table concurrently — not an error.
MemoryContextSwitchTo(saved_context);
CurrentResourceOwner = saved_owner;
RollbackAndReleaseCurrentSubTransaction();
FlushErrorState();
elog(DEBUG1, "Concurrent table creation for %s, skipping", qualified_name.c_str());
}
PG_END_TRY();

pfree(buf.data);

relid = RangeVarGetRelid(rel, NoLock, true);
}
Expand Down