XStream Python-Workflow构建工具

XStream中Workflow是通过Json配置文件来描述的。当一个Workflow变得复杂时,Json配置文件则变得冗长,增加维护的困难性,体现在如下几个方面:

  • 串联各个节点的输入和输出名字需要手动填写,容易出错或者造成命名冲突。

  • 各个节点的顺序关系要保证,对于不熟悉的开发者来说,需要花时间去梳理Node的逻辑关系之后才能添加新的Node。

  • 无法快速的发现并去除冗余路径上面的节点。

  • 图可能出现环路。

经过实践发现,构建Workflow的过程其实和编程类似。Json在这个地方就是起到了编程语言的作用。与其发明一种新的语言,就不如使用如今非常流行的Python语言来实现快速构建Workflow,并解决上面的问题。

安装

可以通过下面命令,来安装Python-Workflow构建工具

cd source/common/xstream/python_api
pip install -e 

安装成功之后便可以使用了。

使用

1. 构建一个简单的Workflow

import xstream

# 声明FasterRCNNMethod,以及指定它的输入和输出数据列表,此时这些数据作为该Method的预定义参数,后面可以根据不同的场景对预定义参数进行覆盖
frcnn_method = xstream.Method("FasterRCNNMethod").inputs(["image"]).outputs(
    ["face_box", "head_box", "body_box", "kps"]).thread_count(1).config_file("multitask_config.json")

# 声明MOTMethod
mot_method = xstream.Method("MOTMethod").inputs(["face_box"]).outputs(
    ["face_bbox_list", "face_disappeared_track_id_list"]).config_file("iou_method_param.json")

# 定义一个workflow
def my_workflow(image):
    face_box, head_box, body_box, kps = frcnn_method(
        image, unique_name="multi_task")
    face_bbox_list, face_disappeared_track_id_list = mot_method(
        face_box, unique_name="face_mot")

    return image, face_bbox_list, head_box, body_box, kps

# 导出为json
json = xstream.Serialize(my_workflow)
print(json)

Workflow在Python中是以函数的形式存在的:**函数的参数就是这个workflow的输入,返回值是这个workflow的输出。**各个Node之间的组织关系就是Python中对Node的调用。通过这种方式可以快速的描述Node之间的组织关系。上述代码描述的Workflow中,只有两个Node,类型为FasterRCNNMethodMOTMethod

上面Workflow中的变量,比如的imageface_boxhead_boxbody_boxkps等只是一个名字占位符,一般情况下变量名和Method对应的输入和输出列表里的名字相同。在将Workflow加载到XStream Framework框架中,会有实际的数据结构与之对应。

上面代码执行结果:

{
    "inputs":[
        "image"
    ],
    "outputs":[
        "image",
        "face_bbox_list",
        "head_box",
        "body_box",
        "kps"
    ],
    "workflow":[
        {
            "inputs":[
                "image"
            ],
            "method_config_file":"multitask_config.json",
            "method_type":"FasterRCNNMethod",
            "outputs":[
                "multi_task_face_box",
                "head_box",
                "body_box",
                "kps"
            ],
            "thread_count":1,
            "unique_name":"multi_task"
        },
        {
            "inputs":[
                "multi_task_face_box"
            ],
            "method_config_file":"iou_method_param.json",
            "method_type":"MOTMethod",
            "outputs":[
                "face_bbox_list",
                "face_mot_face_disappeared_track_id_list"
            ],
            "thread_count":1,
            "unique_name":"face_mot"
        }
    ]
}

2. 构建子Workflow

通过XStream,可以非常简单的实现Workflow之间的复用和组合。下面的代码中定义了一个sub_workflow和my_workflow。sub_workflow中有很多个节点,一个输入参数,5个输出参数。sub_workflow可以看作是一个具有与之相同输入输出的Node。

import xstream.xstream

node1 = xstream.Method("Node_1").inputs(["data1"]).outputs(
    ["data1", "data2", "data3", "data4"])
node1.ConfigFile("Node1_cfg.json").ThreadCount(2)
node2 = xstream.Method("Node_2").inputs(
    ["data1", "data2"]).outputs(["data1", "data2", "data3"])
node3 = xstream.Method("Node_3").inputs(
    ["data3", "data4"]).outputs(["data1", "data2", "data3"])

node5 = xstream.Method("Node_5").inputs(
    ["data1", "data2"]).outputs(["data1", "data2", "data3"])
node6 = xstream.Method("Node_6").inputs(
    ["data1", "data3"]).outputs(["data1", "data2", "data3"])
node7 = xstream.Method("Node_7").inputs(
    ["data1", "data3"]).outputs(["data1", "data2", "data3"])
node8 = xstream.Method("Node_8").inputs(
    ["data1", "data2", "data2", "data3"]).outputs(["data1", "data2", "data3"])
node9 = xstream.Method("Node_9").inputs(
    ["data1", "data2"]).outputs(["data1", "data2", "data3"])

cnn_method = xstream.Method("CNNMethod").inputs(["image"])

# 创建workflow
def sub_workflow(data1):
    data1_1, data2_1, data3_1, data4_1 = node1(data1)

    with xstream.Scope("pre1"):
        # node1的下一级
        data1_2, data2_2, data3_2 = node2(data1_1, data2_1)
        data1_3, data2_3, data3_3 = node3(data3_1, data4_1)

        with xstream.Scope("pre2"):
            # node2的下一级
            data1_5, data2_5, data3_5 = node5(data1_2, data2_2)
            data1_6, data2_6, data3_6 = node6(data1_2, data3_2)

            # node3的下一级
            data1_7, data2_7, data3_7 = node7(
                data1_3, data3_3,
                config={
                    "a": 1,
                    "b": 2
                },
                thread_list=[1, 2, 3, 4]
            )

        # node5 node6的下一级
        data1_8, data2_8, data3_8 = node8(data1_5, data2_5, data2_6, data3_6)

    # node7的下一级
    data1_9, data2_9, data3_9 = node9(data1_7, data2_7)

    # 返回当前工作流的结果
    return data1_8, data2_8, data1_6, data1_9, data2_9

def my_workflow(image):
    lmk = cnn_method(image, outputs=["data1"], config_file="cnn_cfg.json")
    out1, out2, out3, out4, out5 = sub_workflow(lmk)
    eyes = cnn_method(out2, out3, inputs=["data2", "data1"], outputs=[
                      "eyes"], config_file="cnn_cfg2.json")

    # return eyes
    return eyes, out4

# 将workflow序列化成json
jsondata = xstream.Serialize(my_workflow)  # 可以对其写入至文件等
print(jsondata)

上面的代码会构建如下一个workflow:

多结点的workflow

由于Workflow在Python中是以函数的方式体现的,那么多个workflow之间的组合就可以转化为函数调用,这样的好处是多个workflow可以以函数调用的方式随意组合。

上面代码执行结果:

{
    "inputs":[
        "image"
    ],
    "outputs":[
        "eyes",
        "data1"
    ],
    "workflow":[
        {
            "inputs":[
                "image"
            ],
            "method_config_file":"cnn_cfg.json",
            "method_type":"CNNMethod",
            "outputs":[
                "CNNMethod_0_data1"
            ],
            "thread_count":1,
            "unique_name":"CNNMethod_0"
        },
        {
            "inputs":[
                "CNNMethod_0_data1"
            ],
            "method_config_file":"Node1_cfg.json",
            "method_type":"Node_1",
            "outputs":[
                "Node_1_0_data1",
                "Node_1_0_data2",
                "Node_1_0_data3",
                "Node_1_0_data4"
            ],
            "thread_count":2,
            "unique_name":"Node_1_0"
        },
        {
            "inputs":[
                "Node_1_0_data3",
                "Node_1_0_data4"
            ],
            "method_config_file":"null",
            "method_type":"Node_3",
            "outputs":[
                "pre1_Node_3_0_data1",
                "pre1_Node_3_0_data2",
                "pre1_Node_3_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_Node_3_0"
        },
        {
            "inputs":[
                "pre1_Node_3_0_data1",
                "pre1_Node_3_0_data3"
            ],
            "method_config":{
                "a":1,
                "b":2
            },
            "method_config_file":"null",
            "method_type":"Node_7",
            "outputs":[
                "pre1_pre2_Node_7_0_data1",
                "pre1_pre2_Node_7_0_data2",
                "pre1_pre2_Node_7_0_data3"
            ],
            "thread_list":[
                1,
                2,
                3,
                4
            ],
            "unique_name":"pre1_pre2_Node_7_0"
        },
        {
            "inputs":[
                "Node_1_0_data1",
                "Node_1_0_data2"
            ],
            "method_config_file":"null",
            "method_type":"Node_2",
            "outputs":[
                "pre1_Node_2_0_data1",
                "pre1_Node_2_0_data2",
                "pre1_Node_2_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_Node_2_0"
        },
        {
            "inputs":[
                "pre1_pre2_Node_7_0_data1",
                "pre1_pre2_Node_7_0_data2"
            ],
            "method_config_file":"null",
            "method_type":"Node_9",
            "outputs":[
                "data1",
                "Node_9_0_data2",
                "Node_9_0_data3"
            ],
            "thread_count":1,
            "unique_name":"Node_9_0"
        },
        {
            "inputs":[
                "pre1_Node_2_0_data1",
                "pre1_Node_2_0_data2"
            ],
            "method_config_file":"null",
            "method_type":"Node_5",
            "outputs":[
                "pre1_pre2_Node_5_0_data1",
                "pre1_pre2_Node_5_0_data2",
                "pre1_pre2_Node_5_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_pre2_Node_5_0"
        },
        {
            "inputs":[
                "pre1_Node_2_0_data1",
                "pre1_Node_2_0_data3"
            ],
            "method_config_file":"null",
            "method_type":"Node_6",
            "outputs":[
                "pre1_pre2_Node_6_0_data1",
                "pre1_pre2_Node_6_0_data2",
                "pre1_pre2_Node_6_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_pre2_Node_6_0"
        },
        {
            "inputs":[
                "pre1_pre2_Node_5_0_data1",
                "pre1_pre2_Node_5_0_data2",
                "pre1_pre2_Node_6_0_data2",
                "pre1_pre2_Node_6_0_data3"
            ],
            "method_config_file":"null",
            "method_type":"Node_8",
            "outputs":[
                "pre1_Node_8_0_data1",
                "pre1_Node_8_0_data2",
                "pre1_Node_8_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_Node_8_0"
        },
        {
            "inputs":[
                "pre1_Node_8_0_data2",
                "pre1_pre2_Node_6_0_data1"
            ],
            "method_config_file":"cnn_cfg2.json",
            "method_type":"CNNMethod",
            "outputs":[
                "eyes"
            ],
            "thread_count":1,
            "unique_name":"CNNMethod_1"
        }
    ]
}

3. 冗余路径删除

回到前面的例子:

def my_workflow(image):
    lmk = cnn_method(image, Outputs=["data1"], ConfigFile="cnn_cfg.json")
    out1, out2, out3, out4, out5 = sub_workflow(lmk)
    eyes = cnn_method(out2, out3, Inputs=["data2", "data1"], Outputs=[
                      "eyes"], ConfigFile="cnn_cfg2.json")
	
    # 此处只返回eyes
    return eyes
    # return eyes, out4

out4其实是Node9的data1。是输入数据通过CNNMethod_0→Node1→Node3→Node7→Node9这条路径产生的结果。

如果只返回eyes,那么将会生成如下Json:

{
    "inputs":[
        "image"
    ],
    "outputs":[
        "eyes"
    ],
    "workflow":[
        {
            "inputs":[
                "image"
            ],
            "method_config_file":"cnn_cfg.json",
            "method_type":"CNNMethod",
            "outputs":[
                "CNNMethod_0_data1"
            ],
            "thread_count":1,
            "unique_name":"CNNMethod_0"
        },
        {
            "inputs":[
                "CNNMethod_0_data1"
            ],
            "method_config_file":"Node1_cfg.json",
            "method_type":"Node_1",
            "outputs":[
                "Node_1_0_data1",
                "Node_1_0_data2",
                "Node_1_0_data3",
                "Node_1_0_data4"
            ],
            "thread_count":2,
            "unique_name":"Node_1_0"
        },
        {
            "inputs":[
                "Node_1_0_data1",
                "Node_1_0_data2"
            ],
            "method_config_file":"null",
            "method_type":"Node_2",
            "outputs":[
                "pre1_Node_2_0_data1",
                "pre1_Node_2_0_data2",
                "pre1_Node_2_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_Node_2_0"
        },
        {
            "inputs":[
                "pre1_Node_2_0_data1",
                "pre1_Node_2_0_data2"
            ],
            "method_config_file":"null",
            "method_type":"Node_5",
            "outputs":[
                "pre1_pre2_Node_5_0_data1",
                "pre1_pre2_Node_5_0_data2",
                "pre1_pre2_Node_5_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_pre2_Node_5_0"
        },
        {
            "inputs":[
                "pre1_Node_2_0_data1",
                "pre1_Node_2_0_data3"
            ],
            "method_config_file":"null",
            "method_type":"Node_6",
            "outputs":[
                "pre1_pre2_Node_6_0_data1",
                "pre1_pre2_Node_6_0_data2",
                "pre1_pre2_Node_6_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_pre2_Node_6_0"
        },
        {
            "inputs":[
                "pre1_pre2_Node_5_0_data1",
                "pre1_pre2_Node_5_0_data2",
                "pre1_pre2_Node_6_0_data2",
                "pre1_pre2_Node_6_0_data3"
            ],
            "method_config_file":"null",
            "method_type":"Node_8",
            "outputs":[
                "pre1_Node_8_0_data1",
                "pre1_Node_8_0_data2",
                "pre1_Node_8_0_data3"
            ],
            "thread_count":1,
            "unique_name":"pre1_Node_8_0"
        },
        {
            "inputs":[
                "pre1_Node_8_0_data2",
                "pre1_pre2_Node_6_0_data1"
            ],
            "method_config_file":"cnn_cfg2.json",
            "method_type":"CNNMethod",
            "outputs":[
                "eyes"
            ],
            "thread_count":1,
            "unique_name":"CNNMethod_1"
        }
    ]
}

可以看到Node3,7,9并没有在Workflow中出现。这是因为Node9的输出不需要,所以CNNMethod_0→Node1→Node3→Node7→Node9这条路径是冗余的,就自动将它优化掉了。