Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/api_connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@ def decorated(*args, **kwargs):


def requires_access_custom_view(
fab_action_name: str,
fab_resource_name: str,
method: ResourceMethod,
resource_name: str,
) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_custom_view(
fab_action_name=fab_action_name, fab_resource_name=fab_resource_name
method=method, resource_name=resource_name
),
func=func,
args=args,
Expand Down
17 changes: 7 additions & 10 deletions airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,21 @@ def is_authorized_view(
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""

@abstractmethod
def is_authorized_custom_view(
self, *, fab_action_name: str, fab_resource_name: str, user: BaseUser | None = None
self, *, method: ResourceMethod, resource_name: str, user: BaseUser | None = None
):
"""
Return whether the user is authorized to perform a given action on a custom view.

A custom view is a view defined as part of the auth manager. This view is then only available when
the auth manager is used as part of the environment.

By default, it throws an exception because auth managers do not define custom views by default.
If an auth manager defines some custom views, it needs to override this method.
A custom view can be a view defined as part of the auth manager. This view is then only available when
the auth manager is used as part of the environment. It can also be a view defined as part of a
plugin defined by a user.

:param fab_action_name: the name of the FAB action defined in the view in ``base_permissions``
:param fab_resource_name: the name of the FAB resource defined in the view in
``class_permission_name``
:param method: the method to perform
:param resource_name: the name of the resource
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""
raise AirflowException(f"The resource `{fab_resource_name}` does not exist in the environment.")

def batch_is_authorized_connection(
self,
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/avp/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ class AvpEntities(Enum):
# Resource types
CONFIGURATION = "Configuration"
CONNECTION = "Connection"
CUSTOM = "Custom"
DAG = "Dag"
DATASET = "Dataset"
MENU = "Menu"
Comment thread
potiuk marked this conversation as resolved.
POOL = "Pool"
VARIABLE = "Variable"
VIEW = "View"
Expand Down
176 changes: 17 additions & 159 deletions airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from airflow.cli.cli_config import CLICommand, DefaultHelpParser, GroupCommand
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowOptionalProviderFeatureException
from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.auth_manager.avp.facade import (
AwsAuthManagerAmazonVerifiedPermissionsFacade,
Expand All @@ -41,28 +41,6 @@
from airflow.providers.amazon.aws.auth_manager.security_manager.aws_security_manager_override import (
AwsSecurityManagerOverride,
)
from airflow.security.permissions import (
RESOURCE_AUDIT_LOG,
RESOURCE_CLUSTER_ACTIVITY,
RESOURCE_CONFIG,
RESOURCE_CONNECTION,
RESOURCE_DAG,
RESOURCE_DAG_CODE,
RESOURCE_DAG_DEPENDENCIES,
RESOURCE_DAG_RUN,
RESOURCE_DATASET,
RESOURCE_DOCS,
RESOURCE_JOB,
RESOURCE_PLUGIN,
RESOURCE_POOL,
RESOURCE_PROVIDER,
RESOURCE_SLA_MISS,
RESOURCE_TASK_INSTANCE,
RESOURCE_TASK_RESCHEDULE,
RESOURCE_TRIGGER,
RESOURCE_VARIABLE,
RESOURCE_XCOM,
)

try:
from airflow.auth.managers.base_auth_manager import BaseAuthManager, ResourceMethod
Expand Down Expand Up @@ -97,136 +75,6 @@
from airflow.www.extensions.init_appbuilder import AirflowAppBuilder


_MENU_ITEM_REQUESTS: dict[str, IsAuthorizedRequest] = {
RESOURCE_AUDIT_LOG: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.AUDIT_LOG.value,
},
},
},
RESOURCE_CLUSTER_ACTIVITY: {
"method": "GET",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.CLUSTER_ACTIVITY.value,
},
RESOURCE_CONFIG: {
"method": "GET",
"entity_type": AvpEntities.CONFIGURATION,
},
RESOURCE_CONNECTION: {
"method": "GET",
"entity_type": AvpEntities.CONNECTION,
},
RESOURCE_DAG: {
"method": "GET",
"entity_type": AvpEntities.DAG,
},
RESOURCE_DAG_CODE: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.CODE.value,
},
},
},
RESOURCE_DAG_DEPENDENCIES: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.DEPENDENCIES.value,
},
},
},
RESOURCE_DAG_RUN: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.RUN.value,
},
},
},
RESOURCE_DATASET: {
"method": "GET",
"entity_type": AvpEntities.DATASET,
},
RESOURCE_DOCS: {
"method": "GET",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.DOCS.value,
},
RESOURCE_PLUGIN: {
"method": "GET",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.PLUGINS.value,
},
RESOURCE_JOB: {
"method": "GET",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.JOBS.value,
},
RESOURCE_POOL: {
"method": "GET",
"entity_type": AvpEntities.POOL,
},
RESOURCE_PROVIDER: {
"method": "GET",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.PROVIDERS.value,
},
RESOURCE_SLA_MISS: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.SLA_MISS.value,
},
},
},
RESOURCE_TASK_INSTANCE: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.TASK_INSTANCE.value,
},
},
},
RESOURCE_TASK_RESCHEDULE: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.TASK_RESCHEDULE.value,
},
},
},
RESOURCE_TRIGGER: {
"method": "GET",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.TRIGGERS.value,
},
RESOURCE_VARIABLE: {
"method": "GET",
"entity_type": AvpEntities.VARIABLE,
},
RESOURCE_XCOM: {
"method": "GET",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
"string": DagAccessEntity.XCOM.value,
},
},
},
}


class AwsAuthManager(BaseAuthManager):
"""
AWS auth manager.
Expand Down Expand Up @@ -357,6 +205,16 @@ def is_authorized_view(
entity_id=access_view.value,
)

def is_authorized_custom_view(
self, *, method: ResourceMethod, resource_name: str, user: BaseUser | None = None
):
return self.avp_facade.is_authorized(
method=method,
entity_type=AvpEntities.CUSTOM,
user=user or self.get_user(),
entity_id=resource_name,
)

def batch_is_authorized_connection(
self,
requests: Sequence[IsAuthorizedConnectionRequest],
Expand Down Expand Up @@ -565,12 +423,12 @@ def get_cli_commands() -> list[CLICommand]:
]

@staticmethod
def _get_menu_item_request(fab_resource_name: str) -> IsAuthorizedRequest:
menu_item_request = _MENU_ITEM_REQUESTS.get(fab_resource_name)
if menu_item_request:
return menu_item_request
else:
raise AirflowException(f"Unknown resource name {fab_resource_name}")
def _get_menu_item_request(resource_name: str) -> IsAuthorizedRequest:
return {
"method": "MENU",
"entity_type": AvpEntities.MENU,
"entity_id": resource_name,
}


def get_parser() -> argparse.ArgumentParser:
Expand Down
32 changes: 32 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/cli/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,30 @@
"resourceTypes": ["Connection"]
}
},
"Custom.DELETE": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Custom.GET": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Custom.POST": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Custom.PUT": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Configuration.GET": {
"appliesTo": {
"principalTypes": ["User"],
Expand Down Expand Up @@ -97,6 +121,12 @@
"resourceTypes": ["Dataset"]
}
},
"Menu.MENU": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Menu"]
}
},
"Pool.DELETE": {
"appliesTo": {
"principalTypes": ["User"],
Expand Down Expand Up @@ -155,8 +185,10 @@
"entityTypes": {
"Configuration": {},
"Connection": {},
"Custom": {},
"Dag": {},
"Dataset": {},
"Menu": {},
"Pool": {},
"Role": {},
"User": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _check_action_and_resource(sm: FabAirflowSecurityManagerOverride, perms: lis
raise BadRequest(detail=f"The specified resource: {resource!r} was not found")


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_ROLE)
@requires_access_custom_view("GET", permissions.RESOURCE_ROLE)
def get_role(*, role_name: str) -> APIResponse:
"""Get role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand All @@ -66,7 +66,7 @@ def get_role(*, role_name: str) -> APIResponse:
return role_schema.dump(role)


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_ROLE)
@requires_access_custom_view("GET", permissions.RESOURCE_ROLE)
@format_parameters({"limit": check_limit})
def get_roles(*, order_by: str = "name", limit: int, offset: int | None = None) -> APIResponse:
"""Get roles."""
Expand Down Expand Up @@ -94,7 +94,7 @@ def get_roles(*, order_by: str = "name", limit: int, offset: int | None = None)
return role_collection_schema.dump(RoleCollection(roles=roles, total_entries=total_entries))


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_ACTION)
@requires_access_custom_view("GET", permissions.RESOURCE_ACTION)
@format_parameters({"limit": check_limit})
def get_permissions(*, limit: int, offset: int | None = None) -> APIResponse:
"""Get permissions."""
Expand All @@ -106,7 +106,7 @@ def get_permissions(*, limit: int, offset: int | None = None) -> APIResponse:
return action_collection_schema.dump(ActionCollection(actions=actions, total_entries=total_entries))


@requires_access_custom_view(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_ROLE)
@requires_access_custom_view("DELETE", permissions.RESOURCE_ROLE)
def delete_role(*, role_name: str) -> APIResponse:
"""Delete a role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand All @@ -118,7 +118,7 @@ def delete_role(*, role_name: str) -> APIResponse:
return NoContent, HTTPStatus.NO_CONTENT


@requires_access_custom_view(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
@requires_access_custom_view("PUT", permissions.RESOURCE_ROLE)
def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse:
"""Update a role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand Down Expand Up @@ -151,7 +151,7 @@ def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse
return role_schema.dump(role)


@requires_access_custom_view(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_ROLE)
@requires_access_custom_view("POST", permissions.RESOURCE_ROLE)
def post_role() -> APIResponse:
"""Create a new role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand Down
Loading