diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index b4b00188b..03e796ec3 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -13,6 +13,7 @@ import { arrowToSQLField, arrowToSQLType } from '../json_typedef'; import { WebFile } from './web_file'; import { UDFFunction, UDFFunctionDeclaration } from './udf_function'; import * as arrow from 'apache-arrow'; +import { createOPFSTempPool } from '../utils/opfs_util'; const TEXT_ENCODER = new TextEncoder(); @@ -701,6 +702,21 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { throw new Error('Not an OPFS file name: ' + file); } } + + public async registerOPFSTempDir(tempPath?: string, maxPoolSize?: number, minPoolSize?: number): Promise { + // Access BROWSER_RUNTIME through the runtime field + const runtime = this._runtime as any; + + if (runtime._opfsTmpPool) { + await runtime._opfsTmpPool.destroy(); + runtime._opfsTmpPool = null; + } + + if (tempPath) { + runtime._opfsTmpPool = await createOPFSTempPool(tempPath, { maxUnused: maxPoolSize, minUnused: minPoolSize }); + } + } + public collectFileStatistics(file: string, enable: boolean): void { const [s, d, n] = callSRet(this.mod, 'duckdb_web_collect_file_stats', ['string', 'boolean'], [file, enable]); if (s !== StatusCode.SUCCESS) { diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 06e712f9a..7011c495f 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -63,6 +63,7 @@ export interface DuckDBBindings { copyFileToPath(name: string, path: string): void; copyFileToBuffer(name: string): Uint8Array; registerOPFSFileName(file: string): Promise; + registerOPFSTempDir(tempPath?: string, maxPoolSize?: number, minPoolSize?: number): Promise; collectFileStatistics(file: string, enable: boolean): void; exportFileStatistics(file: string): FileStatistics; } diff --git a/packages/duckdb-wasm/src/bindings/config.ts b/packages/duckdb-wasm/src/bindings/config.ts index a36e4ab5e..9cf69e303 100644 --- a/packages/duckdb-wasm/src/bindings/config.ts +++ b/packages/duckdb-wasm/src/bindings/config.ts @@ -40,6 +40,37 @@ export interface DuckDBOPFSConfig { * - "manual": Files must be manually registered and dropped. */ fileHandling?: "auto" | "manual"; + + /** + * OPFS path for temporary files (e.g., "opfs://tmp"). + * + * When set, a "pool" of pre-allocated temp files is maintained for use by + * DuckDB when it opens a tempfile on-demand. Pre-allocation of tempfiles is + * required when using OPFS due to the OPFS API providing only asynchronous + * file creation, while DuckDB's temp file creation must be synchronous. By + * maintaining a pool of pre-created temp files, DuckDB can synchronously + * claim a temp file from the pool when needed. + * + * `SET temp_directory='opfs://...` can also be used to initialize or change + * the temp directory at runtime when using "auto" fileHandling. + */ + tempPath?: string; + + /** + * Maximum number of pre-allocated file handles in the temp pool beyond + * returned files will be deleted when closed. + */ + tempPoolMax?: number; + + /** + * Minimum number of unused pre-allocated handles to maintain in any temp + * file pools. When a tempfile is opened from the pool causing the remaining + * unused handles to drop below this number, new handles will be created + * asynchronously in the background to refill the pool up to tempPoolMax. + * + * Must be less than tempPoolMax. + */ + tempPoolMin?: number; } export enum DuckDBAccessMode { diff --git a/packages/duckdb-wasm/src/bindings/runtime_browser.ts b/packages/duckdb-wasm/src/bindings/runtime_browser.ts index 34a1ed165..b1c54e893 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_browser.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_browser.ts @@ -16,6 +16,7 @@ import { } from './runtime'; import { DuckDBModule } from './duckdb_module'; import * as udf from './udf_runtime'; +import { TmpPool } from '../utils/opfs_util'; const OPFS_PREFIX_LEN = 'opfs://'.length; const PATH_SEP_REGEX = /\/|\\/; @@ -26,6 +27,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { _globalFileInfo: DuckDBGlobalFileInfo | null; _preparedHandles: Record; _opfsRoot: FileSystemDirectoryHandle | null; + _opfsTmpPool: TmpPool | null; getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null; getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null; @@ -37,6 +39,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { _globalFileInfo: null, _preparedHandles: {} as any, _opfsRoot: null, + _opfsTmpPool: null, getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null { try { @@ -417,7 +420,21 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return result; } case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); + let handle: FileSystemSyncAccessHandle | undefined = BROWSER_RUNTIME._files?.get(file.fileName); + + // Check if this file belongs to a registered temp directory. + // DuckDB creates temp files on-the-fly during query execution (for spilling), + // calling openFile() synchronously. Since OPFS file creation is async, we can't + // create new files here. Instead, we use pre-allocated file handles from the + // temp pool that was set up when the temp directory was registered. + if (!handle && BROWSER_RUNTIME._opfsTmpPool) { + const pool = BROWSER_RUNTIME._opfsTmpPool; + if (pool.matches(file.fileName)) { + handle = pool.openFile(file.fileName); + BROWSER_RUNTIME._files.set(file.fileName, handle); + } + } + if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } @@ -529,6 +546,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { closeFile: (mod: DuckDBModule, fileId: number) => { const file = BROWSER_RUNTIME.getFileInfo(mod, fileId); BROWSER_RUNTIME._fileInfoCache.delete(fileId); + try { switch (file?.dataProtocol) { case DuckDBDataProtocol.BUFFER: @@ -556,6 +574,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { const fileName = readString(mod, fileNamePtr, fileNameLen); const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(fileName); if (handle) { + if (BROWSER_RUNTIME._opfsTmpPool && BROWSER_RUNTIME._opfsTmpPool.matches(fileName)) { + BROWSER_RUNTIME._opfsTmpPool.dropFile(fileName); + return + } BROWSER_RUNTIME._files.delete(fileName); if (handle instanceof FileSystemSyncAccessHandle) { try { @@ -769,7 +791,13 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } return true; }, - removeFile: (_mod: DuckDBModule, _pathPtr: number, _pathLen: number) => {}, + removeFile: (mod: DuckDBModule, pathPtr: number, pathLen: number) => { + const path = readString(mod, pathPtr, pathLen); + + if (BROWSER_RUNTIME._opfsTmpPool && BROWSER_RUNTIME._opfsTmpPool.matches(path)) { + BROWSER_RUNTIME._opfsTmpPool.dropFile(path); + } + }, callScalarUDF: ( mod: DuckDBModule, response: number, diff --git a/packages/duckdb-wasm/src/parallel/async_bindings.ts b/packages/duckdb-wasm/src/parallel/async_bindings.ts index 0ca302d8c..9aaa5934b 100644 --- a/packages/duckdb-wasm/src/parallel/async_bindings.ts +++ b/packages/duckdb-wasm/src/parallel/async_bindings.ts @@ -179,6 +179,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { case WorkerRequestType.CLOSE_PREPARED: case WorkerRequestType.COLLECT_FILE_STATISTICS: case WorkerRequestType.REGISTER_OPFS_FILE_NAME: + case WorkerRequestType.REGISTER_OPFS_TEMP_DIR: case WorkerRequestType.COPY_FILE_TO_PATH: case WorkerRequestType.DISCONNECT: case WorkerRequestType.DROP_FILE: @@ -386,6 +387,19 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { this._config = config; const task = new WorkerTask(WorkerRequestType.OPEN, config); await this.postTask(task); + + // If OPFS temp path is configured, eagerly initialize the pool + if (config.opfs?.tempPath) { + await this.registerOPFSTempDir(config.opfs.tempPath, config.opfs.tempPoolMax, config.opfs.tempPoolMin); + + // Configure DuckDB to use this temp directory + const conn = await this.connect(); + try { + await conn.send(`SET temp_directory = '${config.opfs.tempPath}'`); + } finally { + await conn.close(); + } + } } /** Tokenize a script text */ @@ -614,6 +628,13 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { ); await this.postTask(task, []); } + public async registerOPFSTempDir(name?: string, maxPoolSize?: number, minPoolSize?: number): Promise { + const task = new WorkerTask( + WorkerRequestType.REGISTER_OPFS_TEMP_DIR, + [name, maxPoolSize, minPoolSize], + ); + await this.postTask(task, []); + } /** Enable file statistics */ public async collectFileStatistics(name: string, enable: boolean): Promise { @@ -715,6 +736,18 @@ export class AsyncDuckDB implements AsyncDuckDBBindings { } private async registerOPFSFileFromSQL(text: string) { + const opfsTempDir = text.match(/(?:SET|PRAGMA)\s+temp_directory\s*=\s*['"]?(opfs:\/\/[^\s'";]+)['"]?/i); + if (opfsTempDir ) { + const newPath = opfsTempDir[1]; + + // Register the temp directory with the worker using config sizes if available + await this.registerOPFSTempDir(newPath, this._config?.opfs?.tempPoolMax, this._config?.opfs?.tempPoolMin); + // Remove the 'SET' or 'PRAGMA' temp_directory from the text that is + // searched for opfs urls to avoid detecting and attempting to + // register the temp *directory* as a file. + text = text.replace(opfsTempDir[0], ""); + } + const files = searchOPFSFiles(text); const result: string[] = []; for (const file of files) { diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index a030999f4..e983efd73 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -365,7 +365,10 @@ export abstract class AsyncDuckDBDispatcher implements Logger { await this._bindings.registerOPFSFileName(request.data[0]); this.sendOK(request); break; - + case WorkerRequestType.REGISTER_OPFS_TEMP_DIR: + await this._bindings.registerOPFSTempDir(request.data[0], request.data[1], request.data[2]); + this.sendOK(request); + break; case WorkerRequestType.EXPORT_FILE_STATISTICS: { this.postMessage( { diff --git a/packages/duckdb-wasm/src/parallel/worker_request.ts b/packages/duckdb-wasm/src/parallel/worker_request.ts index d0e1b5c35..dd017d438 100644 --- a/packages/duckdb-wasm/src/parallel/worker_request.ts +++ b/packages/duckdb-wasm/src/parallel/worker_request.ts @@ -15,6 +15,7 @@ export enum WorkerRequestType { CLOSE_PREPARED = 'CLOSE_PREPARED', COLLECT_FILE_STATISTICS = 'COLLECT_FILE_STATISTICS', REGISTER_OPFS_FILE_NAME = 'REGISTER_OPFS_FILE_NAME', + REGISTER_OPFS_TEMP_DIR = 'REGISTER_OPFS_TEMP_DIR', CONNECT = 'CONNECT', COPY_FILE_TO_BUFFER = 'COPY_FILE_TO_BUFFER', COPY_FILE_TO_PATH = 'COPY_FILE_TO_PATH', @@ -111,6 +112,7 @@ export type WorkerRequestVariant = | WorkerRequest | WorkerRequest | WorkerRequest + | WorkerRequest | WorkerRequest | WorkerRequest | WorkerRequest @@ -171,6 +173,7 @@ export type WorkerResponseVariant = export type WorkerTaskVariant = | WorkerTask | WorkerTask + | WorkerTask | WorkerTask | WorkerTask | WorkerTask diff --git a/packages/duckdb-wasm/src/utils/opfs_util.ts b/packages/duckdb-wasm/src/utils/opfs_util.ts index 822eb4a7c..039bc25e6 100644 --- a/packages/duckdb-wasm/src/utils/opfs_util.ts +++ b/packages/duckdb-wasm/src/utils/opfs_util.ts @@ -7,4 +7,126 @@ export function isOPFSProtocol(path: string): boolean { export function searchOPFSFiles(text: string) { return [...text.matchAll(REGEX_OPFS_FILE)].map(match => match[1]); -} \ No newline at end of file +} + +type HandleEntry = { name: string; handle: FileSystemSyncAccessHandle }; + +export type TmpPoolConfig = { maxUnused?: number; minUnused?: number }; + +/** + * TmpPool manages pre-allocated OPFS file handles for temporary files. + * + * DuckDB's file operations (openFile, dropFile, etc.) are synchronous C++ calls + * that cannot be made async. However, OPFS file creation requires async APIs. + * When DuckDB needs to create temporary files for spilling data to disk, it + * calls openFile synchronously, expecting an immediate file handle. + * + * To work around this API mismatch, we pre-create a pool of OPFS files with + * their sync access handles during the async temp directory registration. When + * DuckDB needs a temp file, we can synchronously hand out one of these + * pre-created handles from the pool. When the file is closed, we clear it and + * return it to the pool for reuse. + */ +export class TmpPool { + public readonly path: string; + private dir: FileSystemDirectoryHandle; + private maxUnused: number; + private minUnused: number; + private pool: HandleEntry[] = []; // unused files. + private openMap = new Map(); // checked out files. + private nextId = 1; + private refillInFlight = false; + + constructor( + path: string, + dir: FileSystemDirectoryHandle, + maxUnused: number = 4, + minUnused: number = 2 + ) { + if (minUnused >= maxUnused) throw new Error("minUnused must be < maxUnused"); + this.path = canonicalDirUrl(path); + this.dir = dir; + this.maxUnused = maxUnused; + this.minUnused = minUnused; + } + + async init(): Promise { await this.refillTo(this.maxUnused); } + + matches(path: string): boolean { + const canonical = canonicalDirUrl(path); + return canonical === this.path || canonical.startsWith(this.path); + } + + openFile(path: string): FileSystemSyncAccessHandle { + const existing = this.openMap.get(path); if (existing) return existing.handle; + if (this.pool.length === 0) throw new Error("OPFS tmp pool exhausted"); + const entry = this.pool.pop()!; this.openMap.set(path, entry); + if (this.pool.length < this.minUnused) this.maybeRefillAsync(); + return entry.handle; + } + + dropFile(path: string): void { + const entry = this.openMap.get(path); if (!entry) return; + entry.handle.flush(); + + if (this.pool.length >= this.maxUnused) { + this.asyncDelete(entry).catch(() => {}); + } else { + entry.handle.truncate(0); + this.pool.push(entry); + } + this.openMap.delete(path); + } + + async destroy(): Promise { + await Promise.all(this.pool.splice(0).map(e => this.asyncDelete(e))); + } + + private async createEntry(): Promise { + const name = `tmp${this.nextId++}`; + const fh = await this.dir.getFileHandle(name, { create: true }); + const sah = await fh.createSyncAccessHandle(); + sah.truncate(0); + return { name, handle: sah }; + } + private async refillTo(target: number): Promise { + while (this.pool.length < target) { + const e = await this.createEntry(); this.pool.push(e); + } + } + private maybeRefillAsync() { + if (this.refillInFlight) return; + this.refillInFlight = true; + this.refillTo(this.maxUnused).finally(() => { this.refillInFlight = false; }); + } + private async asyncDelete(e: HandleEntry) { + try { e.handle.flush(); } catch { /* ignore errors */ } + try { e.handle.close(); } catch { /* ignore errors */ } + try { await this.dir.removeEntry(e.name); } catch { /* ignore errors */ } + } +} + +export function canonicalDirUrl(url: string) { + return url.replace(/\/+$/, ""); +} + +export async function resolveOpfsDirectory(opfsUrl: string): Promise { + const root = await (navigator as any).storage.getDirectory(); + const rel = opfsUrl.slice("opfs://".length).replace(/^\/+/, ""); + const parts = rel.split("/").filter(Boolean); + let dir: FileSystemDirectoryHandle = root; + for (const p of parts) { + dir = await dir.getDirectoryHandle(p, { create: true }); + } + return dir; +} + +export async function createOPFSTempPool(opfsDirUrl: string, cfg: TmpPoolConfig = {}) : Promise { + const key = canonicalDirUrl(opfsDirUrl); + const dir = await resolveOpfsDirectory(key); + const maxUnused = cfg.maxUnused ?? 10; + const minUnused = cfg.minUnused ?? 2; + const pool = new TmpPool(key, dir, maxUnused, minUnused); + await pool.init(); + return pool; +} diff --git a/packages/duckdb-wasm/test/opfs.test.ts b/packages/duckdb-wasm/test/opfs.test.ts index 2fddc8cfb..91d857ace 100644 --- a/packages/duckdb-wasm/test/opfs.test.ts +++ b/packages/duckdb-wasm/test/opfs.test.ts @@ -57,6 +57,75 @@ export function testOPFS(baseDir: string, bundle: () => DuckDBBundle): void { await removeFiles(); }); + + describe('OPFS Spilling', () => { + const spillQuery = `SELECT COUNT(*) FROM (SELECT i FROM range(0, 800_000) i ORDER BY hash(i) LIMIT 100_000)`; + + it('via tempPath specified in db.open', async () => { + await conn.close(); await db.reset(); + + await db.open({ + path: 'opfs://spilltest/duck.db', + accessMode: DuckDBAccessMode.READ_WRITE, + opfs: {tempPath: 'opfs://spilltest/tmp'}, + }); + conn = await db.connect(); + await conn.send(`PRAGMA memory_limit='4MB'; SET max_temp_directory_size = '100MiB'`); + + const res = await conn.send(spillQuery); + await res.next(); + }); + + it('via SET temp_directory', async () => { + await conn.close(); await db.reset(); + + await db.open({ + path: 'opfs://spilltest/duck.db', + accessMode: DuckDBAccessMode.READ_WRITE, + opfs: { + // No tempPath here, but in "auto" mode an opfs URL in a SET + // command is automatically detected to initialize the pool. + fileHandling: "auto", + }, + }); + conn = await db.connect(); + await conn.send(`PRAGMA memory_limit='4MB'; SET max_temp_directory_size = '100MiB'`); + + // Set (and then just to exercise re-setting reset) the temp dir via + // SET. NB: the DB only allows changing temp prior to first usage. + await conn.send(`SET temp_directory = 'opfs://spilltest/other'`); + await conn.send(`SET temp_directory = 'opfs://spilltest/tmp'`); + + const res = await conn.send(spillQuery); + await res.next(); + }); + + it('via repeated concurrent spills', async () => { + await conn.close(); + await db.reset(); + + await db.open({ + path: 'opfs://spilltest/duck.db', + accessMode: DuckDBAccessMode.READ_WRITE, + opfs: { tempPath: 'opfs://spilltest/tmp', tempPoolMax: 6, tempPoolMin: 3, }, + }); + conn = await db.connect(); + await conn.send(`PRAGMA memory_limit='12MB'; SET max_temp_directory_size = '100MiB'`); + await conn.close(); + + // Multiple rounds of running concurrent spilling queries, exercising + // the temp pool's refill and closed file return mechanics. + for (let i = 0; i < 5; i++) { + await Promise.all(Array.from({ length: 4 }, async () => { + const c = await db.connect(); + const res = await c.send(spillQuery); + await res.next(); + await c.close(); + })); + } + }); + }); + describe('Load Data in OPFS', () => { it('Import Small Parquet file', async () => { //1. data preparation @@ -405,6 +474,7 @@ export function testOPFS(baseDir: string, bundle: () => DuckDBBundle): void { await opfsRoot.removeEntry('test2.csv').catch(_ignore); await opfsRoot.removeEntry('test3.csv').catch(_ignore); await opfsRoot.removeEntry('test.parquet').catch(_ignore); + await opfsRoot.removeEntry('spilltest', { recursive: true }).catch(_ignore); try { const datadir = await opfsRoot.getDirectoryHandle('datadir'); datadir.removeEntry('test.parquet').catch(_ignore);