From e8f8302127239da634d8fd18d5334a813f7dbeae Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 13 Nov 2025 23:31:02 +0000 Subject: [PATCH] opfs: enable opfs temp_directory via pre-registered pool Previously attempting to set temp_directory to an opfs:// path did not work. Specifically, while 'auto' file handling scanned SQL stmts for opfs:// paths to pre-open mentioned files via the async OPFS API and register them where the sync openFile call could find them, it only worked for specific _files_, not for a temp directory _in which_ open file would later attempt to create files. The sync vs async APIs of openFile vs OPFS presents a challenge here: when the DB decides to spill to a tempfile and attempts to create it, it does so via the openFile call, which is a sync call. This makes creating the OPFS file at that point, via the async OPFS API, a problem. To get around this, this change, when an OPFS temp directory is registered, sets up a 'temp pool' of pre-created files, with sync access handles, which can be handed out as needed in openFile calls. When closed, a file can be truncated and returned to the pool, or deleted if the pool is already full. When the pool runs low, new files can be opened and added to it async. This approach works around the sync vs async API mismatch, while still allowing openFile calls to create arbitrarily named -- from its point of view -- files at arbitrary times. Fixes #2061. --- .../duckdb-wasm/src/bindings/bindings_base.ts | 16 +++ .../src/bindings/bindings_interface.ts | 1 + packages/duckdb-wasm/src/bindings/config.ts | 31 +++++ .../src/bindings/runtime_browser.ts | 32 ++++- .../src/parallel/async_bindings.ts | 33 +++++ .../src/parallel/worker_dispatcher.ts | 5 +- .../src/parallel/worker_request.ts | 3 + packages/duckdb-wasm/src/utils/opfs_util.ts | 124 +++++++++++++++++- packages/duckdb-wasm/test/opfs.test.ts | 70 ++++++++++ 9 files changed, 311 insertions(+), 4 deletions(-) diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index b4b00188b..03e796ec3 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -13,6 +13,7 @@ import { arrowToSQLField, arrowToSQLType } from '../json_typedef'; import { WebFile } from './web_file'; import { UDFFunction, UDFFunctionDeclaration } from './udf_function'; import * as arrow from 'apache-arrow'; +import { createOPFSTempPool } from '../utils/opfs_util'; const TEXT_ENCODER = new TextEncoder(); @@ -701,6 +702,21 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { throw new Error('Not an OPFS file name: ' + file); } } + + public async registerOPFSTempDir(tempPath?: string, maxPoolSize?: number, minPoolSize?: number): Promise { + // Access BROWSER_RUNTIME through the runtime field + const runtime = this._runtime as any; + + if (runtime._opfsTmpPool) { + await runtime._opfsTmpPool.destroy(); + runtime._opfsTmpPool = null; + } + + if (tempPath) { + runtime._opfsTmpPool = await createOPFSTempPool(tempPath, { maxUnused: maxPoolSize, minUnused: minPoolSize }); + } + } + public collectFileStatistics(file: string, enable: boolean): void { const [s, d, n] = callSRet(this.mod, 'duckdb_web_collect_file_stats', ['string', 'boolean'], [file, enable]); if (s !== StatusCode.SUCCESS) { diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 06e712f9a..7011c495f 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -63,6 +63,7 @@ export interface DuckDBBindings { copyFileToPath(name: string, path: string): void; copyFileToBuffer(name: string): Uint8Array; registerOPFSFileName(file: string): Promise; + registerOPFSTempDir(tempPath?: string, maxPoolSize?: number, minPoolSize?: number): Promise; collectFileStatistics(file: string, enable: boolean): void; exportFileStatistics(file: string): FileStatistics; } diff --git a/packages/duckdb-wasm/src/bindings/config.ts b/packages/duckdb-wasm/src/bindings/config.ts index a36e4ab5e..9cf69e303 100644 --- a/packages/duckdb-wasm/src/bindings/config.ts +++ b/packages/duckdb-wasm/src/bindings/config.ts @@ -40,6 +40,37 @@ export interface DuckDBOPFSConfig { * - "manual": Files must be manually registered and dropped. */ fileHandling?: "auto" | "manual"; + + /** + * OPFS path for temporary files (e.g., "opfs://tmp"). + * + * When set, a "pool" of pre-allocated temp files is maintained for use by + * DuckDB when it opens a tempfile on-demand. Pre-allocation of tempfiles is + * required when using OPFS due to the OPFS API providing only asynchronous + * file creation, while DuckDB's temp file creation must be synchronous. By + * maintaining a pool of pre-created temp files, DuckDB can synchronously + * claim a temp file from the pool when needed. + * + * `SET temp_directory='opfs://...` can also be used to initialize or change + * the temp directory at runtime when using "auto" fileHandling. + */ + tempPath?: string; + + /** + * Maximum number of pre-allocated file handles in the temp pool beyond + * returned files will be deleted when closed. + */ + tempPoolMax?: number; + + /** + * Minimum number of unused pre-allocated handles to maintain in any temp + * file pools. When a tempfile is opened from the pool causing the remaining + * unused handles to drop below this number, new handles will be created + * asynchronously in the background to refill the pool up to tempPoolMax. + * + * Must be less than tempPoolMax. + */ + tempPoolMin?: number; } export enum DuckDBAccessMode { diff --git a/packages/duckdb-wasm/src/bindings/runtime_browser.ts b/packages/duckdb-wasm/src/bindings/runtime_browser.ts index 34a1ed165..b1c54e893 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_browser.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_browser.ts @@ -16,6 +16,7 @@ import { } from './runtime'; import { DuckDBModule } from './duckdb_module'; import * as udf from './udf_runtime'; +import { TmpPool } from '../utils/opfs_util'; const OPFS_PREFIX_LEN = 'opfs://'.length; const PATH_SEP_REGEX = /\/|\\/; @@ -26,6 +27,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { _globalFileInfo: DuckDBGlobalFileInfo | null; _preparedHandles: Record; _opfsRoot: FileSystemDirectoryHandle | null; + _opfsTmpPool: TmpPool | null; getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null; getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null; @@ -37,6 +39,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { _globalFileInfo: null, _preparedHandles: {} as any, _opfsRoot: null, + _opfsTmpPool: null, getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null { try { @@ -417,7 +420,21 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return result; } case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); + let handle: FileSystemSyncAccessHandle | undefined = BROWSER_RUNTIME._files?.get(file.fileName); + + // Check if this file belongs to a registered temp directory. + // DuckDB creates temp files on-the-fly during query execution (for spilling), + // calling openFile() synchronously. Since OPFS file creation is async, we can't + // create new files here. Instead, we use pre-allocated file handles from the + // temp pool that was set up when the temp directory was registered. + if (!handle && BROWSER_RUNTIME._opfsTmpPool) { + const pool = BROWSER_RUNTIME._opfsTmpPool; + if (pool.matches(file.fileName)) { + handle = pool.openFile(file.fileName); + BROWSER_RUNTIME._files.set(file.fileName, handle); + } + } + if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } @@ -529,6 +546,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { closeFile: (mod: DuckDBModule, fileId: number) => { const file = BROWSER_RUNTIME.getFileInfo(mod, fileId); BROWSER_RUNTIME._fileInfoCache.delete(fileId); + try { switch (file?.dataProtocol) { case DuckDBDataProtocol.BUFFER: @@ -556,6 +574,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { const fileName = readString(mod, fileNamePtr, fileNameLen); const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(fileName); if (handle) { + if (BROWSER_RUNTIME._opfsTmpPool && BROWSER_RUNTIME._opfsTmpPool.matches(fileName)) { + BROWSER_RUNTIME._opfsTmpPool.dropFile(fileName); + return + } BROWSER_RUNTIME._files.delete(fileName); if (handle instanceof FileSystemSyncAccessHandle) { try { @@ -769,7 +791,13 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } return true; }, - removeFile: (_mod: DuckDBModule, _pathPtr: number, _pathLen: number) => {}, + removeFile: (mod: DuckDBModule, pathPtr: number, pathLen: number) => { + const path = readString(mod, pathPtr, pathLen); + + if (BROWSER_RUNTIME._opfsTmpPool && BROWSER_RUNTIME._opfsTmpPool.matches(path)) { + BROWSER_RUNTIME._opfsTmpPool.dropFile(path); + } + }, callScalarUDF: ( mod: DuckDBModule, response: number, diff --git a/packages/duckdb-wasm/src/parallel/async_bindings.ts b/packages/duckdb-wasm/src/parallel/async_bindings.ts index 0ca302d8c..9aaa5934b 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings.ts @@ -179,6 +179,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { case WorkerRequestType.CLOSE_PREPARED: case WorkerRequestType.COLLECT_FILE_STATISTICS: case WorkerRequestType.REGISTER_OPFS_FILE_NAME: + case WorkerRequestType.REGISTER_OPFS_TEMP_DIR: case WorkerRequestType.COPY_FILE_TO_PATH: case WorkerRequestType.DISCONNECT: case WorkerRequestType.DROP_FILE: @@ -386,6 +387,19 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { this._config = config; const task = new WorkerTask(WorkerRequestType.OPEN, config); await this.postTask(task); + + // If OPFS temp path is configured, eagerly initialize the pool + if (config.opfs?.tempPath) { + await this.registerOPFSTempDir(config.opfs.tempPath, config.opfs.tempPoolMax, config.opfs.tempPoolMin); + + // Configure DuckDB to use this temp directory + const conn = await this.connect(); + try { + await conn.send(`SET temp_directory = '${config.opfs.tempPath}'`); + } finally { + await conn.close(); + } + } } /** Tokenize a script text */ @@ -614,6 +628,13 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { ); await this.postTask(task, []); } + public async registerOPFSTempDir(name?: string, maxPoolSize?: number, minPoolSize?: number): Promise { + const task = new WorkerTask( + WorkerRequestType.REGISTER_OPFS_TEMP_DIR, + [name, maxPoolSize, minPoolSize], + ); + await this.postTask(task, []); + } /** Enable file statistics */ public async collectFileStatistics(name: string, enable: boolean): Promise { @@ -715,6 +736,18 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { } private async registerOPFSFileFromSQL(text: string) { + const opfsTempDir = text.match(/(?:SET|PRAGMA)\s+temp_directory\s*=\s*['"]?(opfs:\/\/[^\s'";]+)['"]?/i); + if (opfsTempDir ) { + const newPath = opfsTempDir[1]; + + // Register the temp directory with the worker using config sizes if available + await this.registerOPFSTempDir(newPath, this._config?.opfs?.tempPoolMax, this._config?.opfs?.tempPoolMin); + // Remove the 'SET' or 'PRAGMA' temp_directory from the text that is + // searched for opfs urls to avoid detecting and attempting to + // register the temp *directory* as a file. + text = text.replace(opfsTempDir[0], ""); + } + const files = searchOPFSFiles(text); const result: string[] = []; for (const file of files) { diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index a030999f4..e983efd73 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -365,7 +365,10 @@ export abstract class AsyncDuckDBDispatcher implements Logger { await this._bindings.registerOPFSFileName(request.data[0]); this.sendOK(request); break; - + case WorkerRequestType.REGISTER_OPFS_TEMP_DIR: + await this._bindings.registerOPFSTempDir(request.data[0], request.data[1], request.data[2]); + this.sendOK(request); + break; case WorkerRequestType.EXPORT_FILE_STATISTICS: { this.postMessage( { diff --git a/packages/duckdb-wasm/src/parallel/worker_request.ts b/packages/duckdb-wasm/src/parallel/worker_request.ts index d0e1b5c35..dd017d438 100644 --- a/packages/duckdb-wasm/src/parallel/worker_request.ts +++ b/packages/duckdb-wasm/src/parallel/worker_request.ts @@ -15,6 +15,7 @@ export enum WorkerRequestType { CLOSE_PREPARED = 'CLOSE_PREPARED', COLLECT_FILE_STATISTICS = 'COLLECT_FILE_STATISTICS', REGISTER_OPFS_FILE_NAME = 'REGISTER_OPFS_FILE_NAME', + REGISTER_OPFS_TEMP_DIR = 'REGISTER_OPFS_TEMP_DIR', CONNECT = 'CONNECT', COPY_FILE_TO_BUFFER = 'COPY_FILE_TO_BUFFER', COPY_FILE_TO_PATH = 'COPY_FILE_TO_PATH', @@ -111,6 +112,7 @@ export type WorkerRequestVariant = | WorkerRequest | WorkerRequest | WorkerRequest + | WorkerRequest | WorkerRequest | WorkerRequest | WorkerRequest @@ -171,6 +173,7 @@ export type WorkerResponseVariant = export type WorkerTaskVariant = | WorkerTask | WorkerTask + | WorkerTask | WorkerTask | WorkerTask | WorkerTask diff --git a/packages/duckdb-wasm/src/utils/opfs_util.ts b/packages/duckdb-wasm/src/utils/opfs_util.ts index 822eb4a7c..039bc25e6 100644 --- a/packages/duckdb-wasm/src/utils/opfs_util.ts +++ b/packages/duckdb-wasm/src/utils/opfs_util.ts @@ -7,4 +7,126 @@ export function isOPFSProtocol(path: string): boolean { export function searchOPFSFiles(text: string) { return [...text.matchAll(REGEX_OPFS_FILE)].map(match => match[1]); -} \ No newline at end of file +} + +type HandleEntry = { name: string; handle: FileSystemSyncAccessHandle }; + +export type TmpPoolConfig = { maxUnused?: number; minUnused?: number }; + +/** + * TmpPool manages pre-allocated OPFS file handles for temporary files. + * + * DuckDB's file operations (openFile, dropFile, etc.) are synchronous C++ calls + * that cannot be made async. However, OPFS file creation requires async APIs. + * When DuckDB needs to create temporary files for spilling data to disk, it + * calls openFile synchronously, expecting an immediate file handle. + * + * To work around this API mismatch, we pre-create a pool of OPFS files with + * their sync access handles during the async temp directory registration. When + * DuckDB needs a temp file, we can synchronously hand out one of these + * pre-created handles from the pool. When the file is closed, we clear it and + * return it to the pool for reuse. + */ +export class TmpPool { + public readonly path: string; + private dir: FileSystemDirectoryHandle; + private maxUnused: number; + private minUnused: number; + private pool: HandleEntry[] = []; // unused files. + private openMap = new Map(); // checked out files. + private nextId = 1; + private refillInFlight = false; + + constructor( + path: string, + dir: FileSystemDirectoryHandle, + maxUnused: number = 4, + minUnused: number = 2 + ) { + if (minUnused >= maxUnused) throw new Error("minUnused must be < maxUnused"); + this.path = canonicalDirUrl(path); + this.dir = dir; + this.maxUnused = maxUnused; + this.minUnused = minUnused; + } + + async init(): Promise { await this.refillTo(this.maxUnused); } + + matches(path: string): boolean { + const canonical = canonicalDirUrl(path); + return canonical === this.path || canonical.startsWith(this.path); + } + + openFile(path: string): FileSystemSyncAccessHandle { + const existing = this.openMap.get(path); if (existing) return existing.handle; + if (this.pool.length === 0) throw new Error("OPFS tmp pool exhausted"); + const entry = this.pool.pop()!; this.openMap.set(path, entry); + if (this.pool.length < this.minUnused) this.maybeRefillAsync(); + return entry.handle; + } + + dropFile(path: string): void { + const entry = this.openMap.get(path); if (!entry) return; + entry.handle.flush(); + + if (this.pool.length >= this.maxUnused) { + this.asyncDelete(entry).catch(() => {}); + } else { + entry.handle.truncate(0); + this.pool.push(entry); + } + this.openMap.delete(path); + } + + async destroy(): Promise { + await Promise.all(this.pool.splice(0).map(e => this.asyncDelete(e))); + } + + private async createEntry(): Promise { + const name = `tmp${this.nextId++}`; + const fh = await this.dir.getFileHandle(name, { create: true }); + const sah = await fh.createSyncAccessHandle(); + sah.truncate(0); + return { name, handle: sah }; + } + private async refillTo(target: number): Promise { + while (this.pool.length < target) { + const e = await this.createEntry(); this.pool.push(e); + } + } + private maybeRefillAsync() { + if (this.refillInFlight) return; + this.refillInFlight = true; + this.refillTo(this.maxUnused).finally(() => { this.refillInFlight = false; }); + } + private async asyncDelete(e: HandleEntry) { + try { e.handle.flush(); } catch { /* ignore errors */ } + try { e.handle.close(); } catch { /* ignore errors */ } + try { await this.dir.removeEntry(e.name); } catch { /* ignore errors */ } + } +} + +export function canonicalDirUrl(url: string) { + return url.replace(/\/+$/, ""); +} + +export async function resolveOpfsDirectory(opfsUrl: string): Promise { + const root = await (navigator as any).storage.getDirectory(); + const rel = opfsUrl.slice("opfs://".length).replace(/^\/+/, ""); + const parts = rel.split("/").filter(Boolean); + let dir: FileSystemDirectoryHandle = root; + for (const p of parts) { + dir = await dir.getDirectoryHandle(p, { create: true }); + } + return dir; +} + +export async function createOPFSTempPool(opfsDirUrl: string, cfg: TmpPoolConfig = {}) : Promise { + const key = canonicalDirUrl(opfsDirUrl); + const dir = await resolveOpfsDirectory(key); + const maxUnused = cfg.maxUnused ?? 10; + const minUnused = cfg.minUnused ?? 2; + const pool = new TmpPool(key, dir, maxUnused, minUnused); + await pool.init(); + return pool; +} diff --git a/packages/duckdb-wasm/test/opfs.test.ts b/packages/duckdb-wasm/test/opfs.test.ts index 2fddc8cfb..91d857ace 100644 --- a/packages/duckdb-wasm/test/opfs.test.ts +++ b/packages/duckdb-wasm/test/opfs.test.ts @@ -57,6 +57,75 @@ export function testOPFS(baseDir: string, bundle: () => DuckDBBundle): void { await removeFiles(); }); + + describe('OPFS Spilling', () => { + const spillQuery = `SELECT COUNT(*) FROM (SELECT i FROM range(0, 800_000) i ORDER BY hash(i) LIMIT 100_000)`; + + it('via tempPath specified in db.open', async () => { + await conn.close(); await db.reset(); + + await db.open({ + path: 'opfs://spilltest/duck.db', + accessMode: DuckDBAccessMode.READ_WRITE, + opfs: {tempPath: 'opfs://spilltest/tmp'}, + }); + conn = await db.connect(); + await conn.send(`PRAGMA memory_limit='4MB'; SET max_temp_directory_size = '100MiB'`); + + const res = await conn.send(spillQuery); + await res.next(); + }); + + it('via SET temp_directory', async () => { + await conn.close(); await db.reset(); + + await db.open({ + path: 'opfs://spilltest/duck.db', + accessMode: DuckDBAccessMode.READ_WRITE, + opfs: { + // No tempPath here, but in "auto" mode an opfs URL in a SET + // command is automatically detected to initialize the pool. + fileHandling: "auto", + }, + }); + conn = await db.connect(); + await conn.send(`PRAGMA memory_limit='4MB'; SET max_temp_directory_size = '100MiB'`); + + // Set (and then just to exercise re-setting reset) the temp dir via + // SET. NB: the DB only allows changing temp prior to first usage. + await conn.send(`SET temp_directory = 'opfs://spilltest/other'`); + await conn.send(`SET temp_directory = 'opfs://spilltest/tmp'`); + + const res = await conn.send(spillQuery); + await res.next(); + }); + + it('via repeated concurrent spills', async () => { + await conn.close(); + await db.reset(); + + await db.open({ + path: 'opfs://spilltest/duck.db', + accessMode: DuckDBAccessMode.READ_WRITE, + opfs: { tempPath: 'opfs://spilltest/tmp', tempPoolMax: 6, tempPoolMin: 3, }, + }); + conn = await db.connect(); + await conn.send(`PRAGMA memory_limit='12MB'; SET max_temp_directory_size = '100MiB'`); + await conn.close(); + + // Multiple rounds of running concurrent spilling queries, exercising + // the temp pool's refill and closed file return mechanics. + for (let i = 0; i < 5; i++) { + await Promise.all(Array.from({ length: 4 }, async () => { + const c = await db.connect(); + const res = await c.send(spillQuery); + await res.next(); + await c.close(); + })); + } + }); + }); + describe('Load Data in OPFS', () => { it('Import Small Parquet file', async () => { //1. data preparation @@ -405,6 +474,7 @@ export function testOPFS(baseDir: string, bundle: () => DuckDBBundle): void { await opfsRoot.removeEntry('test2.csv').catch(_ignore); await opfsRoot.removeEntry('test3.csv').catch(_ignore); await opfsRoot.removeEntry('test.parquet').catch(_ignore); + await opfsRoot.removeEntry('spilltest', { recursive: true }).catch(_ignore); try { const datadir = await opfsRoot.getDirectoryHandle('datadir'); datadir.removeEntry('test.parquet').catch(_ignore);