Skip to content

Commit 7d497a5

Browse files
authored
Add opt-in multithreading (#85)
1 parent 3b1a6f2 commit 7d497a5

6 files changed

Lines changed: 406 additions & 10 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ The optional `options` object may contain:
201201
- `createIfMissing` (boolean, default: `true`): If `true`, create an empty database if one doesn't already exist. If `false` and the database doesn't exist, opening will fail.
202202
- `errorIfExists` (boolean, default: `false`): If `true` and the database already exists, opening will fail.
203203
- `passive` (boolean, default: `false`): Wait for, but do not initiate, opening of the database.
204+
- `multithreading` (boolean, default: `false`): Allow multiple threads to access the database. This is only relevant when using [worker threads](https://nodejs.org/api/worker_threads.html)
204205

205206
For advanced performance tuning, the `options` object may also contain the following. Modify these options only if you can prove actual benefit for your particular application.
206207

binding.cc

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,36 @@
1111

1212
#include <map>
1313
#include <vector>
14+
#include <mutex>
1415

1516
/**
1617
* Forward declarations.
1718
*/
1819
struct Database;
1920
struct Iterator;
2021
static void iterator_close_do (napi_env env, Iterator* iterator, napi_value cb);
22+
static leveldb::Status threadsafe_open(const leveldb::Options &options,
23+
bool multithreading,
24+
Database &db_instance);
25+
static leveldb::Status threadsafe_close(Database &db_instance);
2126

2227
/**
23-
* Macros.
28+
* Global declarations for multi-threaded access. These are not context-aware
29+
* by definition and is specifically to allow for cross thread access to the
30+
* single database handle.
2431
*/
32+
struct LevelDbHandle
33+
{
34+
leveldb::DB *db;
35+
size_t open_handle_count;
36+
};
37+
static std::mutex handles_mutex;
38+
// only access this when protected by the handles_mutex!
39+
static std::map<std::string, LevelDbHandle> db_handles;
2540

41+
/**
42+
* Macros.
43+
*/
2644
#define NAPI_DB_CONTEXT() \
2745
Database* database = NULL; \
2846
NAPI_STATUS_THROWS(napi_get_value_external(env, argv[0], (void**)&database));
@@ -495,19 +513,21 @@ struct Database {
495513

496514
~Database () {
497515
if (db_ != NULL) {
498-
delete db_;
499-
db_ = NULL;
516+
threadsafe_close(*this);
500517
}
501518
}
502519

503520
leveldb::Status Open (const leveldb::Options& options,
504-
const char* location) {
505-
return leveldb::DB::Open(options, location, &db_);
521+
const std::string &location,
522+
bool multithreading) {
523+
location_ = location;
524+
return threadsafe_open(options, multithreading, *this);
506525
}
507526

508527
void CloseDatabase () {
509-
delete db_;
510-
db_ = NULL;
528+
if (db_ != NULL) {
529+
threadsafe_close(*this);
530+
}
511531
if (blockCache_) {
512532
delete blockCache_;
513533
blockCache_ = NULL;
@@ -600,8 +620,66 @@ struct Database {
600620

601621
private:
602622
uint32_t priorityWork_;
623+
std::string location_;
624+
625+
// for separation of concerns the threadsafe functionality was kept at the global
626+
// level and made a friend so it is explict where the threadsafe boundary exists
627+
friend leveldb::Status threadsafe_open(const leveldb::Options &options,
628+
bool multithreading,
629+
Database &db_instance);
630+
friend leveldb::Status threadsafe_close(Database &db_instance);
603631
};
604632

633+
634+
leveldb::Status threadsafe_open(const leveldb::Options &options,
635+
bool multithreading,
636+
Database &db_instance) {
637+
// Bypass lock and handles if multithreading is disabled
638+
if (!multithreading) {
639+
return leveldb::DB::Open(options, db_instance.location_, &db_instance.db_);
640+
}
641+
642+
std::unique_lock<std::mutex> lock(handles_mutex);
643+
644+
auto it = db_handles.find(db_instance.location_);
645+
if (it == db_handles.end()) {
646+
// Database not opened yet for this location, unless it was with
647+
// multithreading disabled, in which case we're expected to fail here.
648+
LevelDbHandle handle = {nullptr, 0};
649+
leveldb::Status status = leveldb::DB::Open(options, db_instance.location_, &handle.db);
650+
651+
if (status.ok()) {
652+
handle.open_handle_count++;
653+
db_instance.db_ = handle.db;
654+
db_handles[db_instance.location_] = handle;
655+
}
656+
657+
return status;
658+
}
659+
660+
++(it->second.open_handle_count);
661+
db_instance.db_ = it->second.db;
662+
663+
return leveldb::Status::OK();
664+
}
665+
666+
leveldb::Status threadsafe_close(Database &db_instance) {
667+
std::unique_lock<std::mutex> lock(handles_mutex);
668+
669+
auto it = db_handles.find(db_instance.location_);
670+
if (it == db_handles.end()) {
671+
// Was not opened with multithreading enabled
672+
delete db_instance.db_;
673+
} else if (--(it->second.open_handle_count) == 0) {
674+
delete it->second.db;
675+
db_handles.erase(it);
676+
}
677+
678+
// ensure db_ pointer is nullified in Database instance
679+
db_instance.db_ = NULL;
680+
return leveldb::Status::OK();
681+
}
682+
605683
/**
606684
* Base worker class for doing async work that defers closing the database.
607685
*/
@@ -974,13 +1052,15 @@ struct OpenWorker final : public BaseWorker {
9741052
const bool createIfMissing,
9751053
const bool errorIfExists,
9761054
const bool compression,
1055+
const bool multithreading,
9771056
const uint32_t writeBufferSize,
9781057
const uint32_t blockSize,
9791058
const uint32_t maxOpenFiles,
9801059
const uint32_t blockRestartInterval,
9811060
const uint32_t maxFileSize)
9821061
: BaseWorker(env, database, callback, "classic_level.db.open"),
983-
location_(location) {
1062+
location_(location),
1063+
multithreading_(multithreading) {
9841064
options_.block_cache = database->blockCache_;
9851065
options_.filter_policy = database->filterPolicy_;
9861066
options_.create_if_missing = createIfMissing;
@@ -998,11 +1078,12 @@ struct OpenWorker final : public BaseWorker {
9981078
~OpenWorker () {}
9991079

10001080
void DoExecute () override {
1001-
SetStatus(database_->Open(options_, location_.c_str()));
1081+
SetStatus(database_->Open(options_, location_, multithreading_));
10021082
}
10031083

10041084
leveldb::Options options_;
10051085
std::string location_;
1086+
bool multithreading_;
10061087
};
10071088

10081089
/**
@@ -1017,6 +1098,7 @@ NAPI_METHOD(db_open) {
10171098
const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true);
10181099
const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false);
10191100
const bool compression = BooleanProperty(env, options, "compression", true);
1101+
const bool multithreading = BooleanProperty(env, options, "multithreading", false);
10201102

10211103
const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20);
10221104
const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20);
@@ -1031,7 +1113,8 @@ NAPI_METHOD(db_open) {
10311113
napi_value callback = argv[3];
10321114
OpenWorker* worker = new OpenWorker(env, database, callback, location,
10331115
createIfMissing, errorIfExists,
1034-
compression, writeBufferSize, blockSize,
1116+
compression, multithreading,
1117+
writeBufferSize, blockSize,
10351118
maxOpenFiles, blockRestartInterval,
10361119
maxFileSize);
10371120
worker->Queue(env);

index.d.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ export interface OpenOptions extends AbstractOpenOptions {
213213
* @defaultValue `2 * 1024 * 1024`
214214
*/
215215
maxFileSize?: number | undefined
216+
217+
/**
218+
* Allows multi-threaded access to a single DB instance for sharing a DB
219+
* across multiple worker threads within the same process.
220+
*
221+
* @defaultValue `false`
222+
*/
223+
multithreading?: boolean | undefined
216224
}
217225

218226
/**

test/multithreading-test.js

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
'use strict'
2+
3+
const test = require('tape')
4+
const tempy = require('tempy')
5+
const path = require('path')
6+
const { Worker } = require('worker_threads')
7+
const { ClassicLevel } = require('..')
8+
const {
9+
MIN_KEY,
10+
MID_KEY,
11+
MAX_KEY,
12+
CLOSED_DB_MESSAGE,
13+
WORKER_CREATING_KEYS_MESSAGE,
14+
WORKER_READY_TO_READ_MESSAGE,
15+
WORKER_ERROR_MESSAGE,
16+
START_READING_MESSAGE,
17+
createRandomKeys,
18+
getRandomKeys
19+
} = require('./worker-utils')
20+
21+
/**
22+
* Makes sure that the multithreading flag is working as expected
23+
*/
24+
test('check multithreading flag works as expected', async function (t) {
25+
t.plan(9)
26+
const location = tempy.directory()
27+
const db1 = new ClassicLevel(location)
28+
const db2 = new ClassicLevel(location)
29+
30+
// check that must set multithreading flag on all instances
31+
await db1.open()
32+
t.is(db1.location, location)
33+
try {
34+
await db2.open({ multithreading: true })
35+
} catch (err) {
36+
t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'second instance failed to open')
37+
t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error')
38+
}
39+
await db1.close()
40+
41+
await db1.open({ multithreading: true })
42+
t.is(db1.location, location)
43+
await db2.open({ multithreading: true })
44+
t.is(db2.location, location)
45+
// test that passing to the constructor works
46+
const db3 = new ClassicLevel(location, { multithreading: true })
47+
await db3.open()
48+
t.is(db3.location, location)
49+
const db4 = new ClassicLevel(location)
50+
try {
51+
await db4.open({ location, multithreading: false })
52+
} catch (err) {
53+
t.is(err.code, 'LEVEL_DATABASE_NOT_OPEN', 'fourth instance failed to open')
54+
t.is(err.cause.code, 'LEVEL_LOCKED', 'second instance got lock error')
55+
}
56+
await db1.close()
57+
await db2.close()
58+
await db3.close()
59+
60+
const db5 = new ClassicLevel(location)
61+
await db5.open({ location, multithreading: false })
62+
t.is(db5.location, location)
63+
await db5.close()
64+
})
65+
66+
/**
67+
* Tests for interleaved opening and closing of the database to check
68+
* that the mutex for guarding the handles is working as expected. Creates
69+
* many workers that only open and then close the db after a random delay. Goal
70+
* is to interleave the open and close processes to ensure that the mutex is
71+
* guarding the handles correctly. After all workers have completed the main
72+
* thread closes the db and then opens it again as a non-multi-threaded instance
73+
* to make sure the handle was deleted correctly.
74+
*/
75+
test('open/close mutex works as expected', async function (t) {
76+
t.plan(3)
77+
const location = tempy.directory()
78+
const db1 = new ClassicLevel(location)
79+
await db1.open({ multithreading: true })
80+
t.is(db1.location, location)
81+
82+
const activeWorkers = []
83+
84+
for (let i = 0; i < 100; i++) {
85+
const worker = new Worker(path.join(__dirname, 'worker.js'), {
86+
workerData: { location, workerStartup: true }
87+
})
88+
89+
activeWorkers.push(
90+
new Promise((resolve, reject) => {
91+
worker.once('message', ({ message, error }) => {
92+
if (message === WORKER_ERROR_MESSAGE) {
93+
return reject(error)
94+
}
95+
if (message === CLOSED_DB_MESSAGE) {
96+
return resolve()
97+
}
98+
return reject(new Error('unexpected error\n>>> ' + error))
99+
})
100+
})
101+
)
102+
}
103+
104+
const results = await Promise.allSettled(activeWorkers)
105+
const rejected = results.filter((res) => res.status === 'rejected')
106+
t.is(rejected.length, 0)
107+
await db1.close()
108+
109+
// reopen the db non-multithreaded to check that the handle record was fully
110+
// deleted from the handle map
111+
await db1.open({ multithreading: false })
112+
t.is(db1.location, location)
113+
await db1.close()
114+
})
115+
116+
/**
117+
* Tests for reading and writing to a single db from multiple threads.
118+
*
119+
* Starts by setting up worker and then worker reports its ready and immediately
120+
* starts writing to the database. Main thread gets message and also writes to
121+
* the same db but to a different key space. Goal is to concurrently write
122+
* consecutively numbered records. Once records are all written the worker
123+
* reports to the main thread and the main thread waits until both threads are
124+
* complete with the writing process. When both are ready they concurrently read
125+
* random records from the full key space for a set interval.
126+
*/
127+
test('allow multi-threading by same process', async function (t) {
128+
try {
129+
const location = tempy.directory()
130+
const db = new ClassicLevel(location, { multithreading: true })
131+
await db.open()
132+
133+
const worker = new Worker(path.join(__dirname, 'worker.js'), {
134+
workerData: { location, readWrite: true }
135+
})
136+
137+
function cleanup (err) {
138+
worker.removeAllListeners('message')
139+
worker.removeAllListeners('error')
140+
worker.terminate()
141+
if (err) {
142+
throw err
143+
}
144+
}
145+
146+
worker.on('error', cleanup)
147+
worker.on('message', ({ message, error }) => {
148+
if (message === WORKER_ERROR_MESSAGE) {
149+
cleanup(new Error(error))
150+
}
151+
})
152+
153+
// Concurrently write keys to the db on both thread and wait
154+
// until ready before attempting to concurrently read keys
155+
const workerReady = new Promise((resolve) => {
156+
let mainThreadReady = false
157+
worker.on('message', ({ message }) => {
158+
if (message === WORKER_CREATING_KEYS_MESSAGE) {
159+
createRandomKeys(db, MID_KEY, MAX_KEY).then(() => {
160+
mainThreadReady = true
161+
})
162+
} else if (message === WORKER_READY_TO_READ_MESSAGE) {
163+
const interval = setInterval(() => {
164+
if (mainThreadReady) {
165+
clearInterval(interval)
166+
resolve()
167+
}
168+
}, 100)
169+
}
170+
})
171+
})
172+
173+
await workerReady
174+
175+
// once db is seeded start reading keys from both threads
176+
worker.postMessage({ message: START_READING_MESSAGE })
177+
await getRandomKeys(db, MIN_KEY, MAX_KEY)
178+
await db.close()
179+
180+
t.end()
181+
} catch (error) {
182+
t.fail(error.message)
183+
t.end()
184+
}
185+
})

0 commit comments

Comments
 (0)