# XStream Python-Workflow构建工具 XStream中Workflow是通过Json配置文件来描述的。当一个Workflow变得复杂时,Json配置文件则变得冗长,增加维护的困难性,体现在如下几个方面: * 串联各个节点的输入和输出名字需要手动填写,容易出错或者造成命名冲突。 * 各个节点的顺序关系要保证,对于不熟悉的开发者来说,需要花时间去梳理Node的逻辑关系之后才能添加新的Node。 * 无法快速的发现并去除冗余路径上面的节点。 * 图可能出现环路。 经过实践发现,构建Workflow的过程其实和编程类似。Json在这个地方就是起到了编程语言的作用。与其发明一种新的语言,就不如使用如今非常流行的Python语言来实现快速构建Workflow,并解决上面的问题。 ## 安装 可以通过下面命令,来安装Python-Workflow构建工具 ```bash cd source/common/xstream/python_api pip install -e ``` 安装成功之后便可以使用了。 ## 使用 ### 1. 构建一个简单的Workflow ```python import xstream # 声明FasterRCNNMethod,以及指定它的输入和输出数据列表,此时这些数据作为该Method的预定义参数,后面可以根据不同的场景对预定义参数进行覆盖 frcnn_method = xstream.Method("FasterRCNNMethod").inputs(["image"]).outputs( ["face_box", "head_box", "body_box", "kps"]).thread_count(1).config_file("multitask_config.json") # 声明MOTMethod mot_method = xstream.Method("MOTMethod").inputs(["face_box"]).outputs( ["face_bbox_list", "face_disappeared_track_id_list"]).config_file("iou_method_param.json") # 定义一个workflow def my_workflow(image): face_box, head_box, body_box, kps = frcnn_method( image, unique_name="multi_task") face_bbox_list, face_disappeared_track_id_list = mot_method( face_box, unique_name="face_mot") return image, face_bbox_list, head_box, body_box, kps # 导出为json json = xstream.Serialize(my_workflow) print(json) ``` Workflow在Python中是以函数的形式存在的:**函数的参数就是这个workflow的输入,返回值是这个workflow的输出。**各个Node之间的组织关系就是Python中对Node的调用。通过这种方式可以快速的描述Node之间的组织关系。上述代码描述的Workflow中,只有两个Node,类型为`FasterRCNNMethod`和`MOTMethod`。 上面Workflow中的变量,比如的`image`、`face_box`、`head_box`、`body_box`和`kps`等只是一个名字占位符,一般情况下变量名和Method对应的输入和输出列表里的名字相同。在将Workflow加载到XStream Framework框架中,会有实际的数据结构与之对应。 上面代码执行结果: ```json { "inputs":[ "image" ], "outputs":[ "image", "face_bbox_list", "head_box", "body_box", "kps" ], "workflow":[ { "inputs":[ "image" ], "method_config_file":"multitask_config.json", "method_type":"FasterRCNNMethod", "outputs":[ "multi_task_face_box", "head_box", "body_box", "kps" ], "thread_count":1, "unique_name":"multi_task" }, { "inputs":[ "multi_task_face_box" ], "method_config_file":"iou_method_param.json", "method_type":"MOTMethod", "outputs":[ "face_bbox_list", "face_mot_face_disappeared_track_id_list" ], "thread_count":1, "unique_name":"face_mot" } ] } ``` ### 2. 构建子Workflow 通过XStream,可以非常简单的实现Workflow之间的复用和组合。下面的代码中定义了一个sub_workflow和my_workflow。sub_workflow中有很多个节点,一个输入参数,5个输出参数。sub_workflow可以看作是一个具有与之相同输入输出的Node。 ```python import xstream.xstream node1 = xstream.Method("Node_1").inputs(["data1"]).outputs( ["data1", "data2", "data3", "data4"]) node1.ConfigFile("Node1_cfg.json").ThreadCount(2) node2 = xstream.Method("Node_2").inputs( ["data1", "data2"]).outputs(["data1", "data2", "data3"]) node3 = xstream.Method("Node_3").inputs( ["data3", "data4"]).outputs(["data1", "data2", "data3"]) node5 = xstream.Method("Node_5").inputs( ["data1", "data2"]).outputs(["data1", "data2", "data3"]) node6 = xstream.Method("Node_6").inputs( ["data1", "data3"]).outputs(["data1", "data2", "data3"]) node7 = xstream.Method("Node_7").inputs( ["data1", "data3"]).outputs(["data1", "data2", "data3"]) node8 = xstream.Method("Node_8").inputs( ["data1", "data2", "data2", "data3"]).outputs(["data1", "data2", "data3"]) node9 = xstream.Method("Node_9").inputs( ["data1", "data2"]).outputs(["data1", "data2", "data3"]) cnn_method = xstream.Method("CNNMethod").inputs(["image"]) # 创建workflow def sub_workflow(data1): data1_1, data2_1, data3_1, data4_1 = node1(data1) with xstream.Scope("pre1"): # node1的下一级 data1_2, data2_2, data3_2 = node2(data1_1, data2_1) data1_3, data2_3, data3_3 = node3(data3_1, data4_1) with xstream.Scope("pre2"): # node2的下一级 data1_5, data2_5, data3_5 = node5(data1_2, data2_2) data1_6, data2_6, data3_6 = node6(data1_2, data3_2) # node3的下一级 data1_7, data2_7, data3_7 = node7( data1_3, data3_3, config={ "a": 1, "b": 2 }, thread_list=[1, 2, 3, 4] ) # node5 node6的下一级 data1_8, data2_8, data3_8 = node8(data1_5, data2_5, data2_6, data3_6) # node7的下一级 data1_9, data2_9, data3_9 = node9(data1_7, data2_7) # 返回当前工作流的结果 return data1_8, data2_8, data1_6, data1_9, data2_9 def my_workflow(image): lmk = cnn_method(image, outputs=["data1"], config_file="cnn_cfg.json") out1, out2, out3, out4, out5 = sub_workflow(lmk) eyes = cnn_method(out2, out3, inputs=["data2", "data1"], outputs=[ "eyes"], config_file="cnn_cfg2.json") # return eyes return eyes, out4 # 将workflow序列化成json jsondata = xstream.Serialize(my_workflow) # 可以对其写入至文件等 print(jsondata) ``` 上面的代码会构建如下一个workflow: ![多结点的workflow](image/xstream_pythonapi_multi_workflow_connected.png) 由于Workflow在Python中是以函数的方式体现的,那么多个workflow之间的组合就可以转化为函数调用,这样的好处是多个workflow可以以函数调用的方式随意组合。 上面代码执行结果: ```json { "inputs":[ "image" ], "outputs":[ "eyes", "data1" ], "workflow":[ { "inputs":[ "image" ], "method_config_file":"cnn_cfg.json", "method_type":"CNNMethod", "outputs":[ "CNNMethod_0_data1" ], "thread_count":1, "unique_name":"CNNMethod_0" }, { "inputs":[ "CNNMethod_0_data1" ], "method_config_file":"Node1_cfg.json", "method_type":"Node_1", "outputs":[ "Node_1_0_data1", "Node_1_0_data2", "Node_1_0_data3", "Node_1_0_data4" ], "thread_count":2, "unique_name":"Node_1_0" }, { "inputs":[ "Node_1_0_data3", "Node_1_0_data4" ], "method_config_file":"null", "method_type":"Node_3", "outputs":[ "pre1_Node_3_0_data1", "pre1_Node_3_0_data2", "pre1_Node_3_0_data3" ], "thread_count":1, "unique_name":"pre1_Node_3_0" }, { "inputs":[ "pre1_Node_3_0_data1", "pre1_Node_3_0_data3" ], "method_config":{ "a":1, "b":2 }, "method_config_file":"null", "method_type":"Node_7", "outputs":[ "pre1_pre2_Node_7_0_data1", "pre1_pre2_Node_7_0_data2", "pre1_pre2_Node_7_0_data3" ], "thread_list":[ 1, 2, 3, 4 ], "unique_name":"pre1_pre2_Node_7_0" }, { "inputs":[ "Node_1_0_data1", "Node_1_0_data2" ], "method_config_file":"null", "method_type":"Node_2", "outputs":[ "pre1_Node_2_0_data1", "pre1_Node_2_0_data2", "pre1_Node_2_0_data3" ], "thread_count":1, "unique_name":"pre1_Node_2_0" }, { "inputs":[ "pre1_pre2_Node_7_0_data1", "pre1_pre2_Node_7_0_data2" ], "method_config_file":"null", "method_type":"Node_9", "outputs":[ "data1", "Node_9_0_data2", "Node_9_0_data3" ], "thread_count":1, "unique_name":"Node_9_0" }, { "inputs":[ "pre1_Node_2_0_data1", "pre1_Node_2_0_data2" ], "method_config_file":"null", "method_type":"Node_5", "outputs":[ "pre1_pre2_Node_5_0_data1", "pre1_pre2_Node_5_0_data2", "pre1_pre2_Node_5_0_data3" ], "thread_count":1, "unique_name":"pre1_pre2_Node_5_0" }, { "inputs":[ "pre1_Node_2_0_data1", "pre1_Node_2_0_data3" ], "method_config_file":"null", "method_type":"Node_6", "outputs":[ "pre1_pre2_Node_6_0_data1", "pre1_pre2_Node_6_0_data2", "pre1_pre2_Node_6_0_data3" ], "thread_count":1, "unique_name":"pre1_pre2_Node_6_0" }, { "inputs":[ "pre1_pre2_Node_5_0_data1", "pre1_pre2_Node_5_0_data2", "pre1_pre2_Node_6_0_data2", "pre1_pre2_Node_6_0_data3" ], "method_config_file":"null", "method_type":"Node_8", "outputs":[ "pre1_Node_8_0_data1", "pre1_Node_8_0_data2", "pre1_Node_8_0_data3" ], "thread_count":1, "unique_name":"pre1_Node_8_0" }, { "inputs":[ "pre1_Node_8_0_data2", "pre1_pre2_Node_6_0_data1" ], "method_config_file":"cnn_cfg2.json", "method_type":"CNNMethod", "outputs":[ "eyes" ], "thread_count":1, "unique_name":"CNNMethod_1" } ] } ``` ### 3. 冗余路径删除 回到前面的例子: ```python def my_workflow(image): lmk = cnn_method(image, Outputs=["data1"], ConfigFile="cnn_cfg.json") out1, out2, out3, out4, out5 = sub_workflow(lmk) eyes = cnn_method(out2, out3, Inputs=["data2", "data1"], Outputs=[ "eyes"], ConfigFile="cnn_cfg2.json") # 此处只返回eyes return eyes # return eyes, out4 ``` `out4`其实是Node9的`data1`。是输入数据通过CNNMethod_0→Node1→Node3→Node7→Node9这条路径产生的结果。 如果只返回`eyes`,那么将会生成如下Json: ```json { "inputs":[ "image" ], "outputs":[ "eyes" ], "workflow":[ { "inputs":[ "image" ], "method_config_file":"cnn_cfg.json", "method_type":"CNNMethod", "outputs":[ "CNNMethod_0_data1" ], "thread_count":1, "unique_name":"CNNMethod_0" }, { "inputs":[ "CNNMethod_0_data1" ], "method_config_file":"Node1_cfg.json", "method_type":"Node_1", "outputs":[ "Node_1_0_data1", "Node_1_0_data2", "Node_1_0_data3", "Node_1_0_data4" ], "thread_count":2, "unique_name":"Node_1_0" }, { "inputs":[ "Node_1_0_data1", "Node_1_0_data2" ], "method_config_file":"null", "method_type":"Node_2", "outputs":[ "pre1_Node_2_0_data1", "pre1_Node_2_0_data2", "pre1_Node_2_0_data3" ], "thread_count":1, "unique_name":"pre1_Node_2_0" }, { "inputs":[ "pre1_Node_2_0_data1", "pre1_Node_2_0_data2" ], "method_config_file":"null", "method_type":"Node_5", "outputs":[ "pre1_pre2_Node_5_0_data1", "pre1_pre2_Node_5_0_data2", "pre1_pre2_Node_5_0_data3" ], "thread_count":1, "unique_name":"pre1_pre2_Node_5_0" }, { "inputs":[ "pre1_Node_2_0_data1", "pre1_Node_2_0_data3" ], "method_config_file":"null", "method_type":"Node_6", "outputs":[ "pre1_pre2_Node_6_0_data1", "pre1_pre2_Node_6_0_data2", "pre1_pre2_Node_6_0_data3" ], "thread_count":1, "unique_name":"pre1_pre2_Node_6_0" }, { "inputs":[ "pre1_pre2_Node_5_0_data1", "pre1_pre2_Node_5_0_data2", "pre1_pre2_Node_6_0_data2", "pre1_pre2_Node_6_0_data3" ], "method_config_file":"null", "method_type":"Node_8", "outputs":[ "pre1_Node_8_0_data1", "pre1_Node_8_0_data2", "pre1_Node_8_0_data3" ], "thread_count":1, "unique_name":"pre1_Node_8_0" }, { "inputs":[ "pre1_Node_8_0_data2", "pre1_pre2_Node_6_0_data1" ], "method_config_file":"cnn_cfg2.json", "method_type":"CNNMethod", "outputs":[ "eyes" ], "thread_count":1, "unique_name":"CNNMethod_1" } ] } ``` 可以看到Node3,7,9并没有在Workflow中出现。这是因为Node9的输出不需要,所以CNNMethod_0→Node1→Node3→Node7→Node9这条路径是冗余的,就自动将它优化掉了。