Skip to content

Commit 6e05916

Browse files
srielaucloud-fan
authored andcommitted
[SPARK-55964] Cache coherence: clear function registry on DROP DATABASE
- Add dropFunctionsInDatabase(db) to FunctionRegistryBase and implementations - SessionCatalog.dropDatabase clears scalar and table function registry cache for the dropped database so resolution does not see stale entries - Add SessionCatalogSuite tests for cache coherence ### What changes were proposed in this pull request? We now delete functions from a a schema dropped within the session from the sesssion function registry [design-cache-coherence.md](https://github.com/user-attachments/files/25957395/design-cache-coherence.md) ### Why are the changes needed? Without this change the session coudl keep resolving fucntions from a schema it had dropped. ### Does this PR introduce _any_ user-facing change? It fixes a bug that can be observed. ### How was this patch tested? Added new testcases ### Was this patch authored or co-authored using generative AI tooling? Claude Opus4.6 Closes #54781 from srielau/SPARK-55982-cache-coherence. Lead-authored-by: Serge Rielau <[email protected]> Co-authored-by: Serge Rielau <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 9adf791 commit 6e05916

File tree

3 files changed

+113
-0
lines changed

3 files changed

+113
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ trait FunctionRegistryBase[T] {
108108
/** Drop a function and return whether the function existed. */
109109
def dropFunction(name: FunctionIdentifier): Boolean
110110

111+
/**
112+
* Remove all cached function entries matching the given namespace.
113+
* The namespace is a FunctionIdentifier with empty funcName used as a filter:
114+
* matches on database (case-insensitive) and catalog. The catalog must be specified
115+
* (e.g. when dropping a database, the database belongs to a catalog).
116+
*/
117+
def dropFunctionsInDatabase(namespace: FunctionIdentifier): Unit
118+
111119
/** Checks if a function with a given name exists. */
112120
def functionExists(name: FunctionIdentifier): Boolean = lookupFunction(name).isDefined
113121

@@ -263,6 +271,13 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
263271
functionBuilders.remove(normalizeFuncName(name)).isDefined
264272
}
265273

274+
override def dropFunctionsInDatabase(namespace: FunctionIdentifier): Unit = synchronized {
275+
val toRemove = listFunction().filter { f =>
276+
f.copy(funcName = "") == namespace
277+
}
278+
toRemove.foreach(n => functionBuilders.remove(n))
279+
}
280+
266281
override def clear(): Unit = synchronized {
267282
functionBuilders.clear()
268283
}
@@ -295,6 +310,8 @@ trait EmptyFunctionRegistryBase[T] extends FunctionRegistryBase[T] {
295310
throw SparkUnsupportedOperationException()
296311
}
297312

313+
override def dropFunctionsInDatabase(namespace: FunctionIdentifier): Unit = {}
314+
298315
override def clear(): Unit = {
299316
throw SparkUnsupportedOperationException()
300317
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,11 @@ class SessionCatalog(
490490
listTables(dbName).foreach { t =>
491491
invalidateCachedTable(QualifiedTableName(SESSION_CATALOG_NAME, dbName, t.table))
492492
}
493+
// Clear cached functions in this database so the cache stays coherent on drop.
494+
// normalizeFuncName stores entries with catalog=None, so the filter must match that.
495+
val namespace = FunctionIdentifier("", Some(dbName), None)
496+
functionRegistry.dropFunctionsInDatabase(namespace)
497+
tableFunctionRegistry.dropFunctionsInDatabase(namespace)
493498
}
494499
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
495500
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,97 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
297297
}
298298
}
299299

300+
test("drop database clears function registry cache (cache coherence)") {
301+
val extCatalog = newEmptyCatalog()
302+
extCatalog.createDatabase(newDb("default"), ignoreIfExists = true)
303+
extCatalog.createDatabase(newDb("cache_coherence_db"), ignoreIfExists = false)
304+
extCatalog.createFunction(
305+
"cache_coherence_db", newFunc("cached_func", Some("cache_coherence_db")))
306+
val registry = new SimpleFunctionRegistry()
307+
val catalog = new SessionCatalog(extCatalog, registry)
308+
try {
309+
val ident = FunctionIdentifier("cached_func", Some("cache_coherence_db"))
310+
val info = new ExpressionInfo(
311+
"test.Example",
312+
"cache_coherence_db",
313+
"cached_func",
314+
"usage",
315+
"arguments",
316+
"\n Examples:\n",
317+
"\n \n ",
318+
"misc_funcs",
319+
"1.0.0",
320+
"",
321+
"sql_udf")
322+
val builder = (e: Seq[Expression]) => e.head
323+
registry.registerFunction(ident, info, builder)
324+
assert(registry.functionExists(ident))
325+
catalog.dropDatabase("cache_coherence_db", ignoreIfNotExists = false, cascade = true)
326+
assert(!registry.functionExists(ident))
327+
} finally {
328+
catalog.reset()
329+
}
330+
}
331+
332+
test("drop database clears table function registry cache (cache coherence)") {
333+
val extCatalog = newEmptyCatalog()
334+
extCatalog.createDatabase(newDb("default"), ignoreIfExists = true)
335+
extCatalog.createDatabase(newDb("cache_coherence_db2"), ignoreIfExists = false)
336+
val scalarRegistry = new SimpleFunctionRegistry()
337+
val tableRegistry = new SimpleTableFunctionRegistry()
338+
val catalog = new SessionCatalog(extCatalog, scalarRegistry, tableRegistry)
339+
try {
340+
val ident = FunctionIdentifier("cached_table_func", Some("cache_coherence_db2"))
341+
val info = new ExpressionInfo(
342+
"test.Example",
343+
"cache_coherence_db2",
344+
"cached_table_func",
345+
"usage",
346+
"arguments",
347+
"\n Examples:\n",
348+
"\n \n ",
349+
"table_funcs",
350+
"1.0.0",
351+
"",
352+
"sql_udf")
353+
val builder = (_: Seq[Expression]) => Range(1, 1, 1, 1)
354+
tableRegistry.registerFunction(ident, info, builder)
355+
assert(tableRegistry.functionExists(ident))
356+
catalog.dropDatabase("cache_coherence_db2", ignoreIfNotExists = false, cascade = true)
357+
assert(!tableRegistry.functionExists(ident))
358+
} finally {
359+
catalog.reset()
360+
}
361+
}
362+
363+
test("drop database preserves functions in other databases (cache coherence)") {
364+
val extCatalog = newEmptyCatalog()
365+
extCatalog.createDatabase(newDb("default"), ignoreIfExists = true)
366+
extCatalog.createDatabase(newDb("drop_me"), ignoreIfExists = false)
367+
extCatalog.createDatabase(newDb("keep_me"), ignoreIfExists = false)
368+
extCatalog.createFunction("drop_me", newFunc("func_drop", Some("drop_me")))
369+
extCatalog.createFunction("keep_me", newFunc("func_keep", Some("keep_me")))
370+
val registry = new SimpleFunctionRegistry()
371+
val catalog = new SessionCatalog(extCatalog, registry)
372+
try {
373+
val dropIdent = FunctionIdentifier("func_drop", Some("drop_me"))
374+
val keepIdent = FunctionIdentifier("func_keep", Some("keep_me"))
375+
val builder = (e: Seq[Expression]) => e.head
376+
val makeInfo = (db: String, name: String) => new ExpressionInfo(
377+
"test.Example", db, name, "usage", "arguments",
378+
"\n Examples:\n", "\n \n ", "misc_funcs", "1.0.0", "", "sql_udf")
379+
registry.registerFunction(dropIdent, makeInfo("drop_me", "func_drop"), builder)
380+
registry.registerFunction(keepIdent, makeInfo("keep_me", "func_keep"), builder)
381+
assert(registry.functionExists(dropIdent))
382+
assert(registry.functionExists(keepIdent))
383+
catalog.dropDatabase("drop_me", ignoreIfNotExists = false, cascade = true)
384+
assert(!registry.functionExists(dropIdent))
385+
assert(registry.functionExists(keepIdent))
386+
} finally {
387+
catalog.reset()
388+
}
389+
}
390+
300391
test("alter database") {
301392
withBasicCatalog { catalog =>
302393
val db1 = catalog.getDatabaseMetadata("db1")

0 commit comments

Comments
 (0)