Skip to content
HeZzz
Go back

工作流引擎技术分享:从原理到实战

深入探讨工作流引擎的技术架构、实现原理及在企业级应用中的最佳实践。

一、工作流引擎技术概览

1.1 工作流引擎的本质

1.2 技术演进路线

第一代:硬编码流程 (if-else 地狱)

第二代:配置化流程 (XML/JSON 配置)

第三代:可视化设计器 + 动态引擎

第四代:AI 驱动的智能流程编排

1.3 业界主流方案对比

引擎架构特点适用场景学习成本
Activiti重量级、BPMN 2.0 标准复杂企业流程
FlowableActiviti 分支、功能增强企业级 BPM
Camunda微服务友好、事件驱动分布式系统
Warm Flow轻量级、国产化中小型业务

二、工作流引擎核心技术原理

2.1 状态机模型

// 状态转移核心逻辑
State currentState = getCurrentState(processInstance);
Event event = receiveEvent();
State nextState = stateTransitionTable.get(currentState, event);
executeActions(currentState, event, nextState);
updateState(processInstance, nextState);

2.2 BPMN 2.0 规范解析

2.3 数据库设计模式

-- 核心表结构
流程定义表 (wf_process_definition)
  - 存储流程模板的元数据
  
流程实例表 (wf_process_instance)
  - 运行时流程的状态追踪
  
任务表 (wf_task)
  - 待办任务队列
  
历史表 (wf_history_*)
  - 流程执行日志与审计
  
变量表 (wf_variable)
  - 流程上下文数据

三、Warm Flow 技术架构深度剖析

3.1 架构设计理念

3.2 核心模块拆解

warm-flow-core          // 核心引擎
├── runtime             // 运行时管理
│   ├── ProcessEngine   // 流程引擎入口
│   ├── TaskService     // 任务服务
│   └── RuntimeService  // 运行时服务
├── repository          // 流程仓库
│   └── DefinitionService
├── listener            // 事件监听器
└── expression          // 表达式引擎

warm-flow-spring-boot-starter  // Spring Boot 集成
warm-flow-mybatis-plus         // 持久层实现
warm-flow-ui                   // 流程设计器前端

3.3 关键技术实现

3.3.1 流程定义加载

// XML 流程定义解析
@Component
public class ProcessDefinitionParser {
    
    public ProcessDefinition parse(InputStream xml) {
        Document doc = parseXML(xml);
        ProcessDefinition def = new ProcessDefinition();
        
        // 解析节点
        List<FlowNode> nodes = parseNodes(doc);
        // 解析连线
        List<SequenceFlow> flows = parseFlows(doc);
        // 构建 DAG
        buildGraph(def, nodes, flows);
        
        return def;
    }
}

3.3.2 任务路由引擎

// 动态任务分配
@Service
public class TaskRouter {
    
    public List<String> getAssignees(Task task, FlowNode node) {
        if (node.hasAssignee()) {
            return Collections.singletonList(node.getAssignee());
        } else if (node.hasCandidateUsers()) {
            return node.getCandidateUsers();
        } else if (node.hasCandidateGroups()) {
            return userService.getUsersByGroups(node.getCandidateGroups());
        } else if (node.hasListenerClass()) {
            TaskListener listener = getListener(node.getListenerClass());
            return listener.getAssignees(task);
        }
        throw new RuntimeException("无法确定任务处理人");
    }
}

3.3.3 并行网关实现

// Fork/Join 模式
public class ParallelGateway {
    
    public void execute(ProcessInstance instance, Gateway gateway) {
        if (gateway.isFork()) {
            // 并行分支:创建多个子令牌
            for (SequenceFlow outgoing : gateway.getOutgoings()) {
                Token token = new Token(instance, outgoing.getTarget());
                tokenRepository.save(token);
                executeNode(instance, outgoing.getTarget());
            }
        } else {
            // 并行汇聚:等待所有分支完成
            List<Token> tokens = tokenRepository.findByNode(gateway);
            if (tokens.size() == gateway.getIncomings().size()) {
                // 所有分支已到达,继续执行
                mergeAndContinue(instance, gateway);
            }
        }
    }
}

四、实战案例:请假审批流程

4.1 业务需求分析

场景:员工请假审批
规则:
  - 1天以内 → 直接主管审批
  - 1-3天 → 主管 + 部门经理审批
  - 3天以上 → 主管 + 部门经理 + HR + 总经理审批
  - 支持会签、加签、转办、退回

4.2 流程设计

<process id="leave_process" name="请假流程">
  <startEvent id="start"/>
  
  <userTask id="apply" name="发起申请" assignee="${applicant}"/>
  
  <exclusiveGateway id="gateway1"/>
  <sequenceFlow sourceRef="apply" targetRef="gateway1"/>
  
  <!-- 1天以内 -->
  <sequenceFlow sourceRef="gateway1" targetRef="managerApprove">
    <conditionExpression>${days <= 1}</conditionExpression>
  </sequenceFlow>
  
  <!-- 1-3天 -->
  <sequenceFlow sourceRef="gateway1" targetRef="deptManagerApprove">
    <conditionExpression>${days > 1 && days <= 3}</conditionExpression>
  </sequenceFlow>
  
  <!-- 3天以上 -->
  <sequenceFlow sourceRef="gateway1" targetRef="hrApprove">
    <conditionExpression>${days > 3}</conditionExpression>
  </sequenceFlow>
  
  <!-- 审批节点 -->
  <userTask id="managerApprove" name="主管审批" 
            assignee="${applicant.manager}"/>
  <userTask id="deptManagerApprove" name="部门经理审批"/>
  <userTask id="hrApprove" name="HR审批"/>
  
  <endEvent id="end"/>
</process>

4.3 代码实现

@RestController
@RequestMapping("/leave")
public class LeaveController {
    
    @Autowired
    private RuntimeService runtimeService;
    
    @Autowired
    private TaskService taskService;
    
    /**
     * 发起请假申请
     */
    @PostMapping("/apply")
    public Result apply(@RequestBody LeaveRequest request) {
        // 构建流程变量
        Map<String, Object> variables = new HashMap<>();
        variables.put("applicant", getCurrentUser());
        variables.put("days", request.getDays());
        variables.put("reason", request.getReason());
        variables.put("startDate", request.getStartDate());
        
        // 启动流程实例
        ProcessInstance instance = runtimeService
            .startProcessInstanceByKey("leave_process", variables);
        
        return Result.success(instance.getId());
    }
    
    /**
     * 审批任务
     */
    @PostMapping("/approve/{taskId}")
    public Result approve(@PathVariable String taskId, 
                         @RequestBody ApprovalRequest request) {
        // 设置审批意见
        Map<String, Object> variables = new HashMap<>();
        variables.put("approved", request.isApproved());
        variables.put("comment", request.getComment());
        
        // 完成任务
        taskService.complete(taskId, variables);
        
        return Result.success();
    }
    
    /**
     * 查询我的待办
     */
    @GetMapping("/todo")
    public Result<List<Task>> getTodoList() {
        String userId = getCurrentUserId();
        List<Task> tasks = taskService
            .createTaskQuery()
            .taskAssignee(userId)
            .list();
        
        return Result.success(tasks);
    }
}

4.4 高级特性实现

4.4.1 会签功能

// 配置会签节点
<userTask id="multiInstanceTask" name="会签">
  <multiInstanceLoopCharacteristics isSequential="false">
    <loopCardinality>${assignees.size()}</loopCardinality>
    <completionCondition>${approvedCount >= assignees.size() * 0.5}</completionCondition>
  </multiInstanceLoopCharacteristics>
</userTask>

// 完成条件:半数通过即可

4.4.2 动态加签

@Service
public class TaskDelegateService {
    
    public void addSign(String taskId, String newAssignee) {
        Task task = taskService.getTask(taskId);
        
        // 创建新任务
        Task newTask = new Task();
        newTask.setProcessInstanceId(task.getProcessInstanceId());
        newTask.setAssignee(newAssignee);
        newTask.setName("加签任务");
        taskService.saveTask(newTask);
        
        // 挂起当前任务
        taskService.suspendTask(taskId);
    }
}

五、性能优化与监控

5.1 性能优化策略

5.1.1 数据库优化

-- 索引优化
CREATE INDEX idx_instance_status ON wf_process_instance(status);
CREATE INDEX idx_task_assignee ON wf_task(assignee, status);
CREATE INDEX idx_task_create_time ON wf_task(create_time);

-- 分表策略
wf_history_2024_q1
wf_history_2024_q2
...

-- 异步写入历史表
INSERT INTO wf_history_task SELECT * FROM wf_task WHERE id = ?;

5.1.2 缓存策略

@Configuration
public class CacheConfig {
    
    @Bean
    public CacheManager cacheManager() {
        return CacheManagerBuilder.newCacheManagerBuilder()
            .withCache("processDefinitions",
                CacheConfigurationBuilder.newCacheConfigurationBuilder(
                    String.class, ProcessDefinition.class,
                    ResourcePoolsBuilder.heap(1000))
                .withExpiry(ExpiryPolicyBuilder.timeToLiveExpiration(Duration.ofHours(1))))
            .build(true);
    }
}

5.1.3 异步处理

@Configuration
@EnableAsync
public class AsyncConfig {
    
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("workflow-");
        return executor;
    }
}

// 异步执行服务任务
@Async
public void executeServiceTask(ServiceTask task, ProcessInstance instance) {
    // 执行业务逻辑
}

5.2 监控体系

5.2.1 指标采集

@Component
public class WorkflowMetrics {
    
    private final MeterRegistry registry;
    
    public void recordTaskComplete(String taskName, long duration) {
        Timer.builder("workflow.task.duration")
             .tag("task", taskName)
             .register(registry)
             .record(duration, TimeUnit.MILLISECONDS);
    }
    
    public void recordProcessStart(String processKey) {
        Counter.builder("workflow.process.start")
               .tag("process", processKey)
               .register(registry)
               .increment();
    }
}

5.2.2 Grafana 大盘

指标:
- 流程启动 TPS
- 任务处理耗时 P99
- 待办任务积压数
- 流程异常率
- 节点通过率热力图

六、常见问题与解决方案

6.1 技术难点

6.1.1 流程版本管理

// 版本升级策略
public class ProcessVersionManager {
    
    public void deploy(ProcessDefinition newVersion) {
        // 1. 新版本部署
        newVersion.setVersion(getLatestVersion() + 1);
        definitionRepository.save(newVersion);
        
        // 2. 运行中实例处理
        List<ProcessInstance> runningInstances = 
            runtimeService.getRunningInstances(newVersion.getKey());
        
        // 选项A:继续使用旧版本(推荐)
        // 选项B:自动迁移到新版本(需谨慎)
        for (ProcessInstance instance : runningInstances) {
            if (shouldMigrate(instance, newVersion)) {
                migrateInstance(instance, newVersion);
            }
        }
    }
}

6.1.2 分布式事务

// 使用 Saga 模式
@Service
public class DistributedWorkflowService {
    
    @Transactional
    public void completeTaskWithCompensation(String taskId) {
        try {
            // 1. 完成任务
            taskService.complete(taskId);
            
            // 2. 调用外部服务
            externalService.doSomething();
            
        } catch (Exception e) {
            // 3. 补偿操作
            compensationService.compensate(taskId);
            throw e;
        }
    }
}

6.2 最佳实践

6.2.1 流程设计原则

6.2.2 安全加固

// 权限校验
@Aspect
@Component
public class TaskPermissionAspect {
    
    @Before("execution(* TaskService.complete(..))")
    public void checkPermission(JoinPoint joinPoint) {
        String taskId = (String) joinPoint.getArgs()[0];
        Task task = taskService.getTask(taskId);
        
        String currentUser = SecurityContextHolder.getContext()
            .getAuthentication().getName();
        
        if (!task.getAssignee().equals(currentUser)) {
            throw new AccessDeniedException("无权限操作此任务");
        }
    }
}

七、技术选型建议

7.1 决策树

需要完整 BPMN 2.0 支持?
├─ 是 → Flowable / Camunda
└─ 否 
    └─ 分布式部署?
        ├─ 是 → Camunda (事件驱动)
        └─ 否
            └─ 快速上手?
                ├─ 是 → Warm Flow
                └─ 否 → Activiti

7.2 成本对比

维度ActivitiFlowableWarm Flow
学习成本
运维成本
定制成本
性能

八、动手实践:搭建 Warm Flow 环境

8.1 快速开始

<!-- pom.xml -->
<dependency>
    <groupId>com.warm-flow</groupId>
    <artifactId>warm-flow-spring-boot-starter</artifactId>
    <version>1.2.0</version>
</dependency>
# application.yml
warm-flow:
  enabled: true
  database: mysql
  table-prefix: wf_
  enable-logic-delete: true

8.2 第一个流程

@SpringBootTest
public class QuickStartTest {
    
    @Autowired
    private RuntimeService runtimeService;
    
    @Test
    public void testSimpleProcess() {
        // 1. 部署流程
        ProcessDefinition def = repositoryService
            .createDeployment()
            .addClasspathResource("simple-process.xml")
            .deploy();
        
        // 2. 启动实例
        ProcessInstance instance = runtimeService
            .startProcessInstanceByKey("simple_process");
        
        // 3. 完成任务
        Task task = taskService.createTaskQuery()
            .processInstanceId(instance.getId())
            .singleResult();
        taskService.complete(task.getId());
        
        // 4. 验证流程结束
        assertFalse(runtimeService.isActive(instance.getId()));
    }
}

九、总结与展望

9.1 核心要点

9.2 进阶方向

参考链接

Workflow | Wikipedia

What is a workflow? | IBM

Warm Flow | Github

Activiti | Github

Flowable | Github

Directed acyclic graph | Wikipedia


Share this post on:

上一篇
编译技术-2025fa-lb复习课
下一篇
云计算技术-25fa-zb最后一课-MapReduce