|
7 | 7 | import os |
8 | 8 | import zipfile |
9 | 9 |
|
| 10 | +from samcli.commands.exceptions import UserException |
| 11 | + |
10 | 12 | LOG = logging.getLogger(__name__) |
11 | 13 |
|
12 | 14 | S_IFLNK = 0xA |
@@ -48,15 +50,27 @@ def _extract(file_info, output_dir, zip_ref): |
48 | 50 | ------- |
49 | 51 | string |
50 | 52 | Returns the target path the Zip Entry was extracted to. |
| 53 | +
|
| 54 | + Raises |
| 55 | + ------ |
| 56 | + ValueError |
| 57 | + If the extraction path is not valid |
51 | 58 | """ |
52 | 59 |
|
53 | 60 | # Handle any regular file/directory entries |
54 | 61 | if not _is_symlink(file_info): |
55 | 62 | return zip_ref.extract(file_info, output_dir) |
56 | 63 |
|
57 | 64 | source = zip_ref.read(file_info.filename).decode("utf8") |
| 65 | + output_dir = os.path.normpath(output_dir) |
58 | 66 | link_name = os.path.normpath(os.path.join(output_dir, file_info.filename)) |
59 | 67 |
|
| 68 | + output_dir_abs = os.path.abspath(output_dir) |
| 69 | + link_name_abs = os.path.abspath(link_name) |
| 70 | + |
| 71 | + if not link_name_abs.startswith(output_dir_abs + os.sep) and link_name_abs != output_dir_abs: |
| 72 | + raise UserException(f"Failed to extract file from the zip file. The '{file_info.filename}' is invalid") |
| 73 | + |
60 | 74 | # make leading dirs if needed |
61 | 75 | leading_dirs = os.path.dirname(link_name) |
62 | 76 | if not os.path.exists(leading_dirs): |
@@ -85,19 +99,22 @@ def unzip(zip_file_path, output_dir, permission=None): |
85 | 99 | permission : int |
86 | 100 | Permission to set in an octal int form |
87 | 101 | """ |
88 | | - |
| 102 | + extracted_path = None |
89 | 103 | with zipfile.ZipFile(zip_file_path, "r") as zip_ref: |
90 | 104 | # For each item in the zip file, extract the file and set permissions if available |
91 | 105 | for file_info in zip_ref.infolist(): |
92 | | - extracted_path = _extract(file_info, output_dir, zip_ref) |
93 | | - |
94 | | - # If the extracted_path is a symlink, do not set the permissions. If the target of the symlink does not |
95 | | - # exist, then os.chmod will fail with FileNotFoundError |
96 | | - if not os.path.islink(extracted_path): |
97 | | - _set_permissions(file_info, extracted_path) |
98 | | - _override_permissions(extracted_path, permission) |
99 | | - |
100 | | - if not os.path.islink(extracted_path): |
| 106 | + try: |
| 107 | + extracted_path = _extract(file_info, output_dir, zip_ref) |
| 108 | + |
| 109 | + # If the extracted_path is a symlink, do not set the permissions. If the target of the symlink does not |
| 110 | + # exist, then os.chmod will fail with FileNotFoundError |
| 111 | + if not os.path.islink(extracted_path): |
| 112 | + _set_permissions(file_info, extracted_path) |
| 113 | + _override_permissions(extracted_path, permission) |
| 114 | + except Exception as ex: |
| 115 | + LOG.debug("Failed to extract '%s' from %s: %s", file_info.filename, zip_file_path, ex) |
| 116 | + |
| 117 | + if extracted_path is not None and not os.path.islink(extracted_path): |
101 | 118 | _override_permissions(output_dir, permission) |
102 | 119 |
|
103 | 120 |
|
|
0 commit comments