Skip to content

Commit 2566cc7

Browse files
ko3a4okgcf-owl-bot[bot]surbhigarg92
authored
feat: inline BeginTransaction with first statement (#1692)
* feat: inline BeginTransaction with first statement * fix: prevent multiple begin transactions happen during a race condition happens; make sure stream requests uses a transaction id from the first response even if the stream fails halfway with an UNAVAILABLE error * fix: minor * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: call explicit begin transaction only after the first attempt. * feat: add inline begin transaction for batch DML requests * fix: lint * fix: explicit begin transaction for blind commit if a transaction runs over the runner. * feat: adding more unit tests * fix: explicit begin transaction for unknown error * fix: format * fix: tests after merge * Lint fix * fix: lint * fix: system test * fix: system emulator test Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: surbhigarg92 <surbhigarg.92@gmail.com>
1 parent b5ccf94 commit 2566cc7

10 files changed

Lines changed: 475 additions & 131 deletions

File tree

handwritten/spanner/src/session-pool.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
11591159
const transaction = session.transaction(
11601160
(session.parent as Database).queryOptions_
11611161
);
1162-
await transaction.begin();
11631162
session.txn = transaction;
11641163
}
11651164

handwritten/spanner/src/transaction-runner.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ export abstract class Runner<T> {
120120
this.attempts = 0;
121121
this.session = session;
122122
this.transaction = transaction;
123+
this.transaction.useInRunner();
123124

124125
const defaults = {timeout: 3600000};
125126

@@ -194,7 +195,9 @@ export abstract class Runner<T> {
194195
const transaction = this.session.transaction(
195196
(this.session.parent as Database).queryOptions_
196197
);
197-
await transaction.begin();
198+
if (this.attempts > 0) {
199+
await transaction.begin();
200+
}
198201
return transaction;
199202
}
200203
/**

handwritten/spanner/src/transaction.ts

Lines changed: 112 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,9 @@ export type CommitCallback =
216216
export class Snapshot extends EventEmitter {
217217
protected _options!: spannerClient.spanner.v1.ITransactionOptions;
218218
protected _seqno = 1;
219+
protected _idWaiter: Readable;
220+
protected _inlineBeginStarted;
221+
protected _useInRunner = false;
219222
id?: Uint8Array | string;
220223
ended: boolean;
221224
metadata?: spannerClient.spanner.v1.ITransaction;
@@ -289,6 +292,10 @@ export class Snapshot extends EventEmitter {
289292
this.resourceHeader_ = {
290293
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
291294
};
295+
this._idWaiter = new Readable({
296+
read() {},
297+
});
298+
this._inlineBeginStarted = false;
292299
}
293300

294301
/**
@@ -378,17 +385,7 @@ export class Snapshot extends EventEmitter {
378385
callback!(err, resp);
379386
return;
380387
}
381-
382-
const {id, readTimestamp} = resp;
383-
384-
this.id = id!;
385-
this.metadata = resp;
386-
387-
if (readTimestamp) {
388-
this.readTimestampProto = readTimestamp;
389-
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
390-
}
391-
388+
this._update(resp);
392389
callback!(null, resp);
393390
}
394391
);
@@ -573,6 +570,8 @@ export class Snapshot extends EventEmitter {
573570

574571
if (this.id) {
575572
transaction.id = this.id as Uint8Array;
573+
} else if (this._options.readWrite) {
574+
transaction.begin = this._options;
576575
} else {
577576
transaction.singleUse = this._options;
578577
}
@@ -603,6 +602,10 @@ export class Snapshot extends EventEmitter {
603602
);
604603

605604
const makeRequest = (resumeToken?: ResumeToken): Readable => {
605+
if (this.id && transaction.begin) {
606+
delete transaction.begin;
607+
transaction.id = this.id;
608+
}
606609
return this.requestStream({
607610
client: 'SpannerClient',
608611
method: 'streamingRead',
@@ -612,11 +615,21 @@ export class Snapshot extends EventEmitter {
612615
});
613616
};
614617

615-
return partialResultStream(makeRequest, {
618+
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
616619
json,
617620
jsonOptions,
618621
maxResumeRetries,
619-
});
622+
})
623+
?.on('response', response => {
624+
if (response.metadata && response.metadata!.transaction && !this.id) {
625+
this._update(response.metadata!.transaction);
626+
}
627+
})
628+
.on('error', () => {
629+
if (!this.id && this._useInRunner) {
630+
this.begin();
631+
}
632+
});
620633
}
621634

622635
/**
@@ -909,6 +922,9 @@ export class Snapshot extends EventEmitter {
909922
.on('response', response => {
910923
if (response.metadata) {
911924
metadata = response.metadata;
925+
if (metadata.transaction && !this.id) {
926+
this._update(metadata.transaction);
927+
}
912928
}
913929
})
914930
.on('data', row => rows.push(row))
@@ -1034,6 +1050,8 @@ export class Snapshot extends EventEmitter {
10341050
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
10351051
if (this.id) {
10361052
transaction.id = this.id as Uint8Array;
1053+
} else if (this._options.readWrite) {
1054+
transaction.begin = this._options;
10371055
} else {
10381056
transaction.singleUse = this._options;
10391057
}
@@ -1059,7 +1077,7 @@ export class Snapshot extends EventEmitter {
10591077
};
10601078

10611079
const makeRequest = (resumeToken?: ResumeToken): Readable => {
1062-
if (!reqOpts) {
1080+
if (!reqOpts || (this.id && !reqOpts.transaction.id)) {
10631081
try {
10641082
sanitizeRequest();
10651083
} catch (e) {
@@ -1078,11 +1096,21 @@ export class Snapshot extends EventEmitter {
10781096
});
10791097
};
10801098

1081-
return partialResultStream(makeRequest, {
1099+
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
10821100
json,
10831101
jsonOptions,
10841102
maxResumeRetries,
1085-
});
1103+
})
1104+
.on('response', response => {
1105+
if (response.metadata && response.metadata!.transaction && !this.id) {
1106+
this._update(response.metadata!.transaction);
1107+
}
1108+
})
1109+
.on('error', () => {
1110+
if (!this.id && this._useInRunner) {
1111+
this.begin();
1112+
}
1113+
});
10861114
}
10871115

10881116
/**
@@ -1226,6 +1254,51 @@ export class Snapshot extends EventEmitter {
12261254

12271255
return {params, paramTypes};
12281256
}
1257+
1258+
/**
1259+
* Update transaction properties from the response.
1260+
*
1261+
* @private
1262+
*
1263+
* @param {spannerClient.spanner.v1.ITransaction} resp Response object.
1264+
*/
1265+
protected _update(resp: spannerClient.spanner.v1.ITransaction): void {
1266+
const {id, readTimestamp} = resp;
1267+
1268+
this.id = id!;
1269+
this.metadata = resp;
1270+
1271+
if (readTimestamp) {
1272+
this.readTimestampProto = readTimestamp;
1273+
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
1274+
}
1275+
this._idWaiter.emit('notify');
1276+
}
1277+
1278+
/**
1279+
* Wrap `makeRequest` function with the lock to make sure the inline begin
1280+
* transaction can happen only once.
1281+
*
1282+
* @param makeRequest
1283+
* @private
1284+
*/
1285+
private _wrapWithIdWaiter(
1286+
makeRequest: (resumeToken?: ResumeToken) => Readable
1287+
): (resumeToken?: ResumeToken) => Readable {
1288+
if (this.id || !this._options.readWrite) {
1289+
return makeRequest;
1290+
}
1291+
if (!this._inlineBeginStarted) {
1292+
this._inlineBeginStarted = true;
1293+
return makeRequest;
1294+
}
1295+
return (resumeToken?: ResumeToken): Readable =>
1296+
this._idWaiter.once('notify', () =>
1297+
makeRequest(resumeToken)
1298+
.on('data', chunk => this._idWaiter.emit('data', chunk))
1299+
.once('end', () => this._idWaiter.emit('end'))
1300+
);
1301+
}
12291302
}
12301303

12311304
/*! Developer Documentation
@@ -1528,14 +1601,20 @@ export class Transaction extends Dml {
15281601
return {sql, params, paramTypes};
15291602
});
15301603

1604+
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
1605+
if (this.id) {
1606+
transaction.id = this.id as Uint8Array;
1607+
} else {
1608+
transaction.begin = this._options;
1609+
}
15311610
const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = {
15321611
session: this.session.formattedName_!,
15331612
requestOptions: this.configureTagOptions(
15341613
false,
15351614
this.requestOptions?.transactionTag ?? undefined,
15361615
(options as BatchUpdateOptions).requestOptions
15371616
),
1538-
transaction: {id: this.id!},
1617+
transaction,
15391618
seqno: this._seqno++,
15401619
statements,
15411620
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest;
@@ -1562,6 +1641,11 @@ export class Transaction extends Dml {
15621641
}
15631642

15641643
const {resultSets, status} = resp;
1644+
for (const resultSet of resultSets) {
1645+
if (!this.id && resultSet.metadata?.transaction) {
1646+
this._update(resultSet.metadata.transaction);
1647+
}
1648+
}
15651649
const rowCounts: number[] = resultSets.map(({stats}) => {
15661650
return (
15671651
(stats &&
@@ -1686,8 +1770,11 @@ export class Transaction extends Dml {
16861770

16871771
if (this.id) {
16881772
reqOpts.transactionId = this.id as Uint8Array;
1689-
} else {
1773+
} else if (!this._useInRunner) {
16901774
reqOpts.singleUseTransaction = this._options;
1775+
} else {
1776+
this.begin().then(() => this.commit(options, callback));
1777+
return;
16911778
}
16921779

16931780
if (
@@ -2184,6 +2271,13 @@ export class Transaction extends Dml {
21842271
const unique = new Set(allKeys);
21852272
return Array.from(unique).sort();
21862273
}
2274+
2275+
/**
2276+
* Mark transaction as started from the runner.
2277+
*/
2278+
useInRunner(): void {
2279+
this._useInRunner = true;
2280+
}
21872281
}
21882282

21892283
/*! Developer Documentation

handwritten/spanner/system-test/spanner.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6799,10 +6799,11 @@ describe('Spanner', () => {
67996799
NumberValue: 0,
68006800
};
68016801

6802-
beforeEach(() => {
6803-
return googleSqlTable.update(defaultRowValues).then(() => {
6804-
postgreSqlTable.update(defaultRowValues);
6805-
});
6802+
beforeEach(async () => {
6803+
await googleSqlTable.update(defaultRowValues);
6804+
if (!IS_EMULATOR_ENABLED) {
6805+
await postgreSqlTable.update(defaultRowValues);
6806+
}
68066807
});
68076808

68086809
const readConcurrentTransaction = (done, database, table) => {

handwritten/spanner/test/database.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,6 @@ class FakeSession {
9191
}
9292
}
9393

94-
interface ReadSessionCallback {
95-
(err: Error, session?: null): void;
96-
(err: null, session: FakeSession): void;
97-
}
98-
99-
interface WriteSessionCallback {
100-
(err: Error, session?: null, transaction?: null): void;
101-
(err: null, session: FakeSession, transaction: FakeTransaction): void;
102-
}
103-
10494
class FakeSessionPool extends EventEmitter {
10595
calledWith_: IArguments;
10696
constructor() {

0 commit comments

Comments
 (0)