Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/api_connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@ def decorated(*args, **kwargs):


def requires_access_custom_view(
fab_action_name: str,
fab_resource_name: str,
method: ResourceMethod,
resource_name: str,
) -> Callable[[T], T]:
def requires_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
return _requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_custom_view(
fab_action_name=fab_action_name, fab_resource_name=fab_resource_name
method=method, resource_name=resource_name
),
func=func,
args=args,
Expand Down
17 changes: 7 additions & 10 deletions airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,24 +235,21 @@ def is_authorized_view(
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""

@abstractmethod
def is_authorized_custom_view(
self, *, fab_action_name: str, fab_resource_name: str, user: BaseUser | None = None
self, *, method: ResourceMethod, resource_name: str, user: BaseUser | None = None
):
"""
Return whether the user is authorized to perform a given action on a custom view.

A custom view is a view defined as part of the auth manager. This view is then only available when
the auth manager is used as part of the environment.

By default, it throws an exception because auth managers do not define custom views by default.
If an auth manager defines some custom views, it needs to override this method.
A custom view can be a view defined as part of the auth manager. This view is then only available when
the auth manager is used as part of the environment. It can also be a view defined as part of a
plugin defined by a user.

:param fab_action_name: the name of the FAB action defined in the view in ``base_permissions``
:param fab_resource_name: the name of the FAB resource defined in the view in
``class_permission_name``
:param method: the method to perform
:param resource_name: the name of the resource
:param user: the user to perform the action on. If not provided (or None), it uses the current user
"""
raise AirflowException(f"The resource `{fab_resource_name}` does not exist in the environment.")

def batch_is_authorized_connection(
self,
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/avp/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ class AvpEntities(Enum):
# Resource types
CONFIGURATION = "Configuration"
CONNECTION = "Connection"
CUSTOM = "Custom"
DAG = "Dag"
DATASET = "Dataset"
MENU = "Menu"
Comment thread
potiuk marked this conversation as resolved.
POOL = "Pool"
VARIABLE = "Variable"
VIEW = "View"
Expand Down
64 changes: 37 additions & 27 deletions airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.cli.cli_config import CLICommand, DefaultHelpParser, GroupCommand
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowOptionalProviderFeatureException
from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.auth_manager.avp.facade import (
AwsAuthManagerAmazonVerifiedPermissionsFacade,
Expand Down Expand Up @@ -98,7 +98,7 @@

_MENU_ITEM_REQUESTS: dict[str, IsAuthorizedRequest] = {
RESOURCE_AUDIT_LOG: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -107,24 +107,24 @@
},
},
RESOURCE_CLUSTER_ACTIVITY: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.CLUSTER_ACTIVITY.value,
},
RESOURCE_CONFIG: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.CONFIGURATION,
},
RESOURCE_CONNECTION: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.CONNECTION,
},
RESOURCE_DAG: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
},
RESOURCE_DAG_CODE: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -133,7 +133,7 @@
},
},
RESOURCE_DAG_DEPENDENCIES: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -142,7 +142,7 @@
},
},
RESOURCE_DAG_RUN: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -151,35 +151,35 @@
},
},
RESOURCE_DATASET: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DATASET,
},
RESOURCE_DOCS: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.DOCS.value,
},
RESOURCE_PLUGIN: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.PLUGINS.value,
},
RESOURCE_JOB: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.JOBS.value,
},
RESOURCE_POOL: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.POOL,
},
RESOURCE_PROVIDER: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.PROVIDERS.value,
},
RESOURCE_SLA_MISS: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -188,7 +188,7 @@
},
},
RESOURCE_TASK_INSTANCE: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -197,7 +197,7 @@
},
},
RESOURCE_TASK_RESCHEDULE: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand All @@ -206,16 +206,16 @@
},
},
RESOURCE_TRIGGER: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VIEW,
"entity_id": AccessView.TRIGGERS.value,
},
RESOURCE_VARIABLE: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.VARIABLE,
},
RESOURCE_XCOM: {
"method": "GET",
"method": "MENU",
"entity_type": AvpEntities.DAG,
"context": {
"dag_entity": {
Expand Down Expand Up @@ -356,6 +356,16 @@ def is_authorized_view(
entity_id=access_view.value,
)

def is_authorized_custom_view(
self, *, method: ResourceMethod, resource_name: str, user: BaseUser | None = None
):
return self.avp_facade.is_authorized(
method=method,
entity_type=AvpEntities.CUSTOM,
user=user or self.get_user(),
entity_id=resource_name,
)

def batch_is_authorized_connection(
self,
requests: Sequence[IsAuthorizedConnectionRequest],
Expand Down Expand Up @@ -504,12 +514,12 @@ def get_cli_commands() -> list[CLICommand]:
]

@staticmethod
def _get_menu_item_request(fab_resource_name: str) -> IsAuthorizedRequest:
menu_item_request = _MENU_ITEM_REQUESTS.get(fab_resource_name)
if menu_item_request:
return menu_item_request
else:
raise AirflowException(f"Unknown resource name {fab_resource_name}")
def _get_menu_item_request(resource_name: str) -> IsAuthorizedRequest:
return {
"method": "MENU",
"entity_type": AvpEntities.MENU,
"entity_id": resource_name,
}

def _has_access_to_menu_item(
self, batch_is_authorized_results: list[dict], request: IsAuthorizedRequest, user: AwsAuthManagerUser
Expand Down
32 changes: 32 additions & 0 deletions airflow/providers/amazon/aws/auth_manager/cli/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,30 @@
"resourceTypes": ["Connection"]
}
},
"Custom.DELETE": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Custom.GET": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Custom.POST": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Custom.PUT": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Custom"]
}
},
"Configuration.GET": {
"appliesTo": {
"principalTypes": ["User"],
Expand Down Expand Up @@ -97,6 +121,12 @@
"resourceTypes": ["Dataset"]
}
},
"Menu.MENU": {
"appliesTo": {
"principalTypes": ["User"],
"resourceTypes": ["Menu"]
}
},
"Pool.DELETE": {
"appliesTo": {
"principalTypes": ["User"],
Expand Down Expand Up @@ -155,8 +185,10 @@
"entityTypes": {
"Configuration": {},
"Connection": {},
"Custom": {},
"Dag": {},
"Dataset": {},
"Menu": {},
"Pool": {},
"Role": {},
"User": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _check_action_and_resource(sm: FabAirflowSecurityManagerOverride, perms: lis
raise BadRequest(detail=f"The specified resource: {resource!r} was not found")


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_ROLE)
@requires_access_custom_view("GET", permissions.RESOURCE_ROLE)
def get_role(*, role_name: str) -> APIResponse:
"""Get role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand All @@ -66,7 +66,7 @@ def get_role(*, role_name: str) -> APIResponse:
return role_schema.dump(role)


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_ROLE)
@requires_access_custom_view("GET", permissions.RESOURCE_ROLE)
@format_parameters({"limit": check_limit})
def get_roles(*, order_by: str = "name", limit: int, offset: int | None = None) -> APIResponse:
"""Get roles."""
Expand Down Expand Up @@ -94,7 +94,7 @@ def get_roles(*, order_by: str = "name", limit: int, offset: int | None = None)
return role_collection_schema.dump(RoleCollection(roles=roles, total_entries=total_entries))


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_ACTION)
@requires_access_custom_view("GET", permissions.RESOURCE_ACTION)
@format_parameters({"limit": check_limit})
def get_permissions(*, limit: int, offset: int | None = None) -> APIResponse:
"""Get permissions."""
Expand All @@ -106,7 +106,7 @@ def get_permissions(*, limit: int, offset: int | None = None) -> APIResponse:
return action_collection_schema.dump(ActionCollection(actions=actions, total_entries=total_entries))


@requires_access_custom_view(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_ROLE)
@requires_access_custom_view("DELETE", permissions.RESOURCE_ROLE)
def delete_role(*, role_name: str) -> APIResponse:
"""Delete a role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand All @@ -118,7 +118,7 @@ def delete_role(*, role_name: str) -> APIResponse:
return NoContent, HTTPStatus.NO_CONTENT


@requires_access_custom_view(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
@requires_access_custom_view("PUT", permissions.RESOURCE_ROLE)
def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse:
"""Update a role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand Down Expand Up @@ -151,7 +151,7 @@ def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse
return role_schema.dump(role)


@requires_access_custom_view(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_ROLE)
@requires_access_custom_view("POST", permissions.RESOURCE_ROLE)
def post_role() -> APIResponse:
"""Create a new role."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from airflow.providers.fab.auth_manager.models import Role


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_USER)
@requires_access_custom_view("GET", permissions.RESOURCE_USER)
def get_user(*, username: str) -> APIResponse:
"""Get a user."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand All @@ -54,7 +54,7 @@ def get_user(*, username: str) -> APIResponse:
return user_collection_item_schema.dump(user)


@requires_access_custom_view(permissions.ACTION_CAN_READ, permissions.RESOURCE_USER)
@requires_access_custom_view("GET", permissions.RESOURCE_USER)
@format_parameters({"limit": check_limit})
def get_users(*, limit: int, order_by: str = "id", offset: str | None = None) -> APIResponse:
"""Get users."""
Expand Down Expand Up @@ -86,7 +86,7 @@ def get_users(*, limit: int, order_by: str = "id", offset: str | None = None) ->
return user_collection_schema.dump(UserCollection(users=users, total_entries=total_entries))


@requires_access_custom_view(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_USER)
@requires_access_custom_view("POST", permissions.RESOURCE_USER)
def post_user() -> APIResponse:
"""Create a new user."""
try:
Expand Down Expand Up @@ -129,7 +129,7 @@ def post_user() -> APIResponse:
return user_schema.dump(user)


@requires_access_custom_view(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_USER)
@requires_access_custom_view("PUT", permissions.RESOURCE_USER)
def patch_user(*, username: str, update_mask: UpdateMask = None) -> APIResponse:
"""Update a user."""
try:
Expand Down Expand Up @@ -198,7 +198,7 @@ def patch_user(*, username: str, update_mask: UpdateMask = None) -> APIResponse:
return user_schema.dump(user)


@requires_access_custom_view(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_USER)
@requires_access_custom_view("DELETE", permissions.RESOURCE_USER)
def delete_user(*, username: str) -> APIResponse:
"""Delete a user."""
security_manager = cast(FabAirflowSecurityManagerOverride, get_auth_manager().security_manager)
Expand Down
Loading