1414from itertools import zip_longest
1515from typing import TYPE_CHECKING , Any , Final , cast
1616
17- from music_assistant_models .auth import UserRole
1817from music_assistant_models .background_task import BackgroundTask , TaskMetadata , TaskSchedule
1918from music_assistant_models .config_entries import ConfigEntry , ConfigValueType
2019from music_assistant_models .enums import (
2625 TaskStatus ,
2726)
2827from music_assistant_models .errors import (
29- InsufficientPermissions ,
3028 InvalidDataError ,
3129 InvalidProviderID ,
3230 InvalidProviderURI ,
8078 VACUUM_MIN_RECLAIM_RATIO ,
8179)
8280from music_assistant .controllers .tasks .context import update_current_task_progress_text
83- from music_assistant .controllers .webserver .helpers .auth_middleware import get_current_user
81+ from music_assistant .controllers .webserver .helpers .auth_middleware import (
82+ UseImpersonatedUser ,
83+ get_current_user ,
84+ )
8485from music_assistant .helpers .api import api_command
8586from music_assistant .helpers .compare import compare_strings , compare_version , create_safe_string
8687from music_assistant .helpers .database import UNSET , DatabaseConnection
@@ -3659,57 +3660,58 @@ async def get_item_by_name(
36593660 username_or_user_id : str | None = None ,
36603661 ) -> MediaItemType | ItemMapping | None :
36613662 """Try to find a media item (such as a playlist) by name."""
3662- # Future todo: enhance this method with AI capabilities to allow typos and
3663- # natural language.
3664- searchname = name .lower ()
3665- library_functions = [
3666- x
3667- for x in (
3668- self .playlists .library_items ,
3669- self .radio .library_items ,
3670- self .tracks .library_items ,
3671- self .albums .library_items ,
3672- self .artists .library_items ,
3673- self .audiobooks .library_items ,
3674- self .podcasts .library_items ,
3675- )
3676- if not media_type or media_type .value .lower () in x .__name__
3677- ]
3678- # prefer (exact) lookup in the library by name
3679- for func in library_functions :
3680- result = await func (search = searchname , username_or_user_id = username_or_user_id )
3681- for item in result :
3682- # handle optional artist filter
3683- if (
3684- artist
3685- and (artists := getattr (item , "artists" , None ))
3686- and not any (x for x in artists if x .name .lower () == artist .lower ())
3687- ):
3688- continue
3689- # handle optional album filter
3690- if (
3691- album
3692- and (item_album := getattr (item , "album" , None ))
3693- and item_album .name .lower () != album .lower ()
3694- ):
3695- continue
3696- if searchname == item .name .lower ():
3697- return item
3698- # nothing found in the library, fallback to global search
3699- search_name = name
3700- if album and artist :
3701- search_name = f"{ artist } - { album } - { name } "
3702- elif album :
3703- search_name = f"{ album } - { name } "
3704- elif artist :
3705- search_name = f"{ artist } - { name } "
3706- search_results = await self .search (
3707- search_query = search_name ,
3708- media_types = [media_type ]
3709- if media_type and media_type != MediaType .UNKNOWN
3710- else MediaType .ALL ,
3711- limit = 8 ,
3712- )
3663+ async with UseImpersonatedUser (self .mass , username_or_user_id ):
3664+ # Future todo: enhance this method with AI capabilities to allow typos and
3665+ # natural language.
3666+ searchname = name .lower ()
3667+ library_functions = [
3668+ x
3669+ for x in (
3670+ self .playlists .library_items ,
3671+ self .radio .library_items ,
3672+ self .tracks .library_items ,
3673+ self .albums .library_items ,
3674+ self .artists .library_items ,
3675+ self .audiobooks .library_items ,
3676+ self .podcasts .library_items ,
3677+ )
3678+ if not media_type or media_type .value .lower () in x .__name__
3679+ ]
3680+ # prefer (exact) lookup in the library by name
3681+ for func in library_functions :
3682+ result = await func (search = searchname , username_or_user_id = username_or_user_id )
3683+ for item in result :
3684+ # handle optional artist filter
3685+ if (
3686+ artist
3687+ and (artists := getattr (item , "artists" , None ))
3688+ and not any (x for x in artists if x .name .lower () == artist .lower ())
3689+ ):
3690+ continue
3691+ # handle optional album filter
3692+ if (
3693+ album
3694+ and (item_album := getattr (item , "album" , None ))
3695+ and item_album .name .lower () != album .lower ()
3696+ ):
3697+ continue
3698+ if searchname == item .name .lower ():
3699+ return item
3700+ # nothing found in the library, fallback to global search
3701+ search_name = name
3702+ if album and artist :
3703+ search_name = f"{ artist } - { album } - { name } "
3704+ elif album :
3705+ search_name = f"{ album } - { name } "
3706+ elif artist :
3707+ search_name = f"{ artist } - { name } "
3708+ search_results = await self .search (
3709+ search_query = search_name ,
3710+ media_types = [media_type ]
3711+ if media_type and media_type != MediaType .UNKNOWN
3712+ else MediaType .ALL ,
3713+ limit = 8 ,
3714+ )
37133715 for results in (
37143716 search_results .tracks ,
37153717 search_results .albums ,
@@ -3730,81 +3732,46 @@ async def verify_item_uri(self, uri: str, username_or_user_id: str | None = None
37303732
37313733 If username_or_user_id is specified, verifies additionally, if this user may access this item. This requires the requesting (i.e. authorized user) to be able to access this item as well.
37323734 """
3733- user : User | None = None
3734- if username_or_user_id :
3735- # below raises if permissions are insufficient
3736- user = await self .mass .music .get_requested_user_if_authorized (username_or_user_id )
3737-
3738- try :
3739- media_type , provider_instance_id_or_domain , item_id = await parse_uri (uri )
3740- except (InvalidProviderURI , InvalidProviderID ):
3741- return False
3735+ async with UseImpersonatedUser (self .mass , username_or_user_id ):
3736+ user = get_current_user ()
37423737
3743- # fast return for a provider uri which is not part of a user with a provider filter
3744- if (
3745- provider_instance_id_or_domain != "library"
3746- and user
3747- and user .provider_filter
3748- and provider_instance_id_or_domain not in user .provider_filter
3749- ):
3750- return False
3738+ try :
3739+ media_type , provider_instance_id_or_domain , item_id = await parse_uri (uri )
3740+ except (InvalidProviderURI , InvalidProviderID ):
3741+ return False
37513742
3752- # verify that item itself exists
3753- try :
3754- item = await self .get_item (
3755- media_type = media_type ,
3756- item_id = item_id ,
3757- provider_instance_id_or_domain = provider_instance_id_or_domain ,
3758- allow_update_metadata = False , # no need trigger more methods
3759- )
3760- except MediaNotFoundError :
3761- return False
3743+ # fast return for a provider uri which is not part of a user with a provider filter
3744+ if (
3745+ provider_instance_id_or_domain != "library"
3746+ and user
3747+ and user .provider_filter
3748+ and provider_instance_id_or_domain not in user .provider_filter
3749+ ):
3750+ return False
37623751
3763- # non library item handling for users with no filter, or no user at all
3764- if (
3765- provider_instance_id_or_domain != "library"
3766- or not user
3767- or (user and not user .provider_filter )
3768- or isinstance (item , BrowseFolder )
3769- ):
3770- return True
3752+ # verify that item itself exists
3753+ try :
3754+ item = await self .get_item (
3755+ media_type = media_type ,
3756+ item_id = item_id ,
3757+ provider_instance_id_or_domain = provider_instance_id_or_domain ,
3758+ allow_update_metadata = False , # no need trigger more methods
3759+ )
3760+ except MediaNotFoundError :
3761+ return False
37713762
3772- # library item handling for users with provider filter
3773- for provider_mapping in item .provider_mappings :
3774- if provider_mapping .provider_instance in user .provider_filter :
3763+ # non library item handling for users with no filter, or no user at all
3764+ if (
3765+ provider_instance_id_or_domain != "library"
3766+ or not user
3767+ or (user and not user .provider_filter )
3768+ or isinstance (item , BrowseFolder )
3769+ ):
37753770 return True
37763771
3777- return False
3778-
3779- async def get_requested_user_if_authorized (self , username_or_user_id : str ) -> User :
3780- """Return requested user if authenticated user may access all music providers of this user.
3781-
3782- Raises InsufficientPermissions otherwise.
3783- """
3784- requested_user = await self .mass .webserver .auth .get_user_by_id_or_name (username_or_user_id )
3785- if not requested_user :
3786- raise InvalidDataError (
3787- f"A user with user id or name { username_or_user_id } is not available."
3788- )
3789-
3790- authenticated_user = get_current_user ()
3791- if not authenticated_user :
3792- raise InsufficientPermissions ("Only an authenticated user may request another user." )
3772+ # library item handling for users with provider filter
3773+ for provider_mapping in item .provider_mappings :
3774+ if provider_mapping .provider_instance in user .provider_filter :
3775+ return True
37933776
3794- if requested_user == authenticated_user :
3795- return requested_user
3796-
3797- if authenticated_user .role == UserRole .ADMIN or not authenticated_user .provider_filter :
3798- # If no provider filter is set, a user is allowed to access any provider.
3799- return requested_user
3800-
3801- if not requested_user .provider_filter or not set (requested_user .provider_filter ).issubset (
3802- authenticated_user .provider_filter
3803- ):
3804- # Requested user may access any provider, but we excluded that the authenticated user can do the
3805- # same already
3806- raise InsufficientPermissions (
3807- f"The authenticated user { authenticated_user .display_name } lacks permission to access all music providers accessible to { requested_user .display_name } ."
3808- )
3809-
3810- return requested_user
3777+ return False
0 commit comments