Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { arrowToSQLField, arrowToSQLType } from '../json_typedef';
import { WebFile } from './web_file';
import { UDFFunction, UDFFunctionDeclaration } from './udf_function';
import * as arrow from 'apache-arrow';
import { createOPFSTempPool } from '../utils/opfs_util';

const TEXT_ENCODER = new TextEncoder();

Expand Down Expand Up @@ -701,6 +702,21 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
throw new Error('Not an OPFS file name: ' + file);
}
}

public async registerOPFSTempDir(tempPath?: string, maxPoolSize?: number, minPoolSize?: number): Promise<void> {
// Access BROWSER_RUNTIME through the runtime field
const runtime = this._runtime as any;

if (runtime._opfsTmpPool) {
await runtime._opfsTmpPool.destroy();
runtime._opfsTmpPool = null;
}

if (tempPath) {
runtime._opfsTmpPool = await createOPFSTempPool(tempPath, { maxUnused: maxPoolSize, minUnused: minPoolSize });
}
}

public collectFileStatistics(file: string, enable: boolean): void {
const [s, d, n] = callSRet(this.mod, 'duckdb_web_collect_file_stats', ['string', 'boolean'], [file, enable]);
if (s !== StatusCode.SUCCESS) {
Expand Down
1 change: 1 addition & 0 deletions packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface DuckDBBindings {
copyFileToPath(name: string, path: string): void;
copyFileToBuffer(name: string): Uint8Array;
registerOPFSFileName(file: string): Promise<void>;
registerOPFSTempDir(tempPath?: string, maxPoolSize?: number, minPoolSize?: number): Promise<void>;
collectFileStatistics(file: string, enable: boolean): void;
exportFileStatistics(file: string): FileStatistics;
}
31 changes: 31 additions & 0 deletions packages/duckdb-wasm/src/bindings/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,37 @@ export interface DuckDBOPFSConfig {
* - "manual": Files must be manually registered and dropped.
*/
fileHandling?: "auto" | "manual";

/**
* OPFS path for temporary files (e.g., "opfs://tmp").
*
* When set, a "pool" of pre-allocated temp files is maintained for use by
* DuckDB when it opens a tempfile on-demand. Pre-allocation of tempfiles is
* required when using OPFS due to the OPFS API providing only asynchronous
* file creation, while DuckDB's temp file creation must be synchronous. By
* maintaining a pool of pre-created temp files, DuckDB can synchronously
* claim a temp file from the pool when needed.
*
* `SET temp_directory='opfs://...` can also be used to initialize or change
* the temp directory at runtime when using "auto" fileHandling.
*/
tempPath?: string;

/**
* Maximum number of pre-allocated file handles in the temp pool beyond
* returned files will be deleted when closed.
*/
tempPoolMax?: number;

/**
* Minimum number of unused pre-allocated handles to maintain in any temp
* file pools. When a tempfile is opened from the pool causing the remaining
* unused handles to drop below this number, new handles will be created
* asynchronously in the background to refill the pool up to tempPoolMax.
*
* Must be less than tempPoolMax.
*/
tempPoolMin?: number;
}

export enum DuckDBAccessMode {
Expand Down
32 changes: 30 additions & 2 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from './runtime';
import { DuckDBModule } from './duckdb_module';
import * as udf from './udf_runtime';
import { TmpPool } from '../utils/opfs_util';

const OPFS_PREFIX_LEN = 'opfs://'.length;
const PATH_SEP_REGEX = /\/|\\/;
Expand All @@ -26,6 +27,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
_globalFileInfo: DuckDBGlobalFileInfo | null;
_preparedHandles: Record<string, FileSystemSyncAccessHandle>;
_opfsRoot: FileSystemDirectoryHandle | null;
_opfsTmpPool: TmpPool | null;

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null;
getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null;
Expand All @@ -37,6 +39,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
_globalFileInfo: null,
_preparedHandles: {} as any,
_opfsRoot: null,
_opfsTmpPool: null,

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null {
try {
Expand Down Expand Up @@ -417,7 +420,21 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return result;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
let handle: FileSystemSyncAccessHandle | undefined = BROWSER_RUNTIME._files?.get(file.fileName);

// Check if this file belongs to a registered temp directory.
// DuckDB creates temp files on-the-fly during query execution (for spilling),
// calling openFile() synchronously. Since OPFS file creation is async, we can't
// create new files here. Instead, we use pre-allocated file handles from the
// temp pool that was set up when the temp directory was registered.
if (!handle && BROWSER_RUNTIME._opfsTmpPool) {
const pool = BROWSER_RUNTIME._opfsTmpPool;
if (pool.matches(file.fileName)) {
handle = pool.openFile(file.fileName);
BROWSER_RUNTIME._files.set(file.fileName, handle);
}
}

if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
Expand Down Expand Up @@ -529,6 +546,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
closeFile: (mod: DuckDBModule, fileId: number) => {
const file = BROWSER_RUNTIME.getFileInfo(mod, fileId);
BROWSER_RUNTIME._fileInfoCache.delete(fileId);

try {
switch (file?.dataProtocol) {
case DuckDBDataProtocol.BUFFER:
Expand Down Expand Up @@ -556,6 +574,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
const fileName = readString(mod, fileNamePtr, fileNameLen);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(fileName);
if (handle) {
if (BROWSER_RUNTIME._opfsTmpPool && BROWSER_RUNTIME._opfsTmpPool.matches(fileName)) {
BROWSER_RUNTIME._opfsTmpPool.dropFile(fileName);
return
}
BROWSER_RUNTIME._files.delete(fileName);
if (handle instanceof FileSystemSyncAccessHandle) {
try {
Expand Down Expand Up @@ -769,7 +791,13 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
return true;
},
removeFile: (_mod: DuckDBModule, _pathPtr: number, _pathLen: number) => {},
removeFile: (mod: DuckDBModule, pathPtr: number, pathLen: number) => {
const path = readString(mod, pathPtr, pathLen);

if (BROWSER_RUNTIME._opfsTmpPool && BROWSER_RUNTIME._opfsTmpPool.matches(path)) {
BROWSER_RUNTIME._opfsTmpPool.dropFile(path);
}
},
callScalarUDF: (
mod: DuckDBModule,
response: number,
Expand Down
33 changes: 33 additions & 0 deletions packages/duckdb-wasm/src/parallel/async_bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
case WorkerRequestType.CLOSE_PREPARED:
case WorkerRequestType.COLLECT_FILE_STATISTICS:
case WorkerRequestType.REGISTER_OPFS_FILE_NAME:
case WorkerRequestType.REGISTER_OPFS_TEMP_DIR:
case WorkerRequestType.COPY_FILE_TO_PATH:
case WorkerRequestType.DISCONNECT:
case WorkerRequestType.DROP_FILE:
Expand Down Expand Up @@ -386,6 +387,19 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
this._config = config;
const task = new WorkerTask<WorkerRequestType.OPEN, DuckDBConfig, null>(WorkerRequestType.OPEN, config);
await this.postTask(task);

// If OPFS temp path is configured, eagerly initialize the pool
if (config.opfs?.tempPath) {
await this.registerOPFSTempDir(config.opfs.tempPath, config.opfs.tempPoolMax, config.opfs.tempPoolMin);

// Configure DuckDB to use this temp directory
const conn = await this.connect();
try {
await conn.send(`SET temp_directory = '${config.opfs.tempPath}'`);
} finally {
await conn.close();
}
}
}

/** Tokenize a script text */
Expand Down Expand Up @@ -614,6 +628,13 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
);
await this.postTask(task, []);
}
public async registerOPFSTempDir(name?: string, maxPoolSize?: number, minPoolSize?: number): Promise<void> {
const task = new WorkerTask<WorkerRequestType.REGISTER_OPFS_TEMP_DIR, [string?, number?, number?], null>(
WorkerRequestType.REGISTER_OPFS_TEMP_DIR,
[name, maxPoolSize, minPoolSize],
);
await this.postTask(task, []);
}

/** Enable file statistics */
public async collectFileStatistics(name: string, enable: boolean): Promise<void> {
Expand Down Expand Up @@ -715,6 +736,18 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
}

private async registerOPFSFileFromSQL(text: string) {
const opfsTempDir = text.match(/(?:SET|PRAGMA)\s+temp_directory\s*=\s*['"]?(opfs:\/\/[^\s'";]+)['"]?/i);
if (opfsTempDir ) {
const newPath = opfsTempDir[1];

// Register the temp directory with the worker using config sizes if available
await this.registerOPFSTempDir(newPath, this._config?.opfs?.tempPoolMax, this._config?.opfs?.tempPoolMin);
// Remove the 'SET' or 'PRAGMA' temp_directory from the text that is
// searched for opfs urls to avoid detecting and attempting to
// register the temp *directory* as a file.
text = text.replace(opfsTempDir[0], "");
}

const files = searchOPFSFiles(text);
const result: string[] = [];
for (const file of files) {
Expand Down
5 changes: 4 additions & 1 deletion packages/duckdb-wasm/src/parallel/worker_dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
await this._bindings.registerOPFSFileName(request.data[0]);
this.sendOK(request);
break;

case WorkerRequestType.REGISTER_OPFS_TEMP_DIR:
await this._bindings.registerOPFSTempDir(request.data[0], request.data[1], request.data[2]);
this.sendOK(request);
break;
case WorkerRequestType.EXPORT_FILE_STATISTICS: {
this.postMessage(
{
Expand Down
3 changes: 3 additions & 0 deletions packages/duckdb-wasm/src/parallel/worker_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export enum WorkerRequestType {
CLOSE_PREPARED = 'CLOSE_PREPARED',
COLLECT_FILE_STATISTICS = 'COLLECT_FILE_STATISTICS',
REGISTER_OPFS_FILE_NAME = 'REGISTER_OPFS_FILE_NAME',
REGISTER_OPFS_TEMP_DIR = 'REGISTER_OPFS_TEMP_DIR',
CONNECT = 'CONNECT',
COPY_FILE_TO_BUFFER = 'COPY_FILE_TO_BUFFER',
COPY_FILE_TO_PATH = 'COPY_FILE_TO_PATH',
Expand Down Expand Up @@ -111,6 +112,7 @@ export type WorkerRequestVariant =
| WorkerRequest<WorkerRequestType.CANCEL_PENDING_QUERY, number>
| WorkerRequest<WorkerRequestType.COLLECT_FILE_STATISTICS, [string, boolean]>
| WorkerRequest<WorkerRequestType.REGISTER_OPFS_FILE_NAME, [string]>
| WorkerRequest<WorkerRequestType.REGISTER_OPFS_TEMP_DIR, [string?, number?, number?]>
| WorkerRequest<WorkerRequestType.CONNECT, null>
| WorkerRequest<WorkerRequestType.COPY_FILE_TO_BUFFER, string>
| WorkerRequest<WorkerRequestType.COPY_FILE_TO_PATH, [string, string]>
Expand Down Expand Up @@ -171,6 +173,7 @@ export type WorkerResponseVariant =
export type WorkerTaskVariant =
| WorkerTask<WorkerRequestType.COLLECT_FILE_STATISTICS, [string, boolean], null>
| WorkerTask<WorkerRequestType.REGISTER_OPFS_FILE_NAME, [string], null>
| WorkerTask<WorkerRequestType.REGISTER_OPFS_TEMP_DIR, [string?, number?, number?], null>
| WorkerTask<WorkerRequestType.CLOSE_PREPARED, [number, number], null>
| WorkerTask<WorkerRequestType.CONNECT, null, ConnectionID>
| WorkerTask<WorkerRequestType.COPY_FILE_TO_BUFFER, string, Uint8Array>
Expand Down
124 changes: 123 additions & 1 deletion packages/duckdb-wasm/src/utils/opfs_util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,126 @@ export function isOPFSProtocol(path: string): boolean {

export function searchOPFSFiles(text: string) {
return [...text.matchAll(REGEX_OPFS_FILE)].map(match => match[1]);
}
}

type HandleEntry = { name: string; handle: FileSystemSyncAccessHandle };

export type TmpPoolConfig = { maxUnused?: number; minUnused?: number };

/**
* TmpPool manages pre-allocated OPFS file handles for temporary files.
*
* DuckDB's file operations (openFile, dropFile, etc.) are synchronous C++ calls
* that cannot be made async. However, OPFS file creation requires async APIs.
* When DuckDB needs to create temporary files for spilling data to disk, it
* calls openFile synchronously, expecting an immediate file handle.
*
* To work around this API mismatch, we pre-create a pool of OPFS files with
* their sync access handles during the async temp directory registration. When
* DuckDB needs a temp file, we can synchronously hand out one of these
* pre-created handles from the pool. When the file is closed, we clear it and
* return it to the pool for reuse.
*/
export class TmpPool {
public readonly path: string;
private dir: FileSystemDirectoryHandle;
private maxUnused: number;
private minUnused: number;
private pool: HandleEntry[] = []; // unused files.
private openMap = new Map<string, HandleEntry>(); // checked out files.
private nextId = 1;
private refillInFlight = false;

constructor(
path: string,
dir: FileSystemDirectoryHandle,
maxUnused: number = 4,
minUnused: number = 2
) {
if (minUnused >= maxUnused) throw new Error("minUnused must be < maxUnused");
this.path = canonicalDirUrl(path);
this.dir = dir;
this.maxUnused = maxUnused;
this.minUnused = minUnused;
}

async init(): Promise<void> { await this.refillTo(this.maxUnused); }

matches(path: string): boolean {
const canonical = canonicalDirUrl(path);
return canonical === this.path || canonical.startsWith(this.path);
}

openFile(path: string): FileSystemSyncAccessHandle {
const existing = this.openMap.get(path); if (existing) return existing.handle;
if (this.pool.length === 0) throw new Error("OPFS tmp pool exhausted");
const entry = this.pool.pop()!; this.openMap.set(path, entry);
if (this.pool.length < this.minUnused) this.maybeRefillAsync();
return entry.handle;
}

dropFile(path: string): void {
const entry = this.openMap.get(path); if (!entry) return;
entry.handle.flush();

if (this.pool.length >= this.maxUnused) {
this.asyncDelete(entry).catch(() => {});
} else {
entry.handle.truncate(0);
this.pool.push(entry);
}
this.openMap.delete(path);
}

async destroy(): Promise<void> {
await Promise.all(this.pool.splice(0).map(e => this.asyncDelete(e)));
}

private async createEntry(): Promise<HandleEntry> {
const name = `tmp${this.nextId++}`;
const fh = await this.dir.getFileHandle(name, { create: true });
const sah = await fh.createSyncAccessHandle();
sah.truncate(0);
return { name, handle: sah };
}
private async refillTo(target: number): Promise<void> {
while (this.pool.length < target) {
const e = await this.createEntry(); this.pool.push(e);
}
}
private maybeRefillAsync() {
if (this.refillInFlight) return;
this.refillInFlight = true;
this.refillTo(this.maxUnused).finally(() => { this.refillInFlight = false; });
}
private async asyncDelete(e: HandleEntry) {
try { e.handle.flush(); } catch { /* ignore errors */ }
try { e.handle.close(); } catch { /* ignore errors */ }
try { await this.dir.removeEntry(e.name); } catch { /* ignore errors */ }
}
}

export function canonicalDirUrl(url: string) {
return url.replace(/\/+$/, "");
}

export async function resolveOpfsDirectory(opfsUrl: string): Promise<FileSystemDirectoryHandle> {
const root = await (navigator as any).storage.getDirectory();
const rel = opfsUrl.slice("opfs://".length).replace(/^\/+/, "");
const parts = rel.split("/").filter(Boolean);
let dir: FileSystemDirectoryHandle = root;
for (const p of parts) {
dir = await dir.getDirectoryHandle(p, { create: true });
}
return dir;
}

export async function createOPFSTempPool(opfsDirUrl: string, cfg: TmpPoolConfig = {}) : Promise<TmpPool> {
const key = canonicalDirUrl(opfsDirUrl);
const dir = await resolveOpfsDirectory(key);
const maxUnused = cfg.maxUnused ?? 10;
const minUnused = cfg.minUnused ?? 2;
const pool = new TmpPool(key, dir, maxUnused, minUnused);
await pool.init();
return pool;
}
Loading