一、异步任务框架
以往在处理异步任务的时候,有过三种选择 :
第一种选择
- 描述 :子线程。
 - 劣势 :线程间协调不好处理。代码耦合。
 
第二种选择
- 描述 :把任务写进DB,由另一个进程每隔一段时间轮询DB并执行。(注,相比进程内部定期轮询,用crontab定期拉起消费者进程则更为可用。)
 - 劣势 :DB轮询压力大。实时性不高。
 
第三种选择
- 描述 :把任务写进MQ,由另一个进程收取消息并执行。(注,性能和实时性均高于第二种选择。)
 - 劣势 :一个任务往往是由多个步骤组成的,不同任务之间,往往会存在很多相同的子步骤,造成很多重复代码。
 
当我们把场景定位于服务端的多步骤的复杂耗时的异步任务时,上述三种选择均不可取。其实第三种选择只是不够优雅,如果能消除重复的子步骤代码,那就是比较好的一个方案了。讨论下图的模型(参考了一些服务端模型后总结) :

优雅的做法如上图
- 1、消费者被拆成多个模块,各模块只专注做好自己的事情。
 - 2、各模块监听 非同名的 队列,以免相互干扰。
 - 3、OSS 收到请求后,把整个任务写进DB,并且把任务的第一个子步骤写到 该子步骤对应的队列。
 - 4、第一个子步骤 对应的 执行模块,收到消息,执行第一步。然后查DB,把下一个子步骤写到MQ中,以此串起来。
 
下面重点论述 任务、消息的数据结构 以及 OSS、执行模块的概要逻辑。
二、DB 中的 整个任务
{
    "TaskId"      : 任务编号,
    "TaskStatus"  : 任务状态,
    "TaskMessage" : 任务报错,
    "TaskCursor"  : 游标编号(进行到了第几子步骤),
    "TimeCreate"  : 任务创建时间,
    "TimeStart"   : 任务开始时间,
    "TimeEnd"     : 任务结束时间,
    "Parameters"  : 子步骤共享参数,
    "Steps"       : [
        {
            "Code"            : 子步骤结果,
            "Message"         : 子步骤报错,
            "TimeStart"       : 子步骤开始时间,
            "TimeEnd"         : 子步骤结束时间,
            "NormalModule"    : 正常流程的处理模块,
            "NormalCommand"   : 正常流程的处理命令,
            "NormalTimeout"   : 正常流程的时间限制,
            "NormalRetry"     : 正常流程的重试限制,
            "RollbackModule"  : 回滚流程的处理模块,
            "RollbackCommand" : 回滚流程的处理命令,
            "RollbackTimeout" : 回滚流程的时间限制,
            "RollbackRetry"   : 回滚流程的重试限制
        },
        ...
        ...
    ]
}
任务状态
- 0 成功
 - 1 未开始
 - 2 执行中
 - 3 回滚中
 - other 失败
 
子步骤共享参数
- 包含 OSS 收到的参数 + 各子步骤均需要的公共参数 + 各子步骤的结果(以便后续子步骤能获取上游的执行结果)。
 
处理模块 & 处理命令
- 比如有一个叫 image 的执行模块,它可以处理的命令有 download_image,check_image,backup_image 等。
 
三、MQ 中的 步骤消息
{
    "TaskId"     : 所属任务,
    "Cursor"     : 当前游标,
    "Type"       : 当前类型,
    "Pamameters" : 共享数据,
    "Module"     : 执行模块,
    "Command"    : 执行命令,
    "Timeout"    : 执行时间限制,
    "Retry"      : 执行重试次数限制
}
当前游标
- 若一路的子步骤都是正常的,那么 :消息中的游标 = 任务中的游标。
 - 若在第n子步骤失败了,那么 :任务中的游标就固定在n不会变了(它表示任务最远进行到步骤n);而消息中的游标是会不断减一(它表示依次向前回滚)。
 
当前类型
- 0 正常流程
 - 1 回滚流程
 
四、OSS 创建任务的逻辑
例如,用户发起了一个 创建数据库实例 的任务。
1、OSS查询任务流程配置
{
    "modify_ntp" : [
        {
            "normal"      : {
                "module"  : "resource",
                "command" : "check_resource",   // 资源检查
                "timeout" : 300,
                "retry"   : 0
            },
            "rollback"    : {
                "module"  : "monitor",
                "command" : "report_event",     // 上报告警
                "timeout" : 300,
                "retry"   : 3
            },
        },
        {
            "normal"      : {
                "module"  : "mysql",
                "command" : "init_instance",    // 初始化实例
                "timeout" : 1800,
                "retry"   : 3
            },
            "rollback"    : {
                "module"  : "mysql",
                "command" : "clean_instance",   // 清除初始化实例过程中留下的东西
                "timeout" : 900,
                "retry"   : 3
            },
        },
        {
            "normal"      : {
                "module"  : "resource",
                "command" : "deduct_resource",  // 扣除资源
                "timeout" : 200,
                "retry"   : 2
            },
            "rollback"    : {
                "module"  : "resource",
                "command" : "restore_resource", // 恢复资源
                "timeout" : 200,
                "retry"   : 2
            },
        }
    ],
    ...
    ...
}
即有这样的一个逻辑 :

2、OSS生成任务,并写入DB
{
    "TaskId"      : 全局唯一的ID,
    "TaskStatus"  : 1,
    "TaskMessage" : null,
    "TaskCursor"  : 0,
    "TimeCreate"  : 当前秒级时间戳,
    "TimeStart"   : null,
    "TimeEnd"     : null,
    "Parameters"  : {
        "Cpu"     : 4,
        "Memory"  : 8,
        "Storage" : 500
    },
    "Steps"       : [
        {
            "Code"            : 0,
            "Message"         : null,
            "TimeStart"       : null,
            "TimeEnd"         : null,
            "NormalModule"    : "resource",
            "NormalCommand"   : "check_resource",
            "NormalTimeout"   : 300,
            "NormalRetry"     : 0,
            "RollbackModule"  : "monitor",
            "RollbackCommand" : "report_event",
            "RollbackTimeout" : 300,
            "RollbackRetry"   : 3
        },
        {
            "Code"            : 0,
            "Message"         : null,
            "TimeStart"       : null,
            "TimeEnd"         : null,
            "NormalModule"    : "mysql",
            "NormalCommand"   : "init_instance",
            "NormalTimeout"   : 1800,
            "NormalRetry"     : 3,
            "RollbackModule"  : "mysql",
            "RollbackCommand" : "clean_instance",
            "RollbackTimeout" : 900,
            "RollbackRetry"   : 3
        },
        {
            "Code"            : 0,
            "Message"         : null,
            "TimeStart"       : null,
            "TimeEnd"         : null,
            "NormalModule"    : "resource",
            "NormalCommand"   : "deduct_resource",
            "NormalTimeout"   : 200,
            "NormalRetry"     : 2,
            "RollbackModule"  : "resource",
            "RollbackCommand" : "restore_resource",
            "RollbackTimeout" : 200,
            "RollbackRetry"   : 2
        }
    ]
}
3、OSS生成第一个子步骤的消息,并写入MQ
{
    "TaskId"      : 任务ID,
    "Cursor"      : 0,
    "Type"        : 0,
    "Parameters"  : {
        "Cpu"     : 4,
        "Memory"  : 8,
        "Storage" : 500
    },
    "Module"      : "resource",
    "Command"     : "check_resource",
    "Timeout"     : 300,
    "Retry"       : 0
}
五、各执行模块的逻辑
记 ${msg} 为 本模块收到的消息;
step_success = False;
step_retry   = ${msg}.Retry;
do {
    res = execStep( ${msg}.Command, ${msg}.Parameters );
    
    if ( res 表明成功 ) {
        将 res 中的必要数据 写入 ${msg}.Parameters,以供后续子步骤使用;
        step_success = True;
    }
    
    step_retry--;
} while( step_success == False && step_retry >= 0 );
查询DB,记 ${task} 为所属的任务;
if ( step_success && ${msg}.Type == 0 ) {
    // 成功 & 本次执行是正常流程,则准备 :下一步 正常子步骤
}
if ( step_success && ${msg}.Type == 1 ) {
    // 成功 & 本次执行是回滚流程,则准备 :上一步 回滚子步骤
}
if ( !step_success && ${msg}.Type == 0 ) {
    // 失败 & 本次执行是正常流程,则准备 :这一步 回滚子步骤
}
if ( !step_success && ${msg}.Type == 1 ) {
    // 失败 & 本次执行是回滚流程,则准备 :<系统异常告警>
}
注,上述伪代码没有将 Timeout 考虑进去,它应是在最外面套一层。