Skip to content

Commit a8d8e71

Browse files
committed
Stateless schema.
1 parent e7a44a0 commit a8d8e71

5 files changed

Lines changed: 221 additions & 9 deletions

File tree

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ constexpr const char* k_tables_name = "tables";
2929
constexpr const char* k_columns_name = "columns";
3030
constexpr const char* k_indexes_name = "indexes";
3131
constexpr const char* k_meta_name = "meta";
32+
constexpr const char* k_schemas_name = "schemas";
3233
constexpr const char* k_databases_name = "databases";
3334

3435
// Shared (cluster-wide) path: {root}/__deeplake_catalog/{name}
@@ -184,6 +185,17 @@ deeplake_api::catalog_table_schema make_indexes_schema()
184185
return schema;
185186
}
186187

188+
deeplake_api::catalog_table_schema make_schemas_schema()
189+
{
190+
deeplake_api::catalog_table_schema schema;
191+
schema.add("schema_name", deeplake_core::type::text(codecs::compression::null))
192+
.add("owner", deeplake_core::type::text(codecs::compression::null))
193+
.add("state", deeplake_core::type::text(codecs::compression::null))
194+
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
195+
.set_primary_key("schema_name");
196+
return schema;
197+
}
198+
187199
deeplake_api::catalog_table_schema make_meta_schema()
188200
{
189201
deeplake_api::catalog_table_schema schema;
@@ -272,32 +284,35 @@ int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_na
272284
const auto tables_path = join_db_path(root_path, db_name, k_tables_name);
273285
const auto columns_path = join_db_path(root_path, db_name, k_columns_name);
274286
const auto indexes_path = join_db_path(root_path, db_name, k_indexes_name);
287+
const auto schemas_path = join_db_path(root_path, db_name, k_schemas_name);
275288
const auto meta_path = join_db_path(root_path, db_name, k_meta_name);
276289

277290
try {
278-
// Launch all 4 per-database catalog table creation in parallel
291+
// Launch all 5 per-database catalog table creation in parallel
279292
icm::vector<async::promise<std::shared_ptr<deeplake_api::catalog_table>>> promises;
280-
promises.reserve(4);
293+
promises.reserve(5);
281294
promises.push_back(
282295
deeplake_api::open_or_create_catalog_table(tables_path, make_tables_schema(), icm::string_map<>(creds)));
283296
promises.push_back(
284297
deeplake_api::open_or_create_catalog_table(columns_path, make_columns_schema(), icm::string_map<>(creds)));
285298
promises.push_back(
286299
deeplake_api::open_or_create_catalog_table(indexes_path, make_indexes_schema(), icm::string_map<>(creds)));
300+
promises.push_back(
301+
deeplake_api::open_or_create_catalog_table(schemas_path, make_schemas_schema(), icm::string_map<>(creds)));
287302
promises.push_back(
288303
deeplake_api::open_or_create_catalog_table(meta_path, make_meta_schema(), icm::string_map<>(creds)));
289304

290305
auto results = async::combine(std::move(promises)).get_future().get();
291-
if (results.size() != 4) {
306+
if (results.size() != 5) {
292307
elog(ERROR,
293-
"Failed to initialize per-db catalog at %s/%s: expected 4 catalog tables, got %zu",
308+
"Failed to initialize per-db catalog at %s/%s: expected 5 catalog tables, got %zu",
294309
root_path.c_str(),
295310
db_name.c_str(),
296311
static_cast<size_t>(results.size()));
297312
}
298313

299-
// Initialize per-db meta table if empty (index 3 is meta)
300-
auto& meta_table = results[3];
314+
// Initialize per-db meta table if empty (index 4 is meta)
315+
auto& meta_table = results[4];
301316
if (meta_table) {
302317
auto snapshot = meta_table->read().get_future().get();
303318
if (snapshot.row_count() == 0) {
@@ -447,6 +462,73 @@ std::vector<index_meta> load_indexes(const std::string&, const std::string&, icm
447462
return {};
448463
}
449464

465+
std::vector<schema_meta> load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds)
466+
{
467+
std::vector<schema_meta> out;
468+
try {
469+
auto table = open_db_catalog_table(root_path, db_name, k_schemas_name, std::move(creds));
470+
if (!table) {
471+
return out;
472+
}
473+
auto snapshot = table->read().get_future().get();
474+
if (snapshot.row_count() == 0) {
475+
return out;
476+
}
477+
478+
std::unordered_map<std::string, schema_meta> latest;
479+
for (const auto& row : snapshot.rows()) {
480+
auto schema_name_it = row.find("schema_name");
481+
auto state_it = row.find("state");
482+
if (schema_name_it == row.end() || state_it == row.end()) {
483+
continue;
484+
}
485+
486+
schema_meta meta;
487+
meta.schema_name = deeplake_api::array_to_string(schema_name_it->second);
488+
meta.state = deeplake_api::array_to_string(state_it->second);
489+
auto owner_it = row.find("owner");
490+
if (owner_it != row.end()) {
491+
meta.owner = deeplake_api::array_to_string(owner_it->second);
492+
}
493+
auto updated_it = row.find("updated_at");
494+
if (updated_it != row.end()) {
495+
auto updated_vec = load_int64_vector(updated_it->second);
496+
meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front();
497+
}
498+
499+
auto it = latest.find(meta.schema_name);
500+
if (it == latest.end() || it->second.updated_at <= meta.updated_at) {
501+
latest[meta.schema_name] = std::move(meta);
502+
}
503+
}
504+
505+
out.reserve(latest.size());
506+
for (auto& [_, meta] : latest) {
507+
if (meta.state == "ready") {
508+
out.push_back(std::move(meta));
509+
}
510+
}
511+
return out;
512+
} catch (const std::exception& e) {
513+
elog(DEBUG1, "Failed to load catalog schemas for db '%s': %s (may be old catalog)", db_name.c_str(), e.what());
514+
return out;
515+
} catch (...) {
516+
elog(DEBUG1, "Failed to load catalog schemas for db '%s': unknown error (may be old catalog)", db_name.c_str());
517+
return out;
518+
}
519+
}
520+
521+
void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta)
522+
{
523+
auto table = open_db_catalog_table(root_path, db_name, k_schemas_name, std::move(creds));
524+
icm::string_map<nd::array> row;
525+
row["schema_name"] = nd::adapt(meta.schema_name);
526+
row["owner"] = nd::adapt(meta.owner);
527+
row["state"] = nd::adapt(meta.state);
528+
row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at);
529+
table->upsert(std::move(row)).get_future().get();
530+
}
531+
450532
std::pair<std::vector<table_meta>, std::vector<column_meta>>
451533
load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds)
452534
{

cpp/deeplake_pg/dl_catalog.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ struct index_meta
4040
int32_t order_type = 0;
4141
};
4242

43+
struct schema_meta
44+
{
45+
std::string schema_name; // PK
46+
std::string owner;
47+
std::string state; // "ready" or "dropping"
48+
int64_t updated_at = 0;
49+
};
50+
4351
struct database_meta
4452
{
4553
std::string db_name; // PK
@@ -67,6 +75,10 @@ std::vector<index_meta> load_indexes(const std::string& root_path, const std::st
6775
std::pair<std::vector<table_meta>, std::vector<column_meta>>
6876
load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
6977

78+
// Per-database schema catalog
79+
std::vector<schema_meta> load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
80+
void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta);
81+
7082
// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/)
7183
void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta);
7284
void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<column_meta>& columns);

cpp/deeplake_pg/extension_init.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ extern "C" {
99
#include <postgres.h>
1010

1111
#include <catalog/namespace.h>
12+
#include <commands/dbcommands.h>
1213
#include <commands/defrem.h>
14+
#include <miscadmin.h>
1315
#include <commands/vacuum.h>
1416
#include <nodes/nodeFuncs.h>
1517
#include <optimizer/planner.h>
@@ -586,6 +588,35 @@ static void process_utility(PlannedStmt* pstmt,
586588
}
587589
}
588590
}
591+
592+
// Mark schema as "dropping" in the S3 catalog
593+
if (pg::stateless_enabled) {
594+
try {
595+
auto root_path = pg::session_credentials::get_root_path();
596+
if (root_path.empty()) {
597+
root_path = pg::utils::get_deeplake_root_directory();
598+
}
599+
if (!root_path.empty()) {
600+
auto creds = pg::session_credentials::get_credentials();
601+
const char* dbname = get_database_name(MyDatabaseId);
602+
std::string db_name = dbname ? dbname : "postgres";
603+
if (dbname) pfree(const_cast<char*>(dbname));
604+
605+
pg::dl_catalog::ensure_catalog(root_path, creds);
606+
pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds);
607+
608+
pg::dl_catalog::schema_meta s_meta;
609+
s_meta.schema_name = schema_name;
610+
s_meta.state = "dropping";
611+
pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta);
612+
613+
pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials());
614+
pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials());
615+
}
616+
} catch (const std::exception& e) {
617+
elog(WARNING, "pg_deeplake: failed to mark schema '%s' as dropping in catalog: %s", schema_name, e.what());
618+
}
619+
}
589620
}
590621
} else if (stmt->removeType == OBJECT_DATABASE) {
591622
const char* query = "SELECT nspname, relname "
@@ -760,6 +791,40 @@ static void process_utility(PlannedStmt* pstmt,
760791
}
761792
}
762793

794+
// Post-hook: record CREATE SCHEMA in S3 catalog for multi-instance sync
795+
if (IsA(pstmt->utilityStmt, CreateSchemaStmt) && pg::stateless_enabled) {
796+
CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt;
797+
try {
798+
auto root_path = pg::session_credentials::get_root_path();
799+
if (root_path.empty()) {
800+
root_path = pg::utils::get_deeplake_root_directory();
801+
}
802+
if (!root_path.empty() && schemastmt->schemaname != nullptr) {
803+
auto creds = pg::session_credentials::get_credentials();
804+
const char* dbname = get_database_name(MyDatabaseId);
805+
std::string db_name = dbname ? dbname : "postgres";
806+
if (dbname) pfree(const_cast<char*>(dbname));
807+
808+
pg::dl_catalog::ensure_catalog(root_path, creds);
809+
pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds);
810+
811+
pg::dl_catalog::schema_meta s_meta;
812+
s_meta.schema_name = schemastmt->schemaname;
813+
s_meta.state = "ready";
814+
if (schemastmt->authrole != nullptr) {
815+
s_meta.owner = schemastmt->authrole->rolename;
816+
}
817+
pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta);
818+
819+
pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials());
820+
pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials());
821+
elog(DEBUG1, "pg_deeplake: recorded CREATE SCHEMA '%s' in catalog", schemastmt->schemaname);
822+
}
823+
} catch (const std::exception& e) {
824+
elog(DEBUG1, "pg_deeplake: failed to record CREATE SCHEMA in catalog: %s", e.what());
825+
}
826+
}
827+
763828
// Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset
764829
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
765830
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;

cpp/deeplake_pg/sync_worker.cpp

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,39 @@ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::str
309309
}
310310
}
311311

312+
/**
313+
* Sync schemas for a specific database from pre-loaded catalog data via libpq.
314+
* Creates missing schemas in the target database.
315+
*/
316+
void deeplake_sync_schemas_for_db(const std::string& db_name,
317+
const std::vector<pg::dl_catalog::schema_meta>& schemas)
318+
{
319+
for (const auto& meta : schemas) {
320+
if (meta.state == "dropping") {
321+
continue;
322+
}
323+
324+
// Skip system schemas
325+
if (meta.schema_name == "public" || meta.schema_name == "pg_catalog" ||
326+
meta.schema_name == "information_schema" ||
327+
meta.schema_name.substr(0, 3) == "pg_") {
328+
continue;
329+
}
330+
331+
StringInfoData buf;
332+
initStringInfo(&buf);
333+
appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s",
334+
quote_identifier(meta.schema_name.c_str()));
335+
336+
if (execute_via_libpq(db_name.c_str(), buf.data)) {
337+
elog(LOG, "pg_deeplake sync: created schema '%s' in database '%s'",
338+
meta.schema_name.c_str(), db_name.c_str());
339+
}
340+
341+
pfree(buf.data);
342+
}
343+
}
344+
312345
/**
313346
* Sync tables for a specific database from pre-loaded catalog data via libpq.
314347
* Creates missing tables in the target database.
@@ -444,13 +477,19 @@ void sync_all_databases(
444477
return;
445478
}
446479

447-
// Step 5: For each changed database, load tables+columns and sync
480+
// Step 5: For each changed database, load schemas first, then tables+columns and sync
448481
for (const auto& db_name : changed_dbs) {
449482
try {
483+
// Sync schemas before tables so CREATE TABLE can find the target schema
484+
auto schemas = pg::dl_catalog::load_schemas(root_path, db_name, creds);
485+
if (!schemas.empty()) {
486+
deeplake_sync_schemas_for_db(db_name, schemas);
487+
}
488+
450489
auto [tables, columns] = pg::dl_catalog::load_tables_and_columns(root_path, db_name, creds);
451490
deeplake_sync_tables_for_db(db_name, tables, columns);
452-
elog(LOG, "pg_deeplake sync: synced %zu tables for database '%s'",
453-
tables.size(), db_name.c_str());
491+
elog(LOG, "pg_deeplake sync: synced %zu schemas, %zu tables for database '%s'",
492+
schemas.size(), tables.size(), db_name.c_str());
454493
} catch (const std::exception& e) {
455494
elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db_name.c_str(), e.what());
456495
} catch (...) {

cpp/deeplake_pg/table_storage.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,20 @@ void table_storage::save_table_metadata(const pg::table_data& table_data)
289289
}
290290
pg::dl_catalog::upsert_columns(root_dir, db_name, creds, columns);
291291

292+
// Belt-and-suspenders: ensure the schema is recorded even if
293+
// the CREATE SCHEMA hook was missed (e.g., schema created before
294+
// the extension was loaded, or via a different code path).
295+
if (schema_name != "public") {
296+
try {
297+
pg::dl_catalog::schema_meta s_meta;
298+
s_meta.schema_name = schema_name;
299+
s_meta.state = "ready";
300+
pg::dl_catalog::upsert_schema(root_dir, db_name, creds, s_meta);
301+
} catch (...) {
302+
elog(DEBUG1, "pg_deeplake: failed to upsert schema '%s' in catalog (non-fatal)", schema_name.c_str());
303+
}
304+
}
305+
292306
pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials());
293307
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
294308
catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials());

0 commit comments

Comments
 (0)