Dynamic Workflows 不是更聪明的 if-else
Dynamic Workflows 不是更聪明的 if-else:用一个端口检测案例,拆解它和写死流程的 3 个本质差异
我照着最近热传的 Dynamic Workflows 案例跑了一遍,发现大多数人理解错了一件事。
不是说那些文章写错了——而是大家普遍把 Dynamic Workflows 理解成「更聪明的条件判断」。好像只要把 if port == 8080 换成让 AI 来判断,就算用上了动态流程。
这个理解差了一个维度。
写死的 if-else,决策逻辑在你写代码的时候就定死了。Dynamic Workflows 的核心,是让 Agent 在运行时根据真实环境状态自己决定下一步——包括你没预料到的状态。
这篇文章用「检测端口是否占用」这个最小案例,把两者的差异拆清楚。先跑代码,再看对比,最后给你一个判断框架。
---
第一章:先跑起来——复现动态流程版本
先把能跑的代码放出来,建立直觉比讲原理更重要。
这个案例的任务很简单:检测本机某个端口的状态,然后根据状态决定下一步操作。听起来平无奇,但它包含了 Dynamic Workflows 最核心的结构:工具调用 + 运行时决策 + 循环执行。
import anthropic
import socket
import subprocess
import json
client = anthropic.Anthropic(
base_url="https://api.884819.xyz/v1",
api_key="YOUR_API_KEY"
)
工具定义:检测端口状态
tools = [
{
"name": "check_port",
"description": "检测指定端口是否被占用,返回占用状态和进程信息",
"input_schema": {
"type": "object",
"properties": {
"port": {
"type": "integer",
"description": "要检测的端口号"
}
},
"required": ["port"]
}
},
{
"name": "kill_process",
"description": "终止占用指定端口的进程",
"input_schema": {
"type": "object",
"properties": {
"port": {
"type": "integer",
"description": "要释放的端口号"
}
},
"required": ["port"]
}
},
{
"name": "start_service",
"description": "在指定端口启动服务(模拟)",
"input_schema": {
"type": "object",
"properties": {
"port": {
"type": "integer",
"description": "要启动服务的端口号"
}
},
"required": ["port"]
}
}
]
def check_port(port: int) -> dict:
"""实际检测端口状态"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('localhost', port))
sock.close()
if result == 0:
# 端口被占用,尝试获取进程信息
try:
output = subprocess.check_output(
['lsof', '-i', f':{port}', '-t'],
stderr=subprocess.DEVNULL
).decode().strip()
pid = output.split('\n')[0] if output else "unknown"
return {"occupied": True, "pid": pid, "port": port}
except Exception:
return {"occupied": True, "pid": "unknown", "port": port}
else:
return {"occupied": False, "pid": None, "port": port}
def kill_process(port: int) -> dict:
"""终止占用端口的进程"""
try:
output = subprocess.check_output(
['lsof', '-i', f':{port}', '-t'],
stderr=subprocess.DEVNULL
).decode().strip()
if output:
pid = output.split('\n')[0]
subprocess.run(['kill', '-9', pid], check=True)
return {"success": True, "killed_pid": pid}
return {"success": False, "reason": "no process found"}
except Exception as e:
return {"success": False, "reason": str(e)}
def start_service(port: int) -> dict:
"""模拟启动服务"""
return {"success": True, "message": f"服务已在端口 {port} 启动(模拟)"}
def run_tool(tool_name: str, tool_input: dict) -> str:
"""执行工具调用"""
if tool_name == "check_port":
result = check_port(tool_input["port"])
elif tool_name == "kill_process":
result = kill_process(tool_input["port"])
elif tool_name == "start_service":
result = start_service(tool_input["port"])
else:
result = {"error": f"未知工具: {tool_name}"}
return json.dumps(result, ensure_ascii=False)
def dynamic_port_workflow(port: int):
"""Dynamic Workflow 核心循环"""
print(f"\n== 开始动态流程:处理端口 {port} ===\n")
messages = [
{
"role": "user",
"content": f"请检测端口 {port} 的状态,如果被占用就释放它,然后启动我们的服务。根据实际情况灵活处理,遇到问题要重新规划。"
}
]
# Agentic Loop:持续运行直到任务完成
while True:
response = client.messages.create(
model="claude-haiku-4.5",
max_tokens=1024,
tools=tools,
messages=messages
)
print(f"[Agent 状态] stop_reason: {response.stop_reason}")
# 如果 Agent 决定调用工具
if response.stop_reason == "tool_use":
# 把 Agent 的回复加入对话历史
messages.append({"role": "assistant", "content": response.content})
# 执行所有工具调用
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f"[工具调用] {block.name}({block.input})")
result = run_tool(block.name, block.input)
print(f"[工具结果] {result}\n")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result
})
# 把工具结果反馈给 Agent,让它继续决策
messages.append({"role": "user", "content": tool_results})
# 如果 Agent 认为任务完成
elif response.stop_reason == "end_turn":
for block in response.content:
if hasattr(block, 'text'):
print(f"\n[Agent 最终结论]\n{block.text}")
break
else:
print(f"[未预期的 stop_reason]: {response.stop_reason}")
break
if __name__ == "__main__":
dynamic_port_workflow(8080)
跑起来之后,你会看到类似这样的执行日志:
== 开始动态流程:处理端口 8080 ===
[Agent 状态] stop_reason: tool_use
[工具调用] check_port({'port': 8080})
[工具结果] {"occupied": true, "pid": "12453", "port": 8080}
[Agent 状态] stop_reason: tool_use
[工具调用] kill_process({'port': 8080})
[工具结果] {"success": true, "killed_pid": "12453"}
[Agent 状态] stop_reason: tool_use
[工具调用] check_port({'port': 8080})
[工具结果] {"occupied": false, "pid": null, "port": 8080}
[Agent 状态] stop_reason: tool_use
[工具调用] start_service({'port': 8080})
[工具结果] {"success": true, "message": "服务已在端口 8080 启动(模拟)"}
[Agent 状态] stop_reason: end_turn
[Agent 最终结论]
端口 8080 原本被进程 12453 占用,已成功终止该进程并确认端口释放,服务现已启动。
注意那个 kill_process 之后又调用了一次 check_port——这不是我写的逻辑,是 Agent 自己决定「杀完进程要验证一下」。这个细节很重要,后面会用到。
---
第二章:对照组——写死流程长什么样
同一个任务,硬编码版本:
import socket
import subprocess
def hardcoded_port_workflow(port: int):
"""硬编码版本:流程写死,分支固定"""
print(f"\n=== 硬编码流程:处理端口 {port} ===\n")
# 步骤 1:检测端口
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', port))
sock.close()
if result == 0:
print(f"端口 {port} 被占用")
# 步骤 2:写死的处理逻辑——直接杀进程
try:
output = subprocess.check_output(['lsof', '-i', f':{port}', '-t']).decode().strip()
pid = output.split('\n')[0]
subprocess.run(['kill', '-9', pid], check=True)
print(f"已终止进程 {pid}")
# 步骤 3:启动服务(假设杀完就能用,不验证)
print(f"服务已在端口 {port} 启动")
except subprocess.CalledProcessError:
# 遇到权限问题或进程已消失:直接报错退出
raise RuntimeError(f"无法释放端口 {port},流程终止")
else:
# 端口空闲:直接启动
print(f"端口 {port} 空闲,直接启动服务")
if __name__ == "__main__":
hardcoded_port_workflow(8080)
这段代码看起来也能用。大多数情况下它确实能跑通。
但现在给它一个「意外」:端口被占用,但 lsof 返回了两个 PID(比如父子进程),或者杀完进程之后端口没有立刻释放(TIME_WAIT 状态)。
== 硬编码流程:处理端口 8080 ===
端口 8080 被占用
Traceback (most recent call last):
...
subprocess.CalledProcessError: Command '['kill', '-9', '12453']' returned non-zero exit status 1
RuntimeError: 无法释放端口 8080,流程终止
流程直接崩了。没有重试,没有重新规划,没有任何「感知到异常后的应对」。
所以问题不是「硬编码写得不够好」,而是它的决策逻辑在运行前就已经封死了。
---
第三章:3 个真正不一样的地方
差异 1:决策时机
硬编码流程:
编写时 → [if 占用] → kill → start
[if 空闲] → start
[if 异常] → raise Error ← 所有分支在写代码时就定死
Dynamic Workflow:
运行时 → check_port → [Agent 看到结果] → 决定下一步
[Agent 看到新结果] → 再决定下一步
[Agent 感知到异常] → 重新规划
用 ASCII 图表示两者的结构差异:
硬编码(线性决策树) Dynamic Workflow(运行时决策)
──────────────────── ────────────────────────────
开始 开始
│ │
▼ ▼
检测端口 检测端口
│ │
├─[占用]─► 杀进程 ▼
│ [Agent 读取结果]
│ ▼ │
│ 启动服务 [Agent 决定:杀进程?等待?换端口?]
│ │
├─[空闲]─► 启动服务 ▼
│ 执行工具
└─[异常]─► 报错退出 │
▼
[Agent 读取新结果]
│
[Agent 决定:验证?继续?重试?]
│
...
大白话总结:硬编码的决策在你脑子里,Dynamic Workflow 的决策在运行时的真实数据里。
差异 2:错误处理方式
硬编码遇到异常的处理方式只有两种:崩溃,或者静默跳过。两种都很危险。
Dynamic Workflow 遇到异常的处理方式是:感知 → 重新规划 → 继续执行。
对比一下同一个场景(kill 失败)的两种表现:
# 硬编码版本:kill 失败 = 整个流程终止
try:
subprocess.run(['kill', '-9', pid], check=True)
except subprocess.CalledProcessError:
raise RuntimeError("无法释放端口,流程终止") # 死在这里
Dynamic Workflow 版本:kill 失败 = Agent 收到失败信息,重新决策
工具返回:{"success": false, "reason": "permission denied"}
Agent 下一步可能是:
- 尝试用 sudo(如果有权限)
- 等待几秒后重试
- 报告给用户并请求人工介入
- 尝试其他端口
这些逻辑不需要你提前写,Agent 根据上下文自己判断
这里有一个关键细节:Dynamic Workflow 的「错误处理」不是你写的 try-catch,而是 Agent 在对话上下文里看到工具返回的失败信息后,自主决定下一步。Anthropic 官方文档把这个循环叫做 agentic loop——工具结果会作为新的 user 消息反馈给模型,模型基于完整上下文重新生成决策。
差异 3:可维护性成本
现在需求变了:除了「端口被占用」和「端口空闲」,还要处理第三种状态——「端口被占用,但是我们自己的服务,不需要杀,直接复用」。
# 硬编码版本:必须改代码
if result == 0:
pid = get_pid(port)
process_name = get_process_name(pid)
if process_name == "our_service": # 新增分支
print("是我们自己的服务,直接复用")
elif process_name == "nginx": # 再新增一个
print("是 nginx,需要特殊处理")
else:
kill_process(pid)
start_service(port)
每新增一种情况,就要改这里的代码
# Dynamic Workflow 版本:只需要调整工具描述或 prompt
把 check_port 工具的描述改成:
"description": "检测端口状态,返回占用状态、进程名称和进程类型(our_service/third_party/system)"
或者在 prompt 里加一句:
"如果端口被我们自己的服务占用(进程名包含 our_service),直接复用,不要杀进程"
Agent 会根据新的工具信息和 prompt 自动调整决策逻辑
不需要改任何 if-else
大白话总结:硬编码的维护成本随分支数量线性增长,Dynamic Workflow 的维护成本主要在工具描述和 prompt 的质量上。
---
第四章:什么时候用,什么时候别用
Dynamic Workflows 不是银弹。用错了比硬编码更麻烦。
适合用的场景:- 环境状态不可预测:比如运维自动化、系统健康检查、多步骤部署流程——你不知道目标机器上会遇到什么状态
- 分支逻辑复杂且频繁变化:业务规则经常调整,每次改代码成本高
- 需要「感知-决策」循环:任务本身就是「看到结果再决定下一步」的结构
- 流程完全固定:比如「读文件 → 解析 → 写数据库」,步骤不会变,用 Dynamic Workflow 只是增加延迟和成本
- 对延迟敏感:每次 Agent 决策都要调用一次 LM,比直接执行代码慢一个数量级
- 需要严格审计:金融交易、医疗操作等场景,每一步必须有确定性的执行记录,Agent 的「自主决策」反而是风险
判断标准很简单:如果你能在写代码时把所有分支都列清楚,就用硬编码。如果你列不清楚,或者列清楚了但三个月后就会过时,才考虑 Dynamic Workflow。
---
第五章:自己动手改——3 个可以立刻练手的变体任务
同一个框架,换一个检测目标,逻辑完全可以复用。以下三个任务不给答案,鼓励你自己跑:
练习 1:检测文件是否存在任务描述:给定一个文件路径,检测文件是否存在。如果不存在,尝试从备用路径复制;如果备用路径也没有,创建一个默认配置文件;如果文件存在但内容为空,写入默认内容。
提示:工具需要 check_file、copy_file、create_default、write_content。
任务描述:检测一个 HTTP 接口是否正常响应。如果超时,等待 5 秒后重试;如果返回 5xx,记录错误并尝试备用接口;如果备用接口也不通,发送告警通知。
提示:重点在于让 Agent 自己决定重试次数和切换时机,不要在代码里写死。
练习 3:检测磁盘空间是否充足任务描述:检测指定目录的磁盘使用率。如果超过 80%,找出最大的 5 个文件并列出;如果超过 90%,自动清理 30 天前的日志文件;如果清理后仍然不足,发出告警。
提示:这个任务有明确的阈值逻辑,但「清理哪些文件」的判断可以交给 Agent。
---
文中所有代码示例调用的是 claude-haiku-4.5,速度快、成本低,适合本地调试。如果你想直接跑起来,可以在 [api.884819.xyz](https://api.884819.xyz) 获取 API 访问权限——支持白名单内的全部模型,按量计费,注册即送体验 token,不需要绑定信用卡。
---
写在最后
回到开头那个问题:Dynamic Workflows 和写死流程的本质差异,不是「AI 更聪明」,而是决策发生的时机不同。
硬编码把决策锁在了编写时,Dynamic Workflow 把决策推迟到了运行时。这一个时机的差异,决定了它面对「未知状态」时的表现完全不同。
现在你手上有哪类任务可以用这个思路改造?从最小的那个开始,跑通一个,比看十篇文章都有用。
---
这篇只拆了「单任务内的动态分支」。下一篇我想聊一个更复杂的情况:多个 Agent 协作时,动态流程怎么传递上下文、怎么避免互相打架——比如一个 Agent 刚杀了某个进程,另一个 Agent 同时在等那个进程的响应,这种冲突在生产环境里比你想象的更常见。如果你在用 Multi-Agent 架构,那篇可能比这篇更值得看。
---
本文由848AI原创,转载请注明出处。关注8848AI,带你从零开始学AI。#AI教程 #Claude #动态工作流 #Agent开发 #8848AI #AI编程 #LM应用 #工具调用