网站首页 > 技术教程 正文
Workflow 引擎主要应用于平台的运维作业之作业编排以及工单系统的审批流程,本次分享的workflow引擎为极简版,仅与被执行的作业任务有关,方便根据实际需求进行扩展或者二次开发。
Workflow业务逻辑如下:假设有 a、b、c、d 这 4 个作业任务,运行条件为:在 a 执行成功之后执行 b,b 执行成功之后执行 c,c 执行成功之后执行 d,d 执行成功则任务结束(当然,任务的执行顺序是能够随意编排的)。倘若 a、b、c 中的任何一个执行失败,那么后面的任务究竟是否要继续执行,又或者全局设置作用于所有任务之上,执行失败时是停止还是继续执行,亦皆可。
import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"sync"
)
// DFlow 分布式workflow
type DFlow struct {
RC *redis.Client
Ctx context.Context
LockKey string
Func map[string]Func
Depend map[string][]string
FuncForce map[string]bool
Force bool
}
// workflow引擎
type flowCore struct {
fu map[string]*flowStruct
}
type Func func(interface{}) (interface{}, error)
type flowStruct struct {
Deps []string
Ctr int
Fn Func
C chan error
Res interface{}
force bool //是否强制
once sync.Once
}
// workflow节点已执行
func (fs *flowStruct) done(e error) {
defer func() {
if r := recover(); r != nil {
Log.Error(errors.New(fmt.Sprint(r)))
}
}()
for i := 0; i < fs.Ctr; i++ {
fs.C <- e
}
}
// 关闭workflow节点channel
func (fs *flowStruct) close() {
defer func() {
if r := recover(); r != nil {
Log.Error(errors.New(fmt.Sprint(r)))
}
}()
fs.once.Do(func() {
close(fs.C)
})
}
// 创建workflow
func create() *flowCore {
return &flowCore{
fu: make(map[string]*flowStruct),
}
}
// 增加workflow节点
func (flw *flowCore) add(name string, d []string, fn Func, fc bool) *flowCore {
flw.fu[name] = &flowStruct{
Deps: d,
Fn: fn,
Ctr: 1,
force: fc,
}
return flw
}
// 执行workflow节点
func (flw *flowCore) start(ctx context.Context) map[string]error {
defer func() {
if r := recover(); r != nil {
Log.Error(errors.New(fmt.Sprint(r)))
}
}()
result := map[string]error{}
for name, fn := range flw.fu {
for _, dep := range fn.Deps {
// prevent self depends
if dep == name {
return map[string]error{name: errors.New(name + " not depends of it self")}
}
// prevent no existing dependencies
if _, exists := flw.fu[dep]; exists == false {
return map[string]error{name: errors.New(dep + " not exists")}
}
flw.fu[dep].Ctr++
}
}
for name, fs := range flw.fu {
fs.C = make(chan error)
func(ctx context.Context, name string, fs *flowStruct) {
do := true
defer func() {
if r := recover(); r != nil {
fmt.Println(r)
}
select {
case <-ctx.Done():
fs.close()
}
}()
if len(fs.Deps) > 0 {
for _, dep := range fs.Deps {
err, ok := <-flw.fu[dep].C
if !fs.force && (err != nil || !ok) {
do = false
}
}
}
if do {
//匹配pipeline条件
if len(fs.Deps) == 1 {
fs.Res, err = fs.Fn(flw.fu[fs.Deps[0]].Res)
result[name] = err
} else {
fs.Res, err = fs.Fn(nil)
result[name] = err
}
fs.done(result[name])
}
}(ctx, name, fs)
}
return result
}
// Run workflow
func (df *DFlow) Run() map[string]error {
lock := SyncMutex{LockKey: df.LockKey}
//加锁
if lock.Lock() {
defer func() {
// 释放锁
lock.UnLock(true)
}()
defer func() {
if r := recover(); r != nil {
Log.Error(errors.New(fmt.Sprint(r)))
}
}()
var force bool
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fl := create()
for k, v := range df.Depend {
//默认使用全局配置
force = df.Force
if df.FuncForce != nil {
_, ok := df.FuncForce[k]
if ok {
// 单独配置优先
force = df.FuncForce[k]
}
}
fl.add(k, v, df.Func[k], force)
}
return fl.start(ctx)
}
return nil
}
测试用例
import (
"errors"
"fmt"
"inner/modules/common"
)
type test struct {
}
func (t *test) a(interface{}) (interface{}, error) {
fmt.Println("a")
fmt.Println("==========")
return "a ok", nil
}
func (t *test) b(i interface{}) (interface{}, error) {
fmt.Println(i)
fmt.Println("b")
fmt.Println("==========")
return "b ok", nil
}
func (t *test) c(i interface{}) (interface{}, error) {
fmt.Println(i)
fmt.Println("c")
fmt.Println("==========")
return nil, errors.New("c error")
}
func (t *test) d(i interface{}) (interface{}, error) {
fmt.Println(i)
fmt.Println("d")
fmt.Println("==========")
return "d ok", nil
}
func init() {
rc, ctx := common.RedisConnect() //使用redis做分布式锁
t := test{}
Func := map[string]common.Func{"a": t.a, "b": t.b, "c": t.c, "d": t.d} //作业任务
Depend := map[string][]string{"a": {}, "b": {"a"}, "c": {"b"}, "d": {"c"}} //作业依赖关系
FuncForce := map[string]bool{"a": true, "b": true, "c": false, "d": false} //作业执行失败是否继续执行
df := common.DFlow{RC: rc, Ctx: ctx, LockKey: "workflow_test", Func: Func, Depend: Depend, FuncForce: FuncForce}
result := df.Run()
fmt.Println(result)
}
猜你喜欢
- 2025-05-10 DevExpress v15.1新版亮点XAF控件升级
- 2025-05-10 m4 mac mini部署ComfyUI,测试Flux-dev-GGUF的workflow模型10步出图
- 2025-05-10 DocuWare Workflow Manager(工作流管理器)
- 2025-05-10 技巧:利用 Launcher 或 Workflow 快速开启微信扫一扫
- 2025-05-10 搜狗开源srpc:自研高性能通用RPC框架
- 2025-05-10 一种基于编排的Workflow工作流设计方案
- 2025-05-10 Apple Watch适配应用Workflow已上架
- 2025-05-10 技巧:用 Workflow 来做带壳截图(带壳截图工具)
- 2025-05-10 一个集合若干 Workflow 实例的非官方 Workflow Gallery 网站
- 2025-05-10 Workflow 1.3 通知中心小部件体验与简单实例分享
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- sd分区 (65)
- raid5数据恢复 (81)
- 地址转换 (73)
- 手机存储卡根目录 (55)
- tcp端口 (74)
- project server (59)
- 双击ctrl (55)
- 鼠标 单击变双击 (67)
- debugview (59)
- 字符动画 (65)
- flushdns (57)
- ps复制快捷键 (57)
- 清除系统垃圾代码 (58)
- web服务器的架设 (67)
- 16进制转换 (69)
- xclient (55)
- ps源文件 (67)
- filezilla server (59)
- 句柄无效 (56)
- word页眉页脚设置 (59)
- ansys实例 (56)
- 6 1 3固件 (59)
- sqlserver2000挂起 (59)
- vm虚拟主机 (55)
- config (61)
本文暂时没有评论,来添加一个吧(●'◡'●)