|
1 | 1 | """Pytest fixtures for the files unit test suite.""" |
2 | 2 |
|
3 | | -import os |
4 | | -import shutil |
5 | | -import socket |
6 | | -import tempfile |
7 | | -import threading |
8 | 3 | from collections.abc import Generator |
9 | | -from dataclasses import dataclass |
10 | 4 |
|
11 | 5 | import pytest |
12 | 6 |
|
|
16 | 10 | start_mock_http_server, |
17 | 11 | ) |
18 | 12 |
|
19 | | -_SFTP_USER = "testuser" |
20 | | -_SFTP_PASS = "testpass" |
21 | | - |
22 | | - |
23 | | -@dataclass |
24 | | -class SftpServerInfo: |
25 | | - host: str |
26 | | - port: int |
27 | | - root: str |
28 | | - user: str |
29 | | - passwd: str |
30 | | - |
31 | | - |
32 | | -@pytest.fixture() |
33 | | -def sftp_server() -> Generator[SftpServerInfo, None, None]: |
34 | | - """Spin up an ephemeral in-process SFTP server backed by a temporary directory. |
35 | | -
|
36 | | - The temp directory is pre-populated with a file ``a`` containing ``a\\n`` |
37 | | - so that ``assert_simple_file_realize`` passes out of the box. |
38 | | -
|
39 | | - Yields an :class:`SftpServerInfo` with connection details. |
40 | | - Skips automatically if paramiko is not installed. |
41 | | - """ |
42 | | - from typing import IO |
43 | | - |
44 | | - import paramiko |
45 | | - import paramiko.common |
46 | | - import paramiko.sftp |
47 | | - |
48 | | - # ------------------------------------------------------------------ # |
49 | | - # In-process server implementation # |
50 | | - # ------------------------------------------------------------------ # |
51 | | - |
52 | | - class _ServerInterface(paramiko.ServerInterface): |
53 | | - def check_channel_request(self, kind: str, chanid: int) -> int: |
54 | | - if kind == "session": |
55 | | - return paramiko.common.OPEN_SUCCEEDED |
56 | | - return paramiko.common.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED |
57 | | - |
58 | | - def check_auth_password(self, username: str, password: str) -> int: |
59 | | - if username == _SFTP_USER and password == _SFTP_PASS: |
60 | | - return paramiko.common.AUTH_SUCCESSFUL |
61 | | - return paramiko.common.AUTH_FAILED |
62 | | - |
63 | | - def get_allowed_auths(self, username: str) -> str: |
64 | | - return "password" |
65 | | - |
66 | | - class _SFTPHandle(paramiko.SFTPHandle): |
67 | | - readfile: IO[bytes] |
68 | | - writefile: IO[bytes] |
69 | | - |
70 | | - def stat(self) -> paramiko.SFTPAttributes: |
71 | | - return paramiko.SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) |
72 | | - |
73 | | - def chattr(self, attr: paramiko.SFTPAttributes) -> int: |
74 | | - return paramiko.sftp.SFTP_OK |
75 | | - |
76 | | - def _errno_to_sftp(errno: int | None) -> int: |
77 | | - return paramiko.SFTPServer.convert_errno(errno if errno is not None else 5) |
78 | | - |
79 | | - class _SFTPServerInterface(paramiko.SFTPServerInterface): |
80 | | - def __init__(self, server: paramiko.ServerInterface, *args: object, **kwargs: object) -> None: |
81 | | - self._root = root |
82 | | - super().__init__(server, *args, **kwargs) |
83 | | - |
84 | | - def _realpath(self, path: str) -> str: |
85 | | - return os.path.join(self._root, path.lstrip("/")) |
86 | | - |
87 | | - def list_folder(self, path: str) -> list[paramiko.SFTPAttributes] | int: |
88 | | - real = self._realpath(path) |
89 | | - try: |
90 | | - entries = [] |
91 | | - for name in os.listdir(real): |
92 | | - attr = paramiko.SFTPAttributes.from_stat(os.stat(os.path.join(real, name))) |
93 | | - attr.filename = name |
94 | | - entries.append(attr) |
95 | | - return entries |
96 | | - except OSError as e: |
97 | | - return _errno_to_sftp(e.errno) |
98 | | - |
99 | | - def stat(self, path: str) -> paramiko.SFTPAttributes | int: |
100 | | - try: |
101 | | - return paramiko.SFTPAttributes.from_stat(os.stat(self._realpath(path))) |
102 | | - except OSError as e: |
103 | | - return _errno_to_sftp(e.errno) |
104 | | - |
105 | | - def lstat(self, path: str) -> paramiko.SFTPAttributes | int: |
106 | | - try: |
107 | | - return paramiko.SFTPAttributes.from_stat(os.lstat(self._realpath(path))) |
108 | | - except OSError as e: |
109 | | - return _errno_to_sftp(e.errno) |
110 | | - |
111 | | - def open(self, path: str, flags: int, attr: paramiko.SFTPAttributes) -> _SFTPHandle | int: |
112 | | - real = self._realpath(path) |
113 | | - try: |
114 | | - binary_flags = flags | getattr(os, "O_BINARY", 0) |
115 | | - fd = os.open(real, binary_flags, 0o666) |
116 | | - fobj = _SFTPHandle(flags) |
117 | | - if flags & os.O_WRONLY: |
118 | | - fobj.writefile = os.fdopen(fd, "wb") |
119 | | - elif flags & os.O_RDWR: |
120 | | - f = os.fdopen(fd, "r+b") |
121 | | - fobj.readfile = f |
122 | | - fobj.writefile = f |
123 | | - else: |
124 | | - fobj.readfile = os.fdopen(fd, "rb") |
125 | | - return fobj |
126 | | - except OSError as e: |
127 | | - return _errno_to_sftp(e.errno) |
128 | | - |
129 | | - def remove(self, path: str) -> int: |
130 | | - try: |
131 | | - os.remove(self._realpath(path)) |
132 | | - except OSError as e: |
133 | | - return _errno_to_sftp(e.errno) |
134 | | - return paramiko.sftp.SFTP_OK |
135 | | - |
136 | | - def rename(self, oldpath: str, newpath: str) -> int: |
137 | | - try: |
138 | | - os.rename(self._realpath(oldpath), self._realpath(newpath)) |
139 | | - except OSError as e: |
140 | | - return _errno_to_sftp(e.errno) |
141 | | - return paramiko.sftp.SFTP_OK |
142 | | - |
143 | | - def mkdir(self, path: str, attr: paramiko.SFTPAttributes) -> int: |
144 | | - try: |
145 | | - os.mkdir(self._realpath(path)) |
146 | | - except OSError as e: |
147 | | - return _errno_to_sftp(e.errno) |
148 | | - return paramiko.sftp.SFTP_OK |
149 | | - |
150 | | - def rmdir(self, path: str) -> int: |
151 | | - try: |
152 | | - os.rmdir(self._realpath(path)) |
153 | | - except OSError as e: |
154 | | - return _errno_to_sftp(e.errno) |
155 | | - return paramiko.sftp.SFTP_OK |
156 | | - |
157 | | - def chattr(self, path: str, attr: paramiko.SFTPAttributes) -> int: |
158 | | - return paramiko.sftp.SFTP_OK |
159 | | - |
160 | | - def canonicalize(self, path: str) -> str: |
161 | | - return "/" + os.path.relpath(self._realpath(path), self._root).lstrip(".") |
162 | | - |
163 | | - # ------------------------------------------------------------------ # |
164 | | - # Server lifecycle # |
165 | | - # ------------------------------------------------------------------ # |
166 | | - |
167 | | - root = tempfile.mkdtemp() |
168 | | - with open(os.path.join(root, "a"), "w") as fh: |
169 | | - fh.write("a\n") |
170 | | - |
171 | | - host_key = paramiko.RSAKey.generate(2048) |
172 | | - |
173 | | - srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
174 | | - srv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) |
175 | | - srv_sock.bind(("127.0.0.1", 0)) |
176 | | - port = srv_sock.getsockname()[1] |
177 | | - srv_sock.listen(10) |
178 | | - srv_sock.settimeout(5) |
179 | | - |
180 | | - stop_event = threading.Event() |
181 | | - active_transports: list[paramiko.Transport] = [] |
182 | | - |
183 | | - def _accept_loop() -> None: |
184 | | - while not stop_event.is_set(): |
185 | | - try: |
186 | | - conn, _ = srv_sock.accept() |
187 | | - except OSError: |
188 | | - continue |
189 | | - transport = paramiko.Transport(conn) |
190 | | - transport.add_server_key(host_key) |
191 | | - transport.set_subsystem_handler("sftp", paramiko.SFTPServer, _SFTPServerInterface) |
192 | | - transport.start_server(event=threading.Event(), server=_ServerInterface()) |
193 | | - active_transports.append(transport) |
194 | | - |
195 | | - accept_thread = threading.Thread(target=_accept_loop, daemon=True) |
196 | | - accept_thread.start() |
197 | | - |
198 | | - yield SftpServerInfo(host="127.0.0.1", port=port, root=root, user=_SFTP_USER, passwd=_SFTP_PASS) |
199 | | - |
200 | | - stop_event.set() |
201 | | - srv_sock.close() |
202 | | - accept_thread.join(timeout=5) |
203 | | - for transport in active_transports: |
204 | | - transport.close() |
205 | | - shutil.rmtree(root, ignore_errors=True) |
206 | | - |
207 | 13 |
|
208 | 14 | @pytest.fixture(scope="session") |
209 | 15 | def mock_http_server() -> Generator[MockHttpServer, None, None]: |
|
0 commit comments