Skip to content

Commit 48b972f

Browse files
committed
controllers: implements woodpecker as mq
Signed-off-by: haorenfsa <shaoyue.chen@zilliz.com>
1 parent 63b7e72 commit 48b972f

File tree

14 files changed

+100
-48
lines changed

14 files changed

+100
-48
lines changed

apis/milvus.io/v1beta1/dependencies_types.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ type MilvusDependencies struct {
3131
Kafka MilvusKafka `json:"kafka,omitempty"`
3232

3333
// +kubebuilder:validation:Optional
34-
WoodPecker MilvusBuildInMQ `json:"woodpecker,omitempty"`
34+
WoodPecker MilvusBuiltInMQ `json:"woodpecker,omitempty"`
3535

3636
// +kubebuilder:validation:Optional
37-
RocksMQ MilvusBuildInMQ `json:"rocksmq,omitempty"`
37+
RocksMQ MilvusBuiltInMQ `json:"rocksmq,omitempty"`
3838

3939
// +kubebuilder:validation:Optional
40-
NatsMQ MilvusBuildInMQ `json:"natsmq,omitempty"`
40+
NatsMQ MilvusBuiltInMQ `json:"natsmq,omitempty"`
4141

4242
// +kubebuilder:validation:Optional
4343
Storage MilvusStorage `json:"storage"`
@@ -50,7 +50,7 @@ type MilvusDependencies struct {
5050
CustomMsgStream Values `json:"customMsgStream,omitempty"`
5151
}
5252

53-
func (m *MilvusDependencies) GetMilvusBuildInMQ() *MilvusBuildInMQ {
53+
func (m *MilvusDependencies) GetMilvusBuiltInMQ() *MilvusBuiltInMQ {
5454
switch m.MsgStreamType {
5555
case MsgStreamTypePulsar, MsgStreamTypeKafka, MsgStreamTypeCustom:
5656
return nil
@@ -139,8 +139,8 @@ type MilvusStorage struct {
139139
External bool `json:"external,omitempty"`
140140
}
141141

142-
// MilvusBuildInMQ (rocksmq or natsmq) configuration
143-
type MilvusBuildInMQ struct {
142+
// MilvusBuiltInMQ (rocksmq or natsmq) configuration
143+
type MilvusBuiltInMQ struct {
144144
Persistence Persistence `json:"persistence,omitempty"`
145145
}
146146

apis/milvus.io/v1beta1/dependencies_types_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,29 @@ import (
66
"github.com/stretchr/testify/assert"
77
)
88

9-
func TestMilvusDependencies_GetMilvusBuildInMQ(t *testing.T) {
9+
func TestMilvusDependencies_GetMilvusBuiltInMQ(t *testing.T) {
1010
m := &MilvusDependencies{
1111
MsgStreamType: MsgStreamTypePulsar,
1212
}
13-
assert.Nil(t, m.GetMilvusBuildInMQ())
13+
assert.Nil(t, m.GetMilvusBuiltInMQ())
1414
m.MsgStreamType = MsgStreamTypeKafka
15-
assert.Nil(t, m.GetMilvusBuildInMQ())
15+
assert.Nil(t, m.GetMilvusBuiltInMQ())
1616

1717
m.MsgStreamType = MsgStreamTypeCustom
18-
assert.Nil(t, m.GetMilvusBuildInMQ())
18+
assert.Nil(t, m.GetMilvusBuiltInMQ())
1919

2020
m.MsgStreamType = MsgStreamType("unknown")
21-
assert.Nil(t, m.GetMilvusBuildInMQ())
21+
assert.Nil(t, m.GetMilvusBuiltInMQ())
2222

2323
m.MsgStreamType = MsgStreamTypeWoodPecker
24-
assert.NotNil(t, m.GetMilvusBuildInMQ())
25-
assert.Equal(t, &m.WoodPecker, m.GetMilvusBuildInMQ())
24+
assert.NotNil(t, m.GetMilvusBuiltInMQ())
25+
assert.Equal(t, &m.WoodPecker, m.GetMilvusBuiltInMQ())
2626

2727
m.MsgStreamType = MsgStreamTypeRocksMQ
28-
assert.NotNil(t, m.GetMilvusBuildInMQ())
29-
assert.Equal(t, &m.RocksMQ, m.GetMilvusBuildInMQ())
28+
assert.NotNil(t, m.GetMilvusBuiltInMQ())
29+
assert.Equal(t, &m.RocksMQ, m.GetMilvusBuiltInMQ())
3030

3131
m.MsgStreamType = MsgStreamTypeNatsMQ
32-
assert.NotNil(t, m.GetMilvusBuildInMQ())
33-
assert.Equal(t, &m.NatsMQ, m.GetMilvusBuildInMQ())
32+
assert.NotNil(t, m.GetMilvusBuiltInMQ())
33+
assert.Equal(t, &m.NatsMQ, m.GetMilvusBuiltInMQ())
3434
}

apis/milvus.io/v1beta1/milvus_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ func (ms *MilvusSpec) GetPersistenceConfig() *Persistence {
128128
return &ms.Dep.RocksMQ.Persistence
129129
case MsgStreamTypeNatsMQ:
130130
return &ms.Dep.NatsMQ.Persistence
131+
case MsgStreamTypeWoodPecker:
132+
return &ms.Dep.WoodPecker.Persistence
131133
}
132134
return nil
133135
}

apis/milvus.io/v1beta1/milvus_types_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,15 @@ func TestGetPersistenceConfig(t *testing.T) {
224224
assert.Nil(t, m.Spec.GetPersistenceConfig())
225225

226226
m.Spec.Dep.MsgStreamType = MsgStreamTypeRocksMQ
227-
m.Spec.Dep.NatsMQ.Persistence.Enabled = true
228227
assert.Same(t, &m.Spec.Dep.RocksMQ.Persistence, m.Spec.GetPersistenceConfig())
229228

229+
m.Spec.Dep.NatsMQ.Persistence.Enabled = true
230230
m.Spec.Dep.MsgStreamType = MsgStreamTypeNatsMQ
231231
assert.Same(t, &m.Spec.Dep.NatsMQ.Persistence, m.Spec.GetPersistenceConfig())
232+
233+
m.Spec.Dep.WoodPecker.Persistence.Enabled = true
234+
m.Spec.Dep.MsgStreamType = MsgStreamTypeWoodPecker
235+
assert.Same(t, &m.Spec.Dep.WoodPecker.Persistence, m.Spec.GetPersistenceConfig())
232236
}
233237

234238
func TestGetActiveConfigMap_SetActiveConfigMap(t *testing.T) {

apis/milvus.io/v1beta1/milvus_webhook.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,15 +434,18 @@ func (r *Milvus) defaultValuesMerged() bool {
434434
return r.Annotations[DependencyValuesMergedAnnotation] == TrueStr
435435
}
436436

437-
func (r *Milvus) defaultMsgStream() {
437+
func (r *Milvus) setDefaultMsgStreamType() {
438438
if r.Spec.Dep.MsgStreamType == "" {
439439
switch r.Spec.Mode {
440440
case MilvusModeStandalone:
441441
r.Spec.Dep.MsgStreamType = MsgStreamTypeRocksMQ
442-
case MilvusModeCluster:
442+
default:
443443
r.Spec.Dep.MsgStreamType = MsgStreamTypePulsar
444444
}
445445
}
446+
}
447+
448+
func (r *Milvus) setDefaultMsgStreamConfigs() {
446449
switch r.Spec.Dep.MsgStreamType {
447450
case MsgStreamTypeKafka:
448451
if !r.Spec.Dep.Kafka.External {
@@ -483,6 +486,11 @@ func (r *Milvus) defaultMsgStream() {
483486
}
484487
}
485488

489+
func (r *Milvus) defaultMsgStream() {
490+
r.setDefaultMsgStreamType()
491+
r.setDefaultMsgStreamConfigs()
492+
}
493+
486494
func (r *Milvus) defaultStorage() {
487495
setDefaultStr(&r.Spec.Dep.Storage.Type, "MinIO")
488496
if !r.Spec.Dep.Storage.External {

apis/milvus.io/v1beta1/zz_generated.deepcopy.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# the manifest to enable streaming node for milvus cluster mode
2+
# ref https://milvus.io/docs/release_notes.md#Streaming-Node-Beta
3+
apiVersion: milvus.io/v1beta1
4+
kind: Milvus
5+
metadata:
6+
name: milvus
7+
namespace: mc-sit
8+
labels:
9+
app: milvus
10+
spec:
11+
mode: 'cluster'
12+
components:
13+
# image not formally released yet
14+
image: harbor.milvus.io/milvus/milvus:tinswzy-support_woodpecker_wal_type-36319c0-20250326
15+
streamingNode:
16+
replicas: 1
17+
indexNode:
18+
replicas: 0
19+
dependencies:
20+
msgStreamType: woodpecker

pkg/controllers/dependencies.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,11 @@ func (r *MilvusReconciler) ReconcileMsgStream(ctx context.Context, mc v1beta1.Mi
224224
switch mc.Spec.Dep.MsgStreamType {
225225
case v1beta1.MsgStreamTypeKafka:
226226
return r.ReconcileKafka(ctx, mc)
227-
case v1beta1.MsgStreamTypeRocksMQ, v1beta1.MsgStreamTypeNatsMQ, v1beta1.MsgStreamTypeCustom:
228-
// built in, do nothing
229-
return nil
230-
default:
227+
case v1beta1.MsgStreamTypePulsar:
231228
return r.ReconcilePulsar(ctx, mc)
229+
default:
230+
// built in mq or custom mq, do nothing
231+
return nil
232232
}
233233
}
234234

pkg/controllers/deployment_updater.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ type deploymentUpdater interface {
2828
GetMergedComponentSpec() ComponentSpec
2929
GetArgs() []string
3030
GetSecretRef() string
31-
GetPersistenceConfig() *v1beta1.Persistence
3231
GetMilvus() *v1beta1.Milvus
3332
RollingUpdateImageDependencyReady() bool
3433
HasHookConfig() bool
@@ -207,13 +206,20 @@ func updateUserDefinedVolumes(template *corev1.PodTemplateSpec, updater deployme
207206
fillConfigMapVolumeDefaultValues(&volume)
208207
userDefinedVolumes = append(userDefinedVolumes, volume)
209208
}
210-
if persistence := updater.GetPersistenceConfig(); persistence != nil && persistence.Enabled {
211-
rocketMqPvcName := getPVCNameByInstName(updater.GetIntanceName())
212-
if len(persistence.PersistentVolumeClaim.ExistingClaim) > 0 {
213-
rocketMqPvcName = persistence.PersistentVolumeClaim.ExistingClaim
209+
builtInMq := updater.GetMilvus().Spec.Dep.GetMilvusBuiltInMQ()
210+
211+
if builtInMq != nil {
212+
if builtInMq.Persistence.Enabled {
213+
rocketMqPvcName := getPVCNameByInstName(updater.GetIntanceName())
214+
if len(builtInMq.Persistence.PersistentVolumeClaim.ExistingClaim) > 0 {
215+
rocketMqPvcName = builtInMq.Persistence.PersistentVolumeClaim.ExistingClaim
216+
}
217+
userDefinedVolumes = append(userDefinedVolumes, persisentDataVolumeByName(rocketMqPvcName))
218+
} else {
219+
userDefinedVolumes = append(userDefinedVolumes, emptyDirDataVolume())
214220
}
215-
userDefinedVolumes = append(userDefinedVolumes, persisentVolumeByName(rocketMqPvcName))
216221
}
222+
217223
for _, volume := range userDefinedVolumes {
218224
addVolume(&template.Spec.Volumes, volume)
219225
}
@@ -315,8 +321,9 @@ func updateBuiltInVolumeMounts(template *corev1.PodTemplateSpec, updater deploym
315321

316322
func getUserDefinedVolumeMounts(updater deploymentUpdater) []corev1.VolumeMount {
317323
ret := updater.GetMergedComponentSpec().VolumeMounts
318-
if persistence := updater.GetPersistenceConfig(); persistence != nil && persistence.Enabled {
319-
ret = append(ret, persistentVolumeMount())
324+
builtInMq := updater.GetMilvus().Spec.Dep.GetMilvusBuiltInMQ()
325+
if builtInMq != nil {
326+
ret = append(ret, dataVolumeMount())
320327
}
321328
return ret
322329
}

pkg/controllers/deployments.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,16 @@ func configVolumeByName(name string) corev1.Volume {
320320
}
321321
}
322322

323-
func persisentVolumeByName(name string) corev1.Volume {
323+
func emptyDirDataVolume() corev1.Volume {
324+
return corev1.Volume{
325+
Name: MilvusDataVolumeName,
326+
VolumeSource: corev1.VolumeSource{
327+
EmptyDir: &corev1.EmptyDirVolumeSource{},
328+
},
329+
}
330+
}
331+
332+
func persisentDataVolumeByName(name string) corev1.Volume {
324333
return corev1.Volume{
325334
Name: MilvusDataVolumeName,
326335
VolumeSource: corev1.VolumeSource{
@@ -332,7 +341,7 @@ func persisentVolumeByName(name string) corev1.Volume {
332341
}
333342
}
334343

335-
func persistentVolumeMount() corev1.VolumeMount {
344+
func dataVolumeMount() corev1.VolumeMount {
336345
return corev1.VolumeMount{
337346
Name: MilvusDataVolumeName,
338347
ReadOnly: false,

0 commit comments

Comments
 (0)