diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index a2400d28a85..22fea211230 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -71,15 +71,17 @@ export interface RecordUsageParams { } /** - * Records usage in a single atomic transaction. + * Records usage by inserting into usage_log and incrementing userStats counters. * - * Inserts all entries into usage_log and updates userStats counters - * (totalCost, currentPeriodCost, lastActive) within one Postgres transaction. - * The total cost added to userStats is derived from summing entry costs, - * ensuring usage_log and currentPeriodCost can never drift apart. + * The two writes are intentionally not wrapped in a transaction: under high + * concurrency for the same userId, holding BEGIN/COMMIT across the user_stats + * row-lock wait pins pgbouncer connections and exhausts the pool. * - * If billing is disabled, total cost is zero, or no entries have positive cost, - * this function returns early without writing anything. + * usage_log is the source of truth and the INSERT propagates errors to the + * caller. The userStats UPDATE is best-effort: failures (and missing-row + * cases) are logged as warnings and swallowed. Counter drift is acceptable + * here — the long-term plan is to derive counters from usage_log directly. + * Any drift warning in logs is a signal that needs investigation. */ export async function recordUsage(params: RecordUsageParams): Promise { if (!isBillingEnabled) { @@ -103,47 +105,56 @@ export async function recordUsage(params: RecordUsageParams): Promise { ? Object.fromEntries(Object.entries(additionalStats).filter(([k]) => !RESERVED_KEYS.has(k))) : undefined - await db.transaction(async (tx) => { - if (validEntries.length > 0) { - await tx.insert(usageLog).values( - validEntries.map((entry) => ({ - id: generateId(), - userId, - category: entry.category, - source: entry.source, - description: entry.description, - metadata: entry.metadata ?? null, - cost: entry.cost.toString(), - workspaceId: workspaceId ?? null, - workflowId: workflowId ?? null, - executionId: executionId ?? null, - })) - ) - } + if (validEntries.length > 0) { + await db.insert(usageLog).values( + validEntries.map((entry) => ({ + id: generateId(), + userId, + category: entry.category, + source: entry.source, + description: entry.description, + metadata: entry.metadata ?? null, + cost: entry.cost.toString(), + workspaceId: workspaceId ?? null, + workflowId: workflowId ?? null, + executionId: executionId ?? null, + })) + ) + } - const updateFields: Record = { - lastActive: new Date(), - ...(totalCost > 0 && { - totalCost: sql`total_cost + ${totalCost}`, - currentPeriodCost: sql`current_period_cost + ${totalCost}`, - }), - ...safeStats, - } + const updateFields: Record = { + lastActive: new Date(), + ...(totalCost > 0 && { + totalCost: sql`total_cost + ${totalCost}`, + currentPeriodCost: sql`current_period_cost + ${totalCost}`, + }), + ...safeStats, + } - const result = await tx + try { + const result = await db .update(userStats) .set(updateFields) .where(eq(userStats.userId, userId)) .returning({ userId: userStats.userId }) if (result.length === 0) { - logger.warn('recordUsage: userStats row not found, transaction will roll back', { + logger.warn('recordUsage: userStats row not found; counter increment dropped', { userId, totalCost, + hadEntries: validEntries.length > 0, + additionalStatsKeys: safeStats ? Object.keys(safeStats) : [], }) - throw new Error(`userStats row not found for userId: ${userId}`) } - }) + } catch (error) { + logger.warn('recordUsage: userStats update failed; counter increment dropped', { + error: toError(error).message, + userId, + totalCost, + hadEntries: validEntries.length > 0, + additionalStatsKeys: safeStats ? Object.keys(safeStats) : [], + }) + } logger.debug('Recorded usage', { userId,