1- try :
2- from fs_gcsfs import GCSFS
3- from google .cloud .storage import Client
4- from google .oauth2 import service_account
5- from google .oauth2 .credentials import Credentials
6- except ImportError :
7- GCSFS = None
8-
9- import os
1+ import logging
102from typing import (
113 Optional ,
124 Union ,
135)
146
157from galaxy .files .models import (
168 AnyRemoteEntry ,
17- BaseFileSourceConfiguration ,
18- BaseFileSourceTemplateConfiguration ,
199 FilesSourceRuntimeContext ,
20- RemoteDirectory ,
21- RemoteFile ,
10+ )
11+ from galaxy .files .sources ._fsspec import (
12+ CacheOptionsDictType ,
13+ FsspecBaseFileSourceConfiguration ,
14+ FsspecBaseFileSourceTemplateConfiguration ,
15+ FsspecFilesSource ,
2216)
2317from galaxy .util .config_templates import TemplateExpansion
24- from ._pyfilesystem2 import PyFilesystem2FilesSource
2518
19+ try :
20+ from gcsfs import GCSFileSystem
21+ except ImportError :
22+ GCSFileSystem = None
23+
24+
25+ REQUIRED_PACKAGE = "gcsfs"
26+
27+ log = logging .getLogger (__name__ )
2628
27- class GoogleCloudStorageFileSourceTemplateConfiguration (BaseFileSourceTemplateConfiguration ):
29+
30+ class GoogleCloudStorageFileSourceTemplateConfiguration (FsspecBaseFileSourceTemplateConfiguration ):
2831 bucket_name : Union [str , TemplateExpansion ]
29- root_path : Union [str , TemplateExpansion , None ] = None
3032 project : Union [str , TemplateExpansion , None ] = None
3133 anonymous : Union [bool , TemplateExpansion , None ] = True
32- service_account_json : Union [str , TemplateExpansion , None ] = None
33- token : Union [str , TemplateExpansion , None ] = None
34- token_uri : Union [str , TemplateExpansion , None ] = None
34+ service_account_credentials : Union [str , TemplateExpansion , None ] = None
35+ # OAuth credentials
3536 client_id : Union [str , TemplateExpansion , None ] = None
3637 client_secret : Union [str , TemplateExpansion , None ] = None
38+ access_token : Union [str , TemplateExpansion , None ] = None
3739 refresh_token : Union [str , TemplateExpansion , None ] = None
40+ token_uri : Union [str , TemplateExpansion , None ] = "https://oauth2.googleapis.com/token"
3841
3942
40- class GoogleCloudStorageFileSourceConfiguration (BaseFileSourceConfiguration ):
43+ class GoogleCloudStorageFileSourceConfiguration (FsspecBaseFileSourceConfiguration ):
4144 bucket_name : str
42- root_path : Optional [str ] = None
4345 project : Optional [str ] = None
4446 anonymous : Optional [bool ] = True
45- service_account_json : Optional [str ] = None
46- token : Optional [str ] = None
47- token_uri : Optional [str ] = None
47+ service_account_credentials : Optional [str ] = None
48+ # OAuth credentials
4849 client_id : Optional [str ] = None
4950 client_secret : Optional [str ] = None
51+ access_token : Optional [str ] = None
5052 refresh_token : Optional [str ] = None
53+ token_uri : Optional [str ] = "https://oauth2.googleapis.com/token"
5154
5255
5356class GoogleCloudStorageFilesSource (
54- PyFilesystem2FilesSource [
55- GoogleCloudStorageFileSourceTemplateConfiguration , GoogleCloudStorageFileSourceConfiguration
56- ]
57+ FsspecFilesSource [GoogleCloudStorageFileSourceTemplateConfiguration , GoogleCloudStorageFileSourceConfiguration ]
5758):
5859 plugin_type = "googlecloudstorage"
59- required_module = GCSFS
60- required_package = "fs-gcsfs"
60+ required_module = GCSFileSystem
61+ required_package = REQUIRED_PACKAGE
6162
6263 template_config_class = GoogleCloudStorageFileSourceTemplateConfiguration
6364 resolved_config_class = GoogleCloudStorageFileSourceConfiguration
6465
65- def _open_fs (self , context : FilesSourceRuntimeContext [GoogleCloudStorageFileSourceConfiguration ]):
66- if GCSFS is None :
66+ def _open_fs (
67+ self ,
68+ context : FilesSourceRuntimeContext [GoogleCloudStorageFileSourceConfiguration ],
69+ cache_options : CacheOptionsDictType ,
70+ ):
71+ if GCSFileSystem is None :
6772 raise self .required_package_exception
6873
6974 config = context .config
75+ token : Union [str , dict [str , Optional [str ]], None ]
76+
7077 if config .anonymous :
71- client = Client .create_anonymous_client ()
72- elif config .service_account_json :
73- credentials = service_account .Credentials .from_service_account_file (config .service_account_json )
74- client = Client (project = config .project , credentials = credentials )
75- elif config .token :
76- client = Client (
77- project = config .project ,
78- credentials = Credentials (
79- token = config .token ,
80- token_uri = config .token_uri ,
81- client_id = config .client_id ,
82- client_secret = config .client_secret ,
83- refresh_token = config .refresh_token ,
84- ),
85- )
86-
87- handle = GCSFS (bucket_name = config .bucket_name , root_path = config .root_path or "" , retry = 0 , client = client )
88- return handle
78+ # Use token='anon' for anonymous access to public buckets
79+ token = "anon"
80+ elif config .service_account_credentials :
81+ # Path to service account JSON file
82+ token = config .service_account_credentials
83+ elif config .access_token :
84+ # OAuth credentials passed as a dictionary
85+ token = {
86+ "access_token" : config .access_token ,
87+ "refresh_token" : config .refresh_token ,
88+ "client_id" : config .client_id ,
89+ "client_secret" : config .client_secret ,
90+ "token_uri" : config .token_uri ,
91+ }
92+ else :
93+ # Default: use application default credentials
94+ token = None
95+
96+ fs = GCSFileSystem (
97+ project = config .project ,
98+ token = token ,
99+ ** cache_options ,
100+ )
101+ return fs
102+
103+ def _to_bucket_path (self , path : str , config : GoogleCloudStorageFileSourceConfiguration ) -> str :
104+ """Adapt the path to the GCS bucket format."""
105+ bucket = config .bucket_name
106+ if path .startswith ("/" ):
107+ path = path [1 :]
108+ return f"{ bucket } /{ path } " if path else bucket
109+
110+ def _adapt_entry_path (self , filesystem_path : str ) -> str :
111+ """Remove the GCS bucket name from the filesystem path."""
112+ if self .template_config .bucket_name :
113+ bucket_prefix = f"{ self .template_config .bucket_name } /"
114+ if filesystem_path .startswith (bucket_prefix ):
115+ return "/" + filesystem_path [len (bucket_prefix ) :]
116+ elif filesystem_path == self .template_config .bucket_name :
117+ return "/"
118+ return "/" + filesystem_path
89119
90120 def _list (
91121 self ,
@@ -98,128 +128,44 @@ def _list(
98128 query : Optional [str ] = None ,
99129 sort_by : Optional [str ] = None ,
100130 ) -> tuple [list [AnyRemoteEntry ], int ]:
101- """
102- Override base class _list to work around fs_gcsfs limitation with virtual directories.
103-
104- GCS doesn't require directory marker objects, but fs_gcsfs's getinfo() requires them.
105- This implementation uses the GCS API directly to list blobs, bypassing the problematic
106- getinfo() validation that fails for virtual directories.
107- """
108- if recursive :
109- # For recursive listing, fall back to the base implementation
110- return super ()._list (context , path , recursive , write_intent , limit , offset , query , sort_by )
111-
112- # Open filesystem to get access to the bucket
113- with self ._open_fs (context ) as fs_handle :
114- # Access the bucket from the GCSFS object
115- bucket = fs_handle .bucket
116-
117- # Convert path to GCS prefix format
118- # Remove leading/trailing slashes and add trailing slash for directory prefix
119- normalized_path = path .strip ("/" )
120- if normalized_path :
121- prefix = normalized_path + "/"
122- else :
123- prefix = ""
124-
125- # List blobs with delimiter to get immediate children only (non-recursive)
126- delimiter = "/"
127-
128- # Collect directories (prefixes) and files (blobs)
129- entries : list [AnyRemoteEntry ] = []
130-
131- # First iterator: Get directories from prefixes
132- page_iterator_dirs = bucket .list_blobs (prefix = prefix , delimiter = delimiter )
133- for page in page_iterator_dirs .pages :
134- for dir_prefix in page .prefixes :
135- # Remove the parent prefix and trailing slash to get just the dir name
136- dir_name = dir_prefix [len (prefix ) :].rstrip ("/" )
137- if dir_name :
138- full_path = os .path .join ("/" , normalized_path , dir_name ) if normalized_path else f"/{ dir_name } "
139- uri = self .uri_from_path (full_path )
140- entries .append (RemoteDirectory (name = dir_name , uri = uri , path = full_path ))
141-
142- # Second iterator: Get files from blobs
143- page_iterator_files = bucket .list_blobs (prefix = prefix , delimiter = delimiter )
144- for blob in page_iterator_files :
145- # Skip directory marker objects (empty blobs ending with /)
146- if blob .name .endswith ("/" ):
147- continue
148-
149- # Get just the filename (remove prefix)
150- file_name = blob .name [len (prefix ) :]
151- if file_name :
152- full_path = os .path .join ("/" , normalized_path , file_name ) if normalized_path else f"/{ file_name } "
153- uri = self .uri_from_path (full_path )
154-
155- # Convert blob metadata to RemoteFile
156- ctime = None
157- if blob .time_created :
158- ctime = blob .time_created .isoformat ()
159-
160- entries .append (
161- RemoteFile (name = file_name , size = blob .size or 0 , ctime = ctime , uri = uri , path = full_path )
162- )
163-
164- # Apply query filter if provided
165- if query :
166- query_lower = query .lower ()
167- entries = [e for e in entries if query_lower in e .name .lower ()]
168-
169- # Get total count before pagination
170- total_count = len (entries )
171-
172- # Apply pagination
173- if offset is not None or limit is not None :
174- start = offset or 0
175- end = start + limit if limit is not None else None
176- entries = entries [start :end ]
177-
178- return entries , total_count
131+ bucket_path = self ._to_bucket_path (path , context .config )
132+ return super ()._list (
133+ context = context ,
134+ path = bucket_path ,
135+ recursive = recursive ,
136+ limit = limit ,
137+ offset = offset ,
138+ query = query ,
139+ sort_by = sort_by ,
140+ )
179141
180142 def _realize_to (
181143 self ,
182144 source_path : str ,
183145 native_path : str ,
184146 context : FilesSourceRuntimeContext [GoogleCloudStorageFileSourceConfiguration ],
185147 ):
186- """
187- Override to download files directly from GCS, bypassing fs_gcsfs's directory marker checks.
188- """
189- with self ._open_fs (context ) as fs_handle :
190- bucket = fs_handle .bucket
191-
192- # Convert path to GCS blob key
193- normalized_path = source_path .strip ("/" )
194-
195- # Get the blob
196- blob = bucket .get_blob (normalized_path )
197- if not blob :
198- raise Exception (f"File not found: { source_path } " )
199-
200- # Download directly to file
201- with open (native_path , "wb" ) as write_file :
202- blob .download_to_file (write_file )
148+ bucket_path = self ._to_bucket_path (source_path , context .config )
149+ super ()._realize_to (source_path = bucket_path , native_path = native_path , context = context )
203150
204151 def _write_from (
205152 self ,
206153 target_path : str ,
207154 native_path : str ,
208155 context : FilesSourceRuntimeContext [GoogleCloudStorageFileSourceConfiguration ],
209156 ):
210- """
211- Override to upload files directly to GCS, bypassing fs_gcsfs's directory marker checks.
212- """
213- with self ._open_fs (context ) as fs_handle :
214- bucket = fs_handle .bucket
215-
216- # Convert path to GCS blob key
217- normalized_path = target_path .strip ("/" )
218-
219- # Create blob and upload
220- blob = bucket .blob (normalized_path )
221- with open (native_path , "rb" ) as read_file :
222- blob .upload_from_file (read_file )
157+ bucket_path = self ._to_bucket_path (target_path , context .config )
158+ super ()._write_from (target_path = bucket_path , native_path = native_path , context = context )
159+
160+ def score_url_match (self , url : str ):
161+ bucket_name = self .template_config .bucket_name
162+ # For security, we need to ensure that a partial match doesn't work
163+ if bucket_name and (url .startswith (f"gs://{ bucket_name } /" ) or url == f"gs://{ bucket_name } " ):
164+ return len (f"gs://{ bucket_name } " )
165+ elif bucket_name and (url .startswith (f"gcs://{ bucket_name } /" ) or url == f"gcs://{ bucket_name } " ):
166+ return len (f"gcs://{ bucket_name } " )
167+ else :
168+ return super ().score_url_match (url )
223169
224170
225171__all__ = ("GoogleCloudStorageFilesSource" ,)
0 commit comments