Skip to content

Commit 112d8af

Browse files
author
Boris
committed
feat: pass redis connection as option
1 parent 3ebab25 commit 112d8af

27 files changed

+60
-33
lines changed

example/src/demo_queues/fetchMetrics.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { BULL_REDIS_URI, BULL_HOST_ID } from '../config';
22
import { Queue, Worker, QueueScheduler } from 'bullmq';
3-
import { createBullConnection } from '../../../src/connectRedis';
3+
import { createBullConnection } from '../connectRedis';
44

55
if (!BULL_REDIS_URI) {
66
throw new Error(`Env var BULL_REDIS_URI is empty. Cannot init task ${__filename}.`);

example/src/schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { composeBull } from '../../src';
22
import { schemaComposer } from 'graphql-compose';
3+
import { createBullConnection } from './connectRedis';
34

45
const { queryFields, mutationFields } = composeBull({
56
schemaComposer,
@@ -9,6 +10,7 @@ const { queryFields, mutationFields } = composeBull({
910
name: 'fetch_metrics',
1011
prefix: 'bull.demo',
1112
},
13+
//redis: createBullConnection('queue'),
1214
});
1315

1416
schemaComposer.Query.addFields({

src/definitions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export type Options = {
1010
};
1111
redis?:
1212
| {
13-
uri: string;
13+
uri?: string;
1414
opts?: RedisOptions;
1515
}
1616
| Redis.Redis;

src/helpers/getBullConnection.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { Options } from './../definitions';
2+
import Redis from 'ioredis';
3+
4+
let connection: Redis.Redis;
5+
6+
export function getBullConnection(opts: Options): Redis.Redis {
7+
if (connection) {
8+
return connection;
9+
}
10+
11+
if (opts.redis instanceof Redis) {
12+
connection = opts.redis;
13+
} else if (opts?.redis?.uri) {
14+
connection = new Redis(opts.redis.uri);
15+
} else if (opts?.redis?.opts) {
16+
connection = new Redis(opts.redis.opts);
17+
} else {
18+
connection = new Redis();
19+
}
20+
21+
return connection;
22+
}

src/helpers/gettingQueues.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import { Title } from './queueTitles';
22
import { Queue } from 'bullmq';
3-
import { createBullConnection } from '../connectRedis';
43
import { MutationError, ErrorCodeEnum } from './MutationError';
4+
import { getBullConnection } from './getBullConnection';
5+
import { Options } from '../definitions';
56

6-
export function getQueues(titles: Array<Title>): Array<Queue> {
7-
return titles.map((title) => getQueue(title.prefix, title.queueName));
7+
export function getQueues(titles: Array<Title>, opts: Options): Array<Queue> {
8+
return titles.map((title) => getQueue(title.prefix, title.queueName, opts));
89
}
910

10-
export function getQueue(prefix: string, queueName: string): Queue {
11+
export function getQueue(prefix: string, queueName: string, opts: Options): Queue {
1112
const queue = new Queue(queueName, {
1213
prefix,
13-
connection: createBullConnection('queue'),
14+
connection: getBullConnection(opts),
1415
});
1516

1617
return queue;
@@ -19,9 +20,10 @@ export function getQueue(prefix: string, queueName: string): Queue {
1920
export async function findQueue(
2021
prefix: string,
2122
queueName: string,
23+
opts: Options,
2224
checkExistence: boolean = true
2325
): Promise<Queue> {
24-
const connection = createBullConnection('custom');
26+
const connection = getBullConnection(opts);
2527

2628
if (checkExistence) {
2729
const queueExists = await connection.exists([prefix, queueName, 'meta'].join(':'));
@@ -33,7 +35,7 @@ export async function findQueue(
3335

3436
const queue = new Queue(queueName, {
3537
prefix,
36-
connection: createBullConnection('queue'),
38+
connection,
3739
});
3840

3941
return queue;

src/helpers/queueTitles.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
import { createBullConnection } from '../connectRedis';
1+
import { Options } from './../definitions';
22
import { normalizePrefixGlob } from './normalizePrefixGlob';
3+
import { getBullConnection } from './getBullConnection';
34

45
export type Title = { prefix: string; queueName: string };
56

6-
export async function fetchQueueTitles(prefix: string): Promise<Array<Title>> {
7-
const connection = createBullConnection('custom');
7+
export async function fetchQueueTitles(prefix: string, opts: Options): Promise<Array<Title>> {
8+
const connection = getBullConnection(opts);
89
const keys = await connection.keys(normalizePrefixGlob(prefix));
910

1011
return keys.map((key) => {

src/mutation/jobAdd.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export function createJobAddFC(
3939
}),
4040
},
4141
resolve: async (_, { prefix, queueName, jobName, data, options }) => {
42-
const queue = await findQueue(prefix, queueName);
42+
const queue = await findQueue(prefix, queueName, opts);
4343
const job = await queue.add(jobName, data, options);
4444
return {
4545
job,

src/mutation/jobAddBulk.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export function createJobAddBulkFC(
4343
.getTypePlural(),
4444
},
4545
resolve: async (_, { prefix, queueName, jobs }) => {
46-
const queue = await findQueue(prefix, queueName);
46+
const queue = await findQueue(prefix, queueName, opts);
4747
const jobsRes = await queue.addBulk(jobs);
4848
return {
4949
jobs: jobsRes,

src/mutation/jobAddCron.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export function createJobAddCronFC(
4848
}),
4949
},
5050
resolve: async (_, { prefix, queueName, jobName, data, options }) => {
51-
const queue = await findQueue(prefix, queueName);
51+
const queue = await findQueue(prefix, queueName, opts);
5252
const job = await queue.add(jobName, data, options);
5353
return {
5454
job,

0 commit comments

Comments
 (0)