Skip to content

Commit 0faad24

Browse files
committed
fix(adapter): eliminate data race on shared event field
The listener struct had a shared `event *info.Event` field that was written to by concurrent HTTP request handlers. When two requests arrived simultaneously (e.g. a push event and an incoming webhook), both would write to l.event, causing the push handler to read the incoming handler's event data. This resulted in the push event being processed as event_type=incoming, matching the wrong PipelineRun triggers and creating duplicate PipelineRuns. Remove `event` from the listener struct and make it a local variable per request in handleEvent. Pass it as a parameter to detectIncoming and processIncoming so each concurrent request operates on its own isolated event object. Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com> Assisted-by: Claude Opus 4.6 (via Claude Code)
1 parent 1d04509 commit 0faad24

File tree

3 files changed

+36
-36
lines changed

3 files changed

+36
-36
lines changed

pkg/adapter/adapter.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ type listener struct {
5353
run *params.Run
5454
kint kubeinteraction.Interface
5555
logger *zap.SugaredLogger
56-
event *info.Event
5756
}
5857

5958
type Response struct {
@@ -131,9 +130,9 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc {
131130
return
132131
}
133132

134-
var event map[string]any
133+
var eventBody map[string]any
135134
if string(payload) != "" {
136-
if err := json.Unmarshal(payload, &event); err != nil {
135+
if err := json.Unmarshal(payload, &eventBody); err != nil {
137136
l.logger.Errorf("Invalid event body format format: %s", err)
138137
response.WriteHeader(http.StatusBadRequest)
139138
return
@@ -143,7 +142,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc {
143142
var gitProvider provider.Interface
144143
var logger *zap.SugaredLogger
145144

146-
l.event = info.NewEvent()
145+
event := info.NewEvent()
147146
pacInfo := l.run.Info.GetPacOpts()
148147

149148
globalRepo, err := l.run.Clients.PipelineAsCode.PipelinesascodeV1alpha1().Repositories(l.run.Info.Kube.Namespace).Get(
@@ -170,7 +169,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc {
170169
return
171170
}
172171

173-
isIncoming, targettedRepo, err := l.detectIncoming(ctx, request, payload)
172+
isIncoming, targettedRepo, err := l.detectIncoming(ctx, event, request, payload)
174173
if err != nil {
175174
if errors.Is(err, errMissingFields) {
176175
l.writeResponse(response, http.StatusBadRequest, err.Error())
@@ -180,7 +179,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc {
180179
}
181180

182181
if isIncoming {
183-
gitProvider, logger, err = l.processIncoming(targettedRepo)
182+
gitProvider, logger, err = l.processIncoming(event, targettedRepo)
184183
} else {
185184
gitProvider, logger, err = l.detectProvider(request, string(payload))
186185
}
@@ -196,7 +195,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc {
196195
run: l.run,
197196
vcx: gitProvider,
198197
kint: l.kint,
199-
event: l.event,
198+
event: event,
200199
logger: logger,
201200
payload: payload,
202201
pacInfo: &pacInfo,

pkg/adapter/incoming.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func applyIncomingParams(req *http.Request, payloadBody []byte, params []string)
112112
// detectIncoming checks if the request is for an "incoming" webhook request.
113113
// If the request is for an "incoming" webhook request the request is parsed and matched to the expected
114114
// repository.
115-
func (l *listener) detectIncoming(ctx context.Context, req *http.Request, payloadBody []byte) (bool, *v1alpha1.Repository, error) {
115+
func (l *listener) detectIncoming(ctx context.Context, event *info.Event, req *http.Request, payloadBody []byte) (bool, *v1alpha1.Repository, error) {
116116
if req.URL.Path != "/incoming" {
117117
return false, nil, nil
118118
}
@@ -182,45 +182,45 @@ func (l *listener) detectIncoming(ctx context.Context, req *http.Request, payloa
182182
if err != nil {
183183
return false, nil, err
184184
}
185-
l.event.Provider.URL = enterpriseURL
186-
l.event.Provider.Token = token
187-
l.event.InstallationID = installationID
185+
event.Provider.URL = enterpriseURL
186+
event.Provider.Token = token
187+
event.InstallationID = installationID
188188
// Github app is not installed for provided repository url
189-
if l.event.InstallationID == 0 {
189+
if event.InstallationID == 0 {
190190
return false, nil, fmt.Errorf("GithubApp is not installed for the provided repository url %s ", repo.Spec.URL)
191191
}
192192
}
193193

194194
// make sure accepted is json
195195
if string(payloadBody) != "" {
196-
if l.event.Event, err = applyIncomingParams(req, payloadBody, hook.Params); err != nil {
196+
if event.Event, err = applyIncomingParams(req, payloadBody, hook.Params); err != nil {
197197
return false, nil, err
198198
}
199199
}
200200

201201
// TODO: more than i think about it and more i think triggertarget should be
202202
// eventType and vice versa, but keeping as is for now.
203-
l.event.EventType = "incoming"
204-
l.event.TriggerTarget = "push"
205-
l.event.TargetPipelineRun = payload.PipelineRun
206-
l.event.HeadBranch = payload.Branch
207-
l.event.BaseBranch = payload.Branch
208-
l.event.Request.Header = req.Header
209-
l.event.Request.Payload = payloadBody
210-
l.event.URL = repo.Spec.URL
211-
l.event.Sender = "incoming"
203+
event.EventType = "incoming"
204+
event.TriggerTarget = "push"
205+
event.TargetPipelineRun = payload.PipelineRun
206+
event.HeadBranch = payload.Branch
207+
event.BaseBranch = payload.Branch
208+
event.Request.Header = req.Header
209+
event.Request.Payload = payloadBody
210+
event.URL = repo.Spec.URL
211+
event.Sender = "incoming"
212212

213213
return true, repo, err
214214
}
215215

216-
func (l *listener) processIncoming(targetRepo *v1alpha1.Repository) (provider.Interface, *zap.SugaredLogger, error) {
216+
func (l *listener) processIncoming(event *info.Event, targetRepo *v1alpha1.Repository) (provider.Interface, *zap.SugaredLogger, error) {
217217
// can a git ssh URL be a Repo URL? I don't think this will even ever work
218218
org, repo, err := formatting.GetRepoOwnerSplitted(targetRepo.Spec.URL)
219219
if err != nil {
220220
return nil, nil, err
221221
}
222-
l.event.Organization = org
223-
l.event.Repository = repo
222+
event.Organization = org
223+
event.Repository = repo
224224

225225
var provider provider.Interface
226226
if targetRepo.Spec.GitProvider == nil || targetRepo.Spec.GitProvider.Type == "" {

pkg/adapter/incoming_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -830,16 +830,16 @@ func Test_listener_detectIncoming(t *testing.T) {
830830
run: client,
831831
logger: logger,
832832
kint: kint,
833-
event: info.NewEvent(),
834833
}
834+
event := info.NewEvent()
835835

836836
// make a new request
837837
req := httptest.NewRequestWithContext(ctx, tt.args.method,
838838
fmt.Sprintf("http://localhost%s?repository=%s&secret=%s&pipelinerun=%s&branch=%s&namespace=%s", tt.args.queryURL,
839839
tt.args.queryRepository, tt.args.querySecret, tt.args.queryPipelineRun, tt.args.queryBranch, tt.args.queryNamespace),
840840
strings.NewReader(tt.args.incomingBody))
841841
req.Header = tt.args.queryHeaders
842-
got, _, err := l.detectIncoming(ctx, req, []byte(tt.args.incomingBody))
842+
got, _, err := l.detectIncoming(ctx, event, req, []byte(tt.args.incomingBody))
843843
if tt.wantSubstrErr != "" {
844844
assert.Assert(t, err != nil)
845845
assert.ErrorContains(t, err, tt.wantSubstrErr)
@@ -850,7 +850,7 @@ func Test_listener_detectIncoming(t *testing.T) {
850850
return
851851
}
852852
assert.Equal(t, got, tt.want, "err = %v", err)
853-
assert.Equal(t, l.event.TargetPipelineRun, tt.args.queryPipelineRun)
853+
assert.Equal(t, event.TargetPipelineRun, tt.args.queryPipelineRun)
854854
})
855855
}
856856
}
@@ -1012,16 +1012,17 @@ func Test_listener_processIncoming(t *testing.T) {
10121012
observer, _ := zapobserver.New(zap.InfoLevel)
10131013
logger := zap.New(observer).Sugar()
10141014
l := &listener{
1015-
run: client, kint: kint, logger: logger, event: info.NewEvent(),
1015+
run: client, kint: kint, logger: logger,
10161016
}
1017-
pintf, _, err := l.processIncoming(tt.targetRepo)
1017+
event := info.NewEvent()
1018+
pintf, _, err := l.processIncoming(event, tt.targetRepo)
10181019
if tt.wantErr {
10191020
assert.Assert(t, err != nil)
10201021
return
10211022
}
10221023
assert.Assert(t, reflect.TypeOf(pintf).Elem() == reflect.TypeOf(tt.want).Elem())
1023-
assert.Assert(t, l.event.Organization == tt.wantOrg)
1024-
assert.Assert(t, l.event.Repository == tt.wantRepo)
1024+
assert.Assert(t, event.Organization == tt.wantOrg)
1025+
assert.Assert(t, event.Repository == tt.wantRepo)
10251026
})
10261027
}
10271028
}
@@ -1158,9 +1159,9 @@ func Test_detectIncoming_legacy_warning(t *testing.T) {
11581159
run: client,
11591160
logger: logger,
11601161
kint: kint,
1161-
event: info.NewEvent(),
11621162
}
1163-
got, _, err := l.detectIncoming(ctx, tt.req, tt.body)
1163+
event := info.NewEvent()
1164+
got, _, err := l.detectIncoming(ctx, event, tt.req, tt.body)
11641165
assert.NilError(t, err)
11651166
assert.Assert(t, got)
11661167
found := false
@@ -1229,9 +1230,9 @@ func Test_detectIncoming_body_params_are_parsed(t *testing.T) {
12291230
run: client,
12301231
logger: zap.NewNop().Sugar(),
12311232
kint: kint,
1232-
event: info.NewEvent(),
12331233
}
1234-
got, _, err := l.detectIncoming(ctx, req, []byte(payload))
1234+
event := info.NewEvent()
1235+
got, _, err := l.detectIncoming(ctx, event, req, []byte(payload))
12351236
assert.NilError(t, err)
12361237
assert.Assert(t, got)
12371238
}

0 commit comments

Comments
 (0)