Files
quixos-ts-package-runtime/src/index.ts
T
Timothy J. Aveni 2cdd578848 Initial commit
germanium cutoff
2026-05-24 16:06:53 -07:00

1854 lines
51 KiB
TypeScript

import { AsyncLocalStorage } from "node:async_hooks";
import crypto from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import WebSocket from "ws";
import z, { type ZodTypeAny, type output as ZodOutput } from "zod";
import {
BootRequestParamsSchema,
CallRequestParamsSchema,
ContextCloseRequestParamsSchema,
ContextOpenRequestParamsSchema,
EventSubscribeRequestParamsSchema,
EventUnsubscribeRequestParamsSchema,
type RuntimeErrorRequestParams,
ServerMessageSchema,
TargetContextOpenRequestParamsSchema,
ValueGetRequestParamsSchema,
ValueUnwatchRequestParamsSchema,
ValueWatchRequestParamsSchema,
parseJson,
sendClientMessage,
} from "./rpc.js";
export * from "./rpc.js";
export type PayloadMode = "snapshot" | "delta";
export type SchemaAnnotation = {
type: string;
[key: string]: unknown;
};
export type PackageFunctionSchema = {
description: string;
annotations?: SchemaAnnotation[];
inputSchema: ZodTypeAny;
outputSchema: ZodTypeAny;
};
type StaticResourceSchema = {
description: string;
annotations?: SchemaAnnotation[];
dataSchema: ZodTypeAny;
history?: boolean;
payloadMode?: PayloadMode;
};
type ParameterizedResourceSchema = StaticResourceSchema & {
paramsSchema: ZodTypeAny;
};
export type StaticEventSchema = StaticResourceSchema;
export type ParameterizedEventSchema = ParameterizedResourceSchema;
export type EventSchema = StaticEventSchema | ParameterizedEventSchema;
export type EventSchemaMap = Record<string, EventSchema>;
export type StaticValueSchema = StaticResourceSchema;
export type ParameterizedValueSchema = ParameterizedResourceSchema;
export type ValueSchema = StaticValueSchema | ParameterizedValueSchema;
export type ValueSchemaMap = Record<string, ValueSchema>;
export type PackageSchema<
Functions extends Record<string, PackageFunctionSchema> = Record<
string,
PackageFunctionSchema
>,
Events extends EventSchemaMap = EventSchemaMap,
Values extends ValueSchemaMap = ValueSchemaMap,
> = {
schemaVersion: 1;
majorVersion: number;
description: string;
annotations?: SchemaAnnotation[];
functions: Functions;
events: Events;
values: Values;
};
export const QUIXOS_MEDIA_JSON_SCHEMA_KEY = "x-quixos-media";
export type QuixosDataUrlAnnotation = {
transport: "data-url";
mimeType: string;
extension?: string;
};
type QuixosSchemaMetaRecord = Record<string, unknown>;
const readSchemaMeta = (schema: ZodTypeAny): QuixosSchemaMetaRecord => {
try {
const meta = schema.meta();
if (meta && typeof meta === "object") {
return { ...(meta as QuixosSchemaMetaRecord) };
}
} catch {
// ignore
}
return {};
};
export const annotateDataUrlSchema = <Schema extends ZodTypeAny>(
schema: Schema,
params: { mimeType: string; extension?: string },
): Schema =>
schema.meta({
...readSchemaMeta(schema),
[QUIXOS_MEDIA_JSON_SCHEMA_KEY]: {
transport: "data-url",
mimeType: params.mimeType,
...(params.extension ? { extension: params.extension } : {}),
} satisfies QuixosDataUrlAnnotation,
}) as Schema;
export const quixosMedia = {
dataUrlString: (params: { mimeType: string; extension?: string }) =>
annotateDataUrlSchema(z.string(), params),
annotateDataUrl: annotateDataUrlSchema,
};
type FunctionInput<Schema extends PackageFunctionSchema> = ZodOutput<
Schema["inputSchema"]
>;
type FunctionOutput<Schema extends PackageFunctionSchema> = ZodOutput<
Schema["outputSchema"]
>;
type SchemaParams<
Schema extends StaticResourceSchema | ParameterizedResourceSchema,
> = Schema extends { paramsSchema: infer ParamsSchema extends ZodTypeAny }
? ZodOutput<ParamsSchema>
: undefined;
type EventParams<Schema extends EventSchema> = SchemaParams<Schema>;
type EventData<Schema extends EventSchema> = ZodOutput<Schema["dataSchema"]>;
type ValueParams<Schema extends ValueSchema> = SchemaParams<Schema>;
type ValueData<Schema extends ValueSchema> = ZodOutput<Schema["dataSchema"]>;
export type SubscriptionOptions = {
signal?: AbortSignal;
subscriptionNamespace?: string;
};
export type SubscriptionHandle = {
unsubscribe: () => Promise<void>;
};
export type UsePackageOptions = {
contextNamespace?: string;
stateContextId?: string;
};
type ScopedResource<T> = T & {
withStateContext: (stateContextId: string) => T;
withContextNamespace: (contextNamespace?: string) => T;
};
type EventConsumer<Schema extends EventSchema> = (
data: EventData<Schema>,
) => void | Promise<void>;
type ValueConsumer<Schema extends ValueSchema> = (
data: ValueData<Schema>,
) => void | Promise<void>;
type StaticEventClientBase<Schema extends StaticEventSchema> = {
consume: (
handler: EventConsumer<Schema>,
options?: SubscriptionOptions,
) => Promise<SubscriptionHandle>;
};
type ParameterizedEventClientBase<Schema extends ParameterizedEventSchema> = {
consume: (
params: EventParams<Schema>,
handler: EventConsumer<Schema>,
options?: SubscriptionOptions,
) => Promise<SubscriptionHandle>;
};
export type EventClient<Schema extends EventSchema> =
Schema extends ParameterizedEventSchema
? ScopedResource<ParameterizedEventClientBase<Schema>>
: ScopedResource<StaticEventClientBase<Schema & StaticEventSchema>>;
export type EventClients<Schema extends EventSchemaMap> = {
[Key in keyof Schema]: EventClient<Schema[Key]>;
};
type StaticValueClientBase<Schema extends StaticValueSchema> = {
get: () => Promise<ValueData<Schema>>;
watch: (
handler: ValueConsumer<Schema>,
options?: SubscriptionOptions,
) => Promise<SubscriptionHandle>;
};
type ParameterizedValueClientBase<Schema extends ParameterizedValueSchema> = {
get: (params: ValueParams<Schema>) => Promise<ValueData<Schema>>;
watch: (
params: ValueParams<Schema>,
handler: ValueConsumer<Schema>,
options?: SubscriptionOptions,
) => Promise<SubscriptionHandle>;
};
export type ValueClient<Schema extends ValueSchema> =
Schema extends ParameterizedValueSchema
? ScopedResource<ParameterizedValueClientBase<Schema>>
: ScopedResource<StaticValueClientBase<Schema & StaticValueSchema>>;
export type ValueClients<Schema extends ValueSchemaMap> = {
[Key in keyof Schema]: ValueClient<Schema[Key]>;
};
export type PackageContext = {
stateContext: string;
stateDirectory: string;
usePackage: <Schema extends PackageSchema>(
schema: Schema,
options?: UsePackageOptions,
) => PackageClient<Schema>;
};
export type PackageFunction<
Schema extends PackageFunctionSchema,
Context = any,
> = (
ctx: Context,
params: FunctionInput<Schema>,
) => FunctionOutput<Schema> | Promise<FunctionOutput<Schema>>;
export type PackageFunctions<
Functions extends Record<string, PackageFunctionSchema>,
Context = any,
> = {
[Key in keyof Functions]: PackageFunction<Functions[Key], Context>;
};
export type EventSink<Schema extends EventSchema> = {
subscriptionId: string;
emit: (data: EventData<Schema>) => Promise<void>;
};
type EventHandler<Schema extends EventSchema, Context = any> = {
subscribe: (
ctx: Context,
params: EventParams<Schema>,
sink: EventSink<Schema>,
) =>
| void
| (() => void | Promise<void>)
| Promise<void | (() => void | Promise<void>)>;
};
export type PackageEvents<
Events extends EventSchemaMap,
Context = any,
> = {
[Key in keyof Events]: EventHandler<Events[Key], Context>;
};
export type ValueSink<Schema extends ValueSchema> = {
subscriptionId: string;
set: (data: ValueData<Schema>) => Promise<void>;
};
type ValueHandler<Schema extends ValueSchema, Context = any> = {
get: (
ctx: Context,
params: ValueParams<Schema>,
) => ValueData<Schema> | Promise<ValueData<Schema>>;
watch?: (
ctx: Context,
params: ValueParams<Schema>,
sink: ValueSink<Schema>,
) =>
| void
| (() => void | Promise<void>)
| Promise<void | (() => void | Promise<void>)>;
};
export type PackageValues<
Values extends ValueSchemaMap,
Context = any,
> = {
[Key in keyof Values]: ValueHandler<Values[Key], Context>;
};
type InternalUsePackageOptions = UsePackageOptions & {
callerStateContextId?: string;
};
export type PackageFunctionCaller<Schema extends PackageFunctionSchema> =
ScopedResource<(
params: FunctionInput<Schema>,
) => Promise<FunctionOutput<Schema>>>;
type PackageFunctionCallers<
Functions extends Record<string, PackageFunctionSchema>,
> = {
[Key in keyof Functions]: PackageFunctionCaller<Functions[Key]>;
};
export type PackageClient<Schema extends PackageSchema> = {
functions: PackageFunctionCallers<Schema["functions"]>;
events: EventClients<Schema["events"]>;
values: ValueClients<Schema["values"]>;
withStateContext: (stateContextId: string) => PackageClient<Schema>;
withContextNamespace: (
contextNamespace?: string,
) => PackageClient<Schema>;
};
export type CreatePackageOptions<
Schema extends PackageSchema,
Context extends PackageContext = PackageContext,
> = {
schema: Schema;
functions: PackageFunctions<Schema["functions"], Context>;
events: PackageEvents<Schema["events"], Context>;
values: PackageValues<Schema["values"], Context>;
onCreate?: () => void | Promise<void>;
onContextOpen?: (ctx: Context) => void | Promise<void>;
onContextClose?: (ctx: Context) => void | Promise<void>;
onDestroy?: () => void | Promise<void>;
};
type LifecycleManager = {};
type PendingRequest = {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
};
type ClientSubscriptionEntry = {
deliver: (data: unknown) => void;
closeLocal: (error?: Error) => void;
};
type ServerSubscriptionEntry = {
stateContextId: string;
kind: "event" | "value";
resourceName: string;
closeRemote: () => Promise<void>;
};
type QuixosSchemaMeta = {
name?: string | null;
type?: string | null;
url?: string | null;
rev?: string | null;
flakeRef?: string | null;
};
type QuixosSchema = PackageSchema & { __quixos?: QuixosSchemaMeta };
type RuntimeExecutionContext = PackageContext;
export type RuntimeErrorPhase = RuntimeErrorRequestParams["phase"];
export type RuntimeErrorReportParams = {
phase: RuntimeErrorPhase;
error: unknown;
stateContextId?: string;
functionName?: string;
resourceKind?: "event" | "value";
resourceName?: string;
surfaceId?: string;
hostSessionId?: string;
stack?: string;
};
const pendingRequests = new Map<string, PendingRequest>();
const clientSubscriptions = new Map<string, ClientSubscriptionEntry>();
const serverSubscriptions = new Map<string, ServerSubscriptionEntry>();
const runtimeContextStorage = new AsyncLocalStorage<RuntimeExecutionContext>();
const openedRuntimeStateContexts = new Set<string>();
const DEFAULT_REQUEST_TIMEOUT_MS = 60_000;
const PACKAGE_BOOT_TIMEOUT_MS = 300_000;
const TARGET_CONTEXT_OPEN_TIMEOUT_MS = 300_000;
let socketPromise: Promise<WebSocket> | null = null;
let socketResolve: ((socket: WebSocket) => void) | null = null;
let socketRef: WebSocket | null = null;
let socketAuthenticated = false;
let packageOptions: CreatePackageOptions<any, any> | null = null;
let runtimeExitTimer: NodeJS.Timeout | null = null;
let runtimeExitRequested = false;
const clearRuntimeExitTimer = () => {
if (runtimeExitTimer) {
clearTimeout(runtimeExitTimer);
runtimeExitTimer = null;
}
};
const scheduleRuntimeExit = (code: number, reason: string) => {
if (runtimeExitRequested) {
return;
}
runtimeExitRequested = true;
clearRuntimeExitTimer();
runtimeExitTimer = setTimeout(() => {
console.warn(`[quixos-runtime] exiting (${reason})`);
process.exit(code);
}, 100);
};
const closeServerSubscription = async (subscriptionId: string) => {
const subscription = serverSubscriptions.get(subscriptionId);
if (!subscription) {
return;
}
serverSubscriptions.delete(subscriptionId);
try {
await subscription.closeRemote();
} catch (error) {
await reportPackageRuntimeError({
phase: subscription.kind === "event" ? "event-cleanup" : "value-cleanup",
error,
stateContextId: subscription.stateContextId,
resourceKind: subscription.kind,
resourceName: subscription.resourceName,
});
throw error;
}
};
const normalizeError = (error: unknown) =>
error instanceof Error ? error : new Error(String(error));
const createRuntimeErrorPayload = (params: RuntimeErrorReportParams) => {
const normalized = normalizeError(params.error);
return {
phase: params.phase,
stateContextId: params.stateContextId,
functionName: params.functionName,
resourceKind: params.resourceKind,
resourceName: params.resourceName,
surfaceId: params.surfaceId,
hostSessionId: params.hostSessionId,
message: normalized.message || "Unknown error",
stack: params.stack ?? normalized.stack,
};
};
export const reportPackageRuntimeError = async (
params: RuntimeErrorReportParams,
) => {
try {
await sendRequest("runtime-error", createRuntimeErrorPayload(params), 5_000);
} catch {
// Best-effort reporting only.
}
};
const resetSocket = () => {
const shouldExitForSocketClose =
socketAuthenticated &&
packageOptions !== null &&
Boolean(process.env.QUIXOS_ORCH_SECRET);
socketRef = null;
socketPromise = null;
socketResolve = null;
socketAuthenticated = false;
for (const pending of pendingRequests.values()) {
clearTimeout(pending.timeout);
pending.reject(new Error("Socket closed"));
}
pendingRequests.clear();
for (const subscription of clientSubscriptions.values()) {
subscription.closeLocal(new Error("Socket closed"));
}
clientSubscriptions.clear();
const activeServerSubscriptionIds = [...serverSubscriptions.keys()];
for (const subscriptionId of activeServerSubscriptionIds) {
void closeServerSubscription(subscriptionId).catch(() => {});
}
if (shouldExitForSocketClose) {
scheduleRuntimeExit(0, "orchestrator-socket-closed");
}
};
const getCurrentStateContextId = () =>
runtimeContextStorage.getStore()?.stateContext;
const getStateDirectoryRoot = () => {
const xdgDataHome = process.env.XDG_DATA_HOME;
if (xdgDataHome && xdgDataHome.length > 0) {
return path.join(xdgDataHome, "quixos", "state-contexts");
}
return path.join(os.homedir(), ".local", "share", "quixos", "state-contexts");
};
const sanitizePackageNameForPath = (packageName: string) =>
encodeURIComponent(packageName);
const resolveCurrentPackageName = () => {
const meta = (packageOptions?.schema as QuixosSchema | undefined)?.__quixos;
if (meta) {
return resolveTargetPackageName(meta);
}
const packageRef = process.env.QUIXOS_PACKAGE_REF;
if (packageRef) {
return normalizePackageName(packageNameFromFlakeRef(packageRef));
}
throw new Error("Unable to determine current package name for runtime context.");
};
const getStateDirectoryForContext = (
packageName: string,
stateContextId: string,
) =>
path.join(
getStateDirectoryRoot(),
sanitizePackageNameForPath(packageName),
stateContextId,
);
const withRuntimeContext = async <T>(
ctx: RuntimeExecutionContext,
fn: () => T | Promise<T>,
) => await runtimeContextStorage.run(ctx, fn);
export const runWithStateContext = async <T>(
stateContextId: string,
fn: () => T | Promise<T>,
) =>
await withRuntimeContext(
createRuntimeContext(stateContextId),
fn,
);
function createRuntimeContext(stateContextId: string): PackageContext {
const packageName = resolveCurrentPackageName();
const stateDirectory = getStateDirectoryForContext(packageName, stateContextId);
fs.mkdirSync(stateDirectory, { recursive: true });
return {
stateContext: stateContextId,
stateDirectory,
usePackage: <Schema extends PackageSchema>(
schema: Schema,
options: UsePackageOptions = {},
) =>
createPackageClient(schema, {
...options,
callerStateContextId: stateContextId,
}),
};
}
const closeRuntimeContext = async (stateContextId: string) => {
const hasOpenedContext = openedRuntimeStateContexts.has(stateContextId);
const activeServerSubscriptionIds = [...serverSubscriptions.entries()]
.filter(([, subscription]) => subscription.stateContextId === stateContextId)
.map(([subscriptionId]) => subscriptionId);
if (!hasOpenedContext && activeServerSubscriptionIds.length === 0) {
return;
}
for (const subscriptionId of activeServerSubscriptionIds) {
await closeServerSubscription(subscriptionId);
}
if (!hasOpenedContext) {
return;
}
const ctx = createRuntimeContext(stateContextId);
try {
await withRuntimeContext(ctx, async () => {
await packageOptions?.onContextClose?.(ctx as any);
});
} finally {
openedRuntimeStateContexts.delete(stateContextId);
}
};
const hasParamsSchema = (
schema: StaticResourceSchema | ParameterizedResourceSchema,
): schema is ParameterizedResourceSchema => "paramsSchema" in schema;
const parseResourceParams = <Schema extends EventSchema | ValueSchema>(
resourceKind: "event" | "value",
resourceName: string,
schema: Schema,
params: unknown,
) => {
if (!hasParamsSchema(schema)) {
if (params !== undefined) {
throw new Error(`${resourceKind} ${resourceName} does not take params`);
}
return undefined as SchemaParams<Schema>;
}
const parsed = schema.paramsSchema.safeParse(params);
if (!parsed.success) {
throw new Error(`Invalid params for ${resourceKind} ${resourceName}`);
}
return parsed.data as SchemaParams<Schema>;
};
const parseResourceData = <Schema extends EventSchema | ValueSchema>(
resourceKind: "event" | "value",
resourceName: string,
schema: Schema,
data: unknown,
) => {
const parsed = schema.dataSchema.safeParse(data);
if (!parsed.success) {
throw new Error(`Invalid data for ${resourceKind} ${resourceName}`);
}
return parsed.data as ZodOutput<Schema["dataSchema"]>;
};
const normalizePackageName = (value: string) => {
const normalized = value.startsWith("@quixos-package-schemas/")
? value.slice("@quixos-package-schemas/".length)
: value;
return normalized.endsWith(".git")
? normalized.slice(0, -".git".length)
: normalized;
};
const packageNameFromFlakeRef = (flakeRef: string) => {
const withoutQuery = flakeRef.split("?")[0] ?? flakeRef;
const withoutGitPrefix = withoutQuery.startsWith("git+")
? withoutQuery.slice(4)
: withoutQuery;
try {
const parsed = new URL(withoutGitPrefix);
const segments = parsed.pathname.split("/").filter(Boolean);
const packageName = segments.at(-1);
if (packageName) {
return normalizePackageName(packageName);
}
} catch {
// Fall through to a simple path split for non-URL flake refs.
}
const segments = withoutGitPrefix.split("/").filter(Boolean);
const packageName = segments.at(-1);
if (!packageName) {
throw new Error(`Unable to determine package name for ${flakeRef}`);
}
return normalizePackageName(packageName);
};
const resolveTargetPackageName = (meta: QuixosSchemaMeta) => {
if (meta.name) {
return normalizePackageName(meta.name);
}
if (meta.flakeRef) {
return packageNameFromFlakeRef(meta.flakeRef);
}
throw new Error("Package schema is missing package name metadata.");
};
const sendErrorResponse = (
socket: WebSocket,
requestId: string,
error: unknown,
) => {
const normalized = normalizeError(error);
sendClientMessage(socket, {
type: "response",
requestId,
ok: false,
error: normalized.message || "Unknown error",
});
};
const reportAndSendErrorResponse = async (
socket: WebSocket,
requestId: string,
params: {
phase: RuntimeErrorPhase;
error: unknown;
stateContextId?: string;
functionName?: string;
resourceKind?: "event" | "value";
resourceName?: string;
},
) => {
void reportPackageRuntimeError(params);
sendErrorResponse(socket, requestId, params.error);
};
const handleRequest = async (socket: WebSocket, message: any) => {
if (!packageOptions) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Package not initialized",
});
return;
}
if (message.method === "stop") {
try {
const activeStateContextIds = [...openedRuntimeStateContexts];
for (const stateContextId of activeStateContextIds) {
await closeRuntimeContext(stateContextId);
}
await packageOptions.onDestroy?.();
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
});
scheduleRuntimeExit(0, "stop-request");
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "on-destroy",
error,
});
}
return;
}
if (message.method === "context-open") {
const parsedParams = ContextOpenRequestParamsSchema.safeParse(
message.params,
);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid context open params",
});
return;
}
const ctx = createRuntimeContext(parsedParams.data.stateContextId);
try {
await withRuntimeContext(ctx, async () => {
await packageOptions?.onContextOpen?.(ctx);
});
openedRuntimeStateContexts.add(ctx.stateContext);
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
});
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "on-context-open",
error,
stateContextId: ctx.stateContext,
});
}
return;
}
if (message.method === "context-close") {
const parsedParams = ContextCloseRequestParamsSchema.safeParse(
message.params,
);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid context close params",
});
return;
}
try {
await closeRuntimeContext(parsedParams.data.stateContextId);
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
});
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "on-context-close",
error,
stateContextId: parsedParams.data.stateContextId,
});
}
return;
}
if (message.method === "call") {
const parsedParams = CallRequestParamsSchema.safeParse(message.params);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid call params",
});
return;
}
const { functionName, params, stateContextId } = parsedParams.data;
if (!stateContextId) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Missing state context",
});
return;
}
const fn = packageOptions.functions?.[functionName];
if (!fn) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: `Unknown function: ${functionName}`,
});
return;
}
const ctx = createRuntimeContext(stateContextId);
try {
const result = await withRuntimeContext(
ctx,
async () => await fn(ctx, params),
);
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
result,
});
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "function",
error,
stateContextId,
functionName,
});
}
return;
}
if (message.method === "boot") {
const parsedParams = BootRequestParamsSchema.safeParse(message.params);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid boot params",
});
return;
}
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
});
return;
}
if (message.method === "event-subscribe") {
const parsedParams = EventSubscribeRequestParamsSchema.safeParse(
message.params,
);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid event subscribe params",
});
return;
}
const { eventName, params, stateContextId } = parsedParams.data;
if (!stateContextId) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Missing state context",
});
return;
}
const eventSchema = packageOptions.schema.events?.[eventName];
const eventHandler = packageOptions.events?.[eventName];
if (!eventSchema || !eventHandler) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: `Unknown event: ${eventName}`,
});
return;
}
const ctx = createRuntimeContext(stateContextId);
const subscriptionId = crypto.randomUUID();
try {
const parsedResourceParams = parseResourceParams(
"event",
eventName,
eventSchema,
params,
);
const cleanup = await withRuntimeContext(ctx, async () =>
await eventHandler.subscribe(ctx, parsedResourceParams, {
subscriptionId,
emit: async (data) => {
const parsedData = parseResourceData(
"event",
eventName,
eventSchema,
data,
);
await publishToSubscription(subscriptionId, parsedData);
},
}),
);
serverSubscriptions.set(subscriptionId, {
stateContextId: ctx.stateContext,
kind: "event",
resourceName: eventName,
closeRemote: async () => {
if (typeof cleanup === "function") {
await runWithStateContext(ctx.stateContext, cleanup);
}
},
});
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
result: {
subscriptionId,
},
});
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "event-subscribe",
error,
stateContextId,
resourceKind: "event",
resourceName: eventName,
});
}
return;
}
if (message.method === "event-unsubscribe") {
const parsedParams = EventUnsubscribeRequestParamsSchema.safeParse(
message.params,
);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid event unsubscribe params",
});
return;
}
try {
await closeServerSubscription(parsedParams.data.subscriptionId);
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
});
} catch (error) {
sendErrorResponse(socket, message.requestId, error);
}
return;
}
if (message.method === "value-get") {
const parsedParams = ValueGetRequestParamsSchema.safeParse(message.params);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid value get params",
});
return;
}
const { valueName, params, stateContextId } = parsedParams.data;
if (!stateContextId) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Missing state context",
});
return;
}
const valueSchema = packageOptions.schema.values?.[valueName];
const valueHandler = packageOptions.values?.[valueName];
if (!valueSchema || !valueHandler) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: `Unknown value: ${valueName}`,
});
return;
}
const ctx = createRuntimeContext(stateContextId);
try {
const parsedResourceParams = parseResourceParams(
"value",
valueName,
valueSchema,
params,
);
const result = await withRuntimeContext(ctx, async () =>
await valueHandler.get(ctx, parsedResourceParams),
);
const parsedResult = parseResourceData(
"value",
valueName,
valueSchema,
result,
);
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
result: parsedResult,
});
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "value-get",
error,
stateContextId,
resourceKind: "value",
resourceName: valueName,
});
}
return;
}
if (message.method === "value-watch") {
const parsedParams = ValueWatchRequestParamsSchema.safeParse(
message.params,
);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid value watch params",
});
return;
}
const { valueName, params, stateContextId } = parsedParams.data;
if (!stateContextId) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Missing state context",
});
return;
}
const valueSchema = packageOptions.schema.values?.[valueName];
const valueHandler = packageOptions.values?.[valueName];
if (!valueSchema || !valueHandler?.watch) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: `Unknown watchable value: ${valueName}`,
});
return;
}
const ctx = createRuntimeContext(stateContextId);
const subscriptionId = crypto.randomUUID();
try {
const parsedResourceParams = parseResourceParams(
"value",
valueName,
valueSchema,
params,
);
const cleanup = await withRuntimeContext(ctx, async () =>
await valueHandler.watch?.(ctx, parsedResourceParams, {
subscriptionId,
set: async (data) => {
const parsedData = parseResourceData(
"value",
valueName,
valueSchema,
data,
);
await publishToSubscription(subscriptionId, parsedData);
},
}),
);
serverSubscriptions.set(subscriptionId, {
stateContextId: ctx.stateContext,
kind: "value",
resourceName: valueName,
closeRemote: async () => {
if (typeof cleanup === "function") {
await runWithStateContext(ctx.stateContext, cleanup);
}
},
});
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
result: {
subscriptionId,
},
});
} catch (error) {
await reportAndSendErrorResponse(socket, message.requestId, {
phase: "value-watch",
error,
stateContextId,
resourceKind: "value",
resourceName: valueName,
});
}
return;
}
if (message.method === "value-unwatch") {
const parsedParams = ValueUnwatchRequestParamsSchema.safeParse(
message.params,
);
if (!parsedParams.success) {
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Invalid value unwatch params",
});
return;
}
try {
await closeServerSubscription(parsedParams.data.subscriptionId);
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: true,
});
} catch (error) {
sendErrorResponse(socket, message.requestId, error);
}
return;
}
sendClientMessage(socket, {
type: "response",
requestId: message.requestId,
ok: false,
error: "Unknown method",
});
};
const handleMessage = (socket: WebSocket, data: WebSocket.RawData) => {
const parsed = parseJson(data.toString());
if (!parsed.ok) {
return;
}
const messageResult = ServerMessageSchema.safeParse(parsed.value);
if (!messageResult.success) {
return;
}
const message = messageResult.data;
if (message.type === "auth-ack") {
socketAuthenticated = true;
runtimeExitRequested = false;
clearRuntimeExitTimer();
if (socketResolve) {
socketResolve(socket);
socketResolve = null;
}
return;
}
if (message.type === "subscription-data") {
const subscription = clientSubscriptions.get(message.subscriptionId);
if (!subscription) {
return;
}
subscription.deliver(message.data);
return;
}
if (message.type === "response") {
const pending = pendingRequests.get(message.requestId);
if (!pending) {
return;
}
pendingRequests.delete(message.requestId);
clearTimeout(pending.timeout);
if (message.ok === false) {
pending.reject(new Error(message.error ?? "Unknown error"));
} else {
pending.resolve(message.result);
}
return;
}
if (message.type === "request") {
void handleRequest(socket, message);
}
};
const ensureSocket = async (requireAuth: boolean) => {
if (requireAuth && !process.env.QUIXOS_ORCH_SECRET) {
throw new Error("QUIXOS_ORCH_SECRET is not set; cannot use orchestrator");
}
if (!socketPromise) {
socketPromise = new Promise((resolve) => {
socketResolve = resolve;
});
const socket = new WebSocket("ws://127.0.0.1:6245");
socketRef = socket;
socket.on("open", () => {
const secret = process.env.QUIXOS_ORCH_SECRET;
if (secret) {
sendClientMessage(socket, {
type: "auth",
secret,
pid: process.pid,
});
} else if (socketResolve) {
socketResolve(socket);
socketResolve = null;
}
});
socket.on("message", (eventData) => handleMessage(socket, eventData));
socket.on("close", resetSocket);
socket.on("error", resetSocket);
}
const socket = await socketPromise;
if (requireAuth && !socketAuthenticated) {
return await socketPromise;
}
return socket;
};
const sendRequest = async (
method:
| "stop"
| "call"
| "boot"
| "target-context-open"
| "runtime-error"
| "context-open"
| "context-close"
| "event-subscribe"
| "subscription-ready"
| "event-unsubscribe"
| "value-get"
| "value-watch"
| "value-unwatch"
| "subscription-publish",
params: Record<string, unknown>,
timeoutMs?: number,
) => {
const socket = await ensureSocket(true);
const requestId = crypto.randomUUID();
const effectiveTimeoutMs =
timeoutMs ??
(method === "boot"
? PACKAGE_BOOT_TIMEOUT_MS
: method === "target-context-open" || method === "context-open"
? TARGET_CONTEXT_OPEN_TIMEOUT_MS
: DEFAULT_REQUEST_TIMEOUT_MS);
return await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
pendingRequests.delete(requestId);
reject(new Error("Request timed out"));
}, effectiveTimeoutMs);
pendingRequests.set(requestId, { resolve, reject, timeout });
sendClientMessage(socket, {
type: "request",
requestId,
method,
params,
});
});
};
export const publishToSubscription = async (
subscriptionId: string,
data: unknown,
) => {
await sendRequest("subscription-publish", {
subscriptionId,
data,
});
};
const createPackageClient = <Schema extends PackageSchema>(
schema: Schema,
baseOptions: InternalUsePackageOptions = {},
): PackageClient<Schema> => {
const meta = (schema as QuixosSchema).__quixos;
if (!meta?.flakeRef) {
throw new Error(
"Package schema is missing __quixos metadata. Rebuild the schema with quixos helpers.",
);
}
const targetPackageName = resolveTargetPackageName(meta);
let bootError: Error | null = null;
let bootPromise: Promise<void> | null = null;
const ensureBooted = async () => {
if (!bootPromise) {
bootPromise = (async () => {
try {
await sendRequest("boot", {
target: meta.flakeRef,
});
} catch (error) {
bootError = error instanceof Error ? error : new Error(String(error));
}
})();
}
await bootPromise;
if (bootError) {
throw bootError;
}
};
const createClient = (
scope: InternalUsePackageOptions,
): PackageClient<Schema> => {
let readyError: Error | null = null;
let readyPromise: Promise<void> | null = null;
const resolveRouting = (callerStateContextId?: string) => ({
callerStateContextId:
scope.callerStateContextId ?? callerStateContextId,
targetStateContextId: scope.stateContextId,
contextNamespace: scope.stateContextId ? undefined : scope.contextNamespace,
});
const missingStateContextError = () =>
new Error(
`No active state context for package ${targetPackageName}. Move this call into \`onContextOpen(ctx)\` or another handler with \`ctx.usePackage(...)\`, or pass \`stateContextId\` explicitly.`,
);
const ensurePackageReady = async (callerStateContextId?: string) => {
if (!readyPromise) {
readyPromise = (async () => {
try {
await ensureBooted();
const routing = resolveRouting(callerStateContextId);
if (!routing.callerStateContextId && !routing.targetStateContextId) {
return;
}
await sendRequest("target-context-open", {
target: meta.flakeRef,
targetPackageName,
callerStateContextId: routing.callerStateContextId,
stateContextId: routing.targetStateContextId,
contextNamespace: routing.contextNamespace,
});
} catch (error) {
readyError =
error instanceof Error ? error : new Error(String(error));
}
})();
}
await readyPromise;
if (readyError) {
throw readyError;
}
};
const scopedResourceMethods = <T,>(
lookup: (client: PackageClient<Schema>) => T,
) => ({
withStateContext: (stateContextId: string) =>
lookup(
createClient({
...scope,
stateContextId,
}),
),
withContextNamespace: (contextNamespace?: string) =>
lookup(
createClient({
...scope,
contextNamespace,
}),
),
});
const createManagedSubscription = <SchemaType extends EventSchema | ValueSchema>(
subscriptionId: string,
resourceKind: "event" | "value",
resourceName: string,
schemaType: SchemaType,
handler: (
data: ZodOutput<SchemaType["dataSchema"]>,
) => void | Promise<void>,
unsubscribeMethod: "event-unsubscribe" | "value-unwatch",
subscriberStateContextId?: string,
options?: SubscriptionOptions,
): SubscriptionHandle => {
let closed = false;
let abortHandler: (() => void) | null = null;
let deliveryChain: Promise<void> = Promise.resolve();
if (clientSubscriptions.has(subscriptionId)) {
throw new Error(
`Subscription ${subscriptionId} is already attached. Pass \`subscriptionNamespace\` to create a distinct logical subscription.`,
);
}
const closeLocal = (_error?: Error) => {
if (closed) {
return;
}
closed = true;
clientSubscriptions.delete(subscriptionId);
if (options?.signal && abortHandler) {
options.signal.removeEventListener("abort", abortHandler);
}
};
const sendUnsubscribe = async () => {
try {
await sendRequest(unsubscribeMethod, {
subscriptionId,
});
} catch {
// Ignore unsubscribe errors; local close already happened.
}
};
const unsubscribe = async () => {
if (closed) {
return;
}
closeLocal();
await sendUnsubscribe();
};
const fail = (error: Error) => {
closeLocal(error);
void sendUnsubscribe();
};
const deliver = (data: unknown) => {
if (closed) {
return;
}
let parsedData: ZodOutput<SchemaType["dataSchema"]>;
try {
parsedData = parseResourceData(
resourceKind,
resourceName,
schemaType,
data,
);
} catch (error) {
fail(error instanceof Error ? error : new Error(String(error)));
return;
}
deliveryChain = deliveryChain
.then(async () => {
if (closed) {
return;
}
const invoke = async () => {
await handler(parsedData);
};
if (subscriberStateContextId) {
await runWithStateContext(subscriberStateContextId, invoke);
return;
}
await invoke();
})
.catch((error) => {
fail(error instanceof Error ? error : new Error(String(error)));
});
};
clientSubscriptions.set(subscriptionId, {
deliver,
closeLocal,
});
if (options?.signal) {
abortHandler = () => {
void unsubscribe();
};
if (options.signal.aborted) {
void unsubscribe();
} else {
options.signal.addEventListener("abort", abortHandler);
}
}
return {
unsubscribe,
};
};
const consumeEvent = async (
eventName: string,
eventSchema: EventSchema,
params: unknown,
handler: (data: unknown) => void | Promise<void>,
options?: SubscriptionOptions,
) => {
const callerStateContextId = getCurrentStateContextId();
await ensurePackageReady(callerStateContextId);
const routing = resolveRouting(callerStateContextId);
if (!routing.callerStateContextId && !routing.targetStateContextId) {
throw missingStateContextError();
}
const result = await sendRequest("event-subscribe", {
target: meta.flakeRef,
targetPackageName,
eventName,
params,
callerStateContextId: routing.callerStateContextId,
stateContextId: routing.targetStateContextId,
contextNamespace: routing.contextNamespace,
subscriptionNamespace: options?.subscriptionNamespace,
});
const subscriptionId = (result as { subscriptionId?: string })
?.subscriptionId;
if (!subscriptionId) {
throw new Error(`Subscription failed for event ${eventName}`);
}
const handle = createManagedSubscription(
subscriptionId,
"event",
eventName,
eventSchema,
handler,
"event-unsubscribe",
callerStateContextId,
options,
);
try {
await sendRequest("subscription-ready", {
subscriptionId,
});
} catch (error) {
await handle.unsubscribe();
throw error;
}
return handle;
};
const getValue = async (
valueName: string,
valueSchema: ValueSchema,
params: unknown,
) => {
const callerStateContextId = getCurrentStateContextId();
await ensurePackageReady(callerStateContextId);
const routing = resolveRouting(callerStateContextId);
if (!routing.callerStateContextId && !routing.targetStateContextId) {
throw missingStateContextError();
}
const result = await sendRequest("value-get", {
target: meta.flakeRef,
targetPackageName,
valueName,
params,
callerStateContextId: routing.callerStateContextId,
stateContextId: routing.targetStateContextId,
contextNamespace: routing.contextNamespace,
});
return parseResourceData("value", valueName, valueSchema, result);
};
const watchValue = async (
valueName: string,
valueSchema: ValueSchema,
params: unknown,
handler: (data: unknown) => void | Promise<void>,
options?: SubscriptionOptions,
) => {
const callerStateContextId = getCurrentStateContextId();
await ensurePackageReady(callerStateContextId);
const routing = resolveRouting(callerStateContextId);
if (!routing.callerStateContextId && !routing.targetStateContextId) {
throw missingStateContextError();
}
const result = await sendRequest("value-watch", {
target: meta.flakeRef,
targetPackageName,
valueName,
params,
callerStateContextId: routing.callerStateContextId,
stateContextId: routing.targetStateContextId,
contextNamespace: routing.contextNamespace,
subscriptionNamespace: options?.subscriptionNamespace,
});
const subscriptionId = (result as { subscriptionId?: string })
?.subscriptionId;
if (!subscriptionId) {
throw new Error(`Watch failed for value ${valueName}`);
}
const handle = createManagedSubscription(
subscriptionId,
"value",
valueName,
valueSchema,
handler,
"value-unwatch",
callerStateContextId,
options,
);
try {
await sendRequest("subscription-ready", {
subscriptionId,
});
} catch (error) {
await handle.unsubscribe();
throw error;
}
return handle;
};
const functions = new Proxy(
{},
{
get: (_target, prop) => {
if (typeof prop !== "string") {
return undefined;
}
const call = async (params: unknown) => {
const callerStateContextId = getCurrentStateContextId();
await ensurePackageReady(callerStateContextId);
const routing = resolveRouting(callerStateContextId);
if (!routing.callerStateContextId && !routing.targetStateContextId) {
throw missingStateContextError();
}
return await sendRequest("call", {
target: meta.flakeRef,
targetPackageName,
functionName: prop,
params,
callerStateContextId: routing.callerStateContextId,
stateContextId: routing.targetStateContextId,
contextNamespace: routing.contextNamespace,
});
};
return Object.assign(
call,
scopedResourceMethods((client) => client.functions[prop]),
);
},
},
) as PackageFunctionCallers<Schema["functions"]>;
const events = {} as EventClients<Schema["events"]>;
const eventEntries = Object.entries(
(schema.events ?? {}) as EventSchemaMap,
);
for (const [eventName, eventSchema] of eventEntries) {
if (hasParamsSchema(eventSchema)) {
(events as Record<string, unknown>)[eventName] = Object.assign({
consume: async (
params: unknown,
handler: (data: unknown) => void | Promise<void>,
options?: SubscriptionOptions,
) => {
const parsedParams = eventSchema.paramsSchema.safeParse(params);
if (!parsedParams.success) {
throw new Error(`Invalid params for event ${eventName}`);
}
return await consumeEvent(
eventName,
eventSchema,
parsedParams.data,
handler,
options,
);
},
}, scopedResourceMethods((client) => client.events[eventName]));
} else {
(events as Record<string, unknown>)[eventName] = Object.assign({
consume: async (
handler: (data: unknown) => void | Promise<void>,
options?: SubscriptionOptions,
) =>
await consumeEvent(
eventName,
eventSchema,
undefined,
handler,
options,
),
}, scopedResourceMethods((client) => client.events[eventName]));
}
}
const values = {} as ValueClients<Schema["values"]>;
const valueEntries = Object.entries(
(schema.values ?? {}) as ValueSchemaMap,
);
for (const [valueName, valueSchema] of valueEntries) {
if (hasParamsSchema(valueSchema)) {
(values as Record<string, unknown>)[valueName] = Object.assign({
get: async (params: unknown) => {
const parsedParams = valueSchema.paramsSchema.safeParse(params);
if (!parsedParams.success) {
throw new Error(`Invalid params for value ${valueName}`);
}
return await getValue(valueName, valueSchema, parsedParams.data);
},
watch: async (
params: unknown,
handler: (data: unknown) => void | Promise<void>,
options?: SubscriptionOptions,
) => {
const parsedParams = valueSchema.paramsSchema.safeParse(params);
if (!parsedParams.success) {
throw new Error(`Invalid params for value ${valueName}`);
}
return await watchValue(
valueName,
valueSchema,
parsedParams.data,
handler,
options,
);
},
}, scopedResourceMethods((client) => client.values[valueName]));
} else {
(values as Record<string, unknown>)[valueName] = Object.assign({
get: async () => await getValue(valueName, valueSchema, undefined),
watch: async (
handler: (data: unknown) => void | Promise<void>,
options?: SubscriptionOptions,
) =>
await watchValue(
valueName,
valueSchema,
undefined,
handler,
options,
),
}, scopedResourceMethods((client) => client.values[valueName]));
}
}
const eagerCallerStateContextId =
scope.callerStateContextId ?? getCurrentStateContextId();
if (eagerCallerStateContextId || scope.stateContextId) {
void ensurePackageReady(eagerCallerStateContextId).catch((error) => {
console.warn(
`Failed to eagerly open child context for ${targetPackageName}: ${
error instanceof Error ? error.message : String(error)
}`,
);
});
}
return {
functions,
events,
values,
withStateContext: (stateContextId: string) =>
createClient({
...scope,
stateContextId,
}),
withContextNamespace: (contextNamespace?: string) =>
createClient({
...scope,
contextNamespace,
}),
};
};
return createClient(baseOptions);
};
export const createPackage = <
Schema extends PackageSchema,
Context extends PackageContext = PackageContext,
>(
options: CreatePackageOptions<Schema, Context>,
): LifecycleManager => {
packageOptions = options;
void ensureSocket(false);
void (async () => {
try {
await options.onCreate?.();
} catch (error) {
await reportPackageRuntimeError({
phase: "on-create",
error,
});
console.error(
"[quixos-runtime] onCreate failed:",
normalizeError(error).stack ?? normalizeError(error).message,
);
}
})();
return {};
};