Dynamic Workflows 不是更聪明的 if-else:用一个端口检测案例,拆解它和写死流程的 3 个本质差异

我照着最近热传的 Dynamic Workflows 案例跑了一遍,发现大多数人理解错了一件事。

不是说那些文章写错了——而是大家普遍把 Dynamic Workflows 理解成「更聪明的条件判断」。好像只要把 if port == 8080 换成让 AI 来判断,就算用上了动态流程。

这个理解差了一个维度。

写死的 if-else,决策逻辑在你写代码的时候就定死了。Dynamic Workflows 的核心,是让 Agent 在运行时根据真实环境状态自己决定下一步——包括你没预料到的状态。

这篇文章用「检测端口是否占用」这个最小案例,把两者的差异拆清楚。先跑代码,再看对比,最后给你一个判断框架。

---

第一章:先跑起来——复现动态流程版本

先把能跑的代码放出来,建立直觉比讲原理更重要。

这个案例的任务很简单:检测本机某个端口的状态,然后根据状态决定下一步操作。听起来平无奇,但它包含了 Dynamic Workflows 最核心的结构:工具调用 + 运行时决策 + 循环执行

import anthropic

import socket

import subprocess

import json

client = anthropic.Anthropic(

base_url="https://api.884819.xyz/v1",

api_key="YOUR_API_KEY"

)

工具定义:检测端口状态

tools = [

{

"name": "check_port",

"description": "检测指定端口是否被占用,返回占用状态和进程信息",

"input_schema": {

"type": "object",

"properties": {

"port": {

"type": "integer",

"description": "要检测的端口号"

}

},

"required": ["port"]

}

},

{

"name": "kill_process",

"description": "终止占用指定端口的进程",

"input_schema": {

"type": "object",

"properties": {

"port": {

"type": "integer",

"description": "要释放的端口号"

}

},

"required": ["port"]

}

},

{

"name": "start_service",

"description": "在指定端口启动服务(模拟)",

"input_schema": {

"type": "object",

"properties": {

"port": {

"type": "integer",

"description": "要启动服务的端口号"

}

},

"required": ["port"]

}

}

]

def check_port(port: int) -> dict:

"""实际检测端口状态"""

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.settimeout(1)

result = sock.connect_ex(('localhost', port))

sock.close()

if result == 0:

# 端口被占用,尝试获取进程信息

try:

output = subprocess.check_output(

['lsof', '-i', f':{port}', '-t'],

stderr=subprocess.DEVNULL

).decode().strip()

pid = output.split('\n')[0] if output else "unknown"

return {"occupied": True, "pid": pid, "port": port}

except Exception:

return {"occupied": True, "pid": "unknown", "port": port}

else:

return {"occupied": False, "pid": None, "port": port}

def kill_process(port: int) -> dict:

"""终止占用端口的进程"""

try:

output = subprocess.check_output(

['lsof', '-i', f':{port}', '-t'],

stderr=subprocess.DEVNULL

).decode().strip()

if output:

pid = output.split('\n')[0]

subprocess.run(['kill', '-9', pid], check=True)

return {"success": True, "killed_pid": pid}

return {"success": False, "reason": "no process found"}

except Exception as e:

return {"success": False, "reason": str(e)}

def start_service(port: int) -> dict:

"""模拟启动服务"""

return {"success": True, "message": f"服务已在端口 {port} 启动(模拟)"}

def run_tool(tool_name: str, tool_input: dict) -> str:

"""执行工具调用"""

if tool_name == "check_port":

result = check_port(tool_input["port"])

elif tool_name == "kill_process":

result = kill_process(tool_input["port"])

elif tool_name == "start_service":

result = start_service(tool_input["port"])

else:

result = {"error": f"未知工具: {tool_name}"}

return json.dumps(result, ensure_ascii=False)

def dynamic_port_workflow(port: int):

"""Dynamic Workflow 核心循环"""

print(f"\n== 开始动态流程:处理端口 {port} ===\n")

messages = [

{

"role": "user",

"content": f"请检测端口 {port} 的状态,如果被占用就释放它,然后启动我们的服务。根据实际情况灵活处理,遇到问题要重新规划。"

}

]

# Agentic Loop:持续运行直到任务完成

while True:

response = client.messages.create(

model="claude-haiku-4.5",

max_tokens=1024,

tools=tools,

messages=messages

)

print(f"[Agent 状态] stop_reason: {response.stop_reason}")

# 如果 Agent 决定调用工具

if response.stop_reason == "tool_use":

# 把 Agent 的回复加入对话历史

messages.append({"role": "assistant", "content": response.content})

# 执行所有工具调用

tool_results = []

for block in response.content:

if block.type == "tool_use":

print(f"[工具调用] {block.name}({block.input})")

result = run_tool(block.name, block.input)

print(f"[工具结果] {result}\n")

tool_results.append({

"type": "tool_result",

"tool_use_id": block.id,

"content": result

})

# 把工具结果反馈给 Agent,让它继续决策

messages.append({"role": "user", "content": tool_results})

# 如果 Agent 认为任务完成

elif response.stop_reason == "end_turn":

for block in response.content:

if hasattr(block, 'text'):

print(f"\n[Agent 最终结论]\n{block.text}")

break

else:

print(f"[未预期的 stop_reason]: {response.stop_reason}")

break

if __name__ == "__main__":

dynamic_port_workflow(8080)

跑起来之后,你会看到类似这样的执行日志:

== 开始动态流程:处理端口 8080 ===

[Agent 状态] stop_reason: tool_use

[工具调用] check_port({'port': 8080})

[工具结果] {"occupied": true, "pid": "12453", "port": 8080}

[Agent 状态] stop_reason: tool_use

[工具调用] kill_process({'port': 8080})

[工具结果] {"success": true, "killed_pid": "12453"}

[Agent 状态] stop_reason: tool_use

[工具调用] check_port({'port': 8080})

[工具结果] {"occupied": false, "pid": null, "port": 8080}

[Agent 状态] stop_reason: tool_use

[工具调用] start_service({'port': 8080})

[工具结果] {"success": true, "message": "服务已在端口 8080 启动(模拟)"}

[Agent 状态] stop_reason: end_turn

[Agent 最终结论]

端口 8080 原本被进程 12453 占用,已成功终止该进程并确认端口释放,服务现已启动。

注意那个 kill_process 之后又调用了一次 check_port——这不是我写的逻辑,是 Agent 自己决定「杀完进程要验证一下」。这个细节很重要,后面会用到。

---

第二章:对照组——写死流程长什么样

同一个任务,硬编码版本:

import socket

import subprocess

def hardcoded_port_workflow(port: int):

"""硬编码版本:流程写死,分支固定"""

print(f"\n=== 硬编码流程:处理端口 {port} ===\n")

# 步骤 1:检测端口

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

result = sock.connect_ex(('localhost', port))

sock.close()

if result == 0:

print(f"端口 {port} 被占用")

# 步骤 2:写死的处理逻辑——直接杀进程

try:

output = subprocess.check_output(['lsof', '-i', f':{port}', '-t']).decode().strip()

pid = output.split('\n')[0]

subprocess.run(['kill', '-9', pid], check=True)

print(f"已终止进程 {pid}")

# 步骤 3:启动服务(假设杀完就能用,不验证)

print(f"服务已在端口 {port} 启动")

except subprocess.CalledProcessError:

# 遇到权限问题或进程已消失:直接报错退出

raise RuntimeError(f"无法释放端口 {port},流程终止")

else:

# 端口空闲:直接启动

print(f"端口 {port} 空闲,直接启动服务")

if __name__ == "__main__":

hardcoded_port_workflow(8080)

这段代码看起来也能用。大多数情况下它确实能跑通。

但现在给它一个「意外」:端口被占用,但 lsof 返回了两个 PID(比如父子进程),或者杀完进程之后端口没有立刻释放(TIME_WAIT 状态)。

== 硬编码流程:处理端口 8080 ===

端口 8080 被占用

Traceback (most recent call last):

...

subprocess.CalledProcessError: Command '['kill', '-9', '12453']' returned non-zero exit status 1

RuntimeError: 无法释放端口 8080,流程终止

流程直接崩了。没有重试,没有重新规划,没有任何「感知到异常后的应对」。

所以问题不是「硬编码写得不够好」,而是它的决策逻辑在运行前就已经封死了。

---

第三章:3 个真正不一样的地方

差异 1:决策时机

硬编码流程:

编写时 → [if 占用] → kill → start

[if 空闲] → start

[if 异常] → raise Error ← 所有分支在写代码时就定死

Dynamic Workflow:

运行时 → check_port → [Agent 看到结果] → 决定下一步

[Agent 看到新结果] → 再决定下一步

[Agent 感知到异常] → 重新规划

用 ASCII 图表示两者的结构差异:

硬编码(线性决策树)          Dynamic Workflow(运行时决策)

──────────────────── ────────────────────────────

开始 开始

│ │

▼ ▼

检测端口 检测端口

│ │

├─[占用]─► 杀进程 ▼

│ [Agent 读取结果]

│ ▼ │

│ 启动服务 [Agent 决定:杀进程?等待?换端口?]

│ │

├─[空闲]─► 启动服务 ▼

│ 执行工具

└─[异常]─► 报错退出 │

[Agent 读取新结果]

[Agent 决定:验证?继续?重试?]

...

大白话总结:硬编码的决策在你脑子里,Dynamic Workflow 的决策在运行时的真实数据里。

差异 2:错误处理方式

硬编码遇到异常的处理方式只有两种:崩溃,或者静默跳过。两种都很危险。

Dynamic Workflow 遇到异常的处理方式是:感知 → 重新规划 → 继续执行

对比一下同一个场景(kill 失败)的两种表现:

# 硬编码版本:kill 失败 = 整个流程终止

try:

subprocess.run(['kill', '-9', pid], check=True)

except subprocess.CalledProcessError:

raise RuntimeError("无法释放端口,流程终止") # 死在这里

Dynamic Workflow 版本:kill 失败 = Agent 收到失败信息,重新决策

工具返回:{"success": false, "reason": "permission denied"}

Agent 下一步可能是:

- 尝试用 sudo(如果有权限)

- 等待几秒后重试

- 报告给用户并请求人工介入

- 尝试其他端口

这些逻辑不需要你提前写,Agent 根据上下文自己判断

这里有一个关键细节:Dynamic Workflow 的「错误处理」不是你写的 try-catch,而是 Agent 在对话上下文里看到工具返回的失败信息后,自主决定下一步。Anthropic 官方文档把这个循环叫做 agentic loop——工具结果会作为新的 user 消息反馈给模型,模型基于完整上下文重新生成决策。

大白话总结:硬编码的错误处理是你预设的,Dynamic Workflow 的错误处理是 Agent 临场发挥的。

差异 3:可维护性成本

现在需求变了:除了「端口被占用」和「端口空闲」,还要处理第三种状态——「端口被占用,但是我们自己的服务,不需要杀,直接复用」。

# 硬编码版本:必须改代码

if result == 0:

pid = get_pid(port)

process_name = get_process_name(pid)

if process_name == "our_service": # 新增分支

print("是我们自己的服务,直接复用")

elif process_name == "nginx": # 再新增一个

print("是 nginx,需要特殊处理")

else:

kill_process(pid)

start_service(port)

每新增一种情况,就要改这里的代码

# Dynamic Workflow 版本:只需要调整工具描述或 prompt

把 check_port 工具的描述改成:

"description": "检测端口状态,返回占用状态、进程名称和进程类型(our_service/third_party/system)"

或者在 prompt 里加一句:

"如果端口被我们自己的服务占用(进程名包含 our_service),直接复用,不要杀进程"

Agent 会根据新的工具信息和 prompt 自动调整决策逻辑

不需要改任何 if-else

大白话总结:硬编码的维护成本随分支数量线性增长,Dynamic Workflow 的维护成本主要在工具描述和 prompt 的质量上。

---

第四章:什么时候用,什么时候别用

Dynamic Workflows 不是银弹。用错了比硬编码更麻烦。

适合用的场景:
  • 环境状态不可预测:比如运维自动化、系统健康检查、多步骤部署流程——你不知道目标机器上会遇到什么状态
  • 分支逻辑复杂且频繁变化:业务规则经常调整,每次改代码成本高
  • 需要「感知-决策」循环:任务本身就是「看到结果再决定下一步」的结构
不适合用的场景:
  • 流程完全固定:比如「读文件 → 解析 → 写数据库」,步骤不会变,用 Dynamic Workflow 只是增加延迟和成本
  • 对延迟敏感:每次 Agent 决策都要调用一次 LM,比直接执行代码慢一个数量级
  • 需要严格审计:金融交易、医疗操作等场景,每一步必须有确定性的执行记录,Agent 的「自主决策」反而是风险
判断标准很简单:如果你能在写代码时把所有分支都列清楚,就用硬编码。如果你列不清楚,或者列清楚了但三个月后就会过时,才考虑 Dynamic Workflow。

---

第五章:自己动手改——3 个可以立刻练手的变体任务

同一个框架,换一个检测目标,逻辑完全可以复用。以下三个任务不给答案,鼓励你自己跑:

练习 1:检测文件是否存在

任务描述:给定一个文件路径,检测文件是否存在。如果不存在,尝试从备用路径复制;如果备用路径也没有,创建一个默认配置文件;如果文件存在但内容为空,写入默认内容。

提示:工具需要 check_filecopy_filecreate_defaultwrite_content

练习 2:检测外部 API 是否可达

任务描述:检测一个 HTTP 接口是否正常响应。如果超时,等待 5 秒后重试;如果返回 5xx,记录错误并尝试备用接口;如果备用接口也不通,发送告警通知。

提示:重点在于让 Agent 自己决定重试次数和切换时机,不要在代码里写死。

练习 3:检测磁盘空间是否充足

任务描述:检测指定目录的磁盘使用率。如果超过 80%,找出最大的 5 个文件并列出;如果超过 90%,自动清理 30 天前的日志文件;如果清理后仍然不足,发出告警。

提示:这个任务有明确的阈值逻辑,但「清理哪些文件」的判断可以交给 Agent。

---

文中所有代码示例调用的是 claude-haiku-4.5,速度快、成本低,适合本地调试。如果你想直接跑起来,可以在 [api.884819.xyz](https://api.884819.xyz) 获取 API 访问权限——支持白名单内的全部模型,按量计费,注册即送体验 token,不需要绑定信用卡。

---

写在最后

回到开头那个问题:Dynamic Workflows 和写死流程的本质差异,不是「AI 更聪明」,而是决策发生的时机不同

硬编码把决策锁在了编写时,Dynamic Workflow 把决策推迟到了运行时。这一个时机的差异,决定了它面对「未知状态」时的表现完全不同。

现在你手上有哪类任务可以用这个思路改造?从最小的那个开始,跑通一个,比看十篇文章都有用。

---

这篇只拆了「单任务内的动态分支」。下一篇我想聊一个更复杂的情况:多个 Agent 协作时,动态流程怎么传递上下文、怎么避免互相打架——比如一个 Agent 刚杀了某个进程,另一个 Agent 同时在等那个进程的响应,这种冲突在生产环境里比你想象的更常见。如果你在用 Multi-Agent 架构,那篇可能比这篇更值得看。

---

本文由848AI原创,转载请注明出处。关注8848AI,带你从零开始学AI。

#AI教程 #Claude #动态工作流 #Agent开发 #8848AI #AI编程 #LM应用 #工具调用