XStream Python-Workflow构建工具¶
XStream中Workflow是通过Json配置文件来描述的。当一个Workflow变得复杂时,Json配置文件则变得冗长,增加维护的困难性,体现在如下几个方面:
串联各个节点的输入和输出名字需要手动填写,容易出错或者造成命名冲突。
各个节点的顺序关系要保证,对于不熟悉的开发者来说,需要花时间去梳理Node的逻辑关系之后才能添加新的Node。
无法快速的发现并去除冗余路径上面的节点。
图可能出现环路。
经过实践发现,构建Workflow的过程其实和编程类似。Json在这个地方就是起到了编程语言的作用。与其发明一种新的语言,就不如使用如今非常流行的Python语言来实现快速构建Workflow,并解决上面的问题。
安装¶
可以通过下面命令,来安装Python-Workflow构建工具
cd source/common/xstream/python_api
pip install -e
安装成功之后便可以使用了。
使用¶
1. 构建一个简单的Workflow¶
import xstream
# 声明FasterRCNNMethod,以及指定它的输入和输出数据列表,此时这些数据作为该Method的预定义参数,后面可以根据不同的场景对预定义参数进行覆盖
frcnn_method = xstream.Method("FasterRCNNMethod").inputs(["image"]).outputs(
["face_box", "head_box", "body_box", "kps"]).thread_count(1).config_file("multitask_config.json")
# 声明MOTMethod
mot_method = xstream.Method("MOTMethod").inputs(["face_box"]).outputs(
["face_bbox_list", "face_disappeared_track_id_list"]).config_file("iou_method_param.json")
# 定义一个workflow
def my_workflow(image):
face_box, head_box, body_box, kps = frcnn_method(
image, unique_name="multi_task")
face_bbox_list, face_disappeared_track_id_list = mot_method(
face_box, unique_name="face_mot")
return image, face_bbox_list, head_box, body_box, kps
# 导出为json
json = xstream.Serialize(my_workflow)
print(json)
Workflow在Python中是以函数的形式存在的:**函数的参数就是这个workflow的输入,返回值是这个workflow的输出。**各个Node之间的组织关系就是Python中对Node的调用。通过这种方式可以快速的描述Node之间的组织关系。上述代码描述的Workflow中,只有两个Node,类型为FasterRCNNMethod
和MOTMethod
。
上面Workflow中的变量,比如的image
、face_box
、head_box
、body_box
和kps
等只是一个名字占位符,一般情况下变量名和Method对应的输入和输出列表里的名字相同。在将Workflow加载到XStream Framework框架中,会有实际的数据结构与之对应。
上面代码执行结果:
{
"inputs":[
"image"
],
"outputs":[
"image",
"face_bbox_list",
"head_box",
"body_box",
"kps"
],
"workflow":[
{
"inputs":[
"image"
],
"method_config_file":"multitask_config.json",
"method_type":"FasterRCNNMethod",
"outputs":[
"multi_task_face_box",
"head_box",
"body_box",
"kps"
],
"thread_count":1,
"unique_name":"multi_task"
},
{
"inputs":[
"multi_task_face_box"
],
"method_config_file":"iou_method_param.json",
"method_type":"MOTMethod",
"outputs":[
"face_bbox_list",
"face_mot_face_disappeared_track_id_list"
],
"thread_count":1,
"unique_name":"face_mot"
}
]
}
2. 构建子Workflow¶
通过XStream,可以非常简单的实现Workflow之间的复用和组合。下面的代码中定义了一个sub_workflow和my_workflow。sub_workflow中有很多个节点,一个输入参数,5个输出参数。sub_workflow可以看作是一个具有与之相同输入输出的Node。
import xstream.xstream
node1 = xstream.Method("Node_1").inputs(["data1"]).outputs(
["data1", "data2", "data3", "data4"])
node1.ConfigFile("Node1_cfg.json").ThreadCount(2)
node2 = xstream.Method("Node_2").inputs(
["data1", "data2"]).outputs(["data1", "data2", "data3"])
node3 = xstream.Method("Node_3").inputs(
["data3", "data4"]).outputs(["data1", "data2", "data3"])
node5 = xstream.Method("Node_5").inputs(
["data1", "data2"]).outputs(["data1", "data2", "data3"])
node6 = xstream.Method("Node_6").inputs(
["data1", "data3"]).outputs(["data1", "data2", "data3"])
node7 = xstream.Method("Node_7").inputs(
["data1", "data3"]).outputs(["data1", "data2", "data3"])
node8 = xstream.Method("Node_8").inputs(
["data1", "data2", "data2", "data3"]).outputs(["data1", "data2", "data3"])
node9 = xstream.Method("Node_9").inputs(
["data1", "data2"]).outputs(["data1", "data2", "data3"])
cnn_method = xstream.Method("CNNMethod").inputs(["image"])
# 创建workflow
def sub_workflow(data1):
data1_1, data2_1, data3_1, data4_1 = node1(data1)
with xstream.Scope("pre1"):
# node1的下一级
data1_2, data2_2, data3_2 = node2(data1_1, data2_1)
data1_3, data2_3, data3_3 = node3(data3_1, data4_1)
with xstream.Scope("pre2"):
# node2的下一级
data1_5, data2_5, data3_5 = node5(data1_2, data2_2)
data1_6, data2_6, data3_6 = node6(data1_2, data3_2)
# node3的下一级
data1_7, data2_7, data3_7 = node7(
data1_3, data3_3,
config={
"a": 1,
"b": 2
},
thread_list=[1, 2, 3, 4]
)
# node5 node6的下一级
data1_8, data2_8, data3_8 = node8(data1_5, data2_5, data2_6, data3_6)
# node7的下一级
data1_9, data2_9, data3_9 = node9(data1_7, data2_7)
# 返回当前工作流的结果
return data1_8, data2_8, data1_6, data1_9, data2_9
def my_workflow(image):
lmk = cnn_method(image, outputs=["data1"], config_file="cnn_cfg.json")
out1, out2, out3, out4, out5 = sub_workflow(lmk)
eyes = cnn_method(out2, out3, inputs=["data2", "data1"], outputs=[
"eyes"], config_file="cnn_cfg2.json")
# return eyes
return eyes, out4
# 将workflow序列化成json
jsondata = xstream.Serialize(my_workflow) # 可以对其写入至文件等
print(jsondata)
上面的代码会构建如下一个workflow:
由于Workflow在Python中是以函数的方式体现的,那么多个workflow之间的组合就可以转化为函数调用,这样的好处是多个workflow可以以函数调用的方式随意组合。
上面代码执行结果:
{
"inputs":[
"image"
],
"outputs":[
"eyes",
"data1"
],
"workflow":[
{
"inputs":[
"image"
],
"method_config_file":"cnn_cfg.json",
"method_type":"CNNMethod",
"outputs":[
"CNNMethod_0_data1"
],
"thread_count":1,
"unique_name":"CNNMethod_0"
},
{
"inputs":[
"CNNMethod_0_data1"
],
"method_config_file":"Node1_cfg.json",
"method_type":"Node_1",
"outputs":[
"Node_1_0_data1",
"Node_1_0_data2",
"Node_1_0_data3",
"Node_1_0_data4"
],
"thread_count":2,
"unique_name":"Node_1_0"
},
{
"inputs":[
"Node_1_0_data3",
"Node_1_0_data4"
],
"method_config_file":"null",
"method_type":"Node_3",
"outputs":[
"pre1_Node_3_0_data1",
"pre1_Node_3_0_data2",
"pre1_Node_3_0_data3"
],
"thread_count":1,
"unique_name":"pre1_Node_3_0"
},
{
"inputs":[
"pre1_Node_3_0_data1",
"pre1_Node_3_0_data3"
],
"method_config":{
"a":1,
"b":2
},
"method_config_file":"null",
"method_type":"Node_7",
"outputs":[
"pre1_pre2_Node_7_0_data1",
"pre1_pre2_Node_7_0_data2",
"pre1_pre2_Node_7_0_data3"
],
"thread_list":[
1,
2,
3,
4
],
"unique_name":"pre1_pre2_Node_7_0"
},
{
"inputs":[
"Node_1_0_data1",
"Node_1_0_data2"
],
"method_config_file":"null",
"method_type":"Node_2",
"outputs":[
"pre1_Node_2_0_data1",
"pre1_Node_2_0_data2",
"pre1_Node_2_0_data3"
],
"thread_count":1,
"unique_name":"pre1_Node_2_0"
},
{
"inputs":[
"pre1_pre2_Node_7_0_data1",
"pre1_pre2_Node_7_0_data2"
],
"method_config_file":"null",
"method_type":"Node_9",
"outputs":[
"data1",
"Node_9_0_data2",
"Node_9_0_data3"
],
"thread_count":1,
"unique_name":"Node_9_0"
},
{
"inputs":[
"pre1_Node_2_0_data1",
"pre1_Node_2_0_data2"
],
"method_config_file":"null",
"method_type":"Node_5",
"outputs":[
"pre1_pre2_Node_5_0_data1",
"pre1_pre2_Node_5_0_data2",
"pre1_pre2_Node_5_0_data3"
],
"thread_count":1,
"unique_name":"pre1_pre2_Node_5_0"
},
{
"inputs":[
"pre1_Node_2_0_data1",
"pre1_Node_2_0_data3"
],
"method_config_file":"null",
"method_type":"Node_6",
"outputs":[
"pre1_pre2_Node_6_0_data1",
"pre1_pre2_Node_6_0_data2",
"pre1_pre2_Node_6_0_data3"
],
"thread_count":1,
"unique_name":"pre1_pre2_Node_6_0"
},
{
"inputs":[
"pre1_pre2_Node_5_0_data1",
"pre1_pre2_Node_5_0_data2",
"pre1_pre2_Node_6_0_data2",
"pre1_pre2_Node_6_0_data3"
],
"method_config_file":"null",
"method_type":"Node_8",
"outputs":[
"pre1_Node_8_0_data1",
"pre1_Node_8_0_data2",
"pre1_Node_8_0_data3"
],
"thread_count":1,
"unique_name":"pre1_Node_8_0"
},
{
"inputs":[
"pre1_Node_8_0_data2",
"pre1_pre2_Node_6_0_data1"
],
"method_config_file":"cnn_cfg2.json",
"method_type":"CNNMethod",
"outputs":[
"eyes"
],
"thread_count":1,
"unique_name":"CNNMethod_1"
}
]
}
3. 冗余路径删除¶
回到前面的例子:
def my_workflow(image):
lmk = cnn_method(image, Outputs=["data1"], ConfigFile="cnn_cfg.json")
out1, out2, out3, out4, out5 = sub_workflow(lmk)
eyes = cnn_method(out2, out3, Inputs=["data2", "data1"], Outputs=[
"eyes"], ConfigFile="cnn_cfg2.json")
# 此处只返回eyes
return eyes
# return eyes, out4
out4
其实是Node9的data1
。是输入数据通过CNNMethod_0→Node1→Node3→Node7→Node9这条路径产生的结果。
如果只返回eyes
,那么将会生成如下Json:
{
"inputs":[
"image"
],
"outputs":[
"eyes"
],
"workflow":[
{
"inputs":[
"image"
],
"method_config_file":"cnn_cfg.json",
"method_type":"CNNMethod",
"outputs":[
"CNNMethod_0_data1"
],
"thread_count":1,
"unique_name":"CNNMethod_0"
},
{
"inputs":[
"CNNMethod_0_data1"
],
"method_config_file":"Node1_cfg.json",
"method_type":"Node_1",
"outputs":[
"Node_1_0_data1",
"Node_1_0_data2",
"Node_1_0_data3",
"Node_1_0_data4"
],
"thread_count":2,
"unique_name":"Node_1_0"
},
{
"inputs":[
"Node_1_0_data1",
"Node_1_0_data2"
],
"method_config_file":"null",
"method_type":"Node_2",
"outputs":[
"pre1_Node_2_0_data1",
"pre1_Node_2_0_data2",
"pre1_Node_2_0_data3"
],
"thread_count":1,
"unique_name":"pre1_Node_2_0"
},
{
"inputs":[
"pre1_Node_2_0_data1",
"pre1_Node_2_0_data2"
],
"method_config_file":"null",
"method_type":"Node_5",
"outputs":[
"pre1_pre2_Node_5_0_data1",
"pre1_pre2_Node_5_0_data2",
"pre1_pre2_Node_5_0_data3"
],
"thread_count":1,
"unique_name":"pre1_pre2_Node_5_0"
},
{
"inputs":[
"pre1_Node_2_0_data1",
"pre1_Node_2_0_data3"
],
"method_config_file":"null",
"method_type":"Node_6",
"outputs":[
"pre1_pre2_Node_6_0_data1",
"pre1_pre2_Node_6_0_data2",
"pre1_pre2_Node_6_0_data3"
],
"thread_count":1,
"unique_name":"pre1_pre2_Node_6_0"
},
{
"inputs":[
"pre1_pre2_Node_5_0_data1",
"pre1_pre2_Node_5_0_data2",
"pre1_pre2_Node_6_0_data2",
"pre1_pre2_Node_6_0_data3"
],
"method_config_file":"null",
"method_type":"Node_8",
"outputs":[
"pre1_Node_8_0_data1",
"pre1_Node_8_0_data2",
"pre1_Node_8_0_data3"
],
"thread_count":1,
"unique_name":"pre1_Node_8_0"
},
{
"inputs":[
"pre1_Node_8_0_data2",
"pre1_pre2_Node_6_0_data1"
],
"method_config_file":"cnn_cfg2.json",
"method_type":"CNNMethod",
"outputs":[
"eyes"
],
"thread_count":1,
"unique_name":"CNNMethod_1"
}
]
}
可以看到Node3,7,9并没有在Workflow中出现。这是因为Node9的输出不需要,所以CNNMethod_0→Node1→Node3→Node7→Node9这条路径是冗余的,就自动将它优化掉了。