Skip to content

Commit a1ea7ce

Browse files
authored
Merge pull request #13654 from arafsheikh/master
aws_msk_cluster: support in-place Kafka version upgrade
2 parents 87dfe39 + 3a8e662 commit a1ea7ce

2 files changed

Lines changed: 236 additions & 3 deletions

File tree

aws/resource_aws_msk_cluster.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package aws
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"time"
78

89
"github.com/aws/aws-sdk-go/aws"
910
"github.com/aws/aws-sdk-go/service/kafka"
11+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff"
1012
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
1113
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
1214
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
@@ -22,6 +24,11 @@ func resourceAwsMskCluster() *schema.Resource {
2224
Importer: &schema.ResourceImporter{
2325
State: schema.ImportStatePassthrough,
2426
},
27+
CustomizeDiff: customdiff.Sequence(
28+
customdiff.ForceNewIfChange("kafka_version", func(_ context.Context, old, new, meta interface{}) bool {
29+
return new.(string) < old.(string)
30+
}),
31+
),
2532
Schema: map[string]*schema.Schema{
2633
"arn": {
2734
Type: schema.TypeString,
@@ -198,7 +205,6 @@ func resourceAwsMskCluster() *schema.Resource {
198205
"kafka_version": {
199206
Type: schema.TypeString,
200207
Required: true,
201-
ForceNew: true,
202208
ValidateFunc: validation.StringLenBetween(1, 64),
203209
},
204210
"number_of_broker_nodes": {
@@ -563,7 +569,7 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
563569
}
564570
}
565571

566-
if d.HasChange("configuration_info") {
572+
if d.HasChange("configuration_info") && !d.HasChange("kafka_version") {
567573
input := &kafka.UpdateClusterConfigurationInput{
568574
ClusterArn: aws.String(d.Id()),
569575
ConfigurationInfo: expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{})),
@@ -587,6 +593,34 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
587593
}
588594
}
589595

596+
if d.HasChange("kafka_version") {
597+
input := &kafka.UpdateClusterKafkaVersionInput{
598+
ClusterArn: aws.String(d.Id()),
599+
CurrentVersion: aws.String(d.Get("current_version").(string)),
600+
TargetKafkaVersion: aws.String(d.Get("kafka_version").(string)),
601+
}
602+
603+
if d.HasChange("configuration_info") {
604+
input.ConfigurationInfo = expandMskClusterConfigurationInfo(d.Get("configuration_info").([]interface{}))
605+
}
606+
607+
output, err := conn.UpdateClusterKafkaVersion(input)
608+
609+
if err != nil {
610+
return fmt.Errorf("error updating MSK Cluster (%s) kafka version: %w", d.Id(), err)
611+
}
612+
613+
if output == nil {
614+
return fmt.Errorf("error updating MSK Cluster (%s) kafka version: empty response", d.Id())
615+
}
616+
617+
clusterOperationARN := aws.StringValue(output.ClusterOperationArn)
618+
619+
if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil {
620+
return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %w", d.Id(), clusterOperationARN, err)
621+
}
622+
}
623+
590624
if d.HasChange("tags") {
591625
o, n := d.GetChange("tags")
592626

@@ -1110,7 +1144,7 @@ func waitForMskClusterOperation(conn *kafka.Kafka, clusterOperationARN string) e
11101144
Pending: []string{"PENDING", "UPDATE_IN_PROGRESS"},
11111145
Target: []string{"UPDATE_COMPLETE"},
11121146
Refresh: mskClusterOperationRefreshFunc(conn, clusterOperationARN),
1113-
Timeout: 60 * time.Minute,
1147+
Timeout: 2 * time.Hour,
11141148
}
11151149

11161150
log.Printf("[DEBUG] Waiting for MSK Cluster Operation (%s) completion", clusterOperationARN)

aws/resource_aws_msk_cluster_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,128 @@ func TestAccAWSMskCluster_LoggingInfo(t *testing.T) {
514514
})
515515
}
516516

517+
func TestAccAWSMskCluster_KafkaVersionUpgrade(t *testing.T) {
518+
var cluster1, cluster2 kafka.ClusterInfo
519+
rName := acctest.RandomWithPrefix("tf-acc-test")
520+
resourceName := "aws_msk_cluster.test"
521+
522+
resource.ParallelTest(t, resource.TestCase{
523+
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
524+
Providers: testAccProviders,
525+
CheckDestroy: testAccCheckMskClusterDestroy,
526+
Steps: []resource.TestStep{
527+
{
528+
Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"),
529+
Check: resource.ComposeTestCheckFunc(
530+
testAccCheckMskClusterExists(resourceName, &cluster1),
531+
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
532+
),
533+
},
534+
{
535+
ResourceName: resourceName,
536+
ImportState: true,
537+
ImportStateVerify: true,
538+
ImportStateVerifyIgnore: []string{
539+
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
540+
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
541+
},
542+
},
543+
{
544+
Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"),
545+
Check: resource.ComposeTestCheckFunc(
546+
testAccCheckMskClusterExists(resourceName, &cluster2),
547+
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
548+
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
549+
),
550+
},
551+
},
552+
})
553+
}
554+
555+
func TestAccAWSMskCluster_KafkaVersionDowngrade(t *testing.T) {
556+
var cluster1, cluster2 kafka.ClusterInfo
557+
rName := acctest.RandomWithPrefix("tf-acc-test")
558+
resourceName := "aws_msk_cluster.test"
559+
560+
resource.ParallelTest(t, resource.TestCase{
561+
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
562+
Providers: testAccProviders,
563+
CheckDestroy: testAccCheckMskClusterDestroy,
564+
Steps: []resource.TestStep{
565+
{
566+
Config: testAccMskClusterConfigKafkaVersion(rName, "2.4.1.1"),
567+
Check: resource.ComposeTestCheckFunc(
568+
testAccCheckMskClusterExists(resourceName, &cluster1),
569+
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
570+
),
571+
},
572+
{
573+
ResourceName: resourceName,
574+
ImportState: true,
575+
ImportStateVerify: true,
576+
ImportStateVerifyIgnore: []string{
577+
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
578+
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
579+
},
580+
},
581+
{
582+
Config: testAccMskClusterConfigKafkaVersion(rName, "2.2.1"),
583+
Check: resource.ComposeTestCheckFunc(
584+
testAccCheckMskClusterExists(resourceName, &cluster2),
585+
testAccCheckMskClusterRecreated(&cluster1, &cluster2),
586+
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
587+
),
588+
},
589+
},
590+
})
591+
}
592+
593+
func TestAccAWSMskCluster_KafkaVersionUpgradeWithConfigurationInfo(t *testing.T) {
594+
var cluster1, cluster2 kafka.ClusterInfo
595+
rName := acctest.RandomWithPrefix("tf-acc-test")
596+
configurationResourceName1 := "aws_msk_configuration.config1"
597+
configurationResourceName2 := "aws_msk_configuration.config2"
598+
resourceName := "aws_msk_cluster.test"
599+
600+
resource.ParallelTest(t, resource.TestCase{
601+
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
602+
Providers: testAccProviders,
603+
CheckDestroy: testAccCheckMskClusterDestroy,
604+
Steps: []resource.TestStep{
605+
{
606+
Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.2.1", "config1"),
607+
Check: resource.ComposeTestCheckFunc(
608+
testAccCheckMskClusterExists(resourceName, &cluster1),
609+
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.2.1"),
610+
resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"),
611+
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName1, "arn"),
612+
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName1, "latest_revision"),
613+
),
614+
},
615+
{
616+
ResourceName: resourceName,
617+
ImportState: true,
618+
ImportStateVerify: true,
619+
ImportStateVerifyIgnore: []string{
620+
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
621+
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
622+
},
623+
},
624+
{
625+
Config: testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName, "2.4.1.1", "config2"),
626+
Check: resource.ComposeTestCheckFunc(
627+
testAccCheckMskClusterExists(resourceName, &cluster2),
628+
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
629+
resource.TestCheckResourceAttr(resourceName, "kafka_version", "2.4.1.1"),
630+
resource.TestCheckResourceAttr(resourceName, "configuration_info.#", "1"),
631+
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.arn", configurationResourceName2, "arn"),
632+
resource.TestCheckResourceAttrPair(resourceName, "configuration_info.0.revision", configurationResourceName2, "latest_revision"),
633+
),
634+
},
635+
},
636+
})
637+
}
638+
517639
func TestAccAWSMskCluster_Tags(t *testing.T) {
518640
var cluster kafka.ClusterInfo
519641
var td kafka.ListTagsForResourceOutput
@@ -611,6 +733,16 @@ func testAccCheckMskClusterNotRecreated(i, j *kafka.ClusterInfo) resource.TestCh
611733
}
612734
}
613735

736+
func testAccCheckMskClusterRecreated(i, j *kafka.ClusterInfo) resource.TestCheckFunc {
737+
return func(s *terraform.State) error {
738+
if aws.StringValue(i.ClusterArn) == aws.StringValue(j.ClusterArn) {
739+
return fmt.Errorf("MSK Cluster (%s) was not recreated", aws.StringValue(i.ClusterArn))
740+
}
741+
742+
return nil
743+
}
744+
}
745+
614746
func testAccLoadMskTags(cluster *kafka.ClusterInfo, td *kafka.ListTagsForResourceOutput) resource.TestCheckFunc {
615747
return func(s *terraform.State) error {
616748
conn := testAccProvider.Meta().(*AWSClient).kafkaconn
@@ -1106,6 +1238,73 @@ resource "aws_msk_cluster" "test" {
11061238
`, rName, cloudwatchLogsEnabled, cloudwatchLogsLogGroup, firehoseEnabled, firehoseDeliveryStream, s3Enabled, s3Bucket)
11071239
}
11081240

1241+
func testAccMskClusterConfigKafkaVersion(rName string, kafkaVersion string) string {
1242+
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
1243+
resource "aws_msk_cluster" "test" {
1244+
cluster_name = %[1]q
1245+
kafka_version = %[2]q
1246+
number_of_broker_nodes = 3
1247+
1248+
encryption_info {
1249+
encryption_in_transit {
1250+
client_broker = "TLS_PLAINTEXT"
1251+
}
1252+
}
1253+
1254+
broker_node_group_info {
1255+
client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"]
1256+
ebs_volume_size = 10
1257+
instance_type = "kafka.m5.large"
1258+
security_groups = ["${aws_security_group.example_sg.id}"]
1259+
}
1260+
}
1261+
`, rName, kafkaVersion)
1262+
}
1263+
1264+
func testAccMskClusterConfigKafkaVersionWithConfigurationInfo(rName string, kafkaVersion string, configResourceName string) string {
1265+
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
1266+
resource "aws_msk_configuration" "config1" {
1267+
kafka_versions = ["2.2.1"]
1268+
name = "%[1]s-1"
1269+
server_properties = <<PROPERTIES
1270+
log.cleaner.delete.retention.ms = 86400000
1271+
PROPERTIES
1272+
}
1273+
1274+
resource "aws_msk_configuration" "config2" {
1275+
kafka_versions = ["2.4.1.1"]
1276+
name = "%[1]s-2"
1277+
server_properties = <<PROPERTIES
1278+
log.cleaner.delete.retention.ms = 86400001
1279+
PROPERTIES
1280+
}
1281+
1282+
resource "aws_msk_cluster" "test" {
1283+
cluster_name = %[1]q
1284+
kafka_version = %[2]q
1285+
number_of_broker_nodes = 3
1286+
1287+
encryption_info {
1288+
encryption_in_transit {
1289+
client_broker = "TLS_PLAINTEXT"
1290+
}
1291+
}
1292+
1293+
broker_node_group_info {
1294+
client_subnets = ["${aws_subnet.example_subnet_az1.id}", "${aws_subnet.example_subnet_az2.id}", "${aws_subnet.example_subnet_az3.id}"]
1295+
ebs_volume_size = 10
1296+
instance_type = "kafka.m5.large"
1297+
security_groups = ["${aws_security_group.example_sg.id}"]
1298+
}
1299+
1300+
configuration_info {
1301+
arn = aws_msk_configuration.%[3]s.arn
1302+
revision = aws_msk_configuration.%[3]s.latest_revision
1303+
}
1304+
}
1305+
`, rName, kafkaVersion, configResourceName)
1306+
}
1307+
11091308
func testAccMskClusterConfigTags1(rName string) string {
11101309
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
11111310
resource "aws_msk_cluster" "test" {

0 commit comments

Comments
 (0)