Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/queue/src/domain/handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { MailJob } from "./model/mail-job";
import type { ZapierJob } from "./model/zapier-job";
import notificationQueue from "./notification-queue";
import mailQueue from "./queue";
import zapierQueue from "./zapier-queue";

export async function addMailJob({ to, subject, body, from }: MailJob) {
for (const recipient of to) {
Expand All @@ -16,3 +18,7 @@ export async function addMailJob({ to, subject, body, from }: MailJob) {
export async function addNotificationJob(notification) {
await notificationQueue.add("notification", notification);
}

export async function addZapierJob(job: ZapierJob) {
await zapierQueue.add("zapier", job);
}
9 changes: 9 additions & 0 deletions apps/queue/src/domain/model/zapier-job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { z } from "zod";

export const ZapierJob = z.object({
domainId: z.string(),
action: z.string(),
payload: z.any(),
});

export type ZapierJob = z.infer<typeof ZapierJob>;
8 changes: 8 additions & 0 deletions apps/queue/src/domain/zapier-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Queue } from "bullmq";
import redis from "../redis";

const zapierQueue = new Queue("zapier", {
connection: redis,
});

export default zapierQueue;
1 change: 1 addition & 0 deletions apps/queue/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import sseRoutes from "./sse/routes";
// start workers
import "./domain/worker";
import "./workers/notifications";
import "./workers/zapier";

// start loops
import { startEmailAutomation } from "./start-email-automation";
Expand Down
17 changes: 16 additions & 1 deletion apps/queue/src/job/routes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import express from "express";
import { addMailJob, addNotificationJob } from "../domain/handler";
import { addMailJob, addNotificationJob, addZapierJob } from "../domain/handler";
import { logger } from "../logger";
import { MailJob } from "../domain/model/mail-job";
import { ZapierJob } from "../domain/model/zapier-job";
import NotificationModel from "../domain/model/notification";
import { ObjectId } from "mongodb";
import { User } from "@courselit/common-models";
Expand Down Expand Up @@ -56,4 +57,18 @@ router.post(
},
);

router.post("/zapier", async (req: express.Request, res: express.Response) => {
try {
const { domainId, action, payload } = req.body;
ZapierJob.parse({ domainId, action, payload });

await addZapierJob({ domainId, action, payload });

res.status(200).json({ message: "Success" });
} catch (err: any) {
logger.error(err);
res.status(500).json({ error: err.message });
}
});

export default router;
26 changes: 26 additions & 0 deletions apps/queue/src/workers/zapier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Worker } from "bullmq";
import redis from "../redis";
import { logger } from "../logger";
import { ZapierJob } from "../domain/model/zapier-job";

const worker = new Worker(
"zapier",
async (job) => {
const { domainId, action, payload } = job.data as ZapierJob;
logger.info(`Processing zapier job for domain ${domainId}`, {
action,
payload,
});
},
{
connection: redis,
},
);

worker.on("completed", (job) => {
logger.info(`Zapier job ${job.id} completed`);
});

worker.on("failed", (job, err) => {
logger.error(`Zapier job ${job.id} failed with error ${err.message}`);
});
28 changes: 23 additions & 5 deletions apps/web/app/api/payment/webhook/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import { error } from "@/services/logger";
import mongoose from "mongoose";
import Payment from "@/payments-new/payment";
import { activateMembership } from "../helpers";
import UserModel from "@models/User";
import CourseModel from "@models/Course";
import { addZapierJob } from "@/services/queue";

export async function POST(req: NextRequest) {
try {
Expand Down Expand Up @@ -56,7 +59,7 @@ export async function POST(req: NextRequest) {
membership,
);

await handleInvoice(
const invoice = await handleInvoice(
domain,
invoiceId,
membership,
Expand All @@ -81,6 +84,19 @@ export async function POST(req: NextRequest) {

await activateMembership(domain, membership, paymentPlan);

const user = await UserModel.findOne({ userId: membership.userId, domain: domain._id });
const course = await CourseModel.findOne({ courseId: membership.entityId, domain: domain._id });

await addZapierJob({
domainId: domain._id.toString(),
action: "product_purchased",
payload: {
user,
course,
invoice,
},
});

return Response.json({ message: "success" });
} catch (e) {
error(`Error in payment webhook: ${e.message}`, {
Expand Down Expand Up @@ -145,8 +161,8 @@ async function handleInvoice(
paymentMethod: any,
currencyISOCode: string,
body: any,
) {
const invoice = await InvoiceModel.findOne<Invoice>({
): Promise<Invoice> {
let invoice = await InvoiceModel.findOne<Invoice>({
domain: domain._id,
invoiceId,
status: Constants.InvoiceStatus.PENDING,
Expand All @@ -155,9 +171,9 @@ async function handleInvoice(
invoice.paymentProcessorTransactionId =
paymentMethod.getPaymentIdentifier(body);
invoice.status = Constants.InvoiceStatus.PAID;
await (invoice as any).save();
invoice = await (invoice as any).save();
} else {
await InvoiceModel.create({
invoice = await InvoiceModel.create({
domain: domain._id,
membershipId: membership.membershipId,
membershipSessionId: membership.sessionId,
Expand All @@ -174,6 +190,8 @@ async function handleInvoice(
currencyISOCode,
});
}

return invoice!;
}

async function handleEMICancellation(
Expand Down
13 changes: 12 additions & 1 deletion apps/web/graphql/communities/logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import {
} from "./helpers";
import { error } from "@/services/logger";
import NotificationModel from "@models/Notification";
import { addNotification } from "@/services/queue";
import { addNotification, addZapierJob } from "@/services/queue";
import { hasActiveSubscription } from "../users/logic";
import { internal } from "@config/strings";
import { hasCommunityPermission as hasPermission } from "@ui-lib/utils";
Expand Down Expand Up @@ -492,6 +492,17 @@ export async function joinCommunity({
forUserIds: communityManagers.map((m) => m.userId),
userId: ctx.user.userId,
});

if (member.status === Constants.MembershipStatus.ACTIVE) {
await addZapierJob({
domainId: ctx.subdomain._id.toString(),
action: "community_joined",
payload: {
user: ctx.user,
community,
},
});
}
}

return member;
Expand Down
11 changes: 11 additions & 0 deletions apps/web/graphql/lessons/logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import LessonEvaluation from "../../models/LessonEvaluation";
import { checkPermission } from "@courselit/utils";
import { recordActivity } from "../../lib/record-activity";
import { InternalCourse } from "@courselit/common-logic";
import { addZapierJob } from "@/services/queue";

const { permissions, quiz } = constants;

Expand Down Expand Up @@ -314,6 +315,7 @@ export const markLessonCompleted = async (
lessonId,
courseId: lesson.courseId,
user: ctx.user,
domainId: ctx.subdomain._id.toString(),
});

await recordActivity({
Expand Down Expand Up @@ -357,6 +359,15 @@ const recordCourseCompleted = async (courseId: string, ctx: GQLContext) => {
type: Constants.ActivityType.COURSE_COMPLETED,
entityId: courseId,
});

await addZapierJob({
domainId: ctx.subdomain._id.toString(),
action: "course_completed",
payload: {
user: ctx.user,
course,
},
});
};

export const evaluateLesson = async (
Expand Down
35 changes: 33 additions & 2 deletions apps/web/graphql/users/logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import { generateEmailFrom } from "@/lib/utils";
import MembershipModel from "@models/Membership";
import CommunityModel from "@models/Community";
import CourseModel from "@models/Course";
import { addMailJob } from "@/services/queue";
import LessonModel from "@models/Lesson";
import { addMailJob, addZapierJob } from "@/services/queue";
import { getPaymentMethodFromSettings } from "@/payments-new";
import { checkForInvalidPermissions } from "@/lib/check-invalid-permissions";
import { activateMembership } from "@/app/api/payment/helpers";
Expand Down Expand Up @@ -183,6 +184,15 @@ export const inviteCustomer = async (

await activateMembership(ctx.subdomain!, membership, paymentPlan);

await addZapierJob({
domainId: ctx.subdomain._id.toString(),
action: "course_enrolled",
payload: {
user,
course,
},
});

try {
const emailBody = pug.render(courseEnrollTemplate, {
courseName: course.title,
Expand Down Expand Up @@ -289,10 +299,12 @@ export const recordProgress = async ({
lessonId,
courseId,
user,
domainId,
}: {
lessonId: string;
courseId: string;
courseId:string;
user: User;
domainId: string;
}) => {
const enrolledItemIndex = user.purchases.findIndex(
(progress: Progress) => progress.courseId === courseId,
Expand All @@ -308,6 +320,19 @@ export const recordProgress = async ({
) {
user.purchases[enrolledItemIndex].completedLessons.push(lessonId);
await (user as any).save();

const course = await CourseModel.findOne({ courseId, domain: domainId });
const lesson = await LessonModel.findOne({ lessonId, courseId });

await addZapierJob({
domainId,
action: "lesson_completed",
payload: {
user,
course,
lesson,
},
});
}
};

Expand Down Expand Up @@ -398,6 +423,12 @@ export async function createUser({
type: "newsletter_subscribed",
});
}

await addZapierJob({
domainId: domain._id.toString(),
action: "new_user",
payload: createdUser,
});
}

return createdUser;
Expand Down
10 changes: 10 additions & 0 deletions apps/web/lib/trigger-sequences.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import RuleModel from "@models/Rule";
import SequenceModel from "@models/Sequence";
import mongoose from "mongoose";
import { error } from "../services/logger";
import { addZapierJob } from "@/services/queue";

export async function triggerSequences({
user,
Expand Down Expand Up @@ -73,6 +74,15 @@ export async function triggerSequences({
{ _id: sequence._id },
{ $addToSet: { entrants: user.userId } },
);

await addZapierJob({
domainId: user.domain.toString(),
action: "sequence_subscribed",
payload: {
user,
sequence,
},
});
}
} catch (err: any) {
error(err.message, {
Expand Down
24 changes: 24 additions & 0 deletions apps/web/services/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,30 @@ export async function addMailJob({ to, from, subject, body }: MailProps) {
}
}

export async function addZapierJob(payload: any) {
try {
const jwtSecret = getJwtSecret();
const token = jwtUtils.generateToken({ service: "app" }, jwtSecret);
const response = await fetch(`${queueServer}/job/zapier`, {
method: "POST",
headers: {
"content-type": "application/json",
Authorization: `Bearer ${token}`,
},
body: JSON.stringify(payload),
});
const jsonResponse = await response.json();

if (response.status !== 200) {
throw new Error(jsonResponse.error);
}
} catch (err) {
error(`Error adding zapier job: ${err.message}`, {
payload
});
}
}

export async function addNotification({
domain,
entityId,
Expand Down
Loading