Skip to content

Commit 43fa9d8

Browse files
authored
Merge pull request #3135 from activeloopai/fix-race
Fixed parallel ingestion.
2 parents 3bc282a + f8d3e8a commit 43fa9d8

2 files changed

Lines changed: 81 additions & 56 deletions

File tree

cpp/deeplake_pg/sync_worker.cpp

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string
8383
Oid relid = RangeVarGetRelid(rel, NoLock, true);
8484

8585
if (!OidIsValid(relid)) {
86-
// Table doesn't exist locally - create it
87-
elog(LOG, "pg_deeplake sync: creating table %s from catalog", qualified_name.c_str());
88-
8986
// Gather columns for this table, sorted by position
9087
std::vector<pg::dl_catalog::column_meta> table_columns;
9188
for (const auto& col : catalog_columns) {
@@ -102,26 +99,12 @@ void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string
10299
}
103100

104101
const char* qschema = quote_identifier(meta.schema_name.c_str());
102+
const char* qtable = quote_identifier(meta.table_name.c_str());
105103

104+
// Build CREATE TABLE IF NOT EXISTS statement
106105
StringInfoData buf;
107106
initStringInfo(&buf);
108-
109-
// Create schema if needed
110-
appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
111-
112-
pg::utils::spi_connector connector;
113-
if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) {
114-
elog(WARNING, "pg_deeplake sync: failed to create schema %s", meta.schema_name.c_str());
115-
pfree(buf.data);
116-
continue;
117-
}
118-
119-
// Build CREATE TABLE statement directly from catalog metadata
120-
// This avoids calling the SQL function create_deeplake_table which may not exist
121-
// in the postgres database (extension might not be installed there)
122-
resetStringInfo(&buf);
123-
const char* qtable = quote_identifier(meta.table_name.c_str());
124-
appendStringInfo(&buf, "CREATE TABLE %s.%s (", qschema, qtable);
107+
appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable);
125108

126109
bool first = true;
127110
for (const auto& col : table_columns) {
@@ -131,18 +114,42 @@ void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string
131114
first = false;
132115
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
133116
}
134-
135-
// Table path is now derived from deeplake.root_path GUC set at database level
136-
// Path: {root_path}/{schema}/{table_name}
137117
appendStringInfo(&buf, ") USING deeplake");
138118

139-
if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) {
140-
// Don't log as warning - the dataset might not be available yet
141-
// The sync worker will retry on the next cycle
142-
elog(DEBUG1, "pg_deeplake sync: table %s not ready yet, will retry", qualified_name.c_str());
143-
} else {
144-
elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str());
119+
// Wrap in subtransaction so that if another backend concurrently
120+
// creates the same table (race on composite type), the error is
121+
// caught and we continue instead of aborting the sync cycle.
122+
MemoryContext saved_context = CurrentMemoryContext;
123+
ResourceOwner saved_owner = CurrentResourceOwner;
124+
125+
BeginInternalSubTransaction(NULL);
126+
PG_TRY();
127+
{
128+
pg::utils::spi_connector connector;
129+
130+
// Create schema if needed
131+
StringInfoData schema_buf;
132+
initStringInfo(&schema_buf);
133+
appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
134+
SPI_execute(schema_buf.data, false, 0);
135+
pfree(schema_buf.data);
136+
137+
if (SPI_execute(buf.data, false, 0) == SPI_OK_UTILITY) {
138+
elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str());
139+
}
140+
141+
ReleaseCurrentSubTransaction();
142+
}
143+
PG_CATCH();
144+
{
145+
// Another backend created this table concurrently — not an error.
146+
MemoryContextSwitchTo(saved_context);
147+
CurrentResourceOwner = saved_owner;
148+
RollbackAndReleaseCurrentSubTransaction();
149+
FlushErrorState();
150+
elog(DEBUG1, "pg_deeplake sync: concurrent creation of %s, skipping", qualified_name.c_str());
145151
}
152+
PG_END_TRY();
146153

147154
pfree(buf.data);
148155
}

cpp/deeplake_pg/table_storage.cpp

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ extern "C" {
99
#include <access/heapam.h>
1010
#include <access/htup_details.h>
1111
#include <access/parallel.h>
12+
#include <access/xact.h>
1213
#include <catalog/namespace.h>
1314
#include <catalog/pg_type.h>
1415
#include <executor/spi.h>
@@ -344,28 +345,15 @@ void table_storage::load_table_metadata()
344345
continue;
345346
}
346347

347-
// Not in DDL context (e.g., SET root_path) - safe to auto-create.
348-
// Use catalog_only_guard to skip S3 dataset operations in create_table() —
349-
// the dataset already exists on S3, we just need the pg_class entry.
350-
catalog_only_guard co_guard;
351-
pg::utils::memory_context_switcher context_switcher;
352-
pg::utils::spi_connector connector;
353-
bool pushed_snapshot = false;
354-
if (!ActiveSnapshotSet()) {
355-
PushActiveSnapshot(GetTransactionSnapshot());
356-
pushed_snapshot = true;
357-
}
348+
// Build CREATE TABLE IF NOT EXISTS from catalog metadata.
349+
// Wrap in a subtransaction so that if another backend concurrently
350+
// creates the same table (race on composite type), the error is
351+
// caught and we continue instead of aborting the session.
358352
const char* qschema = quote_identifier(meta.schema_name.c_str());
353+
const char* qtable = quote_identifier(meta.table_name.c_str());
359354

360355
StringInfoData buf;
361356
initStringInfo(&buf);
362-
appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
363-
SPI_execute(buf.data, false, 0);
364-
365-
// Build CREATE TABLE statement directly from catalog metadata
366-
// This avoids calling the SQL function create_deeplake_table which may not exist
367-
resetStringInfo(&buf);
368-
const char* qtable = quote_identifier(meta.table_name.c_str());
369357
appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable);
370358

371359
bool first = true;
@@ -376,19 +364,49 @@ void table_storage::load_table_metadata()
376364
first = false;
377365
appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str());
378366
}
379-
380-
// Table path is now derived from deeplake.root_path GUC set at database level
381-
// Path: {root_path}/{schema}/{table_name}
382367
appendStringInfo(&buf, ") USING deeplake");
383368

384-
if (SPI_execute(buf.data, false, 0) != SPI_OK_UTILITY) {
385-
elog(WARNING, "Failed to auto-create deeplake table %s from catalog", qualified_name.c_str());
386-
}
387-
pfree(buf.data);
369+
MemoryContext saved_context = CurrentMemoryContext;
370+
ResourceOwner saved_owner = CurrentResourceOwner;
371+
372+
BeginInternalSubTransaction(NULL);
373+
PG_TRY();
374+
{
375+
catalog_only_guard co_guard;
376+
pg::utils::spi_connector connector;
377+
bool pushed_snapshot = false;
378+
if (!ActiveSnapshotSet()) {
379+
PushActiveSnapshot(GetTransactionSnapshot());
380+
pushed_snapshot = true;
381+
}
388382

389-
if (pushed_snapshot) {
390-
PopActiveSnapshot();
383+
// Create schema if needed
384+
StringInfoData schema_buf;
385+
initStringInfo(&schema_buf);
386+
appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema);
387+
SPI_execute(schema_buf.data, false, 0);
388+
pfree(schema_buf.data);
389+
390+
SPI_execute(buf.data, false, 0);
391+
392+
if (pushed_snapshot) {
393+
PopActiveSnapshot();
394+
}
395+
396+
ReleaseCurrentSubTransaction();
391397
}
398+
PG_CATCH();
399+
{
400+
// Another backend created this table concurrently — not an error.
401+
MemoryContextSwitchTo(saved_context);
402+
CurrentResourceOwner = saved_owner;
403+
RollbackAndReleaseCurrentSubTransaction();
404+
FlushErrorState();
405+
elog(DEBUG1, "Concurrent table creation for %s, skipping", qualified_name.c_str());
406+
}
407+
PG_END_TRY();
408+
409+
pfree(buf.data);
392410

393411
relid = RangeVarGetRelid(rel, NoLock, true);
394412
}

0 commit comments

Comments
 (0)