分享免费的编程资源和教程

网站首页 > 技术教程 正文

opsone智能运维平台分享系列之workflow引擎

goqiw 2025-05-10 21:53:07 技术教程 2 ℃ 0 评论

Workflow 引擎主要应用于平台的运维作业之作业编排以及工单系统的审批流程,本次分享的workflow引擎为极简版,仅与被执行的作业任务有关,方便根据实际需求进行扩展或者二次开发。

Workflow业务逻辑如下:假设有 a、b、c、d 这 4 个作业任务,运行条件为:在 a 执行成功之后执行 b,b 执行成功之后执行 c,c 执行成功之后执行 d,d 执行成功则任务结束(当然,任务的执行顺序是能够随意编排的)。倘若 a、b、c 中的任何一个执行失败,那么后面的任务究竟是否要继续执行,又或者全局设置作用于所有任务之上,执行失败时是停止还是继续执行,亦皆可。

import (
    "context"
    "errors"
    "fmt"
    "github.com/redis/go-redis/v9"
    "sync"
)

// DFlow 分布式workflow
type DFlow struct {
    RC        *redis.Client
    Ctx       context.Context
    LockKey   string
    Func      map[string]Func
    Depend    map[string][]string
    FuncForce map[string]bool
    Force     bool
}

// workflow引擎
type flowCore struct {
    fu map[string]*flowStruct
}

type Func func(interface{}) (interface{}, error)

type flowStruct struct {
    Deps  []string
    Ctr   int
    Fn    Func
    C     chan error
    Res   interface{}
    force bool //是否强制
    once  sync.Once
}

// workflow节点已执行
func (fs *flowStruct) done(e error) {
    defer func() {
       if r := recover(); r != nil {
          Log.Error(errors.New(fmt.Sprint(r)))
       }
    }()
    for i := 0; i < fs.Ctr; i++ {
       fs.C <- e
    }
}

// 关闭workflow节点channel
func (fs *flowStruct) close() {
    defer func() {
       if r := recover(); r != nil {
          Log.Error(errors.New(fmt.Sprint(r)))
       }
    }()
    fs.once.Do(func() {
       close(fs.C)
    })
}

// 创建workflow
func create() *flowCore {
    return &flowCore{
       fu: make(map[string]*flowStruct),
    }
}

// 增加workflow节点
func (flw *flowCore) add(name string, d []string, fn Func, fc bool) *flowCore {
    flw.fu[name] = &flowStruct{
       Deps:  d,
       Fn:    fn,
       Ctr:   1,
       force: fc,
    }
    return flw
}

// 执行workflow节点
func (flw *flowCore) start(ctx context.Context) map[string]error {
    defer func() {
       if r := recover(); r != nil {
          Log.Error(errors.New(fmt.Sprint(r)))
       }
    }()
    result := map[string]error{}
    for name, fn := range flw.fu {
       for _, dep := range fn.Deps {
          // prevent self depends
          if dep == name {
             return map[string]error{name: errors.New(name + " not depends of it self")}
          }
          // prevent no existing dependencies
          if _, exists := flw.fu[dep]; exists == false {
             return map[string]error{name: errors.New(dep + " not exists")}
          }
          flw.fu[dep].Ctr++
       }
    }
    for name, fs := range flw.fu {
       fs.C = make(chan error)
       func(ctx context.Context, name string, fs *flowStruct) {
          do := true
          defer func() {
             if r := recover(); r != nil {
                fmt.Println(r)
             }
             select {
             case <-ctx.Done():
                fs.close()
             }
          }()
          if len(fs.Deps) > 0 {
             for _, dep := range fs.Deps {
                err, ok := <-flw.fu[dep].C
                if !fs.force && (err != nil || !ok) {
                   do = false
                }
             }
          }
          if do {
             //匹配pipeline条件
             if len(fs.Deps) == 1 {
                fs.Res, err = fs.Fn(flw.fu[fs.Deps[0]].Res)
                result[name] = err
             } else {
                fs.Res, err = fs.Fn(nil)
                result[name] = err
             }
             fs.done(result[name])
          }
       }(ctx, name, fs)
    }
    return result
}

// Run workflow
func (df *DFlow) Run() map[string]error {
    lock := SyncMutex{LockKey: df.LockKey}
    //加锁
    if lock.Lock() {
       defer func() {
          // 释放锁
          lock.UnLock(true)
       }()
       defer func() {
          if r := recover(); r != nil {
             Log.Error(errors.New(fmt.Sprint(r)))
          }
       }()
       var force bool
       ctx, cancel := context.WithCancel(context.Background())
       defer cancel()
       fl := create()
       for k, v := range df.Depend {
          //默认使用全局配置
          force = df.Force
          if df.FuncForce != nil {
             _, ok := df.FuncForce[k]
             if ok {
                // 单独配置优先
                force = df.FuncForce[k]
             }
          }
          fl.add(k, v, df.Func[k], force)
       }
       return fl.start(ctx)
    }
    return nil
}

测试用例

import (
    "errors"
    "fmt"
    "inner/modules/common"
)

type test struct {
}

func (t *test) a(interface{}) (interface{}, error) {
    fmt.Println("a")
    fmt.Println("==========")
    return "a ok", nil
}
func (t *test) b(i interface{}) (interface{}, error) {
    fmt.Println(i)
    fmt.Println("b")
    fmt.Println("==========")
    return "b ok", nil
}
func (t *test) c(i interface{}) (interface{}, error) {
    fmt.Println(i)
    fmt.Println("c")
    fmt.Println("==========")
    return nil, errors.New("c error")
}
func (t *test) d(i interface{}) (interface{}, error) {
    fmt.Println(i)
    fmt.Println("d")
    fmt.Println("==========")
    return "d ok", nil
}
func init() {
    rc, ctx := common.RedisConnect() //使用redis做分布式锁
    t := test{}
    Func := map[string]common.Func{"a": t.a, "b": t.b, "c": t.c, "d": t.d}     //作业任务
    Depend := map[string][]string{"a": {}, "b": {"a"}, "c": {"b"}, "d": {"c"}} //作业依赖关系
    FuncForce := map[string]bool{"a": true, "b": true, "c": false, "d": false} //作业执行失败是否继续执行
    df := common.DFlow{RC: rc, Ctx: ctx, LockKey: "workflow_test", Func: Func, Depend: Depend, FuncForce: FuncForce}
    result := df.Run()
    fmt.Println(result)
}

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表