diff --git a/api/v1beta1/queue_types.go b/api/v1beta1/queue_types.go index e1b9b71a..f20f9ced 100644 --- a/api/v1beta1/queue_types.go +++ b/api/v1beta1/queue_types.go @@ -32,6 +32,10 @@ type QueueSpec struct { Durable bool `json:"durable,omitempty"` // when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes. AutoDelete bool `json:"autoDelete,omitempty"` + // when set to true, queues are deleted only if empty. + DeleteIfEmpty bool `json:"deleteIfEmpty,omitempty"` + // when set to true, queues are delete only if they have no consumer. + DeleteIfUnused bool `json:"deleteIfUnused,omitempty"` // Queue arguments in the format of KEY: VALUE. e.g. x-delivery-limit: 10000. // Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead. // +kubebuilder:validation:Type=object diff --git a/api/v1beta1/queue_webhook_test.go b/api/v1beta1/queue_webhook_test.go index 6bbbbe1f..cb9790e2 100644 --- a/api/v1beta1/queue_webhook_test.go +++ b/api/v1beta1/queue_webhook_test.go @@ -16,11 +16,13 @@ var _ = Describe("queue webhook", func() { Name: "test-queue", }, Spec: QueueSpec{ - Name: "test", - Vhost: "/a-vhost", - Type: "quorum", - Durable: false, - AutoDelete: true, + Name: "test", + Vhost: "/a-vhost", + Type: "quorum", + Durable: false, + AutoDelete: true, + DeleteIfEmpty: true, + DeleteIfUnused: false, RabbitmqClusterReference: RabbitmqClusterReference{ Name: "some-cluster", }, diff --git a/config/crd/bases/rabbitmq.com_queues.yaml b/config/crd/bases/rabbitmq.com_queues.yaml index 6e66724c..18042bca 100644 --- a/config/crd/bases/rabbitmq.com_queues.yaml +++ b/config/crd/bases/rabbitmq.com_queues.yaml @@ -52,6 +52,13 @@ spec: description: when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes. type: boolean + deleteIfEmpty: + description: when set to true, queues are deleted only if empty. + type: boolean + deleteIfUnused: + description: when set to true, queues are delete only if they have + no consumer. + type: boolean durable: description: When set to false queues does not survive server restart. type: boolean diff --git a/controllers/queue_controller.go b/controllers/queue_controller.go index c4884292..9f473fcc 100644 --- a/controllers/queue_controller.go +++ b/controllers/queue_controller.go @@ -47,11 +47,30 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient. logger.Info("Deleting queues from ReconcilerFunc DeleteObj") queue := obj.(*topology.Queue) - err := validateResponseForDeletion(client.DeleteQueue(queue.Spec.Vhost, queue.Spec.Name)) - if errors.Is(err, NotFound) { + queueDeleteOptions, err := internal.GenerateQueueDeleteOptions(queue) + if err != nil { + return fmt.Errorf("failed to generate queue delete options: %w", err) + } + // Manage Quorum queue deletion if DeleteIfEmpty or DeleteIfUnused is true + if queue.Spec.Type == "quorum" && (queue.Spec.DeleteIfEmpty || queue.Spec.DeleteIfUnused) { + qInfo, err := client.GetQueue(queue.Spec.Vhost, queue.Spec.Name) + if err != nil { + return fmt.Errorf("cannot get %w queue information to verify queue is empty/unused: %s", err, queue.Spec.Name) + } + if qInfo.Messages > 0 && queue.Spec.DeleteIfEmpty { + return fmt.Errorf("cannot delete queue %s because it has ready messages", queue.Spec.Name) + } + if qInfo.Consumers > 0 && queue.Spec.DeleteIfUnused { + return fmt.Errorf("cannot delete queue %s because queue has consumers", queue.Spec.Name) + } + + } + + errdel := validateResponseForDeletion(client.DeleteQueue(queue.Spec.Vhost, queue.Spec.Name, *queueDeleteOptions)) + if errors.Is(errdel, NotFound) { logger.Info("cannot find queue in rabbitmq server; already deleted", "queue", queue.Spec.Name) - } else if err != nil { - return err + } else if errdel != nil { + return errdel } return nil } diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index c9a9bc38..50b5c41a 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -902,6 +902,8 @@ QueueSpec defines the desired state of Queue | *`type`* __string__ | | *`durable`* __boolean__ | When set to false queues does not survive server restart. | *`autoDelete`* __boolean__ | when set to true, queues that have had at least one consumer before are deleted after the last consumer unsubscribes. +| *`deleteIfEmpty`* __boolean__ | when set to true, queues are deleted only if empty. +| *`deleteIfUnused`* __boolean__ | when set to true, queues are delete only if they have no consumer. | *`arguments`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | Queue arguments in the format of KEY: VALUE. e.g. x-delivery-limit: 10000. Configuring queues through arguments is not recommended because they cannot be updated once set; we recommend configuring queues through policies instead. | *`rabbitmqClusterReference`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-rabbitmqclusterreference[$$RabbitmqClusterReference$$]__ | Reference to the RabbitmqCluster that the queue will be created in. diff --git a/internal/queue_delete_options.go b/internal/queue_delete_options.go new file mode 100644 index 00000000..2f801682 --- /dev/null +++ b/internal/queue_delete_options.go @@ -0,0 +1,26 @@ +/* +RabbitMQ Messaging Topology Kubernetes Operator +Copyright 2021 VMware, Inc. + +This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License. + +This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file. +*/ + +package internal + +import ( + rabbithole "github.com/michaelklishin/rabbit-hole/v2" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" +) + +// GenerateQueueDeleteOptions generates rabbithole.QueueDeleteOptions for a given Queue +// queue.Spec.Arguments (type k8s runtime.RawExtensions) is unmarshalled +func GenerateQueueDeleteOptions(q *topology.Queue) (*rabbithole.QueueDeleteOptions, error) { + + return &rabbithole.QueueDeleteOptions{ + // Set these values to false if q.Spec.Type = Quorum, not supported by the API + IfEmpty: q.Spec.Type != "quorum" && q.Spec.DeleteIfEmpty, + IfUnused: q.Spec.Type != "quorum" && q.Spec.DeleteIfUnused, + }, nil +} diff --git a/internal/queue_delete_options_test.go b/internal/queue_delete_options_test.go new file mode 100644 index 00000000..d83b67ec --- /dev/null +++ b/internal/queue_delete_options_test.go @@ -0,0 +1,73 @@ +package internal_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" + "github.com/rabbitmq/messaging-topology-operator/internal" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("GenerateQueueDeleteOptionsQuorum", func() { + var q *topology.Queue + + BeforeEach(func() { + q = &topology.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a-queue", + }, + Spec: topology.QueueSpec{ + Type: "quorum", + AutoDelete: false, + Durable: true, + DeleteIfEmpty: true, + DeleteIfUnused: false, + }, + } + }) + + It("sets QueueDeleteOptions.IfEmpty to false because we handle a quorum queue", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfEmpty).To(BeFalse()) + }) + + It("sets QueueDeleteOptions.IfUnused to false because we handle a quorum queue", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfUnused).To(BeFalse()) + }) + +}) + +var _ = Describe("GenerateQueueDeleteOptionsClassic", func() { + var q *topology.Queue + + BeforeEach(func() { + q = &topology.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "a-queue", + }, + Spec: topology.QueueSpec{ + Type: "classic", + AutoDelete: false, + Durable: true, + DeleteIfEmpty: true, + DeleteIfUnused: false, + }, + } + }) + + It("sets QueueDeleteOptions.IfEmpty according to queue.spec", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfEmpty).To(BeTrue()) + }) + + It("sets QueueDeleteOptions.IfUnused according to queue.spec", func() { + options, err := internal.GenerateQueueDeleteOptions(q) + Expect(err).NotTo(HaveOccurred()) + Expect(options.IfUnused).To(BeFalse()) + }) + +}) diff --git a/rabbitmqclient/rabbitmq_client_factory.go b/rabbitmqclient/rabbitmq_client_factory.go index 9a3d3cb1..27b54435 100644 --- a/rabbitmqclient/rabbitmq_client_factory.go +++ b/rabbitmqclient/rabbitmq_client_factory.go @@ -33,6 +33,7 @@ type Client interface { DeletePolicy(string, string) (*http.Response, error) DeclareQueue(string, string, rabbithole.QueueSettings) (*http.Response, error) DeleteQueue(string, string, ...rabbithole.QueueDeleteOptions) (*http.Response, error) + GetQueue(string, string) (*rabbithole.DetailedQueueInfo, error) DeclareExchange(string, string, rabbithole.ExchangeSettings) (*http.Response, error) DeleteExchange(string, string) (*http.Response, error) PutVhost(string, rabbithole.VhostSettings) (*http.Response, error) diff --git a/rabbitmqclient/rabbitmqclientfakes/fake_client.go b/rabbitmqclient/rabbitmqclientfakes/fake_client.go index 8658deec..b5a9d1b8 100644 --- a/rabbitmqclient/rabbitmqclientfakes/fake_client.go +++ b/rabbitmqclient/rabbitmqclientfakes/fake_client.go @@ -236,6 +236,20 @@ type FakeClient struct { result1 *http.Response result2 error } + GetQueueStub func(string, string) (*rabbithole.DetailedQueueInfo, error) + getQueueMutex sync.RWMutex + getQueueArgsForCall []struct { + arg1 string + arg2 string + } + getQueueReturns struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + } + getQueueReturnsOnCall map[int]struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + } GetVhostStub func(string) (*rabbithole.VhostInfo, error) getVhostMutex sync.RWMutex getVhostArgsForCall []struct { @@ -1442,6 +1456,71 @@ func (fake *FakeClient) DeleteVhostReturnsOnCall(i int, result1 *http.Response, }{result1, result2} } +func (fake *FakeClient) GetQueue(arg1 string, arg2 string) (*rabbithole.DetailedQueueInfo, error) { + fake.getQueueMutex.Lock() + ret, specificReturn := fake.getQueueReturnsOnCall[len(fake.getQueueArgsForCall)] + fake.getQueueArgsForCall = append(fake.getQueueArgsForCall, struct { + arg1 string + arg2 string + }{arg1, arg2}) + stub := fake.GetQueueStub + fakeReturns := fake.getQueueReturns + fake.recordInvocation("GetQueue", []interface{}{arg1, arg2}) + fake.getQueueMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeClient) GetQueueCallCount() int { + fake.getQueueMutex.RLock() + defer fake.getQueueMutex.RUnlock() + return len(fake.getQueueArgsForCall) +} + +func (fake *FakeClient) GetQueueCalls(stub func(string, string) (*rabbithole.DetailedQueueInfo, error)) { + fake.getQueueMutex.Lock() + defer fake.getQueueMutex.Unlock() + fake.GetQueueStub = stub +} + +func (fake *FakeClient) GetQueueArgsForCall(i int) (string, string) { + fake.getQueueMutex.RLock() + defer fake.getQueueMutex.RUnlock() + argsForCall := fake.getQueueArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeClient) GetQueueReturns(result1 *rabbithole.DetailedQueueInfo, result2 error) { + fake.getQueueMutex.Lock() + defer fake.getQueueMutex.Unlock() + fake.GetQueueStub = nil + fake.getQueueReturns = struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + }{result1, result2} +} + +func (fake *FakeClient) GetQueueReturnsOnCall(i int, result1 *rabbithole.DetailedQueueInfo, result2 error) { + fake.getQueueMutex.Lock() + defer fake.getQueueMutex.Unlock() + fake.GetQueueStub = nil + if fake.getQueueReturnsOnCall == nil { + fake.getQueueReturnsOnCall = make(map[int]struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + }) + } + fake.getQueueReturnsOnCall[i] = struct { + result1 *rabbithole.DetailedQueueInfo + result2 error + }{result1, result2} +} + func (fake *FakeClient) GetVhost(arg1 string) (*rabbithole.VhostInfo, error) { fake.getVhostMutex.Lock() ret, specificReturn := fake.getVhostReturnsOnCall[len(fake.getVhostArgsForCall)] @@ -2198,6 +2277,8 @@ func (fake *FakeClient) Invocations() map[string][][]interface{} { defer fake.deleteUserMutex.RUnlock() fake.deleteVhostMutex.RLock() defer fake.deleteVhostMutex.RUnlock() + fake.getQueueMutex.RLock() + defer fake.getQueueMutex.RUnlock() fake.getVhostMutex.RLock() defer fake.getVhostMutex.RUnlock() fake.listExchangeBindingsBetweenMutex.RLock()