Cloudflare Workflows は、ステップをチェーンし、失敗時にリトライし、長時間実行されるプロセス間で状態を永続化できる耐久性のある実行エンジンです。開発者は Workflows を使って、バックグラウンドエージェントの実行、データパイプラインの管理、ヒューマンインザループの承認システムの構築などを行います。先月、Cloudflare にデプロイされたすべてのワークフローがダッシュボード上で完全なビジュアル図として表示されるようになったことを発表しました。アプリケーションを可視化できることは、今やこれまで以上に重要です。エージェントがコードを書き、そのコードをあなたが読んでいるかどうかは別として、出来上がるものの「形」は依然として重要です:ステップがどのようにつながっているか、どこで分岐するか、実際に何が起きているか。これまでに可視ワークフロービルダーの図を見たことがあるなら、それらは通常宣言型(JSON、YAML、ドラッグアンドドロップ)から生成されます。しかし、Cloudflare Workflows は単なるコードです。Promises、Promise.all、ループ、条件分岐を含めることができ、関数やクラス内にネストされることもあります。この動的な実行モデルは、図を描画する際に複雑さを増します。私たちは Abstract Syntax Trees(AST)を使って静的にグラフを導出し、Promise と await の関係を追跡してどれが並列に実行され、どれがブロックされ、どのように要素が接続されるかを理解しています。以下で、これらの図をどのように作ったかを解説します。あるいは、最初のワークフローをデプロイして、実際に図を確認してください。
Dynamic workflow execution
一般に、ワークフローエンジンは動的(dynamic)または逐次(static)実行順のいずれかに従って実行できます。逐次実行は直感的に思えるかもしれません:トリガー → ステップA → ステップB → ステップC、エンジンがステップAを完了したらすぐにステップBが開始、という具合です。Cloudflare Workflows は動的実行モデルに従います。ワークフローはコードであるため、ランタイムがステップを検出した順にステップが実行されます。ランタイムがステップを発見すると、そのステップはワークフローエンジンに渡され、エンジンがその実行を管理します。ステップは await されない限り本質的に逐次的ではなく、エンジンはすべての未 await のステップを並列に実行します。これにより、追加のラッパーやディレクティブなしでフロー制御としてワークフローコードを書くことができます。
ハンドオフの流れは次のとおりです:
- エンジン(そのインスタンスの "supervisor" である Durable Object)が立ち上がる。エンジンは実際のワークフロー実行ロジックを担当する。
- エンジンは dynamic dispatch によりユーザー Worker をトリガーし、Workers ランタイムに制御を渡す。
- Runtime が
step.do に遭遇すると、実行は再びエンジンに渡される。
- エンジンはステップを実行し、結果を永続化(またはエラーを投げる)し、再びユーザー Worker をトリガーする。
このアーキテクチャでは、エンジンは実行しているステップの順序を本質的に "知っている" わけではありません。しかし、図にするためにはステップの順序が重要な情報になります。このチャレンジは、診断に有用なグラフへ大多数のワークフローを正確に翻訳する点にあります。現在はベータ段階の図表表現であり、今後も改善を続けます。
Parsing the code
デプロイ時にスクリプトを取得(run time ではなく)することで、ワークフロー全体を解析し、静的に図を生成できます。ワークフローのデプロイの流れを振り返ると:
- 図を作るために、内部の Workers をデプロイする構成サービスによってバンドル(ステップ 2)された後のスクリプトをフェッチする。
- パーサーで抽象構文木(AST)を作成し、内部サービスがすべての WorkflowEntrypoints と workflow ステップへの呼び出しを含む中間グラフを生成・トラバースする。
- 最終結果に基づいて API 上で図をレンダリングする。
Worker がデプロイされると、構成サービスは(デフォルトで esbuild を使って)コードをバンドルし、指定がない限りコードを minify します。ここにも別の課題があります—TypeScript の Workflows は直感的なパターンを持ちますが、minified な JavaScript は密で解読困難になり得ます。さらに、バンドラによって minify の方法が異なる場合があります。
以下は、エージェントが並列に実行される Workflow コードの例です:
const summaryPromise = step.do( `summary agent (loop ${loop})`, async () => { return runAgentPrompt( this.env, SUMMARY_SYSTEM, buildReviewPrompt(
const correctnessPromise = step.do( `correctness agent (loop ${loop})`, async () => { return runAgentPrompt( this.env, CORRECTNESS_SYSTEM, buildReviewPrompt(
const clarityPromise = step.do( `clarity agent (loop ${loop})`, async () => { return runAgentPrompt( this.env, CLARITY_SYSTEM, buildReviewPrompt(
rspack でバンドルした場合、minified なコードのスニペットは次のようになります:
class pe extends e{async run(e,t){de("workflow.run.start" ,{instanceId:e.instanceId});const r=await t.do("validate payload",async() =>{if(!e.payload.r2Key)throw new Error("r2Key is required");if(!e.payload.telegramChatId)throw new Error("telegramChatId is required");return{r2Key:e.payload.r2Key,telegramChatId:e.payload.telegramChatId,context:e.payload.context?.trim()}}),s=await t.do("load source document from r2",async() =>{const e=await this.env.REVIEW_DOCUMENTS.get(r.r2Key);if(!e)throw new Error(`R2 object not found: ${r.r2Key}`);const t=(await e.text()).trim();if(!t)throw new Error("R2 object is empty");return t}),n=Number(this.env.MAX_REVIEW_LOOPS??"5"),o=this.env.RESPONSE_TIMEOUT??"7 days",a=async(s,i,c)=>{if(s>n)return le("workflow.loop.max_reached",{instanceId:e.instanceId,maxLoops:n}),await t.do("notify max loop reached",async() =>{await se(this.env,r.telegramChatId,`Review stopped after ${n} loops for ${e.instanceId}. Start again if you still need revisions.`)}),{approved:!1,loops:n,finalText:i};const h=t.do(`summary agent (loop ${s})`,async() =>te(this.env,"You summarize documents. Keep the output short, concrete, and factual.",ue("Summarize this text in 5 bullet points.",i,r.context)))... }
あるいは、vite でバンドルした場合の minified スニペットは次のようになります:
class ht extends pe { async run(e, r) { b("workflow.run.start", { instanceId: e.instanceId }); const s = await r.do("validate payload", async () => { if (!e.payload.r2Key) throw new Error("r2Key is required"); if (!e.payload.telegramChatId) throw new Error("telegramChatId is required"); return { r2Key: e.payload.r2Key, telegramChatId: e.payload.telegramChatId, context: e.payload.context?.trim() }; }), n = await r.do( "load source document from r2", async () => { const i = await this.env.REVIEW_DOCUMENTS.get(s.r2Key); if (!i) throw new Error(`R2 object not found: ${s.r2Key}`); const c = (await i.text()).trim(); if (!c) throw new Error("R2 object is empty"); return c; } ), o = Number(this.env.MAX_REVIEW_LOOPS ?? "5"), l = this.env.RESPONSE_TIMEOUT ?? "7 days", a = async (i, c, u) => { if (i > o) return H("workflow.loop.max_reached", { instanceId: e.instanceId, maxLoops: o }), await r.do("notify max loop reached", async () => { await J( this.env, s.telegramChatId, `Review stopped after ${o} loops for ${e.instanceId}. Start again if you still need revisions.` ); }), { approved: !1, loops: o, finalText: c }; const h = r.do( `summary agent (loop ${i})`, async () => _( this.env, et, K( "Summarize this text in 5 bullet points.", c, s.context ) ) )... }
minified なコードは非常にややこしくなり得ますし、バンドラによって異なる方向で難読化されます。さまざまな形式の minified コードを迅速かつ正確に解析する方法が必要でした。私たちは oxc-parser(JavaScript Oxidation Compiler、OXC の一部)がこの仕事に最適だと判断しました。
まずこのアイデアをテストするために Rust を動かすコンテナを用意しました。すべての script ID は Cloudflare Queue に送られ、その後メッセージがポップされてコンテナで処理されました。このアプローチが動作することが確認できたため、Rust で書かれた Worker に移行しました。Workers は WebAssembly 経由で Rust を実行することをサポートしており、パッケージも小さくて扱いやすかったため、この移行は容易でした。
Rust Worker の責務は、まず minified な JS を AST ノード型に変換し、次に AST ノード型をダッシュボード上にレンダリングされるワークフローのグラフィカルバージョンに変換することです。これを行うために、各ワークフローに対してあらかじめ定義されたノード型のグラフを生成し、一連のノードマッピングを通じて当社のグラフ表現に翻訳します。
Rendering the diagram
図をレンダリングするにあたり、2つの課題がありました:ステップと関数の関係を正しく追跡すること、そしてワークフローのノード型をできるだけ単純に定義しつつ表面積を網羅することです。
ステップと関数の関係を正確に追跡するために、関数名とステップ名の両方を収集する必要がありました。前述の通り、エンジンはステップに関する情報のみを持ちますが、あるステップは関数に依存し得ますし、その逆もあり得ます。例えば、開発者がステップを関数でラップしたり、関数をステップとして定義したり、関数内で別モジュールのステップを呼んだり、ステップ名をリネームしたりすることがあります。
ライブラリが AST を提供してくれることで最初のハードルは越えましたが、どのように解析するかの判断は残ります。いくつかのコードパターンは追加の工夫を要します。例えば、WorkflowEntrypoint 内にはステップを直接、間接的、またはまったく呼ばない関数が存在し得ます。仮に functionA が console.log(await functionB(), await functionC()) を含み、functionB が step.do() を呼ぶ場合、functionA と functionB の両方をワークフローダイアグラムに含めるべきですが、functionC は含めるべきではありません。直接・間接のステップ呼び出しを含むすべての関数を捕捉するために、私たちは各関数に対してサブグラフを作成し、その関数が自分自身でステップ呼び出しを含むか、あるいはステップを呼ぶ可能性のある別の関数を呼んでいるかをチェックします。これらのサブグラフは関数ノードとして表現され、その中に関連ノードをすべて持ちます。もし関数ノードがグラフの葉(つまり直接・間接のワークフローステップを持たない)であれば、最終出力からトリミングされます。
また、静的ステップのリストからワークフローダイアグラムを推測できるパターンや、最大で 10 種類の方法で定義された変数など、他にもチェックするパターンがあります。スクリプトに複数のワークフローが含まれる場合は、関数のために作成したサブグラフと同様のパターンを一段上の抽象化として適用します。
すべての AST ノード型について、ワークフロー内で使われ得るあらゆる方法(ループ、分岐、promises、parallels、await、アロー関数…)を考慮する必要がありました。これらの経路の中にも何十通りもの可能性があります。例えばループの一例だけでも次のような書き方があります:
// for...of
for (const item of items) {
await step.do(`process ${item}`, async () => item);
}
// while
while (shouldContinue) {
await step.do(
}
// map
await Promise.all(
items.map((item) => step.do(`map ${item}`, async () => item)),
);
// forEach
await items.forEach(async (item) => {
await step.do(`each ${item}`, async () => item);
});
ループに加えて、分岐の扱いも考慮する必要があります:
switch (action.type) {
case 'create':
await step.do('handle create', async () => {});
break;
default:
await step.do('handle unknown', async () => {});
break;
}
if (status === 'pending') {
await step.do('pending p