|
| 1 | +""" |
| 2 | +Sourcify platform. |
| 3 | +
|
| 4 | +Fetches verified contract source code and compilation artifacts from the Sourcify API. |
| 5 | +""" |
| 6 | + |
| 7 | +import json |
| 8 | +import logging |
| 9 | +import os |
| 10 | +import re |
| 11 | +import urllib.request |
| 12 | +from importlib.metadata import version |
| 13 | +from pathlib import Path, PurePosixPath |
| 14 | +from typing import TYPE_CHECKING, Any, Dict, List, Tuple |
| 15 | +from urllib.error import HTTPError, URLError |
| 16 | + |
| 17 | +from crytic_compile.compilation_unit import CompilationUnit |
| 18 | +from crytic_compile.compiler.compiler import CompilerVersion |
| 19 | +from crytic_compile.platform import solc_standard_json |
| 20 | +from crytic_compile.platform.abstract_platform import AbstractPlatform |
| 21 | +from crytic_compile.platform.etherscan import _sanitize_remappings |
| 22 | +from crytic_compile.platform.exceptions import InvalidCompilation |
| 23 | +from crytic_compile.platform.types import Type |
| 24 | + |
| 25 | +if TYPE_CHECKING: |
| 26 | + from crytic_compile import CryticCompile |
| 27 | + |
| 28 | +LOGGER = logging.getLogger("CryticCompile") |
| 29 | + |
| 30 | +SOURCIFY_API_BASE = "https://sourcify.dev/server/v2/contract" |
| 31 | + |
| 32 | + |
| 33 | +def _get_user_agent() -> str: |
| 34 | + """Get the User-Agent string with package version. |
| 35 | +
|
| 36 | + Returns: |
| 37 | + str: User-Agent string in format "crytic-compile/<version>" |
| 38 | + """ |
| 39 | + try: |
| 40 | + pkg_version = version("crytic-compile") |
| 41 | + except Exception: # pylint: disable=broad-except |
| 42 | + pkg_version = "unknown" |
| 43 | + return f"crytic-compile/{pkg_version}" |
| 44 | + |
| 45 | + |
| 46 | +def _parse_chain_id(chain_id_str: str) -> str: |
| 47 | + """Convert hex or decimal chain ID to decimal string. |
| 48 | +
|
| 49 | + Args: |
| 50 | + chain_id_str: Chain ID as decimal string or hex string (0x prefix) |
| 51 | +
|
| 52 | + Returns: |
| 53 | + str: Chain ID as decimal string |
| 54 | + """ |
| 55 | + if chain_id_str.lower().startswith("0x"): |
| 56 | + return str(int(chain_id_str, 16)) |
| 57 | + return chain_id_str |
| 58 | + |
| 59 | + |
| 60 | +def _write_source_files( |
| 61 | + sources: Dict[str, Dict[str, str]], |
| 62 | + addr: str, |
| 63 | + chain_id: str, |
| 64 | + export_dir: str, |
| 65 | +) -> Tuple[str, List[str]]: |
| 66 | + """Write source files to disk. |
| 67 | +
|
| 68 | + Args: |
| 69 | + sources: Dict mapping filename to {"content": source_code} |
| 70 | + addr: Contract address |
| 71 | + chain_id: Chain ID |
| 72 | + export_dir: Base export directory |
| 73 | +
|
| 74 | + Returns: |
| 75 | + Tuple of (working_directory, list_of_filenames) |
| 76 | +
|
| 77 | + Raises: |
| 78 | + IOError: If a path would escape the allowed directory |
| 79 | + """ |
| 80 | + directory = os.path.join(export_dir, f"sourcify-{chain_id}-{addr}") |
| 81 | + |
| 82 | + filenames: List[str] = [] |
| 83 | + |
| 84 | + for filename, source_info in sources.items(): |
| 85 | + content = source_info.get("content", "") |
| 86 | + |
| 87 | + path_filename = PurePosixPath(filename) |
| 88 | + |
| 89 | + # Skip non-Solidity/Vyper files |
| 90 | + if path_filename.suffix not in (".sol", ".vy"): |
| 91 | + continue |
| 92 | + |
| 93 | + # Handle absolute paths by making them relative |
| 94 | + if path_filename.is_absolute(): |
| 95 | + path_filename = PurePosixPath(*path_filename.parts[1:]) |
| 96 | + |
| 97 | + filenames.append(path_filename.as_posix()) |
| 98 | + path_filename_disk = Path(directory, path_filename) |
| 99 | + |
| 100 | + # Security check: ensure path stays within allowed directory |
| 101 | + allowed_path = os.path.abspath(directory) |
| 102 | + if os.path.commonpath((allowed_path, os.path.abspath(path_filename_disk))) != allowed_path: |
| 103 | + raise IOError( |
| 104 | + f"Path '{path_filename_disk}' is outside of the allowed directory: {allowed_path}" |
| 105 | + ) |
| 106 | + |
| 107 | + if not os.path.exists(path_filename_disk.parent): |
| 108 | + os.makedirs(path_filename_disk.parent) |
| 109 | + |
| 110 | + with open(path_filename_disk, "w", encoding="utf8") as file_desc: |
| 111 | + file_desc.write(content) |
| 112 | + |
| 113 | + return directory, filenames |
| 114 | + |
| 115 | + |
| 116 | +def _fetch_sourcify_data(chain_id: str, addr: str) -> Dict[str, Any]: |
| 117 | + """Fetch contract data from Sourcify API. |
| 118 | +
|
| 119 | + Args: |
| 120 | + chain_id: Chain ID (decimal string) |
| 121 | + addr: Contract address |
| 122 | +
|
| 123 | + Returns: |
| 124 | + Dict containing sources and compilation data |
| 125 | +
|
| 126 | + Raises: |
| 127 | + InvalidCompilation: If the contract is not found or API request fails |
| 128 | + """ |
| 129 | + fields = ",".join( |
| 130 | + [ |
| 131 | + "sources", |
| 132 | + "compilation.compilerVersion", |
| 133 | + "compilation.compilerSettings", |
| 134 | + "compilation.name", |
| 135 | + ] |
| 136 | + ) |
| 137 | + url = f"{SOURCIFY_API_BASE}/{chain_id}/{addr}?fields={fields}" |
| 138 | + |
| 139 | + LOGGER.info("Fetching contract from Sourcify: chain=%s address=%s", chain_id, addr) |
| 140 | + |
| 141 | + try: |
| 142 | + req = urllib.request.Request(url, headers={"User-Agent": _get_user_agent()}) |
| 143 | + with urllib.request.urlopen(req) as response: |
| 144 | + data = json.loads(response.read().decode("utf-8")) |
| 145 | + except HTTPError as e: |
| 146 | + if e.code == 404: |
| 147 | + raise InvalidCompilation( |
| 148 | + f"Contract not verified on Sourcify: chain={chain_id} address={addr}" |
| 149 | + ) from e |
| 150 | + raise InvalidCompilation(f"Sourcify API error: {e}") from e |
| 151 | + except URLError as e: |
| 152 | + raise InvalidCompilation(f"Failed to fetch from Sourcify: {e}") from e |
| 153 | + |
| 154 | + # Log match type |
| 155 | + match_type = data.get("match", "unknown") |
| 156 | + match_messages = { |
| 157 | + "exact_match": "exact match found", |
| 158 | + "match": "partial match found (metadata may differ)", |
| 159 | + } |
| 160 | + if match_type in match_messages: |
| 161 | + LOGGER.info("Sourcify: %s", match_messages[match_type]) |
| 162 | + else: |
| 163 | + LOGGER.warning("Sourcify: unexpected match type: %s", match_type) |
| 164 | + |
| 165 | + return data |
| 166 | + |
| 167 | + |
| 168 | +def _write_config_file(working_dir: str, compiler_version: str, settings: Dict[str, Any]) -> None: |
| 169 | + """Write crytic_compile.config.json file. |
| 170 | +
|
| 171 | + Args: |
| 172 | + working_dir: Directory to write config to |
| 173 | + compiler_version: Solc version string |
| 174 | + settings: Compiler settings from Sourcify |
| 175 | + """ |
| 176 | + optimizer = settings.get("optimizer", {}) |
| 177 | + optimization_used = optimizer.get("enabled", False) |
| 178 | + optimize_runs = optimizer.get("runs") if optimization_used else None |
| 179 | + evm_version = settings.get("evmVersion") |
| 180 | + via_ir = settings.get("viaIR") |
| 181 | + remappings = settings.get("remappings", []) |
| 182 | + |
| 183 | + solc_args: List[str] = [] |
| 184 | + if via_ir: |
| 185 | + solc_args.append("--via-ir") |
| 186 | + if optimization_used: |
| 187 | + solc_args.append(f"--optimize --optimize-runs {optimize_runs}") |
| 188 | + if evm_version: |
| 189 | + solc_args.append(f"--evm-version {evm_version}") |
| 190 | + |
| 191 | + metadata_config: Dict[str, Any] = { |
| 192 | + "solc_remaps": _sanitize_remappings(remappings, working_dir) if remappings else {}, |
| 193 | + "solc_solcs_select": compiler_version, |
| 194 | + "solc_args": " ".join(solc_args), |
| 195 | + } |
| 196 | + |
| 197 | + config_path = os.path.join(working_dir, "crytic_compile.config.json") |
| 198 | + with open(config_path, "w", encoding="utf-8") as f: |
| 199 | + json.dump(metadata_config, f) |
| 200 | + |
| 201 | + |
| 202 | +class Sourcify(AbstractPlatform): |
| 203 | + """ |
| 204 | + Sourcify platform - fetches verified contracts from sourcify.dev |
| 205 | + """ |
| 206 | + |
| 207 | + NAME = "Sourcify" |
| 208 | + PROJECT_URL = "https://sourcify.dev/" |
| 209 | + TYPE = Type.SOURCIFY |
| 210 | + |
| 211 | + def compile( # pylint: disable=too-many-locals |
| 212 | + self, crytic_compile: "CryticCompile", **kwargs: str |
| 213 | + ) -> None: |
| 214 | + """Run the compilation by fetching from Sourcify. |
| 215 | +
|
| 216 | + Args: |
| 217 | + crytic_compile: Associated CryticCompile object |
| 218 | + **kwargs: Optional arguments. Used: "export_dir", "solc" |
| 219 | +
|
| 220 | + Raises: |
| 221 | + InvalidCompilation: If the contract is not found or API request fails |
| 222 | + """ |
| 223 | + # Parse target: sourcify-<chainId>:0x<address> |
| 224 | + match = re.match(r"^sourcify-(0x[a-fA-F0-9]+|\d+):(0x[a-fA-F0-9]{40})$", self._target) |
| 225 | + if not match: |
| 226 | + raise InvalidCompilation(f"Invalid Sourcify target format: {self._target}") |
| 227 | + |
| 228 | + chain_id = _parse_chain_id(match.group(1)) |
| 229 | + addr = match.group(2) |
| 230 | + |
| 231 | + # Prepare export directory |
| 232 | + export_dir = os.path.join(kwargs.get("export_dir", "crytic-export"), "sourcify-contracts") |
| 233 | + if not os.path.exists(export_dir): |
| 234 | + os.makedirs(export_dir) |
| 235 | + |
| 236 | + # Fetch from Sourcify API |
| 237 | + data = _fetch_sourcify_data(chain_id, addr) |
| 238 | + |
| 239 | + sources = data.get("sources", {}) |
| 240 | + if not sources: |
| 241 | + raise InvalidCompilation("No source files returned from Sourcify") |
| 242 | + |
| 243 | + working_dir, filenames = _write_source_files(sources, addr, chain_id, export_dir) |
| 244 | + |
| 245 | + # Extract compilation settings |
| 246 | + compilation = data.get("compilation", {}) |
| 247 | + compiler_version_str = compilation.get("compilerVersion", "") |
| 248 | + version_match = re.search(r"(\d+\.\d+\.\d+)", compiler_version_str) |
| 249 | + if not version_match: |
| 250 | + raise InvalidCompilation( |
| 251 | + f"Could not parse compiler version from: {compiler_version_str}" |
| 252 | + ) |
| 253 | + compiler_version = version_match.group(1) |
| 254 | + |
| 255 | + settings = compilation.get("compilerSettings", {}) |
| 256 | + optimizer = settings.get("optimizer", {}) |
| 257 | + optimization_used = optimizer.get("enabled", False) |
| 258 | + remappings = _sanitize_remappings(settings.get("remappings", []), working_dir) or None |
| 259 | + |
| 260 | + # Create and configure compilation unit |
| 261 | + compilation_unit = CompilationUnit(crytic_compile, compilation.get("name", "Contract")) |
| 262 | + compilation_unit.compiler_version = CompilerVersion( |
| 263 | + compiler=kwargs.get("solc", "solc"), |
| 264 | + version=compiler_version, |
| 265 | + optimized=optimization_used, |
| 266 | + optimize_runs=optimizer.get("runs") if optimization_used else None, |
| 267 | + ) |
| 268 | + compilation_unit.compiler_version.look_for_installed_version() |
| 269 | + |
| 270 | + solc_standard_json.standalone_compile( |
| 271 | + filenames, |
| 272 | + compilation_unit, |
| 273 | + working_dir=working_dir, |
| 274 | + remappings=remappings, |
| 275 | + evm_version=settings.get("evmVersion"), |
| 276 | + via_ir=settings.get("viaIR"), |
| 277 | + ) |
| 278 | + |
| 279 | + _write_config_file(working_dir, compiler_version, settings) |
| 280 | + |
| 281 | + def clean(self, **_kwargs: str) -> None: |
| 282 | + # No-op for Sourcify (remote platform) |
| 283 | + pass |
| 284 | + |
| 285 | + @staticmethod |
| 286 | + def is_supported(target: str, **kwargs: str) -> bool: |
| 287 | + """Check if the target is a Sourcify target. |
| 288 | +
|
| 289 | + Args: |
| 290 | + target: Path to the target |
| 291 | + **kwargs: Optional arguments (unused) |
| 292 | +
|
| 293 | + Returns: |
| 294 | + bool: True if the target is a Sourcify target |
| 295 | + """ |
| 296 | + # Match sourcify-<chainId>:0x<address> where chainId is decimal or 0x hex |
| 297 | + return bool(re.match(r"^sourcify-(0x[a-fA-F0-9]+|\d+):0x[a-fA-F0-9]{40}$", target)) |
| 298 | + |
| 299 | + def is_dependency(self, _path: str) -> bool: |
| 300 | + """Check if the path is a dependency. |
| 301 | +
|
| 302 | + Args: |
| 303 | + _path: Path to the target |
| 304 | +
|
| 305 | + Returns: |
| 306 | + bool: Always False for Sourcify |
| 307 | + """ |
| 308 | + return False |
| 309 | + |
| 310 | + def _guessed_tests(self) -> List[str]: |
| 311 | + """Guess the potential unit tests commands. |
| 312 | +
|
| 313 | + Returns: |
| 314 | + List[str]: Empty list (no tests for remote contracts) |
| 315 | + """ |
| 316 | + return [] |
0 commit comments