Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/151.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `infrahubctl schema export` command to export schemas from Infrahub.
20 changes: 20 additions & 0 deletions docs/docs/infrahubctl/infrahubctl-schema.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ $ infrahubctl schema [OPTIONS] COMMAND [ARGS]...
**Commands**:

* `check`: Check if schema files are valid and what...
* `export`: Export the schema from Infrahub as YAML...
* `load`: Load one or multiple schema files into...

## `infrahubctl schema check`
Expand All @@ -40,6 +41,25 @@ $ infrahubctl schema check [OPTIONS] SCHEMAS...
* `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml]
* `--help`: Show this message and exit.

## `infrahubctl schema export`

Export the schema from Infrahub as YAML files, one per namespace.

**Usage**:

```console
$ infrahubctl schema export [OPTIONS]
```

**Options**:

* `--directory PATH`: Directory path to store schema files [default: (dynamic)]
* `--branch TEXT`: Branch from which to export the schema
* `--namespace TEXT`: Namespace(s) to export (default: all user-defined)
* `--debug / --no-debug`: [default: no-debug]
* `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml]
* `--help`: Show this message and exit.

## `infrahubctl schema load`

Load one or multiple schema files into Infrahub.
Expand Down
59 changes: 9 additions & 50 deletions infrahub_sdk/ctl/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
from rich.console import Console

from ..async_typer import AsyncTyper
from ..constants import RESTRICTED_NAMESPACES
from ..ctl.client import initialize_client
from ..ctl.utils import catch_exception, init_logging
from ..queries import SCHEMA_HASH_SYNC_STATUS
from ..schema import GenericSchemaAPI, NodeSchemaAPI, ProfileSchemaAPI, SchemaWarning, TemplateSchemaAPI
from ..schema import SchemaWarning
from ..yaml import SchemaFile
from .parameters import CONFIG_PARAM
from .utils import load_yamlfile_from_disk_and_exit
Expand Down Expand Up @@ -215,36 +214,9 @@ def _display_schema_warnings(console: Console, warnings: list[SchemaWarning]) ->
)


def _default_export_directory() -> str:
def _default_export_directory() -> Path:
timestamp = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S")
return f"infrahub-schema-export-{timestamp}"


_SCHEMA_EXPORT_EXCLUDE: set[str] = {"hash", "hierarchy", "used_by", "id", "state"}
_FIELD_EXPORT_EXCLUDE: set[str] = {"inherited", "read_only", "allow_override", "hierarchical", "id", "state"}


def _schema_to_export_dict(schema: NodeSchemaAPI | GenericSchemaAPI) -> dict[str, Any]:
"""Convert an API schema object to an export-ready dict (omits API-internal fields)."""
data = schema.model_dump(exclude=_SCHEMA_EXPORT_EXCLUDE, exclude_none=True)

data["attributes"] = [
dict(attr.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True))
for attr in schema.attributes
if not attr.inherited
]
if not data["attributes"]:
data.pop("attributes")

data["relationships"] = [
dict(rel.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True))
for rel in schema.relationships
if not rel.inherited
]
if not data["relationships"]:
data.pop("relationships")

return data
return Path(f"infrahub-schema-export-{timestamp}")


@app.command()
Expand All @@ -260,23 +232,10 @@ async def export(
init_logging(debug=debug)

client = initialize_client()
schema_nodes = await client.schema.fetch(branch=branch or client.default_branch)

user_schemas: dict[str, dict[str, list[dict[str, Any]]]] = {}
for schema in schema_nodes.values():
if isinstance(schema, (ProfileSchemaAPI, TemplateSchemaAPI)):
continue
if schema.namespace in RESTRICTED_NAMESPACES:
continue
if namespace and schema.namespace not in namespace:
continue
ns = schema.namespace
user_schemas.setdefault(ns, {"nodes": [], "generics": []})
schema_dict = _schema_to_export_dict(schema)
if isinstance(schema, GenericSchemaAPI):
user_schemas[ns]["generics"].append(schema_dict)
else:
user_schemas[ns]["nodes"].append(schema_dict)
user_schemas = await client.schema.export(
branch=branch,
namespaces=namespace or None,
)

if not user_schemas:
console.print("[yellow]No user-defined schema found to export.")
Expand All @@ -286,10 +245,10 @@ async def export(

for ns, data in sorted(user_schemas.items()):
payload: dict[str, Any] = {"version": "1.0"}
if data["nodes"]:
payload["nodes"] = data["nodes"]
if data["generics"]:
payload["generics"] = data["generics"]
if data["nodes"]:
payload["nodes"] = data["nodes"]

output_file = directory / f"{ns.lower()}.yml"
output_file.write_text(
Expand Down
77 changes: 77 additions & 0 deletions infrahub_sdk/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import httpx
from pydantic import BaseModel, Field

from ..constants import RESTRICTED_NAMESPACES
from ..exceptions import (
BranchNotFoundError,
InvalidResponseError,
Expand All @@ -22,6 +23,7 @@
)
from ..graphql import Mutation
from ..queries import SCHEMA_HASH_SYNC_STATUS
from .export import schema_to_export_dict
from .main import (
AttributeSchema,
AttributeSchemaAPI,
Expand Down Expand Up @@ -64,6 +66,7 @@
"SchemaRoot",
"SchemaRootAPI",
"TemplateSchemaAPI",
"schema_to_export_dict",
]


Expand Down Expand Up @@ -118,6 +121,36 @@ def __init__(self, client: InfrahubClient | InfrahubClientSync) -> None:
self.client = client
self.cache = {}

@staticmethod
def _build_export_schemas(
schema_nodes: MutableMapping[str, MainSchemaTypesAPI],
namespaces: list[str] | None = None,
) -> dict[str, dict[str, list[dict[str, Any]]]]:
"""Organize fetched schemas into a per-namespace export structure.

Filters out system types (Profile/Template), restricted namespaces,
and optionally limits to specific namespaces.

Returns:
Mapping of namespace to ``{"nodes": [...], "generics": [...]}``.
"""
user_schemas: dict[str, dict[str, list[dict[str, Any]]]] = {}
for schema in schema_nodes.values():
if isinstance(schema, (ProfileSchemaAPI, TemplateSchemaAPI)):
continue
if schema.namespace in RESTRICTED_NAMESPACES:
continue
if namespaces and schema.namespace not in namespaces:
continue
ns = schema.namespace
user_schemas.setdefault(ns, {"nodes": [], "generics": []})
schema_dict = schema_to_export_dict(schema)
if isinstance(schema, GenericSchemaAPI):
user_schemas[ns]["generics"].append(schema_dict)
else:
user_schemas[ns]["nodes"].append(schema_dict)
return user_schemas

def validate(self, data: dict[str, Any]) -> None:
SchemaRoot(**data)

Expand Down Expand Up @@ -497,6 +530,28 @@ async def fetch(

return branch_schema.nodes

async def export(
self,
branch: str | None = None,
namespaces: list[str] | None = None,
) -> dict[str, dict[str, list[dict[str, Any]]]]:
"""Export user-defined schemas organized by namespace.

Fetches all schemas from the server, filters out system types and
restricted namespaces, and returns a dict keyed by namespace with
``"nodes"`` and ``"generics"`` lists of export-ready dicts.

Args:
branch: Branch to export from. Defaults to default_branch.
namespaces: Optional list of namespaces to include. If empty/None, all user-defined namespaces are exported.

Returns:
Mapping of namespace to ``{"nodes": [...], "generics": [...]}``.
"""
branch = branch or self.client.default_branch
schema_nodes = await self.fetch(branch=branch)
return self._build_export_schemas(schema_nodes=schema_nodes, namespaces=namespaces)

async def get_graphql_schema(self, branch: str | None = None) -> str:
"""Get the GraphQL schema as a string.

Expand Down Expand Up @@ -739,6 +794,28 @@ def fetch(

return branch_schema.nodes

def export(
self,
branch: str | None = None,
namespaces: list[str] | None = None,
) -> dict[str, dict[str, list[dict[str, Any]]]]:
Comment thread
BeArchiTek marked this conversation as resolved.
Outdated
"""Export user-defined schemas organized by namespace.

Fetches all schemas from the server, filters out system types and
restricted namespaces, and returns a dict keyed by namespace with
``"nodes"`` and ``"generics"`` lists of export-ready dicts.

Args:
branch: Branch to export from. Defaults to default_branch.
namespaces: Optional list of namespaces to include. If empty/None, all user-defined namespaces are exported.

Returns:
Mapping of namespace to ``{"nodes": [...], "generics": [...]}``.
"""
branch = branch or self.client.default_branch
schema_nodes = self.fetch(branch=branch)
return self._build_export_schemas(schema_nodes=schema_nodes, namespaces=namespaces)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def get_graphql_schema(self, branch: str | None = None) -> str:
"""Get the GraphQL schema as a string.

Expand Down
83 changes: 83 additions & 0 deletions infrahub_sdk/schema/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

from typing import Any

from .main import GenericSchemaAPI, NodeSchemaAPI

_SCHEMA_EXPORT_EXCLUDE: set[str] = {"hash", "hierarchy", "used_by", "id", "state"}
# branch is inherited from the node and need not be repeated on each field
_FIELD_EXPORT_EXCLUDE: set[str] = {"inherited", "allow_override", "hierarchical", "id", "state", "branch"}

# Attribute field values that match schema loading defaults — omitted for cleaner output
_ATTR_EXPORT_DEFAULTS: dict[str, Any] = {
"read_only": False,
"optional": False,
}

# Relationship field values that match schema loading defaults — omitted for cleaner output
_REL_EXPORT_DEFAULTS: dict[str, Any] = {
"direction": "bidirectional",
"on_delete": "no-action",
"cardinality": "many",
"optional": True,
"min_count": 0,
"max_count": 0,
"read_only": False,
}

# Relationship kinds that Infrahub generates automatically — never user-defined
_AUTO_GENERATED_REL_KINDS: frozenset[str] = frozenset({"Group", "Profile", "Hierarchy"})


def schema_to_export_dict(schema: NodeSchemaAPI | GenericSchemaAPI) -> dict[str, Any]:
"""Convert an API schema object to an export-ready dict (omits API-internal fields)."""
data = schema.model_dump(exclude=_SCHEMA_EXPORT_EXCLUDE, exclude_none=True)

# Pop attrs/rels so they can be re-inserted last for better readability
data.pop("attributes", None)
data.pop("relationships", None)

# Generics with Hierarchy relationships were defined with `hierarchical: true`.
# Restore that flag and drop the auto-generated rels so the schema round-trips cleanly.
if isinstance(schema, GenericSchemaAPI) and any(
rel.kind == "Hierarchy" for rel in schema.relationships if not rel.inherited
):
data["hierarchical"] = True

# Strip uniqueness_constraints that are auto-generated from `unique: true` attributes
# (single-field entries of the form ["<attr>__value"]). User-defined multi-field
# constraints are preserved.
unique_attr_suffixes = {f"{attr.name}__value" for attr in schema.attributes if attr.unique}
user_constraints = [
c
for c in (data.pop("uniqueness_constraints", None) or [])
if not (len(c) == 1 and c[0] in unique_attr_suffixes)
]
if user_constraints:
data["uniqueness_constraints"] = user_constraints

attributes = [
{
k: v
for k, v in attr.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True).items()
if k not in _ATTR_EXPORT_DEFAULTS or v != _ATTR_EXPORT_DEFAULTS[k]
}
for attr in schema.attributes
if not attr.inherited
]
if attributes:
data["attributes"] = attributes

relationships = [
{
k: v
for k, v in rel.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True).items()
if k not in _REL_EXPORT_DEFAULTS or v != _REL_EXPORT_DEFAULTS[k]
}
for rel in schema.relationships
if not rel.inherited and rel.kind not in _AUTO_GENERATED_REL_KINDS
]
if relationships:
data["relationships"] = relationships

return data
Loading