Skip to content

Commit 4b89874

Browse files
authored
SecretsManager: rotate_secret() now supports the RotateImmediately-parameter (#7347)
1 parent b9d7c20 commit 4b89874

4 files changed

Lines changed: 114 additions & 7 deletions

File tree

moto/secretsmanager/models.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ def rotate_secret(
660660
client_request_token: Optional[str] = None,
661661
rotation_lambda_arn: Optional[str] = None,
662662
rotation_rules: Optional[Dict[str, Any]] = None,
663+
rotate_immediately: bool = True,
663664
) -> str:
664665
rotation_days = "AutomaticallyAfterDays"
665666

@@ -758,25 +759,32 @@ def rotate_secret(
758759
response_headers: Dict[str, Any] = {}
759760

760761
try:
761-
func = lambda_backend.get_function(secret.rotation_lambda_arn)
762+
lambda_backend.get_function(secret.rotation_lambda_arn)
762763
except Exception:
763764
msg = f"Resource not found for ARN '{secret.rotation_lambda_arn}'."
764765
raise ResourceNotFoundException(msg)
765766

766-
for step in ["create", "set", "test", "finish"]:
767-
func.invoke(
768-
json.dumps(
767+
rotation_steps = ["create", "set", "test", "finish"]
768+
if not rotate_immediately:
769+
# if you don't immediately rotate the secret,
770+
# Secrets Manager tests the rotation configuration by running the testSecretstep of the Lambda rotation function.
771+
rotation_steps = ["test"]
772+
for step in rotation_steps:
773+
lambda_backend.invoke(
774+
secret.rotation_lambda_arn,
775+
qualifier=None,
776+
body=json.dumps(
769777
{
770778
"Step": step + "Secret",
771779
"SecretId": secret.name,
772780
"ClientRequestToken": new_version_id,
773781
}
774782
),
775-
request_headers,
776-
response_headers,
783+
headers=request_headers,
784+
response_headers=response_headers,
777785
)
778-
779786
secret.set_default_version_id(new_version_id)
787+
780788
elif secret.versions:
781789
# AWS will always require a Lambda ARN
782790
# without that, Moto can still apply the 'AWSCURRENT'-label

moto/secretsmanager/responses.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,13 @@ def rotate_secret(self) -> str:
120120
rotation_lambda_arn = self._get_param("RotationLambdaARN")
121121
rotation_rules = self._get_param("RotationRules")
122122
secret_id = self._get_param("SecretId")
123+
rotate_immediately = self._get_bool_param("RotateImmediately", True)
123124
return self.backend.rotate_secret(
124125
secret_id=secret_id,
125126
client_request_token=client_request_token,
126127
rotation_lambda_arn=rotation_lambda_arn,
127128
rotation_rules=rotation_rules,
129+
rotate_immediately=rotate_immediately,
128130
)
129131

130132
def put_secret_value(self) -> str:

tests/test_awslambda/test_lambda_invoke.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,9 @@ def test_invoke_lambda_with_proxy():
379379
assert json.loads(payload) == expected_payload
380380

381381

382+
@pytest.mark.network
382383
@mock_aws
384+
@requires_docker
383385
def test_invoke_lambda_with_entrypoint():
384386
conn = boto3.client("lambda", _lambda_region)
385387
function_name = str(uuid4())[0:6]
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import io
2+
import json
3+
import zipfile
4+
from unittest import SkipTest
5+
from unittest.mock import patch
6+
7+
import boto3
8+
from botocore.exceptions import ClientError
9+
10+
from moto import mock_aws, settings
11+
12+
secret_steps = []
13+
14+
15+
def mock_lambda_invoke(*args, **kwarg):
16+
secret_steps.append(json.loads(kwarg["body"])["Step"])
17+
return "n/a"
18+
19+
20+
@mock_aws(config={"lambda": {"use_docker": False}})
21+
@patch(
22+
"moto.awslambda_simple.models.LambdaSimpleBackend.invoke", new=mock_lambda_invoke
23+
)
24+
def test_simple_lambda_is_invoked():
25+
if not settings.TEST_DECORATOR_MODE:
26+
raise SkipTest("Can only test patched code in DecoratorMode")
27+
sm_client = boto3.client("secretsmanager", region_name="us-east-1")
28+
secret_arn = sm_client.create_secret(Name="some", SecretString="secret")["ARN"]
29+
30+
lambda_res = create_mock_rotator_lambda()
31+
sm_client.rotate_secret(
32+
SecretId=secret_arn,
33+
RotationLambdaARN=lambda_res["FunctionArn"],
34+
RotationRules={"AutomaticallyAfterDays": 1, "Duration": "1h"},
35+
RotateImmediately=True,
36+
)
37+
assert secret_steps == ["createSecret", "setSecret", "testSecret", "finishSecret"]
38+
secret_steps.clear()
39+
40+
41+
@mock_aws(config={"lambda": {"use_docker": False}})
42+
@patch(
43+
"moto.awslambda_simple.models.LambdaSimpleBackend.invoke", new=mock_lambda_invoke
44+
)
45+
def test_simple_lambda_is_invoked__do_not_rotate_immediately():
46+
if not settings.TEST_DECORATOR_MODE:
47+
raise SkipTest("Can only test patched code in DecoratorMode")
48+
sm_client = boto3.client("secretsmanager", region_name="us-east-1")
49+
secret_arn = sm_client.create_secret(Name="some", SecretString="secret")["ARN"]
50+
51+
lambda_res = create_mock_rotator_lambda()
52+
sm_client.rotate_secret(
53+
SecretId=secret_arn,
54+
RotationLambdaARN=lambda_res["FunctionArn"],
55+
RotationRules={"AutomaticallyAfterDays": 1, "Duration": "1h"},
56+
RotateImmediately=False,
57+
)
58+
assert secret_steps == ["testSecret"]
59+
secret_steps.clear()
60+
61+
62+
def mock_lambda_zip():
63+
code = """
64+
def lambda_handler(event, context):
65+
return event
66+
"""
67+
zip_output = io.BytesIO()
68+
zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED)
69+
zip_file.writestr("lambda_function.py", code)
70+
zip_file.close()
71+
zip_output.seek(0)
72+
return zip_output.read()
73+
74+
75+
def create_mock_rotator_lambda():
76+
client = boto3.client("lambda", region_name="us-east-1")
77+
return client.create_function(
78+
FunctionName="mock-rotator",
79+
Runtime="python3.9",
80+
Role=get_mock_role_arn(),
81+
Handler="lambda_function.lambda_handler",
82+
Code={"ZipFile": mock_lambda_zip()},
83+
)
84+
85+
86+
def get_mock_role_arn():
87+
iam = boto3.client("iam", region_name="us-east-1")
88+
try:
89+
return iam.get_role(RoleName="my-role")["Role"]["Arn"]
90+
except ClientError:
91+
return iam.create_role(
92+
RoleName="my-role",
93+
AssumeRolePolicyDocument="some policy",
94+
Path="/my-path/",
95+
)["Role"]["Arn"]

0 commit comments

Comments
 (0)