Skip to content

Latest commit

 

History

History
147 lines (106 loc) · 5.52 KB

File metadata and controls

147 lines (106 loc) · 5.52 KB
title 通信机制与设计模式
description Shared Store 全局共享与 Params 局部参数,以及 PocketFlow 的六大设计模式。

3. 通信机制:Shared Store 与 Params

PocketFlow 提供两种节点间通信方式,各有分工。

3.1 Shared Store —— 全局共享字典

Shared Store:所有节点通过共享字典读写数据

节点之间不直接通信,而是通过一个共享的 shared 字典来传递数据。

shared = {}

# Node A 的 post() 写入数据
shared["question"] = "什么是 PocketFlow?"

# Node B 的 prep() 读取数据
question = shared["question"]

::: warning 设计哲学 为什么不让节点直接传参?因为 PocketFlow 追求最小抽象

  • shared 是一个普通 Python 字典,没有任何封装
  • 所有节点都能读写,足够灵活
  • 你完全清楚数据从哪来、到哪去,没有"黑魔法" :::

3.2 Params —— 局部参数传递

除了 shared 这个"全局共享"机制,PocketFlow 还提供了一套局部参数机制 —— params

# 设置节点参数
node.set_params({"filename": "doc1.txt"})

# 在节点中读取参数
class ProcessFile(Node):
    def prep(self, shared):
        filename = self.params["filename"]  # 读取局部参数
        return shared["files"][filename]

Params 的值由父 Flow 传入:当 Flow 执行子节点时,会自动调用 node.set_params(params) 将参数传递给每个节点。这在 BatchFlow(见第 5.5 节)中尤为关键 —— BatchFlow 每次迭代会将不同的 params 传递给子节点,让同一条 Flow 用不同参数运行多次。

::: tip Shared 与 Params 的类比 可以用编程中的内存模型来理解这两种通信方式:

  • Shared Store堆(Heap)—— 所有函数(节点)共享同一块内存,任意读写
  • Params栈(Stack)—— 由调用者(父 Flow)赋值,节点内只读
特性 Shared Store Params
作用域 全局,所有节点共享 局部,由父 Flow 传入
用途 存储数据、结果、大对象 传递任务标识(文件名、ID)
可变性 节点可读写 节点内只读
典型场景 对话历史、检索结果 BatchFlow 的迭代参数
:::

4. 六大设计模式

掌握了 Node 和 Flow,你就可以构建 LLM 应用中几乎所有的主流模式。它们不是框架提供的"功能类",而是 Node + Flow 自然组合出的图拓扑

模式 图形态 关键技巧 对应案例
链式调用 A → B → C 顺序执行 a >> b >> c 写作工作流
条件分支 根据结果走不同路径 node - "action" >> target 搜索智能体
循环/重试 不满意则重做 post() 返回 action 指回前序节点 聊天机器人
嵌套子流程 Flow 中包含子 Flow Flow 继承自 BaseNode 多智能体协作
批量处理 列表中的每个元素独立处理 BatchNode / BatchFlow Map-Reduce
并行执行 多个任务同时运行 AsyncParallelBatchNode 并行处理

每种模式的代码骨架:

链式调用:节点按顺序依次执行

# 链式:顺序执行
a >> b >> c
flow = Flow(start=a)

条件分支:根据 action 走不同路径

# 条件分支:根据 action 走不同路径
check - "yes" >> handle_yes
check - "no"  >> handle_no
flow = Flow(start=check)

循环/重试:不满足条件则持续推理

# 循环:post() 返回 action 指回前序节点
step >> verify
verify - "retry" >> step        # 不满意则重做
verify - "done"  >> output      # 满意则输出

嵌套子流程:Flow 作为可复用的步骤

# 嵌套子流程:Flow 本身也是 BaseNode
sub_flow = Flow(start=sub_a)
main_a >> sub_flow >> main_b    # Flow 当节点用

批量处理:对列表中每个元素重复执行

# 批量处理:BatchNode 自动遍历列表
class ProcessAll(BatchNode):
    def prep(self, shared): return shared["items"]  # 返回列表
    def exec(self, item): return process(item)       # 逐一执行

并行执行:多个任务并发处理

# 并行执行:asyncio.gather 并发
class ParallelProcess(AsyncParallelBatchNode):
    async def prep_async(self, shared): return shared["items"]
    async def exec_async(self, item): return await async_process(item)
PocketFlow 没有为每种模式创建专门的类 —— 它们都是 Node + Flow + 操作符重载的自然组合。这就是 100 行代码能覆盖这么多场景的原因。