Skip to content

Commit 6f935ce

Browse files
authored
Add runtime per-bucket pub/sub notification configuration API (#2157)
* add runtime per-bucket pub/sub notification configuration API * chore: lint fix * chore(fix) goroutine leak, close all clients on stop and some nitpicks * cr: always set standard GCS attributes regardless of PayloadFormat * cr: use t.Cleanup(srv.Stop) before returning * cr: extract event trigger helper * empty commit, re-trigger build
1 parent 7aca559 commit 6f935ce

File tree

8 files changed

+603
-37
lines changed

8 files changed

+603
-37
lines changed

fakestorage/bucket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func (s *Server) deleteBucket(r *http.Request) jsonResponse {
166166
if err != nil {
167167
return jsonResponse{status: http.StatusInternalServerError, errorMessage: err.Error()}
168168
}
169+
s.notificationRegistry.DeleteBucket(bucketName)
169170
return jsonResponse{}
170171
}
171172

fakestorage/notification.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package fakestorage
2+
3+
import (
4+
"encoding/json"
5+
"net/http"
6+
7+
"github.com/fsouza/fake-gcs-server/internal/notification"
8+
"github.com/gorilla/mux"
9+
)
10+
11+
func (s *Server) insertNotification(r *http.Request) jsonResponse {
12+
bucketName := unescapeMuxVars(mux.Vars(r))["bucketName"]
13+
14+
if _, err := s.backend.GetBucket(bucketName); err != nil {
15+
return jsonResponse{status: http.StatusNotFound}
16+
}
17+
18+
var cfg notification.NotificationConfig
19+
if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil {
20+
return jsonResponse{status: http.StatusBadRequest, errorMessage: err.Error()}
21+
}
22+
if cfg.Topic == "" {
23+
return jsonResponse{status: http.StatusBadRequest, errorMessage: "topic is required"}
24+
}
25+
26+
created := s.notificationRegistry.Insert(bucketName, cfg)
27+
return jsonResponse{status: http.StatusCreated, data: created}
28+
}
29+
30+
func (s *Server) getNotification(r *http.Request) jsonResponse {
31+
vars := unescapeMuxVars(mux.Vars(r))
32+
bucketName := vars["bucketName"]
33+
notificationID := vars["notificationId"]
34+
35+
cfg, ok := s.notificationRegistry.Get(bucketName, notificationID)
36+
if !ok {
37+
return jsonResponse{status: http.StatusNotFound}
38+
}
39+
return jsonResponse{data: cfg}
40+
}
41+
42+
func (s *Server) listNotifications(r *http.Request) jsonResponse {
43+
bucketName := unescapeMuxVars(mux.Vars(r))["bucketName"]
44+
45+
if _, err := s.backend.GetBucket(bucketName); err != nil {
46+
return jsonResponse{status: http.StatusNotFound}
47+
}
48+
49+
cfgs := s.notificationRegistry.List(bucketName)
50+
if cfgs == nil {
51+
cfgs = []notification.NotificationConfig{}
52+
}
53+
return jsonResponse{data: map[string]interface{}{"kind": "storage#notifications", "items": cfgs}}
54+
}
55+
56+
func (s *Server) deleteNotification(r *http.Request) jsonResponse {
57+
vars := unescapeMuxVars(mux.Vars(r))
58+
bucketName := vars["bucketName"]
59+
notificationID := vars["notificationId"]
60+
61+
if !s.notificationRegistry.Delete(bucketName, notificationID) {
62+
return jsonResponse{status: http.StatusNotFound}
63+
}
64+
return jsonResponse{status: http.StatusNoContent}
65+
}

fakestorage/notification_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package fakestorage
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"testing"
9+
10+
"github.com/fsouza/fake-gcs-server/internal/notification"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func newNotificationServer(t *testing.T) *Server {
16+
t.Helper()
17+
srv, err := NewServerWithOptions(Options{NoListener: true})
18+
require.NoError(t, err)
19+
srv.CreateBucketWithOpts(CreateBucketOpts{Name: "test-bucket"})
20+
t.Cleanup(srv.Stop)
21+
return srv
22+
}
23+
24+
func postNotification(t *testing.T, client *http.Client, bucket string, cfg notification.NotificationConfig) (*http.Response, notification.NotificationConfig) {
25+
t.Helper()
26+
body, _ := json.Marshal(cfg)
27+
resp, err := client.Post(
28+
fmt.Sprintf("https://storage.googleapis.com/storage/v1/b/%s/notificationConfigs", bucket),
29+
"application/json",
30+
bytes.NewReader(body),
31+
)
32+
require.NoError(t, err)
33+
defer resp.Body.Close()
34+
var created notification.NotificationConfig
35+
if resp.StatusCode == http.StatusCreated {
36+
require.NoError(t, json.NewDecoder(resp.Body).Decode(&created))
37+
}
38+
return resp, created
39+
}
40+
41+
func TestInsertNotification(t *testing.T) {
42+
srv := newNotificationServer(t)
43+
cfg := notification.NotificationConfig{Topic: "projects/p/topics/t", PayloadFormat: "JSON_API_V1"}
44+
45+
resp, created := postNotification(t, srv.HTTPClient(), "test-bucket", cfg)
46+
assert.Equal(t, http.StatusCreated, resp.StatusCode)
47+
assert.NotEmpty(t, created.ID)
48+
assert.Equal(t, "storage#notification", created.Kind)
49+
assert.Equal(t, cfg.Topic, created.Topic)
50+
}
51+
52+
func TestInsertNotification_BucketNotFound(t *testing.T) {
53+
srv := newNotificationServer(t)
54+
cfg := notification.NotificationConfig{Topic: "projects/p/topics/t"}
55+
resp, _ := postNotification(t, srv.HTTPClient(), "no-such-bucket", cfg)
56+
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
57+
}
58+
59+
func TestInsertNotification_MissingTopic(t *testing.T) {
60+
srv := newNotificationServer(t)
61+
resp, _ := postNotification(t, srv.HTTPClient(), "test-bucket", notification.NotificationConfig{})
62+
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
63+
}
64+
65+
func TestGetNotification(t *testing.T) {
66+
srv := newNotificationServer(t)
67+
cfg := notification.NotificationConfig{Topic: "projects/p/topics/t"}
68+
_, inserted := postNotification(t, srv.HTTPClient(), "test-bucket", cfg)
69+
70+
resp, err := srv.HTTPClient().Get(fmt.Sprintf(
71+
"https://storage.googleapis.com/storage/v1/b/test-bucket/notificationConfigs/%s",
72+
inserted.ID,
73+
))
74+
require.NoError(t, err)
75+
defer resp.Body.Close()
76+
assert.Equal(t, http.StatusOK, resp.StatusCode)
77+
78+
var got notification.NotificationConfig
79+
require.NoError(t, json.NewDecoder(resp.Body).Decode(&got))
80+
assert.Equal(t, inserted.ID, got.ID)
81+
assert.Equal(t, cfg.Topic, got.Topic)
82+
}
83+
84+
func TestGetNotification_NotFound(t *testing.T) {
85+
srv := newNotificationServer(t)
86+
resp, err := srv.HTTPClient().Get(
87+
"https://storage.googleapis.com/storage/v1/b/test-bucket/notificationConfigs/9999",
88+
)
89+
require.NoError(t, err)
90+
defer resp.Body.Close()
91+
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
92+
}
93+
94+
func TestListNotifications(t *testing.T) {
95+
srv := newNotificationServer(t)
96+
client := srv.HTTPClient()
97+
listURL := "https://storage.googleapis.com/storage/v1/b/test-bucket/notificationConfigs"
98+
cfg := notification.NotificationConfig{Topic: "projects/p/topics/t"}
99+
100+
// empty list
101+
resp, err := client.Get(listURL)
102+
require.NoError(t, err)
103+
defer resp.Body.Close()
104+
assert.Equal(t, http.StatusOK, resp.StatusCode)
105+
var listResp struct {
106+
Items []notification.NotificationConfig `json:"items"`
107+
}
108+
require.NoError(t, json.NewDecoder(resp.Body).Decode(&listResp))
109+
assert.Empty(t, listResp.Items)
110+
111+
// insert two
112+
for i := 0; i < 2; i++ {
113+
postNotification(t, client, "test-bucket", cfg)
114+
}
115+
116+
resp2, err := client.Get(listURL)
117+
require.NoError(t, err)
118+
defer resp2.Body.Close()
119+
require.NoError(t, json.NewDecoder(resp2.Body).Decode(&listResp))
120+
assert.Len(t, listResp.Items, 2)
121+
}
122+
123+
func TestDeleteNotification(t *testing.T) {
124+
srv := newNotificationServer(t)
125+
client := srv.HTTPClient()
126+
cfg := notification.NotificationConfig{Topic: "projects/p/topics/t"}
127+
_, inserted := postNotification(t, client, "test-bucket", cfg)
128+
129+
deleteURL := fmt.Sprintf(
130+
"https://storage.googleapis.com/storage/v1/b/test-bucket/notificationConfigs/%s",
131+
inserted.ID,
132+
)
133+
134+
req, _ := http.NewRequest(http.MethodDelete, deleteURL, nil)
135+
resp, err := client.Do(req)
136+
require.NoError(t, err)
137+
resp.Body.Close()
138+
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
139+
140+
// second delete → 404
141+
req2, _ := http.NewRequest(http.MethodDelete, deleteURL, nil)
142+
resp2, err := client.Do(req2)
143+
require.NoError(t, err)
144+
resp2.Body.Close()
145+
assert.Equal(t, http.StatusNotFound, resp2.StatusCode)
146+
}

fakestorage/object.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package fakestorage
77
import (
88
"bytes"
99
"compress/gzip"
10+
"context"
1011
"encoding/json"
1112
"encoding/xml"
1213
"errors"
@@ -328,17 +329,22 @@ func (s *Server) createObject(obj StreamingObject, conditions backend.Conditions
328329

329330
bucket, _ := s.backend.GetBucket(obj.BucketName)
330331
if bucket.VersioningEnabled {
331-
s.eventManager.Trigger(&oldBackendObj, notification.EventArchive, oldObjEventAttr)
332+
s.triggerEvent(&oldBackendObj, notification.EventArchive, oldObjEventAttr)
332333
} else {
333-
s.eventManager.Trigger(&oldBackendObj, notification.EventDelete, oldObjEventAttr)
334+
s.triggerEvent(&oldBackendObj, notification.EventDelete, oldObjEventAttr)
334335
}
335336
}
336337

337338
newObj := fromBackendObjects([]backend.StreamingObject{newBackendObj})[0]
338-
s.eventManager.Trigger(&newBackendObj, notification.EventFinalize, newObjEventAttr)
339+
s.triggerEvent(&newBackendObj, notification.EventFinalize, newObjEventAttr)
339340
return newObj, nil
340341
}
341342

343+
func (s *Server) triggerEvent(obj *backend.StreamingObject, eventType notification.EventType, attrs map[string]string) {
344+
s.eventManager.Trigger(obj, eventType, attrs)
345+
s.notificationRegistry.Trigger(context.Background(), obj, eventType, attrs)
346+
}
347+
342348
type ListOptions struct {
343349
Prefix string
344350
Delimiter string
@@ -803,9 +809,9 @@ func (s *Server) deleteObject(r *http.Request) jsonResponse {
803809
bucket, _ := s.backend.GetBucket(obj.BucketName)
804810
backendObj := toBackendObjects([]StreamingObject{obj})[0]
805811
if bucket.VersioningEnabled {
806-
s.eventManager.Trigger(&backendObj, notification.EventArchive, nil)
812+
s.triggerEvent(&backendObj, notification.EventArchive, nil)
807813
} else {
808-
s.eventManager.Trigger(&backendObj, notification.EventDelete, nil)
814+
s.triggerEvent(&backendObj, notification.EventDelete, nil)
809815
}
810816
return jsonResponse{}
811817
}
@@ -1298,7 +1304,7 @@ func (s *Server) patchObject(r *http.Request) jsonResponse {
12981304
}
12991305
defer backendObj.Close()
13001306

1301-
s.eventManager.Trigger(&backendObj, notification.EventMetadata, nil)
1307+
s.triggerEvent(&backendObj, notification.EventMetadata, nil)
13021308
return jsonResponse{data: fromBackendObjects([]backend.StreamingObject{backendObj})[0]}
13031309
}
13041310

@@ -1365,7 +1371,7 @@ func (s *Server) updateObject(r *http.Request) jsonResponse {
13651371
}
13661372
defer backendObj.Close()
13671373

1368-
s.eventManager.Trigger(&backendObj, notification.EventMetadata, nil)
1374+
s.triggerEvent(&backendObj, notification.EventMetadata, nil)
13691375
return jsonResponse{data: fromBackendObjects([]backend.StreamingObject{backendObj})[0]}
13701376
}
13711377

@@ -1423,7 +1429,7 @@ func (s *Server) composeObject(r *http.Request) jsonResponse {
14231429

14241430
obj := fromBackendObjects([]backend.StreamingObject{backendObj})[0]
14251431

1426-
s.eventManager.Trigger(&backendObj, notification.EventFinalize, nil)
1432+
s.triggerEvent(&backendObj, notification.EventFinalize, nil)
14271433

14281434
return jsonResponse{data: newObjectResponse(obj.ObjectAttrs, urlhelper.GetBaseURL(r))}
14291435
}

fakestorage/server.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ const defaultPublicHost = "storage.googleapis.com"
4242
//
4343
// It provides a fake implementation of the Google Cloud Storage API.
4444
type Server struct {
45-
backend backend.Storage
46-
uploads sync.Map
47-
transport *muxTransport
48-
ts *httptest.Server
49-
handler http.Handler
50-
options Options
51-
externalURL string
52-
publicHost string
53-
eventManager notification.EventManager
45+
backend backend.Storage
46+
uploads sync.Map
47+
transport *muxTransport
48+
ts *httptest.Server
49+
handler http.Handler
50+
options Options
51+
externalURL string
52+
publicHost string
53+
eventManager notification.EventManager
54+
notificationRegistry *notification.NotificationRegistry
5455
}
5556

5657
// NewServer creates a new instance of the server, pre-loaded with the given
@@ -220,12 +221,13 @@ func newServer(options Options) (*Server, error) {
220221
}
221222

222223
s := Server{
223-
backend: backendStorage,
224-
uploads: sync.Map{},
225-
externalURL: options.ExternalURL,
226-
publicHost: publicHost,
227-
options: options,
228-
eventManager: &notification.PubsubEventManager{},
224+
backend: backendStorage,
225+
uploads: sync.Map{},
226+
externalURL: options.ExternalURL,
227+
publicHost: publicHost,
228+
options: options,
229+
eventManager: &notification.PubsubEventManager{},
230+
notificationRegistry: notification.NewNotificationRegistry(options.Writer),
229231
}
230232
s.buildMuxer()
231233
_, err = s.seed()
@@ -282,6 +284,10 @@ func (s *Server) buildMuxer() {
282284
r.Path("/b/{sourceBucket}/o/{sourceObject:.+}/{copyType:rewriteTo|copyTo}/b/{destinationBucket}/o/{destinationObject:.+}").Methods(http.MethodPost).HandlerFunc(jsonToHTTPHandler(s.rewriteObject))
283285
r.Path("/b/{bucketName}/o/{destinationObject:.+}/compose").Methods(http.MethodPost).HandlerFunc(jsonToHTTPHandler(s.composeObject))
284286
r.Path("/b/{bucketName}/o/{objectName:.+}").Methods(http.MethodPut, http.MethodPost).HandlerFunc(jsonToHTTPHandler(s.updateObject))
287+
r.Path("/b/{bucketName}/notificationConfigs").Methods(http.MethodPost).HandlerFunc(jsonToHTTPHandler(s.insertNotification))
288+
r.Path("/b/{bucketName}/notificationConfigs").Methods(http.MethodGet).HandlerFunc(jsonToHTTPHandler(s.listNotifications))
289+
r.Path("/b/{bucketName}/notificationConfigs/{notificationId}").Methods(http.MethodGet).HandlerFunc(jsonToHTTPHandler(s.getNotification))
290+
r.Path("/b/{bucketName}/notificationConfigs/{notificationId}").Methods(http.MethodDelete).HandlerFunc(jsonToHTTPHandler(s.deleteNotification))
285291
}
286292

287293
// Internal / update server configuration
@@ -458,6 +464,7 @@ func (s *Server) Stop() {
458464
if s.ts != nil {
459465
s.ts.Close()
460466
}
467+
s.notificationRegistry.Close()
461468
}
462469

463470
// URL returns the server URL.

0 commit comments

Comments
 (0)