Skip to content

Commit 2b2366d

Browse files
crehbichlerChristoph Rehbichlertadjik1baileympearson
authored
fix(NODE-7298): ensure commonWireVersion is computed from server maxWireVersion (#4805)
Co-authored-by: Christoph Rehbichler <c.rehbichler@appmea.com> Co-authored-by: Sergey Zelenov <sergey.zelenov@mongodb.com> Co-authored-by: Bailey Pearson <bailey.pearson@mongodb.com>
1 parent 7a8276e commit 2b2366d

File tree

6 files changed

+140
-8
lines changed

6 files changed

+140
-8
lines changed

src/sdam/server_selection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec
8888
* server potentially for a write on a secondary.
8989
*/
9090
export function secondaryWritableServerSelector(
91-
wireVersion?: number,
91+
wireVersion: number,
9292
readPreference?: ReadPreference
9393
): ServerSelector {
9494
// If server version < 5.0, read preference always primary.

src/sdam/topology.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
778778
return result;
779779
}
780780

781-
get commonWireVersion(): number | undefined {
781+
get commonWireVersion(): number {
782782
return this.description.commonWireVersion;
783783
}
784784

src/sdam/topology_description.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ export class TopologyDescription {
192192

193193
// update common wire version
194194
if (serverDescription.maxWireVersion !== 0) {
195-
if (commonWireVersion == null) {
195+
if (commonWireVersion === 0) {
196196
commonWireVersion = serverDescription.maxWireVersion;
197197
} else {
198198
commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion);

test/integration/crud/aggregation.test.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,4 +870,73 @@ describe('Aggregation', function () {
870870
.finally(() => client.close());
871871
}
872872
});
873+
874+
it(
875+
'should perform aggregations with a write stage on secondary when readPreference is secondary',
876+
{
877+
metadata: { requires: { topology: 'replicaset', mongodb: '>=5.0' } },
878+
async test() {
879+
const databaseName = this.configuration.db;
880+
const client = this.configuration.newClient(this.configuration.writeConcernMax(), {
881+
maxPoolSize: 1,
882+
monitorCommands: true
883+
});
884+
885+
const events = [];
886+
client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events));
887+
888+
// Discover primary to be able to check the actual server address
889+
await client.db('admin').command({ hello: 1 });
890+
const [helloEvent] = events;
891+
const primaryAddress = helloEvent.address;
892+
893+
// Clear events
894+
events.length = 0;
895+
896+
const src = client.db(databaseName).collection('read_pref_src');
897+
const outMerge = client.db(databaseName).collection('read_pref_merge_out');
898+
const outOut = client.db(databaseName).collection('read_pref_out_out');
899+
900+
await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]);
901+
await src.insertMany([{ a: 1 }, { a: 2 }]);
902+
await Promise.all([
903+
src
904+
.aggregate(
905+
[
906+
{
907+
$merge: {
908+
into: 'read_pref_merge_out',
909+
whenMatched: 'replace',
910+
whenNotMatched: 'insert'
911+
}
912+
}
913+
],
914+
{ readPreference: 'secondary' }
915+
)
916+
.toArray(),
917+
src
918+
.aggregate(
919+
[
920+
{
921+
$out: 'read_pref_out_out'
922+
}
923+
],
924+
{ readPreference: 'secondary' }
925+
)
926+
.toArray()
927+
]);
928+
929+
expect(events).to.have.length(2);
930+
events.forEach(event => {
931+
expect(event).to.have.property('commandName', 'aggregate');
932+
expect(event.address).to.not.equal(primaryAddress);
933+
expect(event).to.have.deep.nested.property('command.$readPreference', {
934+
mode: 'secondary'
935+
});
936+
});
937+
938+
await client.close();
939+
}
940+
}
941+
);
873942
});

test/unit/sdam/server_selection.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ describe('server selection', function () {
295295
new ObjectId(),
296296
MIN_SECONDARY_WRITE_WIRE_VERSION
297297
);
298-
const selector = secondaryWritableServerSelector();
298+
const selector = secondaryWritableServerSelector(0);
299299
const servers = selector(
300300
topologyDescription,
301301
Array.from(serverDescriptions.values()),
@@ -401,7 +401,7 @@ describe('server selection', function () {
401401
new ObjectId(),
402402
MIN_SECONDARY_WRITE_WIRE_VERSION
403403
);
404-
const selector = secondaryWritableServerSelector();
404+
const selector = secondaryWritableServerSelector(0);
405405
const servers = selector(
406406
topologyDescription,
407407
Array.from(serverDescriptions.values()),
@@ -507,7 +507,7 @@ describe('server selection', function () {
507507
new ObjectId(),
508508
MIN_SECONDARY_WRITE_WIRE_VERSION
509509
);
510-
const selector = secondaryWritableServerSelector();
510+
const selector = secondaryWritableServerSelector(0);
511511
const servers = selector(
512512
topologyDescription,
513513
Array.from(serverDescriptions.values()),
@@ -564,7 +564,7 @@ describe('server selection', function () {
564564
new ObjectId(),
565565
MIN_SECONDARY_WRITE_WIRE_VERSION
566566
);
567-
const selector = secondaryWritableServerSelector();
567+
const selector = secondaryWritableServerSelector(0);
568568
const servers = selector(
569569
topologyDescription,
570570
Array.from(serverDescriptions.values()),
@@ -587,7 +587,7 @@ describe('server selection', function () {
587587
MIN_SECONDARY_WRITE_WIRE_VERSION,
588588
{ localThresholdMS: 5 }
589589
);
590-
const selector = secondaryWritableServerSelector();
590+
const selector = secondaryWritableServerSelector(0);
591591
const servers = selector(
592592
topologyDescription,
593593
Array.from(serverDescriptions.values()),
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { expect } from 'chai';
2+
3+
import { TopologyType } from '../../../src/sdam/common';
4+
import { ServerDescription } from '../../../src/sdam/server_description';
5+
import { TopologyDescription } from '../../../src/sdam/topology_description';
6+
7+
describe('TopologyDescription', function () {
8+
describe('#constructor', function () {
9+
it('sets commonWireVersion to 0', function () {
10+
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);
11+
12+
expect(initial.commonWireVersion).to.equal(0);
13+
});
14+
});
15+
16+
describe('update()', function () {
17+
it('initializes commonWireVersion from first non-zero maxWireVersion', function () {
18+
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);
19+
20+
const sd1 = new ServerDescription('a:27017', {
21+
maxWireVersion: 25
22+
});
23+
24+
const updated = initial.update(sd1);
25+
26+
expect(updated.commonWireVersion).to.equal(25);
27+
});
28+
29+
it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () {
30+
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);
31+
32+
const sd1 = new ServerDescription('a:27017', {
33+
maxWireVersion: 25
34+
});
35+
36+
const sd2 = new ServerDescription('b:27017', {
37+
maxWireVersion: 21
38+
});
39+
40+
let updated = initial.update(sd1);
41+
updated = updated.update(sd2);
42+
43+
expect(updated.commonWireVersion).to.equal(21);
44+
});
45+
46+
it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () {
47+
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);
48+
49+
const sd1 = new ServerDescription('a:27017', {
50+
maxWireVersion: 25
51+
});
52+
53+
const sdUnknown = new ServerDescription('b:27017', {
54+
maxWireVersion: 0
55+
});
56+
57+
let updated = initial.update(sd1);
58+
updated = updated.update(sdUnknown);
59+
60+
expect(updated.commonWireVersion).to.equal(25);
61+
});
62+
});
63+
});

0 commit comments

Comments
 (0)