Skip to content

Commit c2da2be

Browse files
authored
Update pgwire to 0.8.1, improve type-safety when decoding rows (#433)
1 parent 8fdbf8d commit c2da2be

File tree

21 files changed

+551
-129
lines changed

21 files changed

+551
-129
lines changed

.changeset/ten-walls-cough.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-jpgwire': patch
4+
---
5+
6+
Update `pgwire` to version `0.8.1`.

libs/lib-postgres/src/db/connection/AbstractPostgresConnection.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,11 @@ export abstract class AbstractPostgresConnection<Listener = {}> extends framewor
6969
}
7070

7171
async *streamRows<T>(...args: pgwire.Statement[]): AsyncIterableIterator<T[]> {
72-
let columns: Array<keyof T> = [];
72+
let columns: Array<pgwire.ColumnDescription> = [];
7373

7474
for await (const chunk of this.stream(...args)) {
7575
if (chunk.tag == 'RowDescription') {
76-
columns = chunk.payload.map((c, index) => {
77-
return c.name as keyof T;
78-
});
76+
columns = chunk.payload;
7977
continue;
8078
}
8179

@@ -86,7 +84,7 @@ export abstract class AbstractPostgresConnection<Listener = {}> extends framewor
8684
yield chunk.rows.map((row) => {
8785
let q: Partial<T> = {};
8886
for (const [index, c] of columns.entries()) {
89-
q[c] = row[index];
87+
q[c.name as keyof T] = row.decodeWithoutCustomTypes(index);
9088
}
9189
return q as T;
9290
});

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
108108
results: {
109109
columns: result.columns.map((c) => c.name),
110110
rows: result.rows.map((row) => {
111-
return row.map((value) => {
111+
return row.raw.map((raw, i) => {
112+
const value = pgwire.PgType.decode(raw, row.columns[i].typeOid);
112113
const sqlValue = sync_rules.applyValueContext(
113114
sync_rules.toSyncRulesValue(value),
114115
sync_rules.CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY
@@ -252,7 +253,7 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`,
252253
// For those, we need to use pg_current_wal_lsn() instead.
253254
const { results } = await lib_postgres.retriedQuery(this.pool, `SELECT pg_current_wal_lsn() as lsn`);
254255

255-
const lsn = results[0].rows[0][0];
256+
const lsn = results[0].rows[0].decodeWithoutCustomTypes(0);
256257
return String(lsn);
257258
}
258259

modules/module-postgres/src/replication/SnapshotQuery.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,30 +127,34 @@ export class ChunkedSnapshotQuery implements SnapshotQuery {
127127
if (this.key.typeId == null) {
128128
throw new Error(`typeId required for primary key ${this.key.name}`);
129129
}
130-
let type: StatementParam['type'] = Number(this.key.typeId);
130+
const type = Number(this.key.typeId);
131131
stream = this.connection.stream({
132132
statement: `SELECT * FROM ${this.table.qualifiedName} WHERE ${escapedKeyName} > $1 ORDER BY ${escapedKeyName} LIMIT ${this.chunkSize}`,
133133
params: [{ value: this.lastKey, type }]
134134
});
135135
}
136136
let primaryKeyIndex: number = -1;
137+
let typeOid = 0;
137138

138139
for await (let chunk of stream) {
139140
if (chunk.tag == 'RowDescription') {
140141
// We get a RowDescription for each FETCH call, but they should
141142
// all be the same.
142-
let i = 0;
143143
const pk = chunk.payload.findIndex((c) => c.name == this.key.name);
144144
if (pk < 0) {
145145
throw new Error(
146146
`Cannot find primary key column ${this.key} in results. Keys: ${chunk.payload.map((c) => c.name).join(', ')}`
147147
);
148148
}
149149
primaryKeyIndex = pk;
150+
typeOid = chunk.payload[pk].typeOid;
150151
}
151152

152153
if (chunk.rows.length > 0) {
153-
this.lastKey = chunk.rows[chunk.rows.length - 1][primaryKeyIndex];
154+
this.lastKey = PgType.decode(chunk.rows[chunk.rows.length - 1].raw[primaryKeyIndex], typeOid) as
155+
| string
156+
| bigint
157+
| null;
154158
}
155159
yield chunk;
156160
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,7 @@ WHERE oid = $1::regclass`,
361361
params: [{ value: table.qualifiedName, type: 'varchar' }]
362362
});
363363
const row = results.rows[0];
364-
if ((row?.[0] ?? -1n) == -1n) {
365-
return -1;
366-
} else {
367-
return Number(row[0]);
368-
}
364+
return Number(row?.decodeWithoutCustomTypes(0) ?? -1n);
369365
}
370366

371367
/**
@@ -499,7 +495,7 @@ WHERE oid = $1::regclass`,
499495
// 2. Wait until logical replication has caught up with all the change between A and B.
500496
// Calling `markSnapshotDone(LSN B)` covers that.
501497
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
502-
tableLsnNotBefore = rs.rows[0][0];
498+
tableLsnNotBefore = rs.rows[0].decodeWithoutCustomTypes(0);
503499
// Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
504500
await db.query('COMMIT');
505501
const [resultTable] = await batch.markSnapshotDone([table], tableLsnNotBefore);
@@ -545,7 +541,7 @@ WHERE oid = $1::regclass`,
545541
}
546542
await q.initialize();
547543

548-
let columns: { i: number; name: string }[] = [];
544+
let columns: { i: number; name: string; typeOid: number }[] = [];
549545
let columnMap: Record<string, number> = {};
550546
let hasRemainingData = true;
551547
while (hasRemainingData) {
@@ -564,7 +560,7 @@ WHERE oid = $1::regclass`,
564560
// all be the same.
565561
let i = 0;
566562
columns = chunk.payload.map((c) => {
567-
return { i: i++, name: c.name };
563+
return { i: i++, name: c.name, typeOid: c.typeOid };
568564
});
569565
for (let column of chunk.payload) {
570566
columnMap[column.name] = column.typeOid;
@@ -575,7 +571,7 @@ WHERE oid = $1::regclass`,
575571
const rows = chunk.rows.map((row) => {
576572
let q: DatabaseInputRow = {};
577573
for (let c of columns) {
578-
q[c.name] = row[c.i];
574+
q[c.name] = pgwire.PgType.decode(row.raw[c.i], c.typeOid);
579575
}
580576
return q;
581577
});

modules/module-postgres/src/replication/replication-utils.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ export async function getPrimaryKeyColumns(
3232

3333
return attrRows.rows.map((row) => {
3434
return {
35-
name: row[0] as string,
36-
typeId: row[1] as number
35+
name: row.decodeWithoutCustomTypes(0) as string,
36+
typeId: row.decodeWithoutCustomTypes(1) as number
3737
} satisfies storage.ColumnDescriptor;
3838
});
3939
}
@@ -50,8 +50,8 @@ export async function getAllColumns(db: pgwire.PgClient, relationId: number): Pr
5050
});
5151
return attrRows.rows.map((row) => {
5252
return {
53-
name: row[0] as string,
54-
typeId: row[1] as number
53+
name: row.decodeWithoutCustomTypes(0) as string,
54+
typeId: row.decodeWithoutCustomTypes(1) as number
5555
} satisfies storage.ColumnDescriptor;
5656
});
5757
}
@@ -71,7 +71,7 @@ FROM pg_class
7171
WHERE oid = $1::oid LIMIT 1`,
7272
params: [{ type: 'int8', value: relationId }]
7373
});
74-
const idType: string = rows.rows[0]?.[0];
74+
const idType: string = rows.rows[0]?.decodeWithoutCustomTypes(0);
7575
if (idType == 'nothing' || idType == null) {
7676
return { replicationIdentity: 'nothing', replicationColumns: [] };
7777
} else if (idType == 'full') {

modules/module-postgres/src/types/resolver.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,16 @@ WHERE a.attnum > 0
197197
*/
198198
decodeTuple(relation: pgwire.PgoutputRelation, tupleRaw: Record<string, any>): DatabaseInputRow {
199199
let result: Record<string, any> = {};
200-
for (let columnName in tupleRaw) {
201-
const rawval = tupleRaw[columnName];
202-
const typeOid = (relation as any)._tupleDecoder._typeOids.get(columnName);
203-
if (typeof rawval == 'string' && typeOid) {
204-
result[columnName] = this.registry.decodeDatabaseValue(rawval, typeOid);
205-
} else {
206-
result[columnName] = rawval;
207-
}
200+
for (const column of relation.columns) {
201+
const rawval = tupleRaw[column.name];
202+
result[column.name] =
203+
rawval == null
204+
? // We can't decode null values, but it's important that null and undefined stay distinct because undefined
205+
// represents a TOASTed value.
206+
rawval
207+
: this.registry.decodeDatabaseValue(rawval, column.typeOid);
208208
}
209+
209210
return result;
210211
}
211212

modules/module-postgres/src/utils/migration_lib.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export class Migrations {
6666
limit 1
6767
`
6868
)
69-
.then((results) => ({ id: results.rows[0][0] as number }));
69+
.then((results) => ({ id: results.rows[0].decodeWithoutCustomTypes(0) as number }));
7070
}
7171

7272
async ensureMigrationsTable(db: pgwire.PgConnection) {

modules/module-postgres/src/utils/postgres_version.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ import semver, { type SemVer } from 'semver';
44
export async function getServerVersion(db: pgwire.PgClient): Promise<SemVer | null> {
55
const result = await db.query(`SHOW server_version;`);
66
// The result is usually of the form "16.2 (Debian 16.2-1.pgdg120+2)"
7-
return semver.coerce(result.rows[0][0].split(' ')[0]);
7+
return semver.coerce(result.rows[0].decodeWithoutCustomTypes(0).split(' ')[0]);
88
}

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import type { LookupFunction } from 'node:net';
2+
import * as dns from 'node:dns';
3+
14
import * as pgwire from '@powersync/service-jpgwire';
25
import {
36
applyRowContext,
@@ -8,11 +11,32 @@ import {
811
CompatibilityEdition,
912
TimeValuePrecision
1013
} from '@powersync/service-sync-rules';
11-
import { describe, expect, test } from 'vitest';
12-
import { clearTestDb, connectPgPool, connectPgWire, TEST_URI } from './util.js';
14+
import { describe, expect, Mock, test, vi } from 'vitest';
15+
import { clearTestDb, connectPgPool, connectPgWire, TEST_CONNECTION_OPTIONS, TEST_URI } from './util.js';
1316
import { WalStream } from '@module/replication/WalStream.js';
1417
import { PostgresTypeResolver } from '@module/types/resolver.js';
15-
import { CustomTypeRegistry } from '@module/types/registry.js';
18+
19+
describe('connection options', () => {
20+
test('uses custom lookup', async () => {
21+
const lookup: Mock<LookupFunction> = vi.fn((hostname, options, cb) => {
22+
expect(hostname).toStrictEqual('powersynctest.example.org');
23+
dns.lookup('localhost', options, cb);
24+
});
25+
26+
const db = await pgwire.connectPgWire({
27+
...TEST_CONNECTION_OPTIONS,
28+
hostname: 'powersynctest.example.org',
29+
lookup
30+
});
31+
expect(lookup).toHaveBeenCalled();
32+
33+
try {
34+
await db.query('SELECT 1');
35+
} finally {
36+
await db.end();
37+
}
38+
});
39+
});
1640

1741
describe('pg data types', () => {
1842
async function setupTable(db: pgwire.PgClient) {

0 commit comments

Comments
 (0)