Skip to content

Commit fa2eb35

Browse files
committed
pubsub: Receive does not retry ResourceExhausted errors
The reasoning here is that StreamingPull should be a long-lived RPC. If a user is seeing ResourceExhausted from StreamingPull, it's likely to be something that they want to act on (increase quota, not over-deploy, etc). As part of this CL, retrying in pubsub gets re-wired to use the gax.Retryer pattern. This allows us to pass call options to call, which means that for the streamingPull call we can pass a custom retryer that does not retry ResourceExhausted. As part of this CL, pstest's GServer is exposed. This makes it easier to create fakes with slightly different semantics, which is a bit more scalable long-term than exposing knobs for everything testers might want to do. Fixes #1166 Change-Id: I2073607bcae410e49b0d139859d9b7c48e065a3c Reviewed-on: https://code-review.googlesource.com/c/36050 Reviewed-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Eno Compton <enocom@google.com>
1 parent 3cd5a7b commit fa2eb35

6 files changed

Lines changed: 159 additions & 57 deletions

File tree

internal/kokoro/vet.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ golint ./... 2>&1 | ( \
4141
grep -vE "exported const AllUsers|AllAuthenticatedUsers|RoleOwner|SSD|HDD|PRODUCTION|DEVELOPMENT should have comment" | \
4242
grep -v "exported func Value returns unexported type pretty.val, which can be annoying to use" | \
4343
grep -v "ExecuteStreamingSql" | \
44+
grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" | \
4445
grep -v "ClusterId" | \
4546
grep -v "InstanceId" | \
4647
grep -v "firestore.arrayUnion" | \

pubsub/pstest/fake.go

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@ func timeNow() time.Time {
6060
// Server is a fake Pub/Sub server.
6161
type Server struct {
6262
srv *testutil.Server
63-
Addr string // The address that the server is listening on.
64-
gServer gServer
63+
Addr string // The address that the server is listening on.
64+
GServer GServer // Not intended to be used directly.
6565
}
6666

67-
type gServer struct {
67+
// GServer is the underlying service implementor. It is not intended to be used
68+
// directly.
69+
type GServer struct {
6870
pb.PublisherServer
6971
pb.SubscriberServer
7072

@@ -87,14 +89,14 @@ func NewServer() *Server {
8789
s := &Server{
8890
srv: srv,
8991
Addr: srv.Addr,
90-
gServer: gServer{
92+
GServer: GServer{
9193
topics: map[string]*topic{},
9294
subs: map[string]*subscription{},
9395
msgsByID: map[string]*Message{},
9496
},
9597
}
96-
pb.RegisterPublisherServer(srv.Gsrv, &s.gServer)
97-
pb.RegisterSubscriberServer(srv.Gsrv, &s.gServer)
98+
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
99+
pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer)
98100
srv.Start()
99101
return s
100102
}
@@ -113,12 +115,12 @@ func (s *Server) Publish(topic string, data []byte, attrs map[string]string) str
113115
if !ok {
114116
panic(fmt.Sprintf("topic name must be of the form %q", topicPattern))
115117
}
116-
_, _ = s.gServer.CreateTopic(context.TODO(), &pb.Topic{Name: topic})
118+
_, _ = s.GServer.CreateTopic(context.TODO(), &pb.Topic{Name: topic})
117119
req := &pb.PublishRequest{
118120
Topic: topic,
119121
Messages: []*pb.PubsubMessage{{Data: data, Attributes: attrs}},
120122
}
121-
res, err := s.gServer.Publish(context.TODO(), req)
123+
res, err := s.GServer.Publish(context.TODO(), req)
122124
if err != nil {
123125
panic(fmt.Sprintf("pstest.Server.Publish: %v", err))
124126
}
@@ -130,9 +132,9 @@ func (s *Server) Publish(topic string, data []byte, attrs map[string]string) str
130132
// minutes. If SetStreamTimeout is never called or is passed zero, streams never shut
131133
// down.
132134
func (s *Server) SetStreamTimeout(d time.Duration) {
133-
s.gServer.mu.Lock()
134-
defer s.gServer.mu.Unlock()
135-
s.gServer.streamTimeout = d
135+
s.GServer.mu.Lock()
136+
defer s.GServer.mu.Unlock()
137+
s.GServer.streamTimeout = d
136138
}
137139

138140
// A Message is a message that was published to the server.
@@ -160,11 +162,11 @@ type Modack struct {
160162

161163
// Messages returns information about all messages ever published.
162164
func (s *Server) Messages() []*Message {
163-
s.gServer.mu.Lock()
164-
defer s.gServer.mu.Unlock()
165+
s.GServer.mu.Lock()
166+
defer s.GServer.mu.Unlock()
165167

166168
var msgs []*Message
167-
for _, m := range s.gServer.msgs {
169+
for _, m := range s.GServer.msgs {
168170
m.Deliveries = m.deliveries
169171
m.Acks = m.acks
170172
msgs = append(msgs, m)
@@ -175,10 +177,10 @@ func (s *Server) Messages() []*Message {
175177
// Message returns the message with the given ID, or nil if no message
176178
// with that ID was published.
177179
func (s *Server) Message(id string) *Message {
178-
s.gServer.mu.Lock()
179-
defer s.gServer.mu.Unlock()
180+
s.GServer.mu.Lock()
181+
defer s.GServer.mu.Unlock()
180182

181-
m := s.gServer.msgsByID[id]
183+
m := s.GServer.msgsByID[id]
182184
if m != nil {
183185
m.Deliveries = m.deliveries
184186
m.Acks = m.acks
@@ -188,21 +190,21 @@ func (s *Server) Message(id string) *Message {
188190

189191
// Wait blocks until all server activity has completed.
190192
func (s *Server) Wait() {
191-
s.gServer.wg.Wait()
193+
s.GServer.wg.Wait()
192194
}
193195

194196
// Close shuts down the server and releases all resources.
195197
func (s *Server) Close() error {
196198
s.srv.Close()
197-
s.gServer.mu.Lock()
198-
defer s.gServer.mu.Unlock()
199-
for _, sub := range s.gServer.subs {
199+
s.GServer.mu.Lock()
200+
defer s.GServer.mu.Unlock()
201+
for _, sub := range s.GServer.subs {
200202
sub.stop()
201203
}
202204
return nil
203205
}
204206

205-
func (s *gServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error) {
207+
func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error) {
206208
s.mu.Lock()
207209
defer s.mu.Unlock()
208210

@@ -214,7 +216,7 @@ func (s *gServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)
214216
return top.proto, nil
215217
}
216218

217-
func (s *gServer) GetTopic(_ context.Context, req *pb.GetTopicRequest) (*pb.Topic, error) {
219+
func (s *GServer) GetTopic(_ context.Context, req *pb.GetTopicRequest) (*pb.Topic, error) {
218220
s.mu.Lock()
219221
defer s.mu.Unlock()
220222

@@ -224,7 +226,7 @@ func (s *gServer) GetTopic(_ context.Context, req *pb.GetTopicRequest) (*pb.Topi
224226
return nil, status.Errorf(codes.NotFound, "topic %q", req.Topic)
225227
}
226228

227-
func (s *gServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*pb.Topic, error) {
229+
func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*pb.Topic, error) {
228230
s.mu.Lock()
229231
defer s.mu.Unlock()
230232

@@ -245,7 +247,7 @@ func (s *gServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
245247
return t.proto, nil
246248
}
247249

248-
func (s *gServer) ListTopics(_ context.Context, req *pb.ListTopicsRequest) (*pb.ListTopicsResponse, error) {
250+
func (s *GServer) ListTopics(_ context.Context, req *pb.ListTopicsRequest) (*pb.ListTopicsResponse, error) {
249251
s.mu.Lock()
250252
defer s.mu.Unlock()
251253

@@ -267,7 +269,7 @@ func (s *gServer) ListTopics(_ context.Context, req *pb.ListTopicsRequest) (*pb.
267269
return res, nil
268270
}
269271

270-
func (s *gServer) ListTopicSubscriptions(_ context.Context, req *pb.ListTopicSubscriptionsRequest) (*pb.ListTopicSubscriptionsResponse, error) {
272+
func (s *GServer) ListTopicSubscriptions(_ context.Context, req *pb.ListTopicSubscriptionsRequest) (*pb.ListTopicSubscriptionsResponse, error) {
271273
s.mu.Lock()
272274
defer s.mu.Unlock()
273275

@@ -288,7 +290,7 @@ func (s *gServer) ListTopicSubscriptions(_ context.Context, req *pb.ListTopicSub
288290
}, nil
289291
}
290292

291-
func (s *gServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*emptypb.Empty, error) {
293+
func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*emptypb.Empty, error) {
292294
s.mu.Lock()
293295
defer s.mu.Unlock()
294296

@@ -301,7 +303,7 @@ func (s *gServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*e
301303
return &emptypb.Empty{}, nil
302304
}
303305

304-
func (s *gServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*pb.Subscription, error) {
306+
func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*pb.Subscription, error) {
305307
s.mu.Lock()
306308
defer s.mu.Unlock()
307309

@@ -382,7 +384,7 @@ func checkMRD(pmrd *durpb.Duration) error {
382384
return nil
383385
}
384386

385-
func (s *gServer) GetSubscription(_ context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error) {
387+
func (s *GServer) GetSubscription(_ context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error) {
386388
s.mu.Lock()
387389
defer s.mu.Unlock()
388390
sub, err := s.findSubscription(req.Subscription)
@@ -392,7 +394,7 @@ func (s *gServer) GetSubscription(_ context.Context, req *pb.GetSubscriptionRequ
392394
return sub.proto, nil
393395
}
394396

395-
func (s *gServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error) {
397+
func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error) {
396398
if req.Subscription == nil {
397399
return nil, status.Errorf(codes.InvalidArgument, "missing subscription")
398400
}
@@ -433,7 +435,7 @@ func (s *gServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
433435
return sub.proto, nil
434436
}
435437

436-
func (s *gServer) ListSubscriptions(_ context.Context, req *pb.ListSubscriptionsRequest) (*pb.ListSubscriptionsResponse, error) {
438+
func (s *GServer) ListSubscriptions(_ context.Context, req *pb.ListSubscriptionsRequest) (*pb.ListSubscriptionsResponse, error) {
437439
s.mu.Lock()
438440
defer s.mu.Unlock()
439441

@@ -455,7 +457,7 @@ func (s *gServer) ListSubscriptions(_ context.Context, req *pb.ListSubscriptions
455457
return res, nil
456458
}
457459

458-
func (s *gServer) DeleteSubscription(_ context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) {
460+
func (s *GServer) DeleteSubscription(_ context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error) {
459461
s.mu.Lock()
460462
defer s.mu.Unlock()
461463
sub, err := s.findSubscription(req.Subscription)
@@ -468,7 +470,7 @@ func (s *gServer) DeleteSubscription(_ context.Context, req *pb.DeleteSubscripti
468470
return &emptypb.Empty{}, nil
469471
}
470472

471-
func (s *gServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error) {
473+
func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error) {
472474
s.mu.Lock()
473475
defer s.mu.Unlock()
474476

@@ -586,7 +588,7 @@ func (s *subscription) stop() {
586588
close(s.done)
587589
}
588590

589-
func (s *gServer) Acknowledge(_ context.Context, req *pb.AcknowledgeRequest) (*emptypb.Empty, error) {
591+
func (s *GServer) Acknowledge(_ context.Context, req *pb.AcknowledgeRequest) (*emptypb.Empty, error) {
590592
s.mu.Lock()
591593
defer s.mu.Unlock()
592594

@@ -600,7 +602,7 @@ func (s *gServer) Acknowledge(_ context.Context, req *pb.AcknowledgeRequest) (*e
600602
return &emptypb.Empty{}, nil
601603
}
602604

603-
func (s *gServer) ModifyAckDeadline(_ context.Context, req *pb.ModifyAckDeadlineRequest) (*emptypb.Empty, error) {
605+
func (s *GServer) ModifyAckDeadline(_ context.Context, req *pb.ModifyAckDeadlineRequest) (*emptypb.Empty, error) {
604606
s.mu.Lock()
605607
defer s.mu.Unlock()
606608
sub, err := s.findSubscription(req.Subscription)
@@ -618,7 +620,7 @@ func (s *gServer) ModifyAckDeadline(_ context.Context, req *pb.ModifyAckDeadline
618620
return &emptypb.Empty{}, nil
619621
}
620622

621-
func (s *gServer) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullResponse, error) {
623+
func (s *GServer) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullResponse, error) {
622624
s.mu.Lock()
623625
sub, err := s.findSubscription(req.Subscription)
624626
if err != nil {
@@ -655,7 +657,7 @@ func (s *gServer) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullRespon
655657
return &pb.PullResponse{ReceivedMessages: msgs}, nil
656658
}
657659

658-
func (s *gServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error {
660+
func (s *GServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error {
659661
// Receive initial message configuring the pull.
660662
req, err := sps.Recv()
661663
if err != nil {
@@ -674,7 +676,7 @@ func (s *gServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error {
674676
return err
675677
}
676678

677-
func (s *gServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekResponse, error) {
679+
func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekResponse, error) {
678680
// Only handle time-based seeking for now.
679681
// This fake doesn't deal with snapshots.
680682
var target time.Time
@@ -729,7 +731,7 @@ func (s *gServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekRespon
729731

730732
// Gets a subscription that must exist.
731733
// Must be called with the lock held.
732-
func (s *gServer) findSubscription(name string) (*subscription, error) {
734+
func (s *GServer) findSubscription(name string) (*subscription, error) {
733735
if name == "" {
734736
return nil, status.Errorf(codes.InvalidArgument, "missing subscription")
735737
}

pubsub/pstest/fake_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestTopics(t *testing.T) {
4242
Labels: map[string]string{"num": fmt.Sprintf("%d", i)},
4343
}))
4444
}
45-
if got, want := len(server.gServer.topics), len(topics); got != want {
45+
if got, want := len(server.GServer.topics), len(topics); got != want {
4646
t.Fatalf("got %d topics, want %d", got, want)
4747
}
4848
for _, top := range topics {
@@ -68,7 +68,7 @@ func TestTopics(t *testing.T) {
6868
t.Fatal(err)
6969
}
7070
}
71-
if got, want := len(server.gServer.topics), 0; got != want {
71+
if got, want := len(server.GServer.topics), 0; got != want {
7272
t.Fatalf("got %d topics, want %d", got, want)
7373
}
7474
}
@@ -88,7 +88,7 @@ func TestSubscriptions(t *testing.T) {
8888
}))
8989
}
9090

91-
if got, want := len(server.gServer.subs), len(subs); got != want {
91+
if got, want := len(server.GServer.subs), len(subs); got != want {
9292
t.Fatalf("got %d subscriptions, want %d", got, want)
9393
}
9494
for _, s := range subs {
@@ -128,7 +128,7 @@ func TestSubscriptions(t *testing.T) {
128128
t.Fatal(err)
129129
}
130130
}
131-
if got, want := len(server.gServer.subs), 0; got != want {
131+
if got, want := len(server.GServer.subs), 0; got != want {
132132
t.Fatalf("got %d subscriptions, want %d", got, want)
133133
}
134134
}

pubsub/pullstream.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,14 @@ func (s *pullStream) get(spc *pb.Subscriber_StreamingPullClient) (*pb.Subscriber
9595
}
9696

9797
func (s *pullStream) openWithRetry() (pb.Subscriber_StreamingPullClient, error) {
98-
var bo gax.Backoff
98+
r := defaultRetryer{}
9999
for {
100100
recordStat(s.ctx, StreamOpenCount, 1)
101101
spc, err := s.open()
102-
if err != nil && isRetryable(err) {
102+
bo, shouldRetry := r.Retry(err)
103+
if err != nil && shouldRetry {
103104
recordStat(s.ctx, StreamRetryCount, 1)
104-
if err := gax.Sleep(s.ctx, bo.Pause()); err != nil {
105+
if err := gax.Sleep(s.ctx, bo); err != nil {
105106
return nil, err
106107
}
107108
continue
@@ -110,11 +111,19 @@ func (s *pullStream) openWithRetry() (pb.Subscriber_StreamingPullClient, error)
110111
}
111112
}
112113

113-
func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error) error {
114+
func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error, opts ...gax.CallOption) error {
115+
var settings gax.CallSettings
116+
for _, opt := range opts {
117+
opt.Resolve(&settings)
118+
}
119+
var r gax.Retryer = &defaultRetryer{}
120+
if settings.Retry != nil {
121+
r = settings.Retry()
122+
}
123+
114124
var (
115125
spc *pb.Subscriber_StreamingPullClient
116126
err error
117-
bo gax.Backoff
118127
)
119128
for {
120129
spc, err = s.get(spc)
@@ -124,10 +133,11 @@ func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error) error
124133
start := time.Now()
125134
err = f(*spc)
126135
if err != nil {
127-
if isRetryable(err) {
136+
bo, shouldRetry := r.Retry(err)
137+
if shouldRetry {
128138
recordStat(s.ctx, StreamRetryCount, 1)
129139
if time.Since(start) < 30*time.Second { // don't sleep if we've been blocked for a while
130-
if err := gax.Sleep(s.ctx, bo.Pause()); err != nil {
140+
if err := gax.Sleep(s.ctx, bo); err != nil {
131141
return err
132142
}
133143
}
@@ -167,7 +177,7 @@ func (s *pullStream) Recv() (*pb.StreamingPullResponse, error) {
167177
recordStat(s.ctx, PullCount, int64(len(res.ReceivedMessages)))
168178
}
169179
return err
170-
})
180+
}, gax.WithRetry(func() gax.Retryer { return &streamingPullRetryer{defaultRetryer: &defaultRetryer{}} }))
171181
return res, err
172182
}
173183

0 commit comments

Comments
 (0)