Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ trait FunctionRegistryBase[T] {
/** Drop a function and return whether the function existed. */
def dropFunction(name: FunctionIdentifier): Boolean

/**
* Remove all cached function entries matching the given namespace.
* The namespace is a FunctionIdentifier with empty funcName used as a filter:
* matches on database (case-insensitive) and, when specified, catalog.
*/
def dropFunctionsInDatabase(namespace: FunctionIdentifier): Unit

/** Checks if a function with a given name exists. */
def functionExists(name: FunctionIdentifier): Boolean = lookupFunction(name).isDefined

Expand Down Expand Up @@ -263,6 +270,14 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
functionBuilders.remove(normalizeFuncName(name)).isDefined
}

override def dropFunctionsInDatabase(namespace: FunctionIdentifier): Unit = synchronized {
val toRemove = listFunction().filter { f =>
f.database.exists(d => namespace.database.exists(_.equalsIgnoreCase(d))) &&
(namespace.catalog.isEmpty || f.catalog == namespace.catalog)
}
toRemove.foreach(n => functionBuilders.remove(n))
}

override def clear(): Unit = synchronized {
functionBuilders.clear()
}
Expand Down Expand Up @@ -295,6 +310,8 @@ trait EmptyFunctionRegistryBase[T] extends FunctionRegistryBase[T] {
throw SparkUnsupportedOperationException()
}

override def dropFunctionsInDatabase(namespace: FunctionIdentifier): Unit = {}

override def clear(): Unit = {
throw SparkUnsupportedOperationException()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ class SessionCatalog(
listTables(dbName).foreach { t =>
invalidateCachedTable(QualifiedTableName(SESSION_CATALOG_NAME, dbName, t.table))
}
// Clear cached functions in this database so the cache stays coherent on drop
val namespace = FunctionIdentifier("", Some(dbName), None)
functionRegistry.dropFunctionsInDatabase(namespace)
tableFunctionRegistry.dropFunctionsInDatabase(namespace)
}
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,97 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}
}

test("drop database clears function registry cache (cache coherence)") {
val extCatalog = newEmptyCatalog()
extCatalog.createDatabase(newDb("default"), ignoreIfExists = true)
extCatalog.createDatabase(newDb("cache_coherence_db"), ignoreIfExists = false)
extCatalog.createFunction(
"cache_coherence_db", newFunc("cached_func", Some("cache_coherence_db")))
val registry = new SimpleFunctionRegistry()
val catalog = new SessionCatalog(extCatalog, registry)
try {
val ident = FunctionIdentifier("cached_func", Some("cache_coherence_db"))
val info = new ExpressionInfo(
"test.Example",
"cache_coherence_db",
"cached_func",
"usage",
"arguments",
"\n Examples:\n",
"\n \n ",
"misc_funcs",
"1.0.0",
"",
"sql_udf")
val builder = (e: Seq[Expression]) => e.head
registry.registerFunction(ident, info, builder)
assert(registry.functionExists(ident))
catalog.dropDatabase("cache_coherence_db", ignoreIfNotExists = false, cascade = true)
assert(!registry.functionExists(ident))
} finally {
catalog.reset()
}
}

test("drop database clears table function registry cache (cache coherence)") {
val extCatalog = newEmptyCatalog()
extCatalog.createDatabase(newDb("default"), ignoreIfExists = true)
extCatalog.createDatabase(newDb("cache_coherence_db2"), ignoreIfExists = false)
val scalarRegistry = new SimpleFunctionRegistry()
val tableRegistry = new SimpleTableFunctionRegistry()
val catalog = new SessionCatalog(extCatalog, scalarRegistry, tableRegistry)
try {
val ident = FunctionIdentifier("cached_table_func", Some("cache_coherence_db2"))
val info = new ExpressionInfo(
"test.Example",
"cache_coherence_db2",
"cached_table_func",
"usage",
"arguments",
"\n Examples:\n",
"\n \n ",
"table_funcs",
"1.0.0",
"",
"sql_udf")
val builder = (_: Seq[Expression]) => Range(1, 1, 1, 1)
tableRegistry.registerFunction(ident, info, builder)
assert(tableRegistry.functionExists(ident))
catalog.dropDatabase("cache_coherence_db2", ignoreIfNotExists = false, cascade = true)
assert(!tableRegistry.functionExists(ident))
} finally {
catalog.reset()
}
}

test("drop database preserves functions in other databases (cache coherence)") {
val extCatalog = newEmptyCatalog()
extCatalog.createDatabase(newDb("default"), ignoreIfExists = true)
extCatalog.createDatabase(newDb("drop_me"), ignoreIfExists = false)
extCatalog.createDatabase(newDb("keep_me"), ignoreIfExists = false)
extCatalog.createFunction("drop_me", newFunc("func_drop", Some("drop_me")))
extCatalog.createFunction("keep_me", newFunc("func_keep", Some("keep_me")))
val registry = new SimpleFunctionRegistry()
val catalog = new SessionCatalog(extCatalog, registry)
try {
val dropIdent = FunctionIdentifier("func_drop", Some("drop_me"))
val keepIdent = FunctionIdentifier("func_keep", Some("keep_me"))
val builder = (e: Seq[Expression]) => e.head
val makeInfo = (db: String, name: String) => new ExpressionInfo(
"test.Example", db, name, "usage", "arguments",
"\n Examples:\n", "\n \n ", "misc_funcs", "1.0.0", "", "sql_udf")
registry.registerFunction(dropIdent, makeInfo("drop_me", "func_drop"), builder)
registry.registerFunction(keepIdent, makeInfo("keep_me", "func_keep"), builder)
assert(registry.functionExists(dropIdent))
assert(registry.functionExists(keepIdent))
catalog.dropDatabase("drop_me", ignoreIfNotExists = false, cascade = true)
assert(!registry.functionExists(dropIdent))
assert(registry.functionExists(keepIdent))
} finally {
catalog.reset()
}
}

test("alter database") {
withBasicCatalog { catalog =>
val db1 = catalog.getDatabaseMetadata("db1")
Expand Down