Fork me on GitHub

Activiti(7)--加签功能的实现

加签的概念

思路:

  1. 直接需改模板, 在模板中添加节点以及连线, 并修改实例的走向;
  2. 直接修改路程定义对应的缓存数据, 不修改模板, 新增节点与当前需要加签的实例挂钩.

1. 方案一

1.1. 实现方式

  1. 找到当前实例对象的模板数据
  2. 在模板数据的基础上添加新节点以及修改连线, 并更新数据库中的模板.
  3. 更新模板对应的流程定义缓存, 必须更新缓存否则加签的节点不会生效. 因为 Activiti 在查找流程定义的时候会先尝试从缓存中进行获取.
  4. 完成新增节点额任务后, 再把新增节点以及连线删除, 即还原模板.

1.2. 优缺点

  • 模板是共享的, 因此修改模板就会将所有运行实例对象的模板修改.
  • 实例间应该相互独立, 不能让针对某个实例的加签影响到其他实例.
  • 修改模板容易导致当前实例影响其他实例, 因此该方案不可取;

2. 方案二

  • 模板是共享的, 因此不能修改模板, 否则会影响其他实例.
  • 也不需要修改原有流程的入线即出线, 不修改原有的走向.

由于流程运转的过程中, 需要实时的获取该实例对应的模板数据才能知道应该如何运转.

  1. 从流程定义缓存中获取模板数据
  2. 如果流程定义缓存丢失, 则需要重新执行模板的解析工作并补充到流程定义缓存中.

因此可以直接修改流程定义缓存数据.

2.1. 思路

image

  1. 在流程缓存中添加一个任务节点, 并未任务节点添加出线信息, 出线连接的是需要到达的目标节点.
  2. 添加的目标节点并没有入线, 并不会影响其他实例, 因此其他流程没有机会走到该节点.
  3. 加签完成后触发执行实例走到新增的任务节点, 这样当前实例就按照最新的路线进行运转;
  4. 如果当前节点在加签后不想直接运转到最新节点, 则可以复制一个当前节点, 继续让流程运转.
  5. 加签的最终目的是让实例按照最新的路线走, 与模板中规划的路线脱离关系.

image

2.2. 引入的问题

  1. 新增的任务节点及连线如何存储
  2. 流程定义缓存如何修改
  3. 加签的节点以及连线信息如何持久化
  4. 如果我们重新修改的流程定义缓存丢失, 引擎依然会解析数据库中保存的原有定义, 新增的节点并没有持久化到 DB
  5. 流程实例结束后, 当前加签的节点以及连线如何删除.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void testAddOneTask(String taskId, String targetActivityId) {
// 获取当前的任务
TaskEntity taskEntity = (TaskEntity) activitiRule.getTaskService().createTaskQuery().taskId(taskId).singleResult();
log.info("taskEntity: {}", taskEntity);
String processDefinitionId = taskEntity.getProcessDefinitionId();
ManagementService managementService = activitiRule.getManagementService();
Process process = managementService.executeCommand(new GetProcessCmd(processDefinitionId));
log.info("process: {}", process);

// 创建新节点
UserTask userTask = new UserTask();
userTask.setId("destinyD");
userTask.setName("加签节点 destinyD");
userTask.setAssignee("destiny-d");
userTask.setBehavior(createUserTaskBehavior(userTask));

// 新节点的目标连线
SequenceFlow sequenceFlow = new SequenceFlow();
sequenceFlow.setId("extra");
userTask.setOutgoingFlows(Arrays.asList(sequenceFlow));
sequenceFlow.setTargetFlowElement(process.getFlowElement(targetActivityId));
sequenceFlow.setTargetRef(targetActivityId);

process.addFlowElement(userTask);
process.addFlowElement(sequenceFlow);

// 更新缓存
ProcessDefinitionCacheEntry processDefinitionCacheEntry = managementService.executeCommand(new GetProcessDefinitionCacheEntryCmd(processDefinitionId));
processDefinitionCacheEntry.setProcess(process);
Process processCache = managementService.executeCommand(new GetProcessDefinitionCacheEntryCmd(processDefinitionId)).getProcess();

log.info("processCache: {}", processCache);

// 跳转
managementService.executeCommand(new JumpCmd(taskId, userTask.getId()));
}

但该方法产生跳转后的新 task 仍然无法提交, 会报一下错误:

17:59:18.971 [main] [ERROR] Error while closing command context  o.a.e.i.i.CommandContext.logException:122
org.activiti.engine.ActivitiException: Programmatic error: no current flow element found or invalid type: null. Halting.
at org.activiti.engine.impl.agenda.TriggerExecutionOperation.run(TriggerExecutionOperation.java:49)
at org.activiti.engine.impl.interceptor.CommandInvoker.executeOperation(CommandInvoker.java:73)
at org.activiti.engine.impl.interceptor.CommandInvoker.executeOperations(CommandInvoker.java:57)
at org.activiti.engine.impl.interceptor.CommandInvoker.execute(CommandInvoker.java:42)
at org.activiti.engine.impl.interceptor.TransactionContextInterceptor.execute(TransactionContextInterceptor.java:48)
at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:63)
at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:29)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:44)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:39)
at org.activiti.engine.impl.TaskServiceImpl.complete(TaskServiceImpl.java:182)
at org.destiny.activiti.addsign1.ClientTest.complete(ClientTest.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.activiti.engine.test.ActivitiRule$1.evaluate(ActivitiRule.java:116)

出线错误的原因是加签方法执行完成后, 缓存中的数据已经被释放, complete 的时候无法继续, 需要在 complete 之前重新向缓存中添加之前的节点和连线

在修改流程定义缓存而不修改模板的实现中, 我们需要一个额外的持久化方式去实现加签部分的持久化

2.3. 持久化加签现场数据

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `ACT_ADD_SIGN` (
`ID_` bigint(20) NOT NULL AUTO_INCREMENT,
`PROCESS_DEFINITION_ID_` varchar(255) NOT NULL COMMENT '流程定义 ID',
`ASSIGNEE_` varchar(32) NOT NULL COMMENT '操作人 ID',
`ACT_ID_` varchar(64) NOT NULL COMMENT '活动 ID',
`PROCESS_INSTANCE_` varchar(255) NOT NULL COMMENT '流程实例 ID',
`PROPERTIES_TEXT_` varchar(2000) DEFAULT NULL COMMENT '参数',
`STATE_` int(11) DEFAULT NULL COMMENT '状态位, 0-有效, 1-无效',
`CREATE_TIME` bigint(20) DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`ID_`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

以及对应的 Mapper 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface AddSignMapper {

@Select("select * from ACT_ADD_SIGN where STATE_ = 0 AND PROCESS_INSTANCE_ID_ = #{processInstanceId}")
@Results({
@Result(property = "id", column = "ID_"),
@Result(property = "processDefinitionId", column = "PROCESS_DEFINITION_ID_"),
@Result(property = "assignee", column = "ASSIGNEE_"),
@Result(property = "processInstanceId", column = "PROCESS_INSTANCE_ID_"),
@Result(property = "propertiesText", column = "PROPERTIES_TEXT_"),
@Result(property = "state", column = "STATE_"),
@Result(property = "createTime", column = "CREATE_TIME_"),
})
List<AddSign> find(String processInstanceId);

@Insert("insert into act_creation(PROCESS_DEFINITION_ID_, PROCESS_INSTANCE_ID_, PROPERTIES_TEXT_, CREATE_TIME_) values(#{processDefinitionId}, #{processInstanceId}, #{propertiesText}, #{createTime})")
int insert(AddSign addSign);
}

2.4. 模型定义

2.4.1. AddSign

1
2
3
4
5
6
7
8
9
10
11
12
13
@Data
public class AddSign {

private long id;
private String processDefinitionId; // 流程定义 id
private String assignee; // 加签用户
private String activityId; // 节点 id
private String processInstanceId; // 流程实例 id
private String propertiesText; // 参数(复合字段)
private int state; // 状态 0-可用, 1-不可用
private long createTime; // 创建时间

}

2.4.2. TaskModel

1
2
3
4
5
6
7
8
@Data
public class TaskModel implements Serializable {

private String id;
private String name;
private String assignee; // 处理人
private int type = 1; // 任务类型, 1-任务节点
}

2.4.3. TmpActivityModel

1
2
3
4
5
6
7
@Data
public class TmpActivityModel implements Serializable {
private String activityIds; // 加签的节点id, 多个的话逗号分隔
private String firstId;
private String lastId;
private List<TaskModel> activityList;
}

2.5. 加签功能实现类

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@Slf4j
public class AddSignService {

/**
* @param procDefId 流程定义 ID
* @param procInstId 流程实例 ID
* @param processEngine 流程引擎
* @param taskModelList 加签节点列表
* @param firstNodeId 加签开始节点 ID
* @param lastNodeId 加签结束节点 ID
* @param persistence 是否持久化
* @param onset 是否需要立即跳转
* @param taskId taskID
* @param targetNodeId 跳转的目标节点
*/
public void addUserTask(String procDefId, String procInstId, ProcessEngine processEngine, List<TaskModel> taskModelList,
String firstNodeId, String lastNodeId, boolean persistence, boolean onset, String taskId, String targetNodeId) {
ManagementService managementService = processEngine.getManagementService();
ProcessDefinitionCacheEntry processDefinitionCacheEntry = managementService.executeCommand(new GetProcessDefinitionCacheEntryCmd(procDefId));
// 通过缓存获取
Process process = processDefinitionCacheEntry.getProcess();
// 批量生成任务, 循环遍历 TaskModel
List<UserTask> userTaskList = Lists.newArrayList();
taskModelList.forEach(taskModel -> {
UserTask userTask = ActivityUtils.convertToUserTask(taskModel, processEngine);
userTaskList.add(userTask);
process.addFlowElement(userTask);
});
// 构造并添加连线
for (int i = 0; i < userTaskList.size(); ++i) {
UserTask userTask = userTaskList.get(i);
SequenceFlow sequenceFlow = null;
if (i == userTaskList.size() - 1) {
// 如果是最后一个节点
sequenceFlow = ActivityUtils.buildSequenceFlow(userTask.getId() + "-->" + lastNodeId,
userTask.getId() + "-->" + lastNodeId, userTask.getId(), lastNodeId);
sequenceFlow.setTargetRef(lastNodeId);
} else {
// 如果不是最后一个
ActivityUtils.buildSequenceFlow(userTask.getId() + "-->" + userTaskList.get(i + 1).getId(),
userTask.getId() + "-->" + userTaskList.get(i + 1).getId(),
userTask.getId(), userTaskList.get(i + 1).getId());
sequenceFlow.setTargetFlowElement(userTaskList.get(i + 1));
}
userTask.setOutgoingFlows(Arrays.asList());
process.addFlowElement(sequenceFlow);
}
log.info("process: {}", process);
// 更新缓存
processDefinitionCacheEntry.setProcess(process);
// 如果需要立即生效(直接跳转)
if (onset) {
managementService.executeCommand(new JumpCmd(taskId, targetNodeId));
}
// 如果需要持久化
if (persistence) {
persistenceToDB(procDefId, procInstId, firstNodeId, lastNodeId, taskModelList, processEngine);
}
}

/**
* 将加签的任务节点添加到数据库
* @param procDefId
* @param procInstId
* @param firstNodeId
* @param lastNodeId
* @param taskModelList
* @param processEngine
*/
private void persistenceToDB(String procDefId, String procInstId, String firstNodeId, String lastNodeId, List<TaskModel> taskModelList, ProcessEngine processEngine) {
ProcessEngineConfigurationImpl processEngineConfiguration = (ProcessEngineConfigurationImpl) processEngine.getProcessEngineConfiguration();
SqlSession sqlSession = processEngineConfiguration.getSqlSessionFactory().openSession();
AddSignMapper mapper = sqlSession.getMapper(AddSignMapper.class);
TmpActivityModel tmpActivityModel = new TmpActivityModel();
tmpActivityModel.setFirstId(firstNodeId);
tmpActivityModel.setLastId(lastNodeId);
tmpActivityModel.setActivityList(taskModelList);
StringBuilder stringBuilder = new StringBuilder();
for (TaskModel taskModel : taskModelList) {
stringBuilder.append(taskModel.getId() + ",");
}
tmpActivityModel.setActivityIds(stringBuilder.toString());

AddSign addSign = new AddSign();
addSign.setProcessDefinitionId(procDefId);
addSign.setProcessInstanceId(procInstId);
addSign.setPropertiesText(JSON.toJSONString(tmpActivityModel));
addSign.setCreateTime(System.currentTimeMillis());
int insert = mapper.insert(addSign);
log.info("insert 结果: {}", insert);

sqlSession.commit();
sqlSession.close();
}
}

2.6. 测试代码

部署流程后, 通过测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void addSignTest() {
String taskId = "17508";
TaskEntity taskEntity = (TaskEntity) activitiRule.getTaskService().createTaskQuery()
.taskId(taskId)
.singleResult();
log.info("taskEntity: {}", taskEntity);
String firstNodeId = "destinyA";
String lastNodeId = "destinyB";
List<TaskModel> taskModelList = Lists.newArrayList();

TaskModel taskModel1 = ActivityUtils.buildTaskModel("destinyD", "destinyD", "destiny-d");
TaskModel taskModel2 = ActivityUtils.buildTaskModel("destinyD", "destinyD", "destiny-d");

taskModelList.add(taskModel1);
taskModelList.add(taskModel2);

AddSignService addSignService = new AddSignService();
addSignService.addUserTask(taskEntity.getProcessDefinitionId(), taskEntity.getProcessInstanceId(),
activitiRule.getProcessEngine(), taskModelList, firstNodeId, lastNodeId, true, true,
taskEntity.getId(), taskModelList.get(0).getId());
}

执行结束后我们就已经将 destinyD -> destinyE 两个节点加签到 destinyA 之后, destinyB 之前.

select NAME_ from ACT_RU_TASK where PROC_INST_ID_ = '17504'; 的执行结果已经变成: destinyD

现在已经完成了加签的一部分代码, 但此时的任务是不能被正确提交的, 会报如下异常:

org.activiti.engine.ActivitiException: Programmatic error: no current flow element found or invalid type: null. Halting.
at org.activiti.engine.impl.agenda.TriggerExecutionOperation.run(TriggerExecutionOperation.java:49)
at org.activiti.engine.impl.interceptor.CommandInvoker.executeOperation(CommandInvoker.java:73)
at org.activiti.engine.impl.interceptor.CommandInvoker.executeOperations(CommandInvoker.java:57)
at org.activiti.engine.impl.interceptor.CommandInvoker.execute(CommandInvoker.java:42)
at org.activiti.engine.impl.interceptor.TransactionContextInterceptor.execute(TransactionContextInterceptor.java:48)
at org.activiti.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:63)
at org.activiti.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:29)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:44)
at org.activiti.engine.impl.cfg.CommandExecutorImpl.execute(CommandExecutorImpl.java:39)
at org.activiti.engine.impl.TaskServiceImpl.complete(TaskServiceImpl.java:182)
at org.destiny.activiti.addsign1.ClientTest.complete(ClientTest.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.activiti.engine.test.ActivitiRule$1.evaluate(ActivitiRule.java:116)

2.7. 流程引擎启动时从 DB 加载流程定义信息

当流程引擎启动的时候, 如果 ACT_ADD_SIGN 表有数据, 就需要将对应的加签现场数据保存并添加到缓存中.

>