> ## Documentation Index
> Fetch the complete documentation index at: https://dripart-fix-cloud-button-text-1773163393.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Cloud API 参考

> Comfy Cloud 的完整 API 参考及代码示例

<Warning>
  **实验性 API：** 此 API 处于实验阶段，可能会发生变化。端点、请求/响应格式和行为可能会在未事先通知的情况下进行修改。部分端点为兼容本地 ComfyUI 而保留，但可能具有不同的语义（例如，某些字段会被忽略）。
</Warning>

本页面提供了常见 Comfy Cloud API 操作的完整示例。

<Note>
  **需要订阅：** 通过 API 运行工作流需要有效的 Comfy Cloud 订阅。请查看[定价方案](https://www.comfy.org/cloud/pricing?utm_source=docs\&utm_campaign=cloud-api)了解详情。
</Note>

## 设置

所有示例都使用以下通用的导入和配置：

<CodeGroup>
  ```bash curl theme={null}
  export COMFY_CLOUD_API_KEY="your-api-key"
  export BASE_URL="https://cloud.comfy.org"
  ```

  ```typescript TypeScript theme={null}
  import { readFile, writeFile } from "fs/promises";

  const BASE_URL = "https://cloud.comfy.org";
  const API_KEY = process.env.COMFY_CLOUD_API_KEY!;

  function getHeaders(): HeadersInit {
    return {
      "X-API-Key": API_KEY,
      "Content-Type": "application/json",
    };
  }
  ```

  ```python Python theme={null}
  import os
  import requests
  import json
  import time
  import asyncio
  import aiohttp

  BASE_URL = "https://cloud.comfy.org"
  API_KEY = os.environ["COMFY_CLOUD_API_KEY"]

  def get_headers():
      return {
          "X-API-Key": API_KEY,
          "Content-Type": "application/json"
      }
  ```
</CodeGroup>

***

## 对象信息

获取可用的节点定义。这对于了解可用的节点及其输入/输出规范非常有用。

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/object_info" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function getObjectInfo(): Promise<Record<string, any>> {
    const response = await fetch(`${BASE_URL}/api/object_info`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const objectInfo = await getObjectInfo();
  console.log(`Available nodes: ${Object.keys(objectInfo).length}`);

  const ksampler = objectInfo["KSampler"] ?? {};
  console.log(`KSampler inputs: ${Object.keys(ksampler.input?.required ?? {})}`);
  ```

  ```python Python theme={null}
  def get_object_info():
      """Fetch all available node definitions from cloud."""
      response = requests.get(
          f"{BASE_URL}/api/object_info",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()

  # Get all nodes
  object_info = get_object_info()
  print(f"Available nodes: {len(object_info)}")

  # Get a specific node's definition
  ksampler = object_info.get("KSampler", {})
  inputs = list(ksampler.get('input', {}).get('required', {}).keys())
  print(f"KSampler inputs: {inputs}")
  ```
</CodeGroup>

***

## 上传输入

上传图像、遮罩或其他文件以在工作流中使用。

### 直接上传（Multipart）

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/upload/image" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -F "image=@my_image.png" \
    -F "type=input" \
    -F "overwrite=true"
  ```

  ```typescript TypeScript theme={null}
  async function uploadInput(
    filePath: string,
    inputType: string = "input"
  ): Promise<{ name: string; subfolder: string }> {
    const file = await readFile(filePath);
    const formData = new FormData();
    formData.append("image", new Blob([file]), filePath.split("/").pop());
    formData.append("type", inputType);
    formData.append("overwrite", "true");

    const response = await fetch(`${BASE_URL}/api/upload/image`, {
      method: "POST",
      headers: { "X-API-Key": API_KEY },
      body: formData,
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const result = await uploadInput("my_image.png");
  console.log(`Uploaded: ${result.name} to ${result.subfolder}`);
  ```

  ```python Python theme={null}
  def upload_input(file_path: str, input_type: str = "input") -> dict:
      """Upload a file directly to cloud.
      
      Args:
          file_path: Path to the file to upload
          input_type: "input" for images, "temp" for temporary files
          
      Returns:
          Upload response with filename and subfolder
      """
      with open(file_path, "rb") as f:
          files = {"image": f}
          data = {"type": input_type, "overwrite": "true"}
          
          response = requests.post(
              f"{BASE_URL}/api/upload/image",
              headers={"X-API-Key": API_KEY},  # No Content-Type for multipart
              files=files,
              data=data
          )
      response.raise_for_status()
      return response.json()

  # Upload an image
  result = upload_input("my_image.png")
  print(f"Uploaded: {result['name']} to {result['subfolder']}")
  ```
</CodeGroup>

### 上传遮罩

<Note>
  `subfolder` 参数为 API 兼容性而接受，但在云存储中会被忽略。所有文件都存储在扁平的、内容寻址的命名空间中。
</Note>

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/upload/mask" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -F "image=@mask.png" \
    -F "type=input" \
    -F "subfolder=clipspace" \
    -F 'original_ref={"filename":"my_image.png","subfolder":"","type":"input"}'
  ```

  ```typescript TypeScript theme={null}
  async function uploadMask(
    filePath: string,
    originalRef: { filename: string; subfolder: string; type: string }
  ): Promise<{ name: string; subfolder: string }> {
    const file = await readFile(filePath);
    const formData = new FormData();
    formData.append("image", new Blob([file]), filePath.split("/").pop());
    formData.append("type", "input");
    formData.append("subfolder", "clipspace");
    formData.append("original_ref", JSON.stringify(originalRef));

    const response = await fetch(`${BASE_URL}/api/upload/mask`, {
      method: "POST",
      headers: { "X-API-Key": API_KEY },
      body: formData,
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const maskResult = await uploadMask("mask.png", {
    filename: "my_image.png",
    subfolder: "",
    type: "input",
  });
  console.log(`Uploaded mask: ${maskResult.name}`);
  ```

  ```python Python theme={null}
  def upload_mask(file_path: str, original_ref: dict) -> dict:
      """Upload a mask associated with an original image.
      
      Args:
          file_path: Path to the mask file
          original_ref: Reference to the original image {"filename": "...", "subfolder": "...", "type": "..."}
      """
      with open(file_path, "rb") as f:
          files = {"image": f}
          data = {
              "type": "input",
              "subfolder": "clipspace",
              "original_ref": json.dumps(original_ref)
          }
          
          response = requests.post(
              f"{BASE_URL}/api/upload/mask",
              headers={"X-API-Key": API_KEY},
              files=files,
              data=data
          )
      response.raise_for_status()
      return response.json()
  ```
</CodeGroup>

***

## 运行工作流

提交工作流以执行。

### 提交工作流

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/prompt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{"prompt": '"$(cat workflow_api.json)"'}'
  ```

  ```typescript TypeScript theme={null}
  async function submitWorkflow(workflow: Record<string, any>): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ prompt: workflow }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    const result = await response.json();

    if (result.error) {
      throw new Error(`Workflow error: ${result.error}`);
    }
    return result.prompt_id;
  }

  const workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));
  const promptId = await submitWorkflow(workflow);
  console.log(`Submitted job: ${promptId}`);
  ```

  ```python Python theme={null}
  def submit_workflow(workflow: dict) -> str:
      """Submit a workflow and return the prompt_id (job ID).
      
      Args:
          workflow: ComfyUI workflow in API format
          
      Returns:
          prompt_id for tracking the job
      """
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={"prompt": workflow}
      )
      response.raise_for_status()
      result = response.json()
      
      if "error" in result:
          raise ValueError(f"Workflow error: {result['error']}")
      
      return result["prompt_id"]

  # Load and submit a workflow
  with open("workflow_api.json") as f:
      workflow = json.load(f)

  prompt_id = submit_workflow(workflow)
  print(f"Submitted job: {prompt_id}")
  ```
</CodeGroup>

### 使用合作伙伴节点

如果您的工作流包含[合作伙伴节点](/zh-CN/tutorials/api-nodes/overview)（调用外部 AI 服务的节点，如 Flux Pro、Ideogram 等），您必须在请求体的 `extra_data` 字段中包含您的 Comfy API 密钥。

<Note>
  在浏览器中运行工作流时，ComfyUI 前端会自动将您的 API 密钥打包到 `extra_data` 中。本节仅适用于直接调用 API 的情况。
</Note>

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/prompt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{
      "prompt": '"$(cat workflow_api.json)"',
      "extra_data": {
        "api_key_comfy_org": "your-comfy-api-key"
      }
    }'
  ```

  ```typescript TypeScript theme={null}
  async function submitWorkflowWithPartnerNodes(
    workflow: Record<string, any>,
    apiKey: string
  ): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({
        prompt: workflow,
        extra_data: {
          api_key_comfy_org: apiKey,
        },
      }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    const result = await response.json();
    return result.prompt_id;
  }

  // 当工作流包含合作伙伴节点时使用（例如 Flux Pro、Ideogram 等）
  const promptId = await submitWorkflowWithPartnerNodes(workflow, API_KEY);
  ```

  ```python Python theme={null}
  def submit_workflow_with_partner_nodes(workflow: dict, api_key: str) -> str:
      """提交使用合作伙伴节点的工作流。
      
      Args:
          workflow: API 格式的 ComfyUI 工作流
          api_key: 来自 platform.comfy.org 的 API 密钥
          
      Returns:
          用于跟踪任务的 prompt_id
      """
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={
              "prompt": workflow,
              "extra_data": {
                  "api_key_comfy_org": api_key
              }
          }
      )
      response.raise_for_status()
      return response.json()["prompt_id"]

  # 当工作流包含合作伙伴节点时使用
  prompt_id = submit_workflow_with_partner_nodes(workflow, API_KEY)
  ```
</CodeGroup>

<Info>
  在 [platform.comfy.org](https://platform.comfy.org/login) 生成您的 API 密钥。此密钥与 Cloud API 身份验证（`X-API-Key` 请求头）使用的是同一个密钥。
</Info>

### 修改工作流输入

<CodeGroup>
  ```typescript TypeScript theme={null}
  function setWorkflowInput(
    workflow: Record<string, any>,
    nodeId: string,
    inputName: string,
    value: any
  ): Record<string, any> {
    if (workflow[nodeId]) {
      workflow[nodeId].inputs[inputName] = value;
    }
    return workflow;
  }

  // Example: Set seed and prompt
  let workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));
  workflow = setWorkflowInput(workflow, "3", "seed", 12345);
  workflow = setWorkflowInput(workflow, "6", "text", "a beautiful landscape");
  ```

  ```python Python theme={null}
  def set_workflow_input(workflow: dict, node_id: str, input_name: str, value) -> dict:
      """Modify a workflow input value.
      
      Args:
          workflow: The workflow dict
          node_id: ID of the node to modify
          input_name: Name of the input field
          value: New value
          
      Returns:
          Modified workflow
      """
      if node_id in workflow:
          workflow[node_id]["inputs"][input_name] = value
      return workflow

  # Example: Set seed and prompt
  workflow = set_workflow_input(workflow, "3", "seed", 12345)
  workflow = set_workflow_input(workflow, "6", "text", "a beautiful landscape")
  ```
</CodeGroup>

***

## 检查任务状态

轮询任务完成状态。

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/job/{prompt_id}/status" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  interface JobStatus {
    status: string;
  }

  async function getJobStatus(promptId: string): Promise<JobStatus> {
    const response = await fetch(`${BASE_URL}/api/job/${promptId}/status`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  async function pollForCompletion(
    promptId: string,
    timeout: number = 300,
    pollInterval: number = 2000
  ): Promise<void> {
    const startTime = Date.now();

    while (Date.now() - startTime < timeout * 1000) {
      const { status } = await getJobStatus(promptId);

      if (status === "success") {
        return;
      } else if (["error", "failed", "cancelled"].includes(status)) {
        throw new Error(`Job failed with status: ${status}`);
      }

      await new Promise((resolve) => setTimeout(resolve, pollInterval));
    }

    throw new Error(`Job ${promptId} did not complete within ${timeout}s`);
  }

  await pollForCompletion(promptId);
  console.log("Job completed!");
  ```

  ```python Python theme={null}
  def get_job_status(prompt_id: str) -> str:
      """Get the current status of a job."""
      response = requests.get(
          f"{BASE_URL}/api/job/{prompt_id}/status",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()["status"]

  def poll_for_completion(prompt_id: str, timeout: int = 300, poll_interval: float = 2.0) -> None:
      """Poll until job completes or times out."""
      start_time = time.time()
      
      while time.time() - start_time < timeout:
          status = get_job_status(prompt_id)
          
          if status == "success":
              return
          elif status in ("error", "failed", "cancelled"):
              raise RuntimeError(f"Job failed with status: {status}")
          
          time.sleep(poll_interval)
      
      raise TimeoutError(f"Job {prompt_id} did not complete within {timeout}s")

  poll_for_completion(prompt_id)
  print("Job completed!")
  ```
</CodeGroup>

***

## 实时进度 WebSocket

连接 WebSocket 以获取实时执行更新。

<Note>
  `clientId` 参数目前会被忽略——同一用户的所有连接都会收到相同的消息。为了向前兼容，仍建议传递唯一的 `clientId`。
</Note>

<CodeGroup>
  ```typescript TypeScript theme={null}
  async function listenForCompletion(
    promptId: string,
    timeout: number = 300000
  ): Promise<Record<string, any>> {
    const wsUrl = `wss://cloud.comfy.org/ws?clientId=${crypto.randomUUID()}&token=${API_KEY}`;
    const outputs: Record<string, any> = {};

    return new Promise((resolve, reject) => {
      const ws = new WebSocket(wsUrl);
      const timer = setTimeout(() => {
        ws.close();
        reject(new Error(`任务在 ${timeout / 1000}s 内未完成`));
      }, timeout);

      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        const msgType = data.type;
        const msgData = data.data ?? {};

        // 过滤我们的任务
        if (msgData.prompt_id !== promptId) return;

        if (msgType === "executing") {
          const node = msgData.node;
          if (node) {
            console.log(`正在执行节点：${node}`);
          } else {
            console.log("执行完成");
          }
        } else if (msgType === "progress") {
          console.log(`进度：${msgData.value}/${msgData.max}`);
        } else if (msgType === "executed" && msgData.output) {
          outputs[msgData.node] = msgData.output;
        } else if (msgType === "execution_success") {
          console.log("任务成功完成！");
          clearTimeout(timer);
          ws.close();
          resolve(outputs);
        } else if (msgType === "execution_error") {
          const errorMsg = msgData.exception_message ?? "未知错误";
          clearTimeout(timer);
          ws.close();
          reject(new Error(`执行错误：${errorMsg}`));
        }
      };

      ws.onerror = (err) => {
        clearTimeout(timer);
        reject(err);
      };
    });
  }

  // 等待完成并收集输出
  const outputs = await listenForCompletion(promptId);
  ```

  ```python Python theme={null}
  import asyncio
  import aiohttp
  import json
  import uuid

  async def listen_for_completion(prompt_id: str, timeout: float = 300.0) -> dict:
      """连接到 WebSocket 并监听任务完成。

      Returns:
          任务的最终输出
      """
      ws_url = BASE_URL.replace("https://", "wss://")
      client_id = str(uuid.uuid4())
      ws_url = f"{ws_url}/ws?clientId={client_id}&token={API_KEY}"

      outputs = {}

      async with aiohttp.ClientSession() as session:
          async with session.ws_connect(ws_url) as ws:
              async def receive_messages():
                  async for msg in ws:
                      if msg.type == aiohttp.WSMsgType.TEXT:
                          data = json.loads(msg.data)
                          msg_type = data.get("type")
                          msg_data = data.get("data", {})

                          # 过滤我们的任务
                          if msg_data.get("prompt_id") != prompt_id:
                              continue

                          if msg_type == "executing":
                              node = msg_data.get("node")
                              if node:
                                  print(f"正在执行节点：{node}")

                          elif msg_type == "progress":
                              value = msg_data.get("value", 0)
                              max_val = msg_data.get("max", 100)
                              print(f"进度：{value}/{max_val}")

                          elif msg_type == "executed":
                              node_id = msg_data.get("node")
                              output = msg_data.get("output", {})
                              if output:
                                  outputs[node_id] = output

                          elif msg_type == "execution_success":
                              print("任务成功完成！")
                              return outputs

                          elif msg_type == "execution_error":
                              error_msg = msg_data.get("exception_message", "未知错误")
                              raise RuntimeError(f"执行错误：{error_msg}")

                      elif msg.type == aiohttp.WSMsgType.ERROR:
                          raise RuntimeError(f"WebSocket 错误：{ws.exception()}")

              try:
                  return await asyncio.wait_for(receive_messages(), timeout=timeout)
              except asyncio.TimeoutError:
                  raise TimeoutError(f"任务在 {timeout}s 内未完成")

  # 等待完成并收集输出
  outputs = await listen_for_completion(prompt_id)
  ```
</CodeGroup>

### WebSocket 消息类型

消息以 JSON 文本帧的形式发送，除非另有说明。

| 类型                      | 描述                                                         |
| ----------------------- | ---------------------------------------------------------- |
| `status`                | 队列状态更新，包含 `queue_remaining` 计数                             |
| `notification`          | 用户友好的状态消息（`value` 字段包含如 "Executing workflow\..." 的文本）      |
| `execution_start`       | 工作流执行已开始                                                   |
| `executing`             | 特定节点正在执行（节点 ID 在 `node` 字段中）                               |
| `progress`              | 节点内的步骤进度（采样步骤的 `value`/`max`）                              |
| `progress_state`        | 扩展进度状态，包含节点元数据（嵌套的 `nodes` 对象）                             |
| `executed`              | 节点完成并输出结果（图像、视频等在 `output` 字段中）                            |
| `execution_cached`      | 因输出已缓存而跳过的节点（`nodes` 数组）                                   |
| `execution_success`     | 整个工作流成功完成                                                  |
| `execution_error`       | 工作流失败（包含 `exception_type`、`exception_message`、`traceback`） |
| `execution_interrupted` | 工作流被用户取消                                                   |

#### 二进制消息（预览图像）

在图像生成过程中，ComfyUI 会发送包含预览图像的**二进制 WebSocket 帧**。这些是原始二进制数据（不是 JSON）：

| 二进制类型                         | 值   | 描述              |
| ----------------------------- | --- | --------------- |
| `PREVIEW_IMAGE`               | `1` | 扩散采样期间的进度预览     |
| `TEXT`                        | `3` | 节点的文本输出（进度文本）   |
| `PREVIEW_IMAGE_WITH_METADATA` | `4` | 带有节点上下文元数据的预览图像 |

**二进制帧格式**（所有整数为大端序）：

<Tabs>
  <Tab title="PREVIEW_IMAGE (1)">
    | 偏移 | 大小   | 字段           | 描述                  |
    | -- | ---- | ------------ | ------------------- |
    | 0  | 4 字节 | `type`       | `0x00000001`        |
    | 4  | 4 字节 | `image_type` | 格式代码（1=JPEG, 2=PNG） |
    | 8  | 可变   | `image_data` | 原始图像字节              |
  </Tab>

  <Tab title="TEXT (3)">
    | 偏移  | 大小   | 字段            | 描述              |
    | --- | ---- | ------------- | --------------- |
    | 0   | 4 字节 | `type`        | `0x00000003`    |
    | 4   | 4 字节 | `node_id_len` | node\_id 字符串的长度 |
    | 8   | N 字节 | `node_id`     | UTF-8 编码的节点 ID  |
    | 8+N | 可变   | `text`        | UTF-8 编码的进度文本   |
  </Tab>

  <Tab title="PREVIEW_WITH_METADATA (4)">
    | 偏移  | 大小   | 字段             | 描述              |
    | --- | ---- | -------------- | --------------- |
    | 0   | 4 字节 | `type`         | `0x00000004`    |
    | 4   | 4 字节 | `metadata_len` | 元数据 JSON 的长度    |
    | 8   | N 字节 | `metadata`     | UTF-8 JSON（见下文） |
    | 8+N | 可变   | `image_data`   | 原始 JPEG/PNG 字节  |

    **元数据 JSON 结构：**

    ```json theme={null}
    {
      "node_id": "3",
      "display_node_id": "3",
      "real_node_id": "3",
      "prompt_id": "abc-123",
      "parent_node_id": null
    }
    ```
  </Tab>
</Tabs>

<Note>
  请参阅 [OpenAPI 规范](/zh-CN/development/cloud/openapi) 了解每种 JSON 消息类型的完整模式定义。
</Note>

***

## 下载输出

在任务完成后检索生成的文件。

<CodeGroup>
  ```bash curl theme={null}
  # 下载单个输出文件（使用 -L 跟随 302 重定向）
  curl -L "$BASE_URL/api/view?filename=output.png&subfolder=&type=output" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -o output.png
  ```

  ```typescript TypeScript theme={null}
  async function downloadOutput(
    filename: string,
    subfolder: string = "",
    outputType: string = "output"
  ): Promise<ArrayBuffer> {
    const params = new URLSearchParams({ filename, subfolder, type: outputType });
    // 获取重定向 URL
    const response = await fetch(`${BASE_URL}/api/view?${params}`, {
      headers: { "X-API-Key": API_KEY },
      redirect: "manual",
    });
    if (response.status !== 302) throw new Error(`HTTP ${response.status}`);
    const signedUrl = response.headers.get("location")!;

    // 从签名 URL 获取文件
    const fileResponse = await fetch(signedUrl);
    if (!fileResponse.ok) throw new Error(`HTTP ${fileResponse.status}`);
    return fileResponse.arrayBuffer();
  }

  async function saveOutputs(
    outputs: Record<string, any>,
    outputDir: string = "."
  ): Promise<void> {
    for (const nodeOutputs of Object.values(outputs)) {
      for (const key of ["images", "video", "audio"]) {
        for (const fileInfo of (nodeOutputs as any)[key] ?? []) {
          const data = await downloadOutput(
            fileInfo.filename,
            fileInfo.subfolder ?? "",
            fileInfo.type ?? "output"
          );
          const path = `${outputDir}/${fileInfo.filename}`;
          await writeFile(path, Buffer.from(data));
          console.log(`已保存：${path}`);
        }
      }
    }
  }

  // 下载所有输出
  await saveOutputs(outputs, "./my_outputs");
  ```

  ```python Python theme={null}
  def download_output(filename: str, subfolder: str = "", output_type: str = "output") -> bytes:
      """下载输出文件。

      Args:
          filename: 文件名
          subfolder: 子文件夹路径（通常为空）
          output_type: "output" 表示最终输出，"temp" 表示预览

      Returns:
          文件字节
      """
      params = {
          "filename": filename,
          "subfolder": subfolder,
          "type": output_type
      }

      response = requests.get(
          f"{BASE_URL}/api/view",
          headers=get_headers(),
          params=params
      )
      response.raise_for_status()
      return response.content

  def save_outputs(outputs: dict, output_dir: str = "."):
      """将任务的所有输出保存到磁盘。

      Args:
          outputs: 任务的输出字典（node_id -> output_data）
          output_dir: 保存文件的目录
      """
      import os
      os.makedirs(output_dir, exist_ok=True)

      for node_id, node_outputs in outputs.items():
          for key in ("images", "video", "audio"):
              for file_info in node_outputs.get(key, []):
                  filename = file_info["filename"]
                  subfolder = file_info.get("subfolder", "")
                  output_type = file_info.get("type", "output")

                  data = download_output(filename, subfolder, output_type)

                  output_path = os.path.join(output_dir, filename)
                  with open(output_path, "wb") as f:
                      f.write(data)
                  print(f"已保存：{output_path}")

  # 下载所有输出
  save_outputs(outputs, "./my_outputs")
  ```
</CodeGroup>

***

## 完整端到端示例

以下是一个将所有内容整合在一起的完整示例：

<CodeGroup>
  ```typescript TypeScript theme={null}
  const BASE_URL = "https://cloud.comfy.org";
  const API_KEY = process.env.COMFY_CLOUD_API_KEY!;

  function getHeaders(): HeadersInit {
    return { "X-API-Key": API_KEY, "Content-Type": "application/json" };
  }

  async function submitWorkflow(workflow: Record<string, any>): Promise<string> {
    const response = await fetch(`${BASE_URL}/api/prompt`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ prompt: workflow }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return (await response.json()).prompt_id;
  }

  async function waitForCompletion(
    promptId: string,
    timeout: number = 300000
  ): Promise<Record<string, any>> {
    const wsUrl = `wss://cloud.comfy.org/ws?clientId=${crypto.randomUUID()}&token=${API_KEY}`;
    const outputs: Record<string, any> = {};

    return new Promise((resolve, reject) => {
      const ws = new WebSocket(wsUrl);
      const timer = setTimeout(() => {
        ws.close();
        reject(new Error("Job timed out"));
      }, timeout);

      ws.onmessage = (event) => {
        const data = JSON.parse(event.data);
        if (data.data?.prompt_id !== promptId) return;

        const msgType = data.type;
        const msgData = data.data ?? {};

        if (msgType === "progress") {
          console.log(`Progress: ${msgData.value}/${msgData.max}`);
        } else if (msgType === "executed" && msgData.output) {
          outputs[msgData.node] = msgData.output;
        } else if (msgType === "execution_success") {
          clearTimeout(timer);
          ws.close();
          resolve(outputs);
        } else if (msgType === "execution_error") {
          clearTimeout(timer);
          ws.close();
          reject(new Error(msgData.exception_message ?? "Unknown error"));
        }
      };

      ws.onerror = (err) => {
        clearTimeout(timer);
        reject(err);
      };
    });
  }

  async function downloadOutputs(
    outputs: Record<string, any>,
    outputDir: string
  ): Promise<void> {
    for (const nodeOutputs of Object.values(outputs)) {
      for (const key of ["images", "video", "audio"]) {
        for (const fileInfo of (nodeOutputs as any)[key] ?? []) {
          const params = new URLSearchParams({
            filename: fileInfo.filename,
            subfolder: fileInfo.subfolder ?? "",
            type: fileInfo.type ?? "output",
          });
          // Get redirect URL (don't follow to avoid sending auth to storage)
          const response = await fetch(`${BASE_URL}/api/view?${params}`, {
            headers: { "X-API-Key": API_KEY },
            redirect: "manual",
          });
          if (response.status !== 302) throw new Error(`HTTP ${response.status}`);
          const signedUrl = response.headers.get("location")!;
          // Fetch from signed URL without auth headers
          const fileResponse = await fetch(signedUrl);
          if (!fileResponse.ok) throw new Error(`HTTP ${fileResponse.status}`);

          const path = `${outputDir}/${fileInfo.filename}`;
          await writeFile(path, Buffer.from(await fileResponse.arrayBuffer()));
          console.log(`Downloaded: ${path}`);
        }
      }
    }
  }

  async function main() {
    // 1. Load workflow
    const workflow = JSON.parse(await readFile("workflow_api.json", "utf-8"));

    // 2. Modify workflow parameters
    workflow["3"].inputs.seed = 42;
    workflow["6"].inputs.text = "a beautiful sunset over mountains";

    // 3. Submit workflow
    const promptId = await submitWorkflow(workflow);
    console.log(`Job submitted: ${promptId}`);

    // 4. Wait for completion with progress
    const outputs = await waitForCompletion(promptId);
    console.log(`Job completed! Found ${Object.keys(outputs).length} output nodes`);

    // 5. Download outputs
    await downloadOutputs(outputs, "./outputs");
    console.log("Done!");
  }

  main();
  ```

  ```python Python theme={null}
  import os
  import requests
  import json
  import asyncio
  import aiohttp
  import uuid

  BASE_URL = "https://cloud.comfy.org"
  API_KEY = os.environ["COMFY_CLOUD_API_KEY"]

  def get_headers():
      return {"X-API-Key": API_KEY, "Content-Type": "application/json"}

  def upload_image(file_path: str) -> dict:
      """Upload an image and return the reference for use in workflows."""
      with open(file_path, "rb") as f:
          response = requests.post(
              f"{BASE_URL}/api/upload/image",
              headers={"X-API-Key": API_KEY},
              files={"image": f},
              data={"type": "input", "overwrite": "true"}
          )
      response.raise_for_status()
      return response.json()

  def submit_workflow(workflow: dict) -> str:
      """Submit workflow and return prompt_id."""
      response = requests.post(
          f"{BASE_URL}/api/prompt",
          headers=get_headers(),
          json={"prompt": workflow}
      )
      response.raise_for_status()
      return response.json()["prompt_id"]

  async def wait_for_completion(prompt_id: str, timeout: float = 300.0) -> dict:
      """Wait for job completion via WebSocket."""
      ws_url = BASE_URL.replace("https://", "wss://") + f"/ws?clientId={uuid.uuid4()}&token={API_KEY}"
      outputs = {}
      
      async with aiohttp.ClientSession() as session:
          async with session.ws_connect(ws_url) as ws:
              start = asyncio.get_event_loop().time()
              async for msg in ws:
                  if asyncio.get_event_loop().time() - start > timeout:
                      raise TimeoutError("Job timed out")
                  
                  if msg.type != aiohttp.WSMsgType.TEXT:
                      continue
                      
                  data = json.loads(msg.data)
                  if data.get("data", {}).get("prompt_id") != prompt_id:
                      continue
                  
                  msg_type = data.get("type")
                  msg_data = data.get("data", {})
                  
                  if msg_type == "progress":
                      print(f"Progress: {msg_data.get('value')}/{msg_data.get('max')}")
                  elif msg_type == "executed":
                      if output := msg_data.get("output"):
                          outputs[msg_data["node"]] = output
                  elif msg_type == "execution_success":
                      return outputs
                  elif msg_type == "execution_error":
                      raise RuntimeError(msg_data.get("exception_message", "Unknown error"))
      
      return outputs

  def download_outputs(outputs: dict, output_dir: str):
      """Download all output files."""
      os.makedirs(output_dir, exist_ok=True)
      
      for node_outputs in outputs.values():
          for key in ["images", "video", "audio"]:
              for file_info in node_outputs.get(key, []):
                  params = {
                      "filename": file_info["filename"],
                      "subfolder": file_info.get("subfolder", ""),
                      "type": file_info.get("type", "output")
                  }
                  response = requests.get(f"{BASE_URL}/api/view", headers=get_headers(), params=params)
                  response.raise_for_status()
                  
                  path = os.path.join(output_dir, file_info["filename"])
                  with open(path, "wb") as f:
                      f.write(response.content)
                  print(f"Downloaded: {path}")

  async def main():
      # 1. Load workflow
      with open("workflow_api.json") as f:
          workflow = json.load(f)
      
      # 2. Optionally upload input images
      # image_ref = upload_image("input.png")
      # workflow["1"]["inputs"]["image"] = image_ref["name"]
      
      # 3. Modify workflow parameters
      workflow["3"]["inputs"]["seed"] = 42
      workflow["6"]["inputs"]["text"] = "a beautiful sunset over mountains"
      
      # 4. Submit workflow
      prompt_id = submit_workflow(workflow)
      print(f"Job submitted: {prompt_id}")
      
      # 5. Wait for completion with progress
      outputs = await wait_for_completion(prompt_id)
      print(f"Job completed! Found {len(outputs)} output nodes")
      
      # 6. Download outputs
      download_outputs(outputs, "./outputs")
      print("Done!")

  if __name__ == "__main__":
      asyncio.run(main())
  ```
</CodeGroup>

***

## 队列管理

### 获取队列状态

<CodeGroup>
  ```bash curl theme={null}
  curl -X GET "$BASE_URL/api/queue" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function getQueue(): Promise<{
    queue_running: any[];
    queue_pending: any[];
  }> {
    const response = await fetch(`${BASE_URL}/api/queue`, {
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  }

  const queue = await getQueue();
  console.log(`Running: ${queue.queue_running.length}`);
  console.log(`Pending: ${queue.queue_pending.length}`);
  ```

  ```python Python theme={null}
  def get_queue():
      """Get current queue status."""
      response = requests.get(
          f"{BASE_URL}/api/queue",
          headers=get_headers()
      )
      response.raise_for_status()
      return response.json()

  queue = get_queue()
  print(f"Running: {len(queue.get('queue_running', []))}")
  print(f"Pending: {len(queue.get('queue_pending', []))}")
  ```
</CodeGroup>

### 取消任务

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/queue" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY" \
    -H "Content-Type: application/json" \
    -d '{"delete": ["PROMPT_ID_HERE"]}'
  ```

  ```typescript TypeScript theme={null}
  async function cancelJob(promptId: string): Promise<void> {
    const response = await fetch(`${BASE_URL}/api/queue`, {
      method: "POST",
      headers: getHeaders(),
      body: JSON.stringify({ delete: [promptId] }),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }
  ```

  ```python Python theme={null}
  def cancel_job(prompt_id: str):
      """Cancel a pending or running job."""
      response = requests.post(
          f"{BASE_URL}/api/queue",
          headers=get_headers(),
          json={"delete": [prompt_id]}
      )
      response.raise_for_status()
      return response.json()
  ```
</CodeGroup>

### 中断当前执行

<CodeGroup>
  ```bash curl theme={null}
  curl -X POST "$BASE_URL/api/interrupt" \
    -H "X-API-Key: $COMFY_CLOUD_API_KEY"
  ```

  ```typescript TypeScript theme={null}
  async function interrupt(): Promise<void> {
    const response = await fetch(`${BASE_URL}/api/interrupt`, {
      method: "POST",
      headers: getHeaders(),
    });
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
  }
  ```

  ```python Python theme={null}
  def interrupt():
      """Interrupt the currently running job."""
      response = requests.post(
          f"{BASE_URL}/api/interrupt",
          headers=get_headers()
      )
      response.raise_for_status()
  ```
</CodeGroup>

***

## 错误处理

### HTTP 错误

REST API 端点返回标准 HTTP 状态码：

| 状态码   | 描述                |
| ----- | ----------------- |
| `400` | 无效请求（错误的工作流、缺少字段） |
| `401` | 未授权（无效或缺少 API 密钥） |
| `402` | 余额不足              |
| `429` | 订阅未激活             |
| `500` | 内部服务器错误           |

### 执行错误

在工作流执行期间，错误通过 `execution_error` WebSocket 消息传递。`exception_type` 字段标识错误类别：

| 异常类型                        | 描述               |
| --------------------------- | ---------------- |
| `ValidationError`           | 无效的工作流或输入        |
| `ModelDownloadError`        | 所需模型不可用或下载失败     |
| `ImageDownloadError`        | 从 URL 下载输入图像失败   |
| `OOMError`                  | GPU 内存不足         |
| `InsufficientFundsError`    | 账户余额不足（用于合作伙伴节点） |
| `InactiveSubscriptionError` | 订阅未激活            |
