Stage 5: Add Prisma integration and enhance mission management
- Introduced Prisma as a dependency in package.json and updated pnpm-lock.yaml. - Created Prisma module and service for database interactions. - Added initial Prisma schema and migration for user, survivor, mission, and related entities. - Implemented throttling in the API using @nestjs/throttler for rate limiting. - Enhanced mission management logic to utilize Prisma for database transactions. - Updated missions controller and service to handle mission state and participant management. - Added Twitch PubSub service for real-time updates on mission states.
This commit is contained in:
@@ -0,0 +1,91 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE "users" (
|
||||
"id" UUID NOT NULL,
|
||||
"twitch_opaque_user_id" TEXT NOT NULL,
|
||||
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
CONSTRAINT "users_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "survivors" (
|
||||
"id" UUID NOT NULL,
|
||||
"user_id" UUID NOT NULL,
|
||||
"channel_id" TEXT NOT NULL,
|
||||
"name" VARCHAR(32) NOT NULL,
|
||||
"state" TEXT NOT NULL DEFAULT 'active',
|
||||
"stats" JSONB NOT NULL,
|
||||
"perk_slots" JSONB NOT NULL,
|
||||
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
CONSTRAINT "survivors_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "missions" (
|
||||
"id" UUID NOT NULL,
|
||||
"group_id" UUID,
|
||||
"channel_id" TEXT NOT NULL,
|
||||
"difficulty" SMALLINT NOT NULL,
|
||||
"status" TEXT NOT NULL DEFAULT 'active',
|
||||
"encounter_library_version" TEXT NOT NULL,
|
||||
"started_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"ended_at" TIMESTAMP(3),
|
||||
"tick_index" INTEGER NOT NULL DEFAULT 0,
|
||||
"next_tick_at" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "missions_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "mission_participants" (
|
||||
"id" UUID NOT NULL,
|
||||
"mission_id" UUID NOT NULL,
|
||||
"survivor_id" UUID NOT NULL,
|
||||
"state" TEXT NOT NULL DEFAULT 'active',
|
||||
"hook_count" SMALLINT NOT NULL DEFAULT 0,
|
||||
|
||||
CONSTRAINT "mission_participants_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "mission_logs" (
|
||||
"id" UUID NOT NULL,
|
||||
"mission_id" UUID NOT NULL,
|
||||
"tick_index" INTEGER NOT NULL,
|
||||
"encounter_key" TEXT NOT NULL,
|
||||
"rendered_text" TEXT NOT NULL,
|
||||
"seed" TEXT NOT NULL,
|
||||
"modifiers_applied" JSONB NOT NULL,
|
||||
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
CONSTRAINT "mission_logs_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "users_twitch_opaque_user_id_key" ON "users"("twitch_opaque_user_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "missions_channel_id_idx" ON "missions"("channel_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "missions_status_next_tick_at_idx" ON "missions"("status", "next_tick_at");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "mission_participants_mission_id_survivor_id_key" ON "mission_participants"("mission_id", "survivor_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "mission_logs_mission_id_tick_index_idx" ON "mission_logs"("mission_id", "tick_index");
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "survivors" ADD CONSTRAINT "survivors_user_id_fkey" FOREIGN KEY ("user_id") REFERENCES "users"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "mission_participants" ADD CONSTRAINT "mission_participants_mission_id_fkey" FOREIGN KEY ("mission_id") REFERENCES "missions"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "mission_participants" ADD CONSTRAINT "mission_participants_survivor_id_fkey" FOREIGN KEY ("survivor_id") REFERENCES "survivors"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "mission_logs" ADD CONSTRAINT "mission_logs_mission_id_fkey" FOREIGN KEY ("mission_id") REFERENCES "missions"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
|
||||
|
||||
80
apps/api/prisma/schema.prisma
Normal file
80
apps/api/prisma/schema.prisma
Normal file
@@ -0,0 +1,80 @@
|
||||
generator client {
|
||||
provider = "prisma-client-js"
|
||||
output = "../../../node_modules/.prisma/client"
|
||||
}
|
||||
|
||||
datasource db {
|
||||
provider = "postgresql"
|
||||
url = env("DATABASE_URL")
|
||||
}
|
||||
|
||||
model User {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
opaqueUserId String @unique @map("twitch_opaque_user_id")
|
||||
createdAt DateTime @default(now()) @map("created_at")
|
||||
survivors Survivor[]
|
||||
|
||||
@@map("users")
|
||||
}
|
||||
|
||||
model Survivor {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
userId String @map("user_id") @db.Uuid
|
||||
user User @relation(fields: [userId], references: [id])
|
||||
channelId String @map("channel_id")
|
||||
name String @db.VarChar(32)
|
||||
state String @default("active")
|
||||
stats Json
|
||||
perkSlots Json @map("perk_slots")
|
||||
createdAt DateTime @default(now()) @map("created_at")
|
||||
participants MissionParticipant[]
|
||||
|
||||
@@map("survivors")
|
||||
}
|
||||
|
||||
model Mission {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
groupId String? @map("group_id") @db.Uuid
|
||||
channelId String @map("channel_id")
|
||||
difficulty Int @db.SmallInt
|
||||
status String @default("active")
|
||||
encounterLibraryVersion String @map("encounter_library_version")
|
||||
startedAt DateTime @default(now()) @map("started_at")
|
||||
endedAt DateTime? @map("ended_at")
|
||||
tickIndex Int @default(0) @map("tick_index")
|
||||
nextTickAt DateTime @map("next_tick_at")
|
||||
participants MissionParticipant[]
|
||||
logs MissionLog[]
|
||||
|
||||
@@index([channelId])
|
||||
@@index([status, nextTickAt])
|
||||
@@map("missions")
|
||||
}
|
||||
|
||||
model MissionParticipant {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
missionId String @map("mission_id") @db.Uuid
|
||||
mission Mission @relation(fields: [missionId], references: [id])
|
||||
survivorId String @map("survivor_id") @db.Uuid
|
||||
survivor Survivor @relation(fields: [survivorId], references: [id])
|
||||
state String @default("active")
|
||||
hookCount Int @default(0) @map("hook_count") @db.SmallInt
|
||||
|
||||
@@unique([missionId, survivorId])
|
||||
@@map("mission_participants")
|
||||
}
|
||||
|
||||
model MissionLog {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
missionId String @map("mission_id") @db.Uuid
|
||||
mission Mission @relation(fields: [missionId], references: [id])
|
||||
tickIndex Int @map("tick_index")
|
||||
encounterKey String @map("encounter_key")
|
||||
renderedText String @map("rendered_text")
|
||||
seed String
|
||||
modifiersApplied Json @map("modifiers_applied")
|
||||
createdAt DateTime @default(now()) @map("created_at")
|
||||
|
||||
@@index([missionId, tickIndex])
|
||||
@@map("mission_logs")
|
||||
}
|
||||
@@ -1,13 +1,23 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { APP_GUARD } from '@nestjs/core';
|
||||
import { ScheduleModule } from '@nestjs/schedule';
|
||||
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
|
||||
import { MissionsModule } from './missions/missions.module';
|
||||
import { PrismaModule } from './prisma/prisma.module';
|
||||
import { TickEngineModule } from './tick-engine/tick-engine.module';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
ScheduleModule.forRoot(),
|
||||
ThrottlerModule.forRoot({
|
||||
throttlers: [{ ttl: 10000, limit: 30 }],
|
||||
}),
|
||||
PrismaModule,
|
||||
MissionsModule,
|
||||
TickEngineModule,
|
||||
],
|
||||
providers: [
|
||||
{ provide: APP_GUARD, useClass: ThrottlerGuard },
|
||||
],
|
||||
})
|
||||
export class AppModule {}
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
ForbiddenException,
|
||||
Get,
|
||||
HttpCode,
|
||||
HttpStatus,
|
||||
NotFoundException,
|
||||
Post,
|
||||
UseGuards,
|
||||
} from '@nestjs/common';
|
||||
import { Throttle } from '@nestjs/throttler';
|
||||
import {
|
||||
MissionStateResponse,
|
||||
MissionStateResponseSchema,
|
||||
@@ -26,6 +27,7 @@ export class MissionsController {
|
||||
) {}
|
||||
|
||||
@Get('state')
|
||||
@Throttle({ default: { limit: 10, ttl: 10000 } })
|
||||
async getState(
|
||||
@TwitchClaims() claims: TwitchJwtPayload
|
||||
): Promise<MissionStateResponse> {
|
||||
@@ -38,9 +40,9 @@ export class MissionsController {
|
||||
async startMission(
|
||||
@TwitchClaims() claims: TwitchJwtPayload,
|
||||
@Body() body: unknown
|
||||
): Promise<MissionStateResponse> {
|
||||
): Promise<NonNullable<MissionStateResponse>> {
|
||||
if (!claims.opaque_user_id.startsWith('U')) {
|
||||
throw new NotFoundException('Anonymous viewers cannot start missions');
|
||||
throw new ForbiddenException('Anonymous viewers cannot start missions');
|
||||
}
|
||||
const { difficulty } = StartMissionRequestSchema.parse(body);
|
||||
return this.missions.startMission(claims, difficulty);
|
||||
|
||||
@@ -15,6 +15,6 @@ import { MissionsService } from './missions.service';
|
||||
EncounterService,
|
||||
GroupSynergyService,
|
||||
],
|
||||
exports: [MissionStoreService, EncounterService],
|
||||
exports: [MissionStoreService, EncounterService, GroupSynergyService],
|
||||
})
|
||||
export class MissionsModule {}
|
||||
|
||||
@@ -7,6 +7,7 @@ import type {
|
||||
} from '@fog-explorer/api-interfaces';
|
||||
import { getLibraryVersion } from '@fog-explorer/encounter-library';
|
||||
import { TwitchJwtPayload } from '../auth/twitch-jwt.guard';
|
||||
import { PrismaService } from '../prisma/prisma.service';
|
||||
import { MissionStoreService } from './mission-store.service';
|
||||
|
||||
const TICK_BASE_INTERVAL_MS = 60_000;
|
||||
@@ -14,7 +15,10 @@ const TICK_JITTER_MS = 5_000;
|
||||
|
||||
@Injectable()
|
||||
export class MissionsService {
|
||||
constructor(private readonly store: MissionStoreService) {}
|
||||
constructor(
|
||||
private readonly store: MissionStoreService,
|
||||
private readonly prisma: PrismaService
|
||||
) {}
|
||||
|
||||
async startMission(
|
||||
claims: TwitchJwtPayload,
|
||||
@@ -22,11 +26,50 @@ export class MissionsService {
|
||||
): Promise<NonNullable<MissionStateResponse>> {
|
||||
const missionId = crypto.randomUUID();
|
||||
const survivorId = crypto.randomUUID();
|
||||
const now = new Date().toISOString();
|
||||
const now = new Date();
|
||||
const jitter = Math.floor(Math.random() * TICK_JITTER_MS);
|
||||
const nextTickAt = new Date(Date.now() + TICK_BASE_INTERVAL_MS + jitter).toISOString();
|
||||
const nextTickAt = new Date(now.getTime() + TICK_BASE_INTERVAL_MS + jitter);
|
||||
const stats: SurvivorStats = { objectives: 5, survival: 5, altruism: 5 };
|
||||
|
||||
const stats: SurvivorStats = defaultStats();
|
||||
// Upsert user and create survivor + mission in one transaction.
|
||||
await this.prisma.$transaction(async (tx) => {
|
||||
const user = await tx.user.upsert({
|
||||
where: { opaqueUserId: claims.opaque_user_id },
|
||||
create: { id: crypto.randomUUID(), opaqueUserId: claims.opaque_user_id },
|
||||
update: {},
|
||||
});
|
||||
|
||||
await tx.survivor.create({
|
||||
data: {
|
||||
id: survivorId,
|
||||
userId: user.id,
|
||||
channelId: claims.channel_id,
|
||||
name: defaultName(claims.opaque_user_id),
|
||||
state: 'active',
|
||||
stats,
|
||||
perkSlots: [],
|
||||
},
|
||||
});
|
||||
|
||||
await tx.mission.create({
|
||||
data: {
|
||||
id: missionId,
|
||||
channelId: claims.channel_id,
|
||||
difficulty,
|
||||
status: 'active',
|
||||
encounterLibraryVersion: getLibraryVersion(),
|
||||
nextTickAt,
|
||||
participants: {
|
||||
create: {
|
||||
id: crypto.randomUUID(),
|
||||
survivorId,
|
||||
state: 'active',
|
||||
hookCount: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
const survivor: Survivor = {
|
||||
id: survivorId,
|
||||
@@ -36,7 +79,7 @@ export class MissionsService {
|
||||
state: 'active',
|
||||
stats,
|
||||
perkSlots: [],
|
||||
createdAt: now,
|
||||
createdAt: now.toISOString(),
|
||||
};
|
||||
|
||||
const mission: Mission = {
|
||||
@@ -46,9 +89,9 @@ export class MissionsService {
|
||||
difficulty,
|
||||
status: 'active',
|
||||
encounterLibraryVersion: getLibraryVersion(),
|
||||
nextTickAt,
|
||||
nextTickAt: nextTickAt.toISOString(),
|
||||
tickIndex: 0,
|
||||
startedAt: now,
|
||||
startedAt: now.toISOString(),
|
||||
endedAt: null,
|
||||
};
|
||||
|
||||
@@ -60,16 +103,12 @@ export class MissionsService {
|
||||
|
||||
await this.store.setActiveMission(state);
|
||||
await this.store.setChannelMissionId(claims.channel_id, missionId);
|
||||
await this.store.scheduleTick(missionId, new Date(nextTickAt).getTime());
|
||||
await this.store.scheduleTick(missionId, nextTickAt.getTime());
|
||||
|
||||
return state;
|
||||
}
|
||||
}
|
||||
|
||||
function defaultStats(): SurvivorStats {
|
||||
return { objectives: 5, survival: 5, altruism: 5 };
|
||||
}
|
||||
|
||||
function defaultName(opaqueUserId: string): string {
|
||||
return `Survivor ${opaqueUserId.slice(-4)}`;
|
||||
}
|
||||
|
||||
9
apps/api/src/app/prisma/prisma.module.ts
Normal file
9
apps/api/src/app/prisma/prisma.module.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { Global, Module } from '@nestjs/common';
|
||||
import { PrismaService } from './prisma.service';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
providers: [PrismaService],
|
||||
exports: [PrismaService],
|
||||
})
|
||||
export class PrismaModule {}
|
||||
13
apps/api/src/app/prisma/prisma.service.ts
Normal file
13
apps/api/src/app/prisma/prisma.service.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
|
||||
@Injectable()
|
||||
export class PrismaService extends PrismaClient implements OnModuleInit, OnModuleDestroy {
|
||||
async onModuleInit(): Promise<void> {
|
||||
await this.$connect();
|
||||
}
|
||||
|
||||
async onModuleDestroy(): Promise<void> {
|
||||
await this.$disconnect();
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,10 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { MissionsModule } from '../missions/missions.module';
|
||||
import { TickService } from './tick.service';
|
||||
import { TwitchPubSubService } from './twitch-pubsub.service';
|
||||
|
||||
@Module({
|
||||
imports: [MissionsModule],
|
||||
providers: [TickService],
|
||||
providers: [TickService, TwitchPubSubService],
|
||||
})
|
||||
export class TickEngineModule {}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { MissionStoreService } from '../missions/mission-store.service';
|
||||
import { PrismaService } from '../prisma/prisma.service';
|
||||
import { EncounterService } from '../missions/encounter.service';
|
||||
import { MissionStoreService } from '../missions/mission-store.service';
|
||||
import { TwitchPubSubService } from './twitch-pubsub.service';
|
||||
|
||||
@Injectable()
|
||||
export class TickService {
|
||||
@@ -9,7 +11,9 @@ export class TickService {
|
||||
|
||||
constructor(
|
||||
private readonly store: MissionStoreService,
|
||||
private readonly encounters: EncounterService
|
||||
private readonly encounters: EncounterService,
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly pubsub: TwitchPubSubService
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_10_SECONDS)
|
||||
@@ -24,7 +28,7 @@ export class TickService {
|
||||
|
||||
private async processMission(missionId: string): Promise<void> {
|
||||
const token = await this.store.acquireLock(missionId);
|
||||
if (!token) return; // Another worker has the lock
|
||||
if (!token) return;
|
||||
|
||||
try {
|
||||
const state = await this.store.getActiveMission(missionId);
|
||||
@@ -34,13 +38,57 @@ export class TickService {
|
||||
}
|
||||
|
||||
const updated = this.encounters.processTick(state);
|
||||
const newLogs = updated.recentLog.slice(
|
||||
0,
|
||||
updated.recentLog.length - state.recentLog.length
|
||||
);
|
||||
|
||||
// Write to Postgres in a transaction.
|
||||
await this.prisma.$transaction(async (tx) => {
|
||||
await tx.mission.update({
|
||||
where: { id: missionId },
|
||||
data: {
|
||||
tickIndex: updated.mission.tickIndex,
|
||||
nextTickAt: new Date(updated.mission.nextTickAt),
|
||||
status: updated.mission.status,
|
||||
endedAt: updated.mission.endedAt ? new Date(updated.mission.endedAt) : null,
|
||||
},
|
||||
});
|
||||
|
||||
for (const survivor of updated.survivors) {
|
||||
const participant = updated.mission.participants.find(
|
||||
(p) => p.survivorId === survivor.id
|
||||
);
|
||||
if (!participant) continue;
|
||||
await tx.missionParticipant.updateMany({
|
||||
where: { missionId, survivorId: survivor.id },
|
||||
data: { state: participant.state, hookCount: participant.hookCount },
|
||||
});
|
||||
await tx.survivor.update({
|
||||
where: { id: survivor.id },
|
||||
data: { state: survivor.state },
|
||||
});
|
||||
}
|
||||
|
||||
if (newLogs.length > 0) {
|
||||
await tx.missionLog.createMany({
|
||||
data: newLogs.map((log) => ({
|
||||
id: crypto.randomUUID(),
|
||||
missionId,
|
||||
tickIndex: log.tickIndex,
|
||||
encounterKey: log.encounterKey,
|
||||
renderedText: log.logText,
|
||||
seed: log.seed,
|
||||
modifiersApplied: log.modifiersApplied,
|
||||
})),
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Update Redis cache.
|
||||
await this.store.setActiveMission(updated);
|
||||
|
||||
if (
|
||||
updated.mission.status === 'active' ||
|
||||
updated.mission.status === 'lobby'
|
||||
) {
|
||||
if (updated.mission.status === 'active') {
|
||||
const nextMs = new Date(updated.mission.nextTickAt).getTime();
|
||||
await this.store.scheduleTick(missionId, nextMs);
|
||||
} else {
|
||||
@@ -52,10 +100,22 @@ export class TickService {
|
||||
tickIndex: updated.mission.tickIndex,
|
||||
});
|
||||
}
|
||||
|
||||
await this.pubsub.broadcast(state.mission.participants[0]
|
||||
? await this.getChannelId(missionId)
|
||||
: null, updated);
|
||||
} catch (err) {
|
||||
this.logger.error({ message: 'tick failed', missionId, err });
|
||||
} finally {
|
||||
await this.store.releaseLock(missionId, token);
|
||||
}
|
||||
}
|
||||
|
||||
private async getChannelId(missionId: string): Promise<string | null> {
|
||||
const mission = await this.prisma.mission.findUnique({
|
||||
where: { id: missionId },
|
||||
select: { channelId: true },
|
||||
});
|
||||
return mission?.channelId ?? null;
|
||||
}
|
||||
}
|
||||
|
||||
72
apps/api/src/app/tick-engine/twitch-pubsub.service.ts
Normal file
72
apps/api/src/app/tick-engine/twitch-pubsub.service.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { createHmac } from 'crypto';
|
||||
import type { MissionStateResponse } from '@fog-explorer/api-interfaces';
|
||||
|
||||
const PUBSUB_URL = 'https://api.twitch.tv/extensions/message';
|
||||
const TOKEN_TTL_SECONDS = 60;
|
||||
|
||||
@Injectable()
|
||||
export class TwitchPubSubService {
|
||||
private readonly logger = new Logger(TwitchPubSubService.name);
|
||||
|
||||
async broadcast(
|
||||
channelId: string | null,
|
||||
state: NonNullable<MissionStateResponse>
|
||||
): Promise<void> {
|
||||
if (!channelId) return;
|
||||
|
||||
const clientId = process.env['TWITCH_CLIENT_ID'];
|
||||
const secret = process.env['TWITCH_EXTENSION_SECRET'];
|
||||
if (!clientId || !secret) return;
|
||||
|
||||
const token = buildServerToken(clientId, secret);
|
||||
const payload = JSON.stringify({
|
||||
content_type: 'application/json',
|
||||
message: JSON.stringify(state),
|
||||
targets: ['broadcast'],
|
||||
});
|
||||
|
||||
if (Buffer.byteLength(payload, 'utf8') > 5000) {
|
||||
this.logger.warn({ message: 'pubsub payload too large, skipping', channelId });
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await fetch(`${PUBSUB_URL}/${channelId}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Authorization': `Bearer ${token}`,
|
||||
'Client-Id': clientId,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: payload,
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
this.logger.warn({
|
||||
message: 'pubsub broadcast failed',
|
||||
channelId,
|
||||
status: res.status,
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.error({ message: 'pubsub broadcast error', channelId, err });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function buildServerToken(clientId: string, secretB64: string): string {
|
||||
const secretBytes = Buffer.from(secretB64, 'base64');
|
||||
const exp = Math.floor(Date.now() / 1000) + TOKEN_TTL_SECONDS;
|
||||
|
||||
const header = Buffer.from(JSON.stringify({ alg: 'HS256', typ: 'JWT' })).toString('base64url');
|
||||
const payload = Buffer.from(
|
||||
JSON.stringify({ exp, user_id: clientId, role: 'external' })
|
||||
).toString('base64url');
|
||||
|
||||
const sig = createHmac('sha256', secretBytes)
|
||||
.update(`${header}.${payload}`)
|
||||
.digest('base64url');
|
||||
|
||||
return `${header}.${payload}.${sig}`;
|
||||
}
|
||||
Reference in New Issue
Block a user