Skip to content

Commit 5496db6

Browse files
committed
Read-atomic/crash-atomic NodeFSStorageAdapter
1 parent f0849a6 commit 5496db6

2 files changed

Lines changed: 403 additions & 15 deletions

File tree

packages/automerge-repo-storage-nodefs/src/index.ts

Lines changed: 237 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,82 @@
11
/**
22
* @packageDocumentation
3-
* A `StorageAdapter` which stores data in the local filesystem
3+
* A `StorageAdapter` which stores data in the local filesystem.
4+
*
5+
* ## Durability and atomicity
6+
*
7+
* Writes use the standard POSIX "write-to-temporary + fsync + rename"
8+
* pattern so that a reader or a crash never observes a half-written file:
9+
*
10+
* 1. The payload is written to `<baseDirectory>/.tmp/<pid>.<uuid>`.
11+
* 2. The temporary file is `fsync`ed so its bytes reach disk.
12+
* 3. `rename(2)` replaces the target atomically (POSIX) or via
13+
* `MoveFileExW(MOVEFILE_REPLACE_EXISTING)` (Windows NTFS).
14+
* 4. On POSIX, the parent directory is `fsync`ed so the rename itself
15+
* is durable across a crash. Windows does not expose directory
16+
* fsync from user-space Node; directory metadata durability falls
17+
* back to the operating system's own guarantees.
18+
*
19+
* Temporary files live in a dedicated `<baseDirectory>/.tmp/` directory
20+
* rather than as siblings of their target files. This keeps them on the
21+
* same filesystem as their targets (required for atomic rename) while
22+
* making them invisible to older adapters and to {@link loadRange} /
23+
* {@link load}, which are always prefix-scoped and are unlikely to walk
24+
* `.tmp/` since real-world keys won't shard into a `.t` prefix.
425
*/
526

627
import {
728
Chunk,
829
StorageAdapterInterface,
930
type StorageKey,
1031
} from "@automerge/automerge-repo/slim"
11-
import fs from "fs"
12-
import path from "path"
32+
import crypto from "node:crypto"
33+
import fs from "node:fs"
34+
import os from "node:os"
35+
import path from "node:path"
1336
import { rimraf } from "rimraf"
1437

38+
const IS_POSIX = os.platform() !== "win32"
39+
1540
export class NodeFSStorageAdapter implements StorageAdapterInterface {
1641
private baseDirectory: string
42+
private tmpDirectory: string
43+
private tmpDirectoryReady: Promise<void> | undefined
1744
private cache: { [key: string]: Uint8Array } = {}
1845

1946
/**
2047
* @param baseDirectory - The path to the directory to store data in. Defaults to "./automerge-repo-data".
2148
*/
2249
constructor(baseDirectory = "automerge-repo-data") {
2350
this.baseDirectory = baseDirectory
51+
this.tmpDirectory = path.join(baseDirectory, TMP_DIR_NAME)
52+
}
53+
54+
/**
55+
* Create `tmpDirectory` once (idempotent). Called lazily on the first
56+
* write so a read-only consumer of this adapter never creates the
57+
* `.tmp/` directory as a side effect of construction.
58+
*/
59+
private ensureTmpDirectory(): Promise<void> {
60+
if (!this.tmpDirectoryReady) {
61+
this.tmpDirectoryReady = fs.promises
62+
.mkdir(this.tmpDirectory, { recursive: true })
63+
.then(() => undefined)
64+
.catch(err => {
65+
// Reset so subsequent writes retry the mkdir. Otherwise a
66+
// transient failure would be remembered forever.
67+
this.tmpDirectoryReady = undefined
68+
throw err
69+
})
70+
}
71+
return this.tmpDirectoryReady
72+
}
73+
74+
/** Build a fresh unique path inside {@link tmpDirectory}. */
75+
private makeTmpPath(): string {
76+
return path.join(
77+
this.tmpDirectory,
78+
`${process.pid}.${crypto.randomUUID().replace(/-/g, "")}`
79+
)
2480
}
2581

2682
async load(keyArray: StorageKey): Promise<Uint8Array | undefined> {
@@ -33,31 +89,52 @@ export class NodeFSStorageAdapter implements StorageAdapterInterface {
3389
const fileContent = await fs.promises.readFile(filePath)
3490
return new Uint8Array(fileContent)
3591
} catch (error: any) {
36-
// don't throw if file not found
37-
if (error.code === "ENOENT") return undefined
92+
// Treat both "file not found" and "path component is not a
93+
// directory" as absent. ENOTDIR can surface when a key's
94+
// logical path passes through a location where some other
95+
// entry occupies the expected directory slot — from load()'s
96+
// perspective that still means "no file at this key".
97+
if (error.code === "ENOENT" || error.code === "ENOTDIR") {
98+
return undefined
99+
}
38100
throw error
39101
}
40102
}
41103

42104
async save(keyArray: StorageKey, binary: Uint8Array): Promise<void> {
105+
// Rollback semantics: if the rename has not yet completed, on-disk
106+
// state is unchanged and we roll the cache back to match. Once the
107+
// rename completes, on-disk state is the new bytes (visible to
108+
// concurrent readers), so we do NOT roll back — cache matches disk.
109+
// A subsequent fsyncDir failure means the rename may not be durable
110+
// across a crash, but the bytes are still present and observable;
111+
// rolling back in that case would make cache diverge from disk.
112+
// The caller learns about the durability gap via the rejection.
43113
const key = getKey(keyArray)
114+
const prev = this.cache[key]
44115
this.cache[key] = binary
45116

46117
const filePath = this.getFilePath(keyArray)
118+
const dir = path.dirname(filePath)
47119

48-
await fs.promises.mkdir(path.dirname(filePath), { recursive: true })
49-
await fs.promises.writeFile(filePath, binary)
120+
try {
121+
await this.ensureTmpDirectory()
122+
await fs.promises.mkdir(dir, { recursive: true })
123+
await atomicWrite(filePath, this.makeTmpPath(), binary)
124+
} catch (err) {
125+
rollbackCache(this.cache, key, prev)
126+
throw err
127+
}
128+
129+
await fsyncDir(dir)
50130
}
51131

52132
async remove(keyArray: string[]): Promise<void> {
53-
// remove from cache
54133
delete this.cache[getKey(keyArray)]
55-
// remove from disk
56134
const filePath = this.getFilePath(keyArray)
57135
try {
58136
await fs.promises.unlink(filePath)
59137
} catch (error: any) {
60-
// don't throw if file not found
61138
if (error.code !== "ENOENT") throw error
62139
}
63140
}
@@ -75,7 +152,9 @@ export class NodeFSStorageAdapter implements StorageAdapterInterface {
75152
const diskFiles = await walkdir(dirPath)
76153

77154
// The "keys" in the cache don't include the baseDirectory.
78-
// We want to de-dupe with the cached keys so we'll use getKey to normalize them.
155+
// We want to de-dupe with the cached keys so we'll use getKey to
156+
// normalize them. Tmp files live under <baseDirectory>/.tmp/, which
157+
// walkdir skips, so diskFiles never contains an in-flight tmp file.
79158
const diskKeys: string[] = diskFiles.map((fileName: string) => {
80159
const k = getKey([path.relative(this.baseDirectory, fileName)])
81160
return k.slice(0, 2) + k.slice(3)
@@ -127,20 +206,164 @@ export class NodeFSStorageAdapter implements StorageAdapterInterface {
127206

128207
const getKey = (key: StorageKey): string => path.join(...key)
129208

209+
/**
210+
* Restore a cache entry to its value prior to a failed write.
211+
*
212+
* If the key had no prior entry (`prev === undefined`), the entry is
213+
* deleted entirely. Otherwise it is overwritten with the prior bytes.
214+
* This keeps the in-memory cache consistent with on-disk state when
215+
* save() rejects partway through.
216+
*/
217+
const rollbackCache = (
218+
cache: Record<string, Uint8Array>,
219+
key: string,
220+
prev: Uint8Array | undefined
221+
): void => {
222+
if (prev === undefined) {
223+
delete cache[key]
224+
} else {
225+
cache[key] = prev
226+
}
227+
}
228+
229+
/**
230+
* Name of the subdirectory under `baseDirectory` that holds in-flight
231+
* temporary files. Hidden by the leading dot. The `.t` two-character
232+
* prefix cannot collide with any real sharded storage key, which shards
233+
* by hex or base58 (see {@link NodeFSStorageAdapter.getFilePath}), so
234+
* an older adapter walking a shard subtree never descends here.
235+
*/
236+
const TMP_DIR_NAME = ".tmp"
237+
238+
/**
239+
* Write `bytes` to `targetPath` atomically:
240+
* 1. write to `tmpPath` on the same filesystem as `targetPath`
241+
* 2. fsync the temporary file
242+
* 3. rename over the target
243+
*
244+
* On POSIX, rename is atomic with respect to concurrent readers and a
245+
* crash. On Windows NTFS, `fs.promises.rename` uses
246+
* `MoveFileExW(MOVEFILE_REPLACE_EXISTING)` which is atomic for
247+
* concurrent readers; post-crash state on Windows depends on NTFS and
248+
* the OS flush policy.
249+
*
250+
* `tmpPath` MUST be on the same filesystem as `targetPath`; otherwise
251+
* `rename` will fail with `EXDEV` and no fallback is attempted. In
252+
* practice callers construct `tmpPath` under `<baseDirectory>/.tmp/`,
253+
* so this holds automatically.
254+
*/
255+
const atomicWrite = async (
256+
targetPath: string,
257+
tmpPath: string,
258+
bytes: Uint8Array
259+
): Promise<void> => {
260+
const fh = await fs.promises.open(tmpPath, "w")
261+
let wroteTmp = false
262+
try {
263+
await fh.writeFile(bytes)
264+
await fh.sync()
265+
wroteTmp = true
266+
} finally {
267+
// fh.close() can itself throw (e.g. EIO). Wrap it in its own
268+
// try/finally so the tmp-file cleanup below still runs — otherwise
269+
// a close failure would mask the original error AND leak a tmp
270+
// file. We log the close error at debug level but don't re-throw;
271+
// the outer writeFile/sync error (if any) is the signal the caller
272+
// cares about.
273+
try {
274+
await fh.close()
275+
} catch (closeErr) {
276+
console.debug(
277+
`[automerge-repo-storage-nodefs] fh.close() failed for ${tmpPath}:`,
278+
closeErr
279+
)
280+
}
281+
if (!wroteTmp) {
282+
// Best-effort cleanup if writeFile/sync threw. The caller is
283+
// about to see the outer writeFile/sync error; the cleanup
284+
// failure almost certainly shares the same underlying cause
285+
// (e.g. EIO on the same filesystem), so we don't surface it
286+
// through the thrown error. A stray tmp file is filtered out by
287+
// loadRange and is otherwise harmless.
288+
try {
289+
await fs.promises.unlink(tmpPath)
290+
} catch (cleanupErr) {
291+
console.debug(
292+
`[automerge-repo-storage-nodefs] failed to clean up tmp file ${tmpPath}:`,
293+
cleanupErr
294+
)
295+
}
296+
}
297+
}
298+
299+
try {
300+
await fs.promises.rename(tmpPath, targetPath)
301+
} catch (err) {
302+
// If the rename failed, the tmp file is still lying around. Same
303+
// best-effort cleanup semantics as above.
304+
try {
305+
await fs.promises.unlink(tmpPath)
306+
} catch (cleanupErr) {
307+
console.debug(
308+
`[automerge-repo-storage-nodefs] failed to clean up tmp file ${tmpPath}:`,
309+
cleanupErr
310+
)
311+
}
312+
throw err
313+
}
314+
}
315+
316+
/**
317+
* `fsync` a directory so that recent `rename(2)` calls into it are durable.
318+
*/
319+
const fsyncDir = async (dir: string): Promise<void> => {
320+
if (!IS_POSIX) return
321+
let fh: fs.promises.FileHandle | undefined
322+
try {
323+
fh = await fs.promises.open(dir, "r")
324+
await fh.sync()
325+
} catch (err: any) {
326+
// Some filesystems (tmpfs, certain network mounts) refuse fsync on
327+
// directories. Treat that as a best-effort guarantee rather than a
328+
// hard failure — the file fsync still gave us data durability.
329+
if (err.code !== "EISDIR" && err.code !== "EINVAL") throw err
330+
} finally {
331+
// Swallow close() failures so they can't turn a tolerated fsync
332+
// outcome into a hard failure. Symmetric with the close handling
333+
// in atomicWrite.
334+
if (fh) {
335+
try {
336+
await fh.close()
337+
} catch (closeErr) {
338+
console.debug(
339+
`[automerge-repo-storage-nodefs] fsyncDir close() failed for ${dir}:`,
340+
closeErr
341+
)
342+
}
343+
}
344+
}
345+
}
346+
130347
/** returns all files in a directory, recursively */
131348
const walkdir = async (dirPath: string): Promise<string[]> => {
132349
try {
133-
const entries = await fs.promises.readdir(dirPath, { withFileTypes: true })
350+
const entries = await fs.promises.readdir(dirPath, {
351+
withFileTypes: true,
352+
})
134353
const files = await Promise.all(
135354
entries.map(entry => {
355+
// Defensive: never descend into the tmp directory if walkdir is
356+
// ever invoked with `dirPath === baseDirectory`. Today loadRange
357+
// is always called with a prefix, so walkdir starts at a shard
358+
// subdirectory and this branch is effectively unreachable.
359+
if (entry.isDirectory() && entry.name === TMP_DIR_NAME) return []
136360
const subpath = path.resolve(dirPath, entry.name)
137361
return entry.isDirectory() ? walkdir(subpath) : subpath
138362
})
139363
)
140364
return files.flat()
141365
} catch (error: any) {
142-
// don't throw if directory not found
143-
if (error.code === "ENOENT") return []
366+
if (error.code === "ENOENT" || error.code === "ENOTDIR") return []
144367
throw error
145368
}
146369
}

0 commit comments

Comments
 (0)