您现在的位置是:首页 > 正文

Go 异步任务

2023-11-06 15:35:00阅读 28

Go 异步任务

异步任务在开发中很常见,用来做解耦。本文介绍一下异步队列的实现的几个问题,并且结合三方库实现来分析。

有下面的几个关键点:

  1. 用户代码(任务)如何封装
  2. 数据的存放(数据存放在哪里?就是一个读取队列)
  3. worker的管理(worker的数量,worker执行是否支持超时时间,worker的异常恢复)

带着上面的问题,对比https://github.com/golang-queue/queue的实现,说明一下。

用户代码如何封装

对于任务来说,最重要的是 函数操作,也就是对应的代码逻辑。go中是可以将方法作为参数传递的,方法也是一种类型。所以我们定义下面的方法,方法签名如下:

type TaskFunc func(ctx context.Context) error

还可以配置方法的callback逻辑,比如重试次数,重试间隔,重试error的判断等等

抽象出一个结构体来表示

https://github.com/golang-queue/queue/blob/master/job/job.go#L15
在这里插入图片描述

数据的存放

这是很好拓展的地方,可以支持多种存储媒介和中间件,比如基于内存实现的循环队列,redis,rocketmq。

在实现上就是接口抽象功能,依赖倒转。接口有下面的两个功能

  1. 存数据
  2. 取数据

https://github.com/golang-queue/queue/blob/master/core/worker.go
在这里插入图片描述

解释一下QueuedMessage接口和Worker中的Run方法

  1. QueuedMessage

    用来做数据转换的。

  2. Run

    用来执行函数,表示执行的任务。

worker的管理

worker管理涉及到下面几个方面

  1. worker的数量限制
  2. worker执行时候的超时时间
  3. worker执行时候的异常panic
  4. workder从队列中获取需要处理的处理,并且支持请求超时操作
  5. 服务关闭之后worker也需要操作

我们来看golang-queue/queue中的实现是什么?

通过metric来记录queue在运行期间具体的情况

https://github.com/golang-queue/queue/blob/master/metric.go#L20

在这里插入图片描述

并且通过 channel 来做限制。

每次在goroutine启动和停止的时候通过metric来计数。并且会调用schedule来发信号,给ready发送信号。

goroutine在启动的时候会select ready。

在这里插入图片描述

work的异常情况,在调用task的处理函数的时候,肯定要用到defer来做error恢复,并且通过channel来通信,context来实现超时控制。

具体的原理,我们从下面的代码开始来分析。

https://github.com/golang-queue/queue/blob/master/queue.go#L285

// 对于start来说,是一个死循环,会启动一个goroutine从work中获取数据,当前goroutine等待结果,并且启动goroutine来执行,此Goroutine叫做worker。
func (q *Queue) start() {
	// QueuedMessage 表示message
	tasks := make(chan core.QueuedMessage, 1)
	// 启动一个goroutine来处理任务
	// 从work中获取任务,并且启动一个goroutine来处理任务
	for {
		// check worker number
    // 做调度的,就是检查work的数量
		q.schedule()
		
    // 数量不够,需要堵塞
		select {
		// wait worker ready
		case <-q.ready:
		case <-q.quit:
			return
		}

	// 启动一个goRoutine从 work中获取数据
		q.routineGroup.Run(func() {
			for {
				// 从队列中获取一个请求
				t, err := q.worker.Request()
				// 没有消息,或者有错误
				if t == nil || err != nil {
					// 有错误
					if err != nil {
						select {
              // 队列退出,关闭掉task,
						case <-q.quit:
							if !errors.Is(err, ErrNoTaskInQueue) {
								close(tasks)
								return
							}
              // 等待一秒再次从work中抓取新数据
						case <-time.After(time.Second):
							// sleep 1 second to fetch new task
						}
					}
				}
				if t != nil { // 说明取到了消息
					tasks <- t
					return
				}
				// 说明t为nil但是没有错误
				select {
				case <-q.quit:
					if !errors.Is(err, ErrNoTaskInQueue) {
						close(tasks)
						return
					}
				default:
				}
			}
		})
		// 这就是从queue中获取一个task,之后将此task提交给work来实现
		task, ok := <-tasks
		if !ok {
			return
		}
		// 所以,这里并没有维护所谓的goroutine池,因为go的编程是不需要这些玩意的。goroutine已经很轻量级的了,直接提交运行就好了
		// start new task
		q.metric.IncBusyWorker()
		q.routineGroup.Run(func() {
			q.work(task)
		})
	}
}

func (q *Queue) work(task core.QueuedMessage) {
	var err error
	// 来处理一些内部的错误,在这里会减去worker的数量,并且重新schedule
	defer func() {
		q.metric.DecBusyWorker()
		e := recover()
		if e != nil {
			q.logger.Errorf("panic error: %v", e)
		}
		q.schedule()

		// increase success or failure number
		if err == nil && e == nil {
			q.metric.IncSuccessTask()
		} else {
			q.metric.IncFailureTask()
		}
	}()
	// 运行任务,可以看到这里的代码就是为了包装一下
	if err = q.run(task); err != nil {
		q.logger.Errorf("runtime error: %s", err.Error())
	}
}

func (q *Queue) run(task core.QueuedMessage) error {
	data := task.(*job.Message)
	if data.Task == nil {
		data = job.Decode(task.Bytes())
		data.Data = data.Payload
	}

	return q.handle(data)
}

func (q *Queue) handle(m *job.Message) error {
	// create channel with buffer size 1 to avoid goroutine leak
	// 这是go中很创建的做法,一个channel中有数据,但并没有被其他的任何的goroutine操作的话,也是会被gc掉的
	done := make(chan error, 1) // 完成的信号channel
	panicChan := make(chan interface{}, 1) // panic的channel
	startTime := time.Now() 
	ctx, cancel := context.WithTimeout(context.Background(), m.Timeout)
	defer func() {
		cancel()
	}()

	// run the job 启动goroutine来运行一个job
	go func() {
		// handle panic issue
		defer func() {
			if p := recover(); p != nil {
				panicChan <- p
			}
		}()

		// run custom process function
		var err error
		// 做重试逻辑,这里的重试逻辑还可以指定重试的错误,比如as,基于那种类型的错误来做重试操作等。
		b := &backoff.Backoff{
			Min:    m.RetryMin,
			Max:    m.RetryMax,
			Factor: m.RetryFactor,
			Jitter: m.Jitter,
		}
		delay := m.RetryDelay
   // backoff都是通过for循环来做的
	loop:
		for {
      // 两种形式,一种是直接function,一直是通过message
			if m.Task != nil {
				err = m.Task(ctx)
			} else {
				err = q.worker.Run(ctx, m)
			}

 	    // 不需要重试就直接返回,如果有错误就开始重试,并且利用time来做重试时间的控制
			if err == nil || m.RetryCount == 0 {
				break
			}
			m.RetryCount--

			if m.RetryDelay == 0 {
				delay = b.Duration()
			}
			// 这里用select来做操作
			select {
			case <-time.After(delay): // retry delay
				q.logger.Infof("retry remaining times: %d, delay time: %s", m.RetryCount, delay)
			case <-ctx.Done(): // timeout reached // ctx完成就直接返回
				err = ctx.Err()
				break loop
			}
		}

		done <- err
	}()
	// 当前的goroutine在等待结果,
	select {
	case p := <-panicChan:
		panic(p)
	case <-ctx.Done(): // timeout reached
		return ctx.Err()
	case <-q.quit: // shutdown service
		// cancel job
		cancel()

		leftTime := m.Timeout - time.Since(startTime)
		// wait job
		select {
		case <-time.After(leftTime):
			return context.DeadlineExceeded
		case err := <-done: // job finish
			return err
		case p := <-panicChan:
			panic(p)
		}
	case err := <-done: // job finish
		return err
	}
}

有个问题,如何保证程序退出的时候这些work可以执行结束呢?利用waitGroup实现。

https://github.com/golang-queue/queue/blob/master/thread.go

在这里插入图片描述

文章来源:https://blog.csdn.net/daliucheng/article/details/132178765
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:https://www.dflian.com/946.html

网站文章

  • 【web前端】web前端设计入门到实战第一弹——html基础精华

    【web前端】web前端设计入门到实战第一弹——html基础精华

    一:图片属性 二:音频标签 三: 视频标签 四:链接标签 五:列表标签 5.1.无序列表 5.2.有序列表 5.3.自定义列表 六:表格 6.1合并单元格 七:input标签 八:select系列 九: 文本域标签 十:label标签十一:语义化标签 十二:字符实体

    2023-11-06 15:34:57
  • SpringBoot如何自动生成实体类和Dao层以及映射文件

    SpringBoot如何自动生成实体类和Dao层以及映射文件 一、首先添加自动生成代码插件 org.mybatis.generator ...

    2023-11-06 15:34:51
  • Java中的接口和抽象类有什么区别?

    Java中的接口和抽象类有什么区别?

    接口和抽象类是 Java 面向对象设计的两个基础机制。接口是对行为的抽象,它是抽象方法的集合,利用接口可以达到 API 定义和实现分离的目的。接口,不能实例化;不能包含任何非常量成员,任何 field...

    2023-11-06 15:34:45
  • Java的日期与时间java.time.Duration的简介说明

    Java的日期与时间java.time.Duration的简介说明

    2023-11-06 15:34:41
  • WEB编程:期末展示

    WEB编程:期末展示

    期末大作业展示:一、项目要求1、用户可注册登录网站,非注册用户不可登录查看数据2、用户注册、登录、查询等操作记入数据库中的日志3、爬虫数据查询结果列表支持分页和排序4、用Echarts或者D3实现3个...

    2023-11-06 15:34:39
  • sklearn 内置数据集-威斯康星州乳腺癌数据集

    威斯康星州乳腺癌数据集是scikit-learn(sklearn)库中一个常用的内置数据集,用于分类任务。该数据集包含了从乳腺癌患者收集的肿瘤特征的测量值,以及相应的良性(benign)或恶性(mal...

    2023-11-06 15:34:34
  • CentOS7配置阿里yum源 超详细!!!

    CentOS7配置阿里yum源 超详细!!!

    centos7配置阿里yum源(奥里给)

    2023-11-06 15:34:32
  • Java基础篇

    Java基础篇

    史上最全最细致最完整的Java基础八股文,全篇文章图文并茂,每个知识点都有深入讲述,致力于让读者深刻理解Java基础,更好地应用面试!

    2023-11-06 15:34:27
  • Java-数据结构-并查集<二>

    Java-数据结构-并查集<二>

    并查集是一种树型的数据结构,用于处理一些不相交集合的合并及查询问题。并查集跟树有些类似,只不过她跟树是相反的。在树这个数据结构里面,每个节点会记录它的子节点。在并查集里,每个节点会记录它的父节点【1】...

    2023-11-06 15:34:22
  • 100+个Java项目视频教程+源码+笔记,项目经验不用愁了!

    有很多朋友问我,说有没有项目可以分享,最近整理了一些项目,现在分享给大家,希望能帮助大家积累一些项目方面的经验。开源项目分享1、微信小程序开发【前端+后端(Java)】附完整源码地址:微信小程序开发【...

    2023-11-06 15:34:16