Skip to content

Commit 785f67c

Browse files
author
Boris
committed
feat: get Queue as new instance w.o. context
1 parent e6932e7 commit 785f67c

31 files changed

+185
-73
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { MutationError, ErrorCodeEnum } from './Error';
2+
import { createBullConnection } from '../../../connectRedis';
3+
import { Queue } from 'bullmq';
4+
5+
export async function findQueue(prefix: string, queueName: string): Promise<Queue> {
6+
const connection = createBullConnection('custom');
7+
const queueExists = await connection.exists([prefix, queueName, 'meta'].join(':'));
8+
9+
if (!queueExists) {
10+
throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND);
11+
}
12+
13+
const queue = new Queue(queueName, {
14+
prefix,
15+
connection: createBullConnection('queue'),
16+
});
17+
18+
return queue;
19+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { createBullConnection } from '../../../connectRedis';
2+
import { Queue } from 'bullmq';
3+
4+
export function getQueue(prefix: string, queueName: string): Queue {
5+
const queue = new Queue(queueName, {
6+
prefix,
7+
connection: createBullConnection('queue'),
8+
});
9+
10+
return queue;
11+
}
12+
13+
// export function getQueue(queueName: string, context: any): Queue {
14+
// const queue = context?.Queues?.get(queueName);
15+
// if (!queue) {
16+
// throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND);
17+
// }
18+
// return queue;
19+
// }

example/src/schema/mutation/helpers/wrapMutationFC.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { Queue } from 'bullmq';
21
import {
32
SchemaComposer,
43
getFlatProjectionFromAST,
@@ -13,14 +12,6 @@ export enum MutationStatusEnum {
1312
ERROR = 'error',
1413
}
1514

16-
export function getQueue(queueName: string, context: any): Queue {
17-
const queue = context?.Queues?.get(queueName);
18-
if (!queue) {
19-
throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND);
20-
}
21-
return queue;
22-
}
23-
2415
type Generator = (
2516
fieldConfig: ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>
2617
) => ObjectTypeComposerFieldConfigAsObjectDefinition<any, any>;

example/src/schema/mutation/jobAdd.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { getQueue } from './helpers/wrapMutationFC';
21
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
32
import { getJobTC } from '../types/job/Job';
3+
import { getQueue } from './helpers/queueGet';
44

55
export function createJobAddFC(
66
sc: SchemaComposer<any>
@@ -13,6 +13,10 @@ export function createJobAddFC(
1313
},
1414
}),
1515
args: {
16+
prefix: {
17+
type: 'String',
18+
defaultValue: 'bull',
19+
},
1620
queueName: 'String!',
1721
jobName: 'String!',
1822
data: 'JSON!',
@@ -32,8 +36,8 @@ export function createJobAddFC(
3236
},
3337
}),
3438
},
35-
resolve: async (_, { queueName, jobName, data, options }, context) => {
36-
const queue = getQueue(queueName, context);
39+
resolve: async (_, { prefix, queueName, jobName, data, options }) => {
40+
const queue = getQueue(prefix, queueName);
3741
const job = await queue.add(jobName, data, options);
3842
return {
3943
job,

example/src/schema/mutation/jobAddBulk.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getQueue } from './helpers/wrapMutationFC';
1+
import { getQueue } from './helpers/queueGet';
22
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
33
import { getJobTC } from '../types/job/Job';
44

@@ -13,6 +13,10 @@ export function createJobAddBulkFC(
1313
},
1414
}),
1515
args: {
16+
prefix: {
17+
type: 'String',
18+
defaultValue: 'bull',
19+
},
1620
queueName: 'String!',
1721
jobs: sc
1822
.createInputTC({
@@ -39,9 +43,8 @@ export function createJobAddBulkFC(
3943
})
4044
.getTypePlural(),
4145
},
42-
resolve: async (_, { queueName, jobs }, context) => {
43-
console.log('Я здесб!');
44-
const queue = getQueue(queueName, context);
46+
resolve: async (_, { prefix, queueName, jobs }) => {
47+
const queue = getQueue(prefix, queueName);
4548
const jobsRes = await queue.addBulk(jobs);
4649
return {
4750
jobs: jobsRes,

example/src/schema/mutation/jobAddCron.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getQueue } from './helpers/wrapMutationFC';
1+
import { getQueue } from './helpers/queueGet';
22
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
33
import { getJobTC } from '../types/job/Job';
44

@@ -13,6 +13,10 @@ export function createJobAddCronFC(
1313
},
1414
}),
1515
args: {
16+
prefix: {
17+
type: 'String',
18+
defaultValue: 'bull',
19+
},
1620
queueName: 'String!',
1721
jobName: 'String!',
1822
data: 'JSON!',
@@ -44,8 +48,8 @@ export function createJobAddCronFC(
4448
},
4549
}),
4650
},
47-
resolve: async (_, { queueName, jobName, data, options }, context) => {
48-
const queue = getQueue(queueName, context);
51+
resolve: async (_, { prefix, queueName, jobName, data, options }) => {
52+
const queue = getQueue(prefix, queueName);
4953
const job = await queue.add(jobName, data, options);
5054
return {
5155
job,

example/src/schema/mutation/jobAddEvery.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getQueue } from './helpers/wrapMutationFC';
1+
import { getQueue } from './helpers/queueGet';
22
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
33
import { getJobTC } from '../types/job/Job';
44

@@ -13,6 +13,10 @@ export function createJobAddEveryFC(
1313
},
1414
}),
1515
args: {
16+
prefix: {
17+
type: 'String',
18+
defaultValue: 'bull',
19+
},
1620
queueName: 'String!',
1721
jobName: 'String!',
1822
data: 'JSON!',
@@ -43,8 +47,8 @@ export function createJobAddEveryFC(
4347
},
4448
}),
4549
},
46-
resolve: async (_, { queueName, jobName, data, options }, context) => {
47-
const queue = getQueue(queueName, context);
50+
resolve: async (_, { prefix, queueName, jobName, data, options }) => {
51+
const queue = getQueue(prefix, queueName);
4852
const job = await queue.add(jobName, data, options);
4953
return {
5054
job,

example/src/schema/mutation/jobDiscard.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
22
import { MutationError, ErrorCodeEnum } from './helpers/Error';
33
import { getJobStatusEnumTC } from '../types';
4-
import { getQueue } from './helpers/wrapMutationFC';
4+
import { getQueue } from './helpers/queueGet';
55

66
export function createJobDiscardFC(
77
sc: SchemaComposer<any>
@@ -15,11 +15,15 @@ export function createJobDiscardFC(
1515
},
1616
}),
1717
args: {
18+
prefix: {
19+
type: 'String',
20+
defaultValue: 'bull',
21+
},
1822
queueName: 'String!',
1923
id: 'String!',
2024
},
21-
resolve: async (_, { queueName, id }, context) => {
22-
const queue = getQueue(queueName, context);
25+
resolve: async (_, { prefix, queueName, id }) => {
26+
const queue = getQueue(prefix, queueName);
2327
const job = await queue.getJob(id);
2428
if (!job) throw new MutationError('Job not found!', ErrorCodeEnum.JOB_NOT_FOUND);
2529
await job.discard();

example/src/schema/mutation/jobLogAdd.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
22
import { MutationError, ErrorCodeEnum } from './helpers/Error';
33
import { getJobStatusEnumTC } from '../types';
4-
import { getQueue } from './helpers/wrapMutationFC';
4+
import { getQueue } from './helpers/queueGet';
55

66
export function createJobLogAddFC(
77
sc: SchemaComposer<any>
@@ -15,12 +15,16 @@ export function createJobLogAddFC(
1515
},
1616
}),
1717
args: {
18+
prefix: {
19+
type: 'String',
20+
defaultValue: 'bull',
21+
},
1822
queueName: 'String!',
1923
id: 'String!',
2024
row: 'String!',
2125
},
22-
resolve: async (_, { queueName, id, row }, context) => {
23-
const queue = getQueue(queueName, context);
26+
resolve: async (_, { prefix, queueName, id, row }) => {
27+
const queue = getQueue(prefix, queueName);
2428
const job = await queue.getJob(id);
2529
if (!job) throw new MutationError('Job not found!', ErrorCodeEnum.JOB_NOT_FOUND);
2630
const logRes = await job.log(row);

example/src/schema/mutation/jobMoveToCompleted.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getQueue } from './helpers/wrapMutationFC';
1+
import { getQueue } from './helpers/queueGet';
22
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
33
import { getJobTC } from '../types/job/Job';
44

@@ -14,11 +14,15 @@ export function jobMoveToCompletedFC(
1414
},
1515
}),
1616
args: {
17+
prefix: {
18+
type: 'String',
19+
defaultValue: 'bull',
20+
},
1721
queueName: 'String!',
1822
id: 'String!',
1923
},
20-
resolve: async (_, { queueName, id }, context) => {
21-
const queue = getQueue(queueName, context);
24+
resolve: async (_, { prefix, queueName, id }) => {
25+
const queue = getQueue(prefix, queueName);
2226
const job = await queue.getJob(id);
2327
if (job) {
2428
await job.moveToCompleted({}, 'tokenmustbehere'); //TODO: нати где брать токен

0 commit comments

Comments
 (0)