Skip to content

Commit bb97333

Browse files
committed
Read-atomic/crash-atomic NodeFSStorageAdapter
1 parent f0849a6 commit bb97333

2 files changed

Lines changed: 439 additions & 18 deletions

File tree

packages/automerge-repo-storage-nodefs/src/index.ts

Lines changed: 255 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,82 @@
11
/**
22
* @packageDocumentation
3-
* A `StorageAdapter` which stores data in the local filesystem
3+
* A `StorageAdapter` which stores data in the local filesystem.
4+
*
5+
* ## Durability and atomicity
6+
*
7+
* Writes use the standard POSIX "write-to-temporary + fsync + rename"
8+
* pattern so that a reader or a crash never observes a half-written file:
9+
*
10+
* 1. The payload is written to `<baseDirectory>/.tmp/<pid>.<uuid>`.
11+
* 2. The temporary file is `fsync`ed so its bytes reach disk.
12+
* 3. `rename(2)` replaces the target atomically (POSIX) or via
13+
* `MoveFileExW(MOVEFILE_REPLACE_EXISTING)` (Windows NTFS).
14+
* 4. On POSIX, the parent directory is `fsync`ed so the rename itself
15+
* is durable across a crash. Windows does not expose directory
16+
* fsync from user-space Node; directory metadata durability falls
17+
* back to the operating system's own guarantees.
18+
*
19+
* Temporary files live in a dedicated `<baseDirectory>/.tmp/` directory
20+
* rather than as siblings of their target files. This keeps them on the
21+
* same filesystem as their targets (required for atomic rename) while
22+
* making them invisible to older adapters and to {@link loadRange} /
23+
* {@link load}, which are always prefix-scoped and are unlikely to walk
24+
* `.tmp/` since real-world keys won't shard into a `.t` prefix.
425
*/
526

627
import {
728
Chunk,
829
StorageAdapterInterface,
930
type StorageKey,
1031
} from "@automerge/automerge-repo/slim"
11-
import fs from "fs"
12-
import path from "path"
32+
import crypto from "node:crypto"
33+
import fs from "node:fs"
34+
import os from "node:os"
35+
import path from "node:path"
1336
import { rimraf } from "rimraf"
1437

38+
const IS_POSIX = os.platform() !== "win32"
39+
1540
export class NodeFSStorageAdapter implements StorageAdapterInterface {
1641
private baseDirectory: string
42+
private tmpDirectory: string
43+
private tmpDirectoryReady: Promise<void> | undefined
1744
private cache: { [key: string]: Uint8Array } = {}
1845

1946
/**
2047
* @param baseDirectory - The path to the directory to store data in. Defaults to "./automerge-repo-data".
2148
*/
2249
constructor(baseDirectory = "automerge-repo-data") {
2350
this.baseDirectory = baseDirectory
51+
this.tmpDirectory = path.join(baseDirectory, TMP_DIR_NAME)
52+
}
53+
54+
/**
55+
* Create `tmpDirectory` once (idempotent). Called lazily on the first
56+
* write so a read-only consumer of this adapter never creates the
57+
* `.tmp/` directory as a side effect of construction.
58+
*/
59+
private ensureTmpDirectory(): Promise<void> {
60+
if (!this.tmpDirectoryReady) {
61+
this.tmpDirectoryReady = fs.promises
62+
.mkdir(this.tmpDirectory, { recursive: true })
63+
.then(() => undefined)
64+
.catch(err => {
65+
// Reset so subsequent writes retry the mkdir. Otherwise a
66+
// transient failure would be remembered forever.
67+
this.tmpDirectoryReady = undefined
68+
throw err
69+
})
70+
}
71+
return this.tmpDirectoryReady
72+
}
73+
74+
/** Build a fresh unique path inside {@link tmpDirectory}. */
75+
private makeTmpPath(): string {
76+
return path.join(
77+
this.tmpDirectory,
78+
`${process.pid}.${crypto.randomUUID().replace(/-/g, "")}`
79+
)
2480
}
2581

2682
async load(keyArray: StorageKey): Promise<Uint8Array | undefined> {
@@ -33,31 +89,52 @@ export class NodeFSStorageAdapter implements StorageAdapterInterface {
3389
const fileContent = await fs.promises.readFile(filePath)
3490
return new Uint8Array(fileContent)
3591
} catch (error: any) {
36-
// don't throw if file not found
37-
if (error.code === "ENOENT") return undefined
92+
// Treat both "file not found" and "path component is not a
93+
// directory" as absent. ENOTDIR can surface when a key's
94+
// logical path passes through a location where some other
95+
// entry occupies the expected directory slot — from load()'s
96+
// perspective that still means "no file at this key".
97+
if (error.code === "ENOENT" || error.code === "ENOTDIR") {
98+
return undefined
99+
}
38100
throw error
39101
}
40102
}
41103

42104
async save(keyArray: StorageKey, binary: Uint8Array): Promise<void> {
105+
// Rollback semantics: if the rename has not yet completed, on-disk
106+
// state is unchanged and we roll the cache back to match. Once the
107+
// rename completes, on-disk state is the new bytes (visible to
108+
// concurrent readers), so we do NOT roll back — cache matches disk.
109+
// A subsequent fsyncDir failure means the rename may not be durable
110+
// across a crash, but the bytes are still present and observable;
111+
// rolling back in that case would make cache diverge from disk.
112+
// The caller learns about the durability gap via the rejection.
43113
const key = getKey(keyArray)
114+
const prev = this.cache[key]
44115
this.cache[key] = binary
45116

46117
const filePath = this.getFilePath(keyArray)
118+
const dir = path.dirname(filePath)
119+
120+
try {
121+
await this.ensureTmpDirectory()
122+
await fs.promises.mkdir(dir, { recursive: true })
123+
await atomicWrite(filePath, this.makeTmpPath(), binary)
124+
} catch (err) {
125+
rollbackCache(this.cache, key, prev)
126+
throw err
127+
}
47128

48-
await fs.promises.mkdir(path.dirname(filePath), { recursive: true })
49-
await fs.promises.writeFile(filePath, binary)
129+
await fsyncDir(dir)
50130
}
51131

52132
async remove(keyArray: string[]): Promise<void> {
53-
// remove from cache
54133
delete this.cache[getKey(keyArray)]
55-
// remove from disk
56134
const filePath = this.getFilePath(keyArray)
57135
try {
58136
await fs.promises.unlink(filePath)
59137
} catch (error: any) {
60-
// don't throw if file not found
61138
if (error.code !== "ENOENT") throw error
62139
}
63140
}
@@ -76,10 +153,12 @@ export class NodeFSStorageAdapter implements StorageAdapterInterface {
76153

77154
// The "keys" in the cache don't include the baseDirectory.
78155
// We want to de-dupe with the cached keys so we'll use getKey to normalize them.
79-
const diskKeys: string[] = diskFiles.map((fileName: string) => {
80-
const k = getKey([path.relative(this.baseDirectory, fileName)])
81-
return k.slice(0, 2) + k.slice(3)
82-
})
156+
const diskKeys: string[] = diskFiles
157+
.filter((fileName: string) => !isTmpPath(fileName))
158+
.map((fileName: string) => {
159+
const k = getKey([path.relative(this.baseDirectory, fileName)])
160+
return k.slice(0, 2) + k.slice(3)
161+
})
83162

84163
// Combine and deduplicate the lists of keys
85164
const allKeys = [...new Set([...cachedKeys, ...diskKeys])]
@@ -127,20 +206,179 @@ export class NodeFSStorageAdapter implements StorageAdapterInterface {
127206

128207
const getKey = (key: StorageKey): string => path.join(...key)
129208

209+
/**
210+
* Restore a cache entry to its value prior to a failed write.
211+
*
212+
* If the key had no prior entry (`prev === undefined`), the entry is
213+
* deleted entirely. Otherwise it is overwritten with the prior bytes.
214+
* This keeps the in-memory cache consistent with on-disk state when
215+
* save() rejects partway through.
216+
*/
217+
const rollbackCache = (
218+
cache: Record<string, Uint8Array>,
219+
key: string,
220+
prev: Uint8Array | undefined
221+
): void => {
222+
if (prev === undefined) {
223+
delete cache[key]
224+
} else {
225+
cache[key] = prev
226+
}
227+
}
228+
229+
/**
230+
* Name of the subdirectory under `baseDirectory` that holds in-flight
231+
* temporary files. Hidden by the leading dot. The `.t` two-character
232+
* prefix cannot collide with any real sharded storage key, which shards
233+
* by hex or base58 (see {@link NodeFSStorageAdapter.getFilePath}), so
234+
* an older adapter walking a shard subtree never descends here.
235+
*/
236+
const TMP_DIR_NAME = ".tmp"
237+
238+
/**
239+
* Matches the legacy tmp-file suffix format produced by earlier
240+
* versions of this adapter, when tmp files were siblings of their
241+
* target: `<target>.tmp.<pid>.<uuid-without-dashes>`. Current code
242+
* places tmp files in the {@link TMP_DIR_NAME} directory instead, so
243+
* this predicate exists only to filter stale siblings left behind by
244+
* crashes under the older layout during an upgrade window.
245+
*/
246+
const TMP_PATH_PATTERN = /\.tmp\.\d+\.[0-9a-f]{32}$/i
247+
248+
const isTmpPath = (p: string): boolean =>
249+
TMP_PATH_PATTERN.test(path.basename(p))
250+
251+
/**
252+
* Write `bytes` to `targetPath` atomically:
253+
* 1. write to `tmpPath` on the same filesystem as `targetPath`
254+
* 2. fsync the temporary file
255+
* 3. rename over the target
256+
*
257+
* On POSIX, rename is atomic with respect to concurrent readers and a
258+
* crash. On Windows NTFS, `fs.promises.rename` uses
259+
* `MoveFileExW(MOVEFILE_REPLACE_EXISTING)` which is atomic for
260+
* concurrent readers; post-crash state on Windows depends on NTFS and
261+
* the OS flush policy.
262+
*
263+
* `tmpPath` MUST be on the same filesystem as `targetPath`; otherwise
264+
* `rename` will fail with `EXDEV` and no fallback is attempted. In
265+
* practice callers construct `tmpPath` under `<baseDirectory>/.tmp/`,
266+
* so this holds automatically.
267+
*/
268+
const atomicWrite = async (
269+
targetPath: string,
270+
tmpPath: string,
271+
bytes: Uint8Array
272+
): Promise<void> => {
273+
const fh = await fs.promises.open(tmpPath, "w")
274+
let wroteTmp = false
275+
try {
276+
await fh.writeFile(bytes)
277+
await fh.sync()
278+
wroteTmp = true
279+
} finally {
280+
// fh.close() can itself throw (e.g. EIO). Wrap it in its own
281+
// try/finally so the tmp-file cleanup below still runs — otherwise
282+
// a close failure would mask the original error AND leak a tmp
283+
// file. We log the close error at debug level but don't re-throw;
284+
// the outer writeFile/sync error (if any) is the signal the caller
285+
// cares about.
286+
try {
287+
await fh.close()
288+
} catch (closeErr) {
289+
console.debug(
290+
`[automerge-repo-storage-nodefs] fh.close() failed for ${tmpPath}:`,
291+
closeErr
292+
)
293+
}
294+
if (!wroteTmp) {
295+
// Best-effort cleanup if writeFile/sync threw. The caller is
296+
// about to see the outer writeFile/sync error; the cleanup
297+
// failure almost certainly shares the same underlying cause
298+
// (e.g. EIO on the same filesystem), so we don't surface it
299+
// through the thrown error. A stray tmp file is filtered out by
300+
// loadRange and is otherwise harmless.
301+
try {
302+
await fs.promises.unlink(tmpPath)
303+
} catch (cleanupErr) {
304+
console.debug(
305+
`[automerge-repo-storage-nodefs] failed to clean up tmp file ${tmpPath}:`,
306+
cleanupErr
307+
)
308+
}
309+
}
310+
}
311+
312+
try {
313+
await fs.promises.rename(tmpPath, targetPath)
314+
} catch (err) {
315+
// If the rename failed, the tmp file is still lying around. Same
316+
// best-effort cleanup semantics as above.
317+
try {
318+
await fs.promises.unlink(tmpPath)
319+
} catch (cleanupErr) {
320+
console.debug(
321+
`[automerge-repo-storage-nodefs] failed to clean up tmp file ${tmpPath}:`,
322+
cleanupErr
323+
)
324+
}
325+
throw err
326+
}
327+
}
328+
329+
/**
330+
* `fsync` a directory so that recent `rename(2)` calls into it are
331+
* durable. POSIX only — opening a directory on Windows fails with
332+
* `EISDIR`, so we skip it and rely on the OS's native guarantees.
333+
*/
334+
const fsyncDir = async (dir: string): Promise<void> => {
335+
if (!IS_POSIX) return
336+
let fh: fs.promises.FileHandle | undefined
337+
try {
338+
fh = await fs.promises.open(dir, "r")
339+
await fh.sync()
340+
} catch (err: any) {
341+
// Some filesystems (tmpfs, certain network mounts) refuse fsync on
342+
// directories. Treat that as a best-effort guarantee rather than a
343+
// hard failure — the file fsync still gave us data durability.
344+
if (err.code !== "EISDIR" && err.code !== "EINVAL") throw err
345+
} finally {
346+
// Swallow close() failures so they can't turn a tolerated fsync
347+
// outcome into a hard failure. Symmetric with the close handling
348+
// in atomicWrite.
349+
if (fh) {
350+
try {
351+
await fh.close()
352+
} catch (closeErr) {
353+
console.debug(
354+
`[automerge-repo-storage-nodefs] fsyncDir close() failed for ${dir}:`,
355+
closeErr
356+
)
357+
}
358+
}
359+
}
360+
}
361+
130362
/** returns all files in a directory, recursively */
131363
const walkdir = async (dirPath: string): Promise<string[]> => {
132364
try {
133-
const entries = await fs.promises.readdir(dirPath, { withFileTypes: true })
365+
const entries = await fs.promises.readdir(dirPath, {
366+
withFileTypes: true,
367+
})
134368
const files = await Promise.all(
135369
entries.map(entry => {
370+
// Defensive: never descend into the tmp directory if walkdir is
371+
// ever invoked with `dirPath === baseDirectory`. Today loadRange
372+
// is always called with a prefix, so walkdir starts at a shard
373+
// subdirectory and this branch is effectively unreachable.
374+
if (entry.isDirectory() && entry.name === TMP_DIR_NAME) return []
136375
const subpath = path.resolve(dirPath, entry.name)
137376
return entry.isDirectory() ? walkdir(subpath) : subpath
138377
})
139378
)
140379
return files.flat()
141380
} catch (error: any) {
142-
// don't throw if directory not found
143-
if (error.code === "ENOENT") return []
381+
if (error.code === "ENOENT" || error.code === "ENOTDIR") return []
144382
throw error
145383
}
146384
}

0 commit comments

Comments
 (0)