Skip to content

Commit

Permalink
Handle realtime with large payloads or outputs #1451
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Nov 18, 2024
1 parent 07daf8e commit 83384c9
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 36 deletions.
42 changes: 21 additions & 21 deletions apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -39,28 +40,27 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ presignedUrl });
}

export async function loader({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);
export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
},
async ({ params, authentication }) => {
const filename = params["*"];

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}
const presignedUrl = await generatePresignedUrl(
authentication.environment.project.externalRef,
authentication.environment.slug,
filename,
"GET"
);

const parsedParams = ParamsSchema.parse(params);
const filename = parsedParams["*"];

const presignedUrl = await generatePresignedUrl(
authenticationResult.environment.project.externalRef,
authenticationResult.environment.slug,
filename,
"GET"
);
if (!presignedUrl) {
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
}

if (!presignedUrl) {
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
// Caller can now use this URL to fetch that object.
return json({ presignedUrl });
}

// Caller can now use this URL to fetch that object.
return json({ presignedUrl });
}
);
3 changes: 3 additions & 0 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ export class ApiClient {
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
closeOnComplete: true,
headers: this.#getRealtimeHeaders(),
client: this,
});
}

Expand All @@ -611,6 +612,7 @@ export class ApiClient {
{
closeOnComplete: false,
headers: this.#getRealtimeHeaders(),
client: this,
}
);
}
Expand All @@ -619,6 +621,7 @@ export class ApiClient {
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/batches/${batchId}`, {
closeOnComplete: false,
headers: this.#getRealtimeHeaders(),
client: this,
});
}

Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/v3/apiClient/runStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
IOPacket,
parsePacket,
} from "../utils/ioSerialization.js";
import { ApiClient } from "./index.js";
import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js";

export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
Expand Down Expand Up @@ -50,6 +51,7 @@ export type RunShapeStreamOptions = {
fetchClient?: typeof fetch;
closeOnComplete?: boolean;
signal?: AbortSignal;
client?: ApiClient;
};

export type StreamPartResult<TRun, TStreams extends Record<string, any>> = {
Expand Down Expand Up @@ -84,6 +86,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
signal: options?.signal,
}
),
...options,
};

return new RunSubscription<TRunTypes>($options);
Expand Down Expand Up @@ -306,7 +309,7 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
return cachedResult;
}

const result = await conditionallyImportAndParsePacket(packet);
const result = await conditionallyImportAndParsePacket(packet, this.options.client);
this.packetCache.set(`${row.friendlyId}/${key}`, result);

return result;
Expand Down
23 changes: 15 additions & 8 deletions packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { apiClientManager } from "../apiClientManager-api.js";
import { zodfetch } from "../zodfetch.js";
import { z } from "zod";
import type { RetryOptions } from "../schemas/index.js";
import { ApiClient } from "../apiClient/index.js";

export type IOPacket = {
data?: string | undefined;
Expand Down Expand Up @@ -36,8 +37,11 @@ export async function parsePacket(value: IOPacket): Promise<any> {
}
}

export async function conditionallyImportAndParsePacket(value: IOPacket): Promise<any> {
const importedPacket = await conditionallyImportPacket(value);
export async function conditionallyImportAndParsePacket(
value: IOPacket,
client?: ApiClient
): Promise<any> {
const importedPacket = await conditionallyImportPacket(value, undefined, client);

return await parsePacket(importedPacket);
}
Expand Down Expand Up @@ -159,19 +163,20 @@ async function exportPacket(packet: IOPacket, pathPrefix: string): Promise<IOPac

export async function conditionallyImportPacket(
packet: IOPacket,
tracer?: TriggerTracer
tracer?: TriggerTracer,
client?: ApiClient
): Promise<IOPacket> {
if (packet.dataType !== "application/store") {
return packet;
}

if (!tracer) {
return await importPacket(packet);
return await importPacket(packet, undefined, client);
} else {
const result = await tracer.startActiveSpan(
"store.downloadPayload",
async (span) => {
return await importPacket(packet, span);
return await importPacket(packet, span, client);
},
{
attributes: {
Expand Down Expand Up @@ -209,16 +214,18 @@ export async function resolvePresignedPacketUrl(
}
}

async function importPacket(packet: IOPacket, span?: Span): Promise<IOPacket> {
async function importPacket(packet: IOPacket, span?: Span, client?: ApiClient): Promise<IOPacket> {
if (!packet.data) {
return packet;
}

if (!apiClientManager.client) {
const $client = client ?? apiClientManager.client;

if (!$client) {
return packet;
}

const presignedResponse = await apiClientManager.client.getPayloadUrl(packet.data);
const presignedResponse = await $client.getPayloadUrl(packet.data);

const response = await zodfetch(z.any(), presignedResponse.presignedUrl, undefined, {
retry: ioRetryOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import RunDetails from "@/components/RunDetails";
import { Card, CardContent } from "@/components/ui/card";
import { TriggerAuthContext, useRun } from "@trigger.dev/react-hooks";
import { TriggerAuthContext, useRealtimeRun } from "@trigger.dev/react-hooks";
import type { exampleTask } from "@/trigger/example";

function RunDetailsWrapper({ runId }: { runId: string }) {
const { run, error } = useRun<typeof exampleTask>(runId, { refreshInterval: 1000 });
const { run, error } = useRealtimeRun<typeof exampleTask>(runId);

if (error) {
return (
Expand Down
4 changes: 2 additions & 2 deletions references/nextjs-realtime/src/components/RunDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Badge } from "@/components/ui/badge";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { ScrollArea } from "@/components/ui/scroll-area";
import { exampleTask } from "@/trigger/example";
import type { RetrieveRunResult } from "@trigger.dev/sdk/v3";
import type { TaskRunShape } from "@trigger.dev/sdk/v3";
import { AlertTriangleIcon, CheckCheckIcon, XIcon } from "lucide-react";

function formatDate(date: Date | undefined) {
Expand All @@ -17,7 +17,7 @@ function JsonDisplay({ data }: { data: any }) {
);
}

export default function RunDetails({ record }: { record: RetrieveRunResult<typeof exampleTask> }) {
export default function RunDetails({ record }: { record: TaskRunShape<typeof exampleTask> }) {
return (
<Card className="w-full max-w-4xl mx-auto bg-gray-800 border-gray-700 text-gray-200">
<CardHeader>
Expand Down
5 changes: 4 additions & 1 deletion references/nextjs-realtime/src/trigger/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ export const exampleTask = schemaTask({

metadata.set("status", { type: "finished", progress: 1.0 });

return { message: "All good here!" };
// Generate a return payload that is more than 128KB
const bigPayload = new Array(100000).fill("a".repeat(10)).join("");

return { message: bigPayload };
},
});
2 changes: 1 addition & 1 deletion references/nextjs-realtime/trigger.config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { defineConfig } from "@trigger.dev/sdk/v3";

export default defineConfig({
project: "proj_xyxzzpnujsnhjiskihvs",
project: "proj_bzhdaqhlymtuhlrcgbqy",
dirs: ["./src/trigger"],
});

0 comments on commit 83384c9

Please sign in to comment.