Skip to content

Commit b5f0e01

Browse files
committed
fix conflict; enhance minicluster for ITs; fix some UTs
Signed-off-by: tinswzy <zhenyuan.wei@zilliz.com>
1 parent 6810f23 commit b5f0e01

21 files changed

+366
-221
lines changed

common/funcutil/func.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package funcutil
18+
19+
import (
20+
"context"
21+
"time"
22+
)
23+
24+
// CheckGrpcReady wait for context timeout, or wait 100ms then send nil to targetCh
25+
func CheckGrpcReady(ctx context.Context, targetCh chan error) {
26+
timer := time.NewTimer(100 * time.Millisecond)
27+
defer timer.Stop()
28+
select {
29+
case <-timer.C:
30+
targetCh <- nil
31+
case <-ctx.Done():
32+
return
33+
}
34+
}

common/funcutil/func_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package funcutil
18+
19+
import (
20+
"context"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
)
26+
27+
func Test_CheckGrpcReady(t *testing.T) {
28+
errChan := make(chan error)
29+
30+
// test errChan can receive nil after interval
31+
go CheckGrpcReady(context.TODO(), errChan)
32+
33+
err := <-errChan
34+
assert.NoError(t, err)
35+
36+
// test CheckGrpcReady can finish after context done
37+
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Millisecond)
38+
CheckGrpcReady(ctx, errChan)
39+
cancel()
40+
}

common/membership/client_node.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type ClientConfig struct {
4141
func NewClientNode(config *ClientConfig) (*ClientNode, error) {
4242
discovery := NewServiceDiscovery()
4343
delegate := NewClientDelegate()
44-
eventDel := NewEventDelegate(discovery, RoleClient)
44+
eventDel := NewEventDelegate(discovery, RoleClient, fmt.Sprintf("%s:%d", config.BindAddr, config.BindPort))
4545

4646
mlConfig := ml.DefaultLocalConfig()
4747
mlConfig.Name = config.NodeID // client as the unique name identifier for gossip protocol node
@@ -65,7 +65,7 @@ func (n *ClientNode) Join(existing []string) error {
6565
if len(existing) > 0 {
6666
count, err := n.memberlist.Join(existing)
6767
if err != nil {
68-
return fmt.Errorf("failed to join cluster: %w", err)
68+
return fmt.Errorf("failed to join cluster %+v: %w", existing, err)
6969
}
7070
log.Printf("[CLIENT] Successfully connected to %d nodes", count)
7171
}

common/membership/event_delegate.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ var _ memberlist.EventDelegate = (*EventDelegate)(nil)
2626
type EventDelegate struct {
2727
discovery *ServiceDiscovery
2828
role NodeRole
29+
addrPort string // only used for logging
2930
}
3031

31-
func NewEventDelegate(discovery *ServiceDiscovery, role NodeRole) *EventDelegate {
32-
return &EventDelegate{discovery: discovery, role: role}
32+
func NewEventDelegate(discovery *ServiceDiscovery, role NodeRole, addrPort string) *EventDelegate {
33+
return &EventDelegate{discovery: discovery, role: role, addrPort: addrPort}
3334
}
3435

3536
// NotifyJoin node joins
@@ -39,9 +40,9 @@ func (e *EventDelegate) NotifyJoin(node *memberlist.Node) {
3940
if err := pb.Unmarshal(node.Meta, &meta); err == nil {
4041
e.discovery.UpdateServer(node.Name, &meta)
4142
if e.role == RoleClient {
42-
log.Printf("[CLIENT-WATCH] Server joined: %s (RG: %s, AZ: %s, Endpoint: %s)", node.Name, meta.ResourceGroup, meta.Az, meta.Endpoint)
43+
log.Printf("[CLIENT-WATCH] Server[%s] joined: %s (RG: %s, AZ: %s, Endpoint: %s)", e.addrPort, node.Name, meta.ResourceGroup, meta.Az, meta.Endpoint)
4344
} else {
44-
log.Printf("[SERVER-EVENT] Server joined: %s (RG: %s, AZ: %s)", node.Name, meta.ResourceGroup, meta.Az)
45+
log.Printf("[SERVER-EVENT] Server[%s] joined: %s (RG: %s, AZ: %s)", e.addrPort, node.Name, meta.ResourceGroup, meta.Az)
4546
}
4647
}
4748
} else { // no meta, indicating a client role joining

common/membership/server_node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func NewServerNode(config *ServerConfig) (*ServerNode, error) {
8888
}
8989
discovery := NewServiceDiscovery()
9090
delegate := NewServerDelegate(meta)
91-
eventDel := NewEventDelegate(discovery, RoleServer)
91+
eventDel := NewEventDelegate(discovery, RoleServer, fmt.Sprintf("%s:%d", endpointAddr, endpointPort))
9292

9393
mlConfig := ml.DefaultLocalConfig()
9494
mlConfig.Name = config.NodeID // unique identifier name for a node

server/service.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/zilliztech/woodpecker/common/channel"
3232
"github.com/zilliztech/woodpecker/common/config"
33+
"github.com/zilliztech/woodpecker/common/funcutil"
3334
"github.com/zilliztech/woodpecker/common/logger"
3435
"github.com/zilliztech/woodpecker/common/membership"
3536
wpNet "github.com/zilliztech/woodpecker/common/net"
@@ -42,7 +43,7 @@ type Server struct {
4243
cfg *config.Configuration
4344
serverNode *membership.ServerNode
4445
serverConfig *membership.ServerConfig // Configuration to be used for creating server node
45-
seeds []string // Seeds for cluster joining
46+
gossipSeeds []string // Seeds for cluster joining
4647
logStore LogStore
4748
grpcWG sync.WaitGroup
4849
grpcErrChan chan error
@@ -54,7 +55,7 @@ type Server struct {
5455
}
5556

5657
// NewServer creates a new server instance with same bind/advertise ip/port
57-
func NewServer(ctx context.Context, configuration *config.Configuration, bindPort int, servicePort int, seeds []string) *Server {
58+
func NewServer(ctx context.Context, configuration *config.Configuration, bindPort int, servicePort int, gossipSeeds []string) *Server {
5859
return NewServerWithConfig(ctx, configuration, &membership.ServerConfig{
5960
NodeID: "", // Will be set in Prepare()
6061
BindPort: bindPort,
@@ -64,11 +65,11 @@ func NewServer(ctx context.Context, configuration *config.Configuration, bindPor
6465
ResourceGroup: "default", // Default resource group
6566
AZ: "default", // Default availability zone
6667
Tags: map[string]string{"role": "logstore"},
67-
}, seeds)
68+
}, gossipSeeds)
6869
}
6970

7071
// NewServerWithConfig creates a new server instance with custom configuration
71-
func NewServerWithConfig(ctx context.Context, configuration *config.Configuration, serverConfig *membership.ServerConfig, seeds []string) *Server {
72+
func NewServerWithConfig(ctx context.Context, configuration *config.Configuration, serverConfig *membership.ServerConfig, gossipSeeds []string) *Server {
7273
ctx, cancel := context.WithCancel(context.Background())
7374
var storageCli storageclient.ObjectStorage
7475
if configuration.Woodpecker.Storage.IsStorageMinio() || configuration.Woodpecker.Storage.IsStorageService() {
@@ -87,7 +88,7 @@ func NewServerWithConfig(ctx context.Context, configuration *config.Configuratio
8788
s.logStore = NewLogStore(ctx, configuration, storageCli)
8889
// Store the server config and seeds for later use in Prepare()
8990
s.serverConfig = serverConfig
90-
s.seeds = seeds
91+
s.gossipSeeds = gossipSeeds
9192
return s
9293
}
9394

@@ -101,8 +102,8 @@ func (s *Server) Prepare() error {
101102
s.logStore.SetAddress(s.listener.Addr().String())
102103

103104
// Start async join if seeds are provided
104-
if len(s.seeds) > 0 {
105-
go s.asyncStartAndJoinSeeds(s.ctx, s.seeds)
105+
if len(s.gossipSeeds) > 0 {
106+
go s.asyncStartAndJoinSeeds(s.ctx, s.gossipSeeds)
106107
}
107108

108109
return nil
@@ -142,9 +143,13 @@ func (s *Server) startGrpcLoop() {
142143
}
143144
s.grpcServer = grpc.NewServer(grpcOpts...)
144145
proto.RegisterLogStoreServer(s.grpcServer, s)
146+
funcutil.CheckGrpcReady(s.ctx, s.grpcErrChan)
147+
logger.Ctx(s.ctx).Info("start grpc server", zap.String("nodeID", s.serverConfig.NodeID), zap.String("address", s.listener.Addr().String()))
145148
if err := s.grpcServer.Serve(s.listener); err != nil {
149+
logger.Ctx(s.ctx).Error("grpc server failed", zap.Error(err))
146150
s.grpcErrChan <- err
147151
}
152+
logger.Ctx(s.ctx).Info("grpc server stopped", zap.String("nodeID", s.serverConfig.NodeID), zap.String("address", s.logStore.GetAddress()))
148153
}
149154

150155
func (s *Server) start() error {
@@ -157,18 +162,39 @@ func (s *Server) start() error {
157162
}
158163

159164
func (s *Server) Stop() error {
165+
// First, stop accepting new connections by closing the listener
166+
if s.listener != nil {
167+
if err := s.listener.Close(); err != nil {
168+
logger.Ctx(s.ctx).Warn("failed to close listener", zap.Error(err))
169+
}
170+
}
171+
172+
// Leave and shutdown the gossip cluster
160173
if s.serverNode != nil {
174+
// First, notify other nodes we're leaving
161175
leaveErr := s.serverNode.Leave()
162176
if leaveErr != nil {
163177
logger.Ctx(s.ctx).Error("server node leave failed", zap.Error(leaveErr))
164178
}
179+
// Then shutdown the memberlist to release ports immediately
180+
shutdownErr := s.serverNode.Shutdown()
181+
if shutdownErr != nil {
182+
logger.Ctx(s.ctx).Error("server node shutdown failed", zap.Error(shutdownErr))
183+
}
165184
}
185+
186+
// Stop the log store
166187
stopErr := s.logStore.Stop()
167188
if stopErr != nil {
168189
logger.Ctx(s.ctx).Error("log store stop failed", zap.Error(stopErr))
169190
}
191+
192+
// Gracefully stop the gRPC server (wait for in-flight requests)
170193
s.grpcServer.GracefulStop()
194+
195+
// Cancel the context
171196
s.cancel()
197+
172198
logger.Ctx(s.ctx).Info("server stopped", zap.String("nodeID", s.serverConfig.NodeID), zap.String("address", s.logStore.GetAddress()))
173199
return nil
174200
}
@@ -190,7 +216,8 @@ func (s *Server) AddEntry(request *proto.AddEntryRequest, serverStream grpc.Serv
190216
sendErr := serverStream.Send(&proto.AddEntryResponse{
191217
State: proto.AddEntryState_Failed,
192218
EntryId: bufferedId,
193-
Status: werr.Status(err)},
219+
Status: werr.Status(err),
220+
},
194221
)
195222
if sendErr != nil {
196223
logger.Ctx(streamCtx).Warn("failed to send error response", zap.Error(sendErr))
@@ -202,7 +229,8 @@ func (s *Server) AddEntry(request *proto.AddEntryRequest, serverStream grpc.Serv
202229
sendErr := serverStream.Send(&proto.AddEntryResponse{
203230
State: proto.AddEntryState_Buffered,
204231
EntryId: bufferedId,
205-
Status: werr.Success()},
232+
Status: werr.Success(),
233+
},
206234
)
207235
if sendErr != nil {
208236
logger.Ctx(streamCtx).Warn("failed to send buffered response", zap.Error(sendErr))
@@ -218,15 +246,17 @@ func (s *Server) AddEntry(request *proto.AddEntryRequest, serverStream grpc.Serv
218246
sendErr = serverStream.Send(&proto.AddEntryResponse{
219247
State: proto.AddEntryState_Failed,
220248
EntryId: id,
221-
Status: werr.Status(err)},
249+
Status: werr.Status(err),
250+
},
222251
)
223252
return sendErr
224253
}
225254
// persist added entry success
226255
sendErr = serverStream.Send(&proto.AddEntryResponse{
227256
State: proto.AddEntryState_Synced,
228257
EntryId: result.SyncedId,
229-
Status: werr.Success()},
258+
Status: werr.Success(),
259+
},
230260
)
231261
return sendErr // Return nil for normal closure, not the context error
232262
}
@@ -372,16 +402,7 @@ func (s *Server) GetServiceAdvertiseAddrPort(ctx context.Context) string {
372402

373403
// GetAdvertiseAddrPort Use for test only
374404
func (s *Server) GetAdvertiseAddrPort(ctx context.Context) string {
375-
if s.serverNode == nil {
376-
return ""
377-
}
378-
// Get the actual gossip advertise address from the memberlist configuration
379-
ml := s.serverNode.GetMemberlist()
380-
config := ml.LocalNode()
381-
// Use the actual advertise address from memberlist, fall back to bind address if not set
382-
addr := config.Addr
383-
port := config.Port
384-
return fmt.Sprintf("%s:%d", addr, port)
405+
return fmt.Sprintf("%s:%d", s.serverConfig.AdvertiseAddr, s.serverConfig.AdvertisePort)
385406
}
386407

387408
// asyncJoinSeeds continuously monitors and joins missing seed nodes with adaptive backoff [[memory:3527742]]

tests/integration/log_writer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func TestClientLogWriterEtcdFailure(t *testing.T) {
328328

329329
// 9. Verify data integrity with a reader
330330
// Wait for sync before read
331-
flushInterval := cfg.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval
331+
flushInterval := cfg.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval.Milliseconds()
332332
time.Sleep(time.Duration(1000 + flushInterval*int(time.Millisecond)))
333333

334334
reader, err := logHandle2.OpenLogReader(context.Background(), &log.LogMessageId{SegmentId: firstMsgID.SegmentId, EntryId: firstMsgID.EntryId}, "test-reader")
@@ -446,7 +446,7 @@ func TestClientLogWriterRealEtcdServiceFailureManually(t *testing.T) {
446446

447447
// 9. Verify data integrity with a reader
448448
// Wait for sync before read
449-
flushInterval := cfg.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval
449+
flushInterval := cfg.Woodpecker.Logstore.SegmentSyncPolicy.MaxInterval.Milliseconds()
450450
time.Sleep(time.Duration(1000 + flushInterval*int(time.Millisecond)))
451451

452452
reader, err := logHandle2.OpenLogReader(context.Background(), &log.LogMessageId{SegmentId: firstMsgID.SegmentId, EntryId: firstMsgID.EntryId}, "test-reader")

0 commit comments

Comments
 (0)