Celery 4.1.0 版本定时任务执行时间 BUG

文章目录

为什么使用celery代替crontab?

关于Celery,他是一个分布式的任务队列。http://docs.celeryproject.org/en/latest/index.html

项目中有较多的定时任务,开始是用crontab实现的,考虑到未来的拓展和管理,决定使用celery的beat来实现定时任务。

虽然对于简单的周期性定时任务使用crontab就能做到,但是一旦需要处理的任务变多需要分布式处理时crontab不便于管理。

celery通过消息队列来实现分布式可以保证定时任务不被重复执行,消息队列已经为你自动实现了同步机制。例外也可以打破crontab的每分钟才执行一次的速度限制,只要有任务消息到来我就能处理,可以使任务更加高效的被执行。使用消息队列还有一个好处是其监控和管理的工具都很多,可以更加方便的知道任务执行的情况。

BUG 描述

但是在使用celery beat做定时任务的过程中碰到了celery的bug,在此记录一下。

当前使用pip安装的celery,默认是安装的最新版本4.1.0,但是在这个版本中在获取当前时间的逻辑中存在bug,会导致定时任务配置后并不能在指定的时间被执行。

有如下定时任务配置:

timezone = 'Asia/Shanghai'

beat_schedule = {
    'save_into_influxdb': {
        'task': 'tasks.calc_busi_capacity.save',
        'schedule': crontab(hour=0, minute=30)
    }
}

会在每天的0点30分执行一个写入数据到influxdb的任务。但是发现到了0点30分任务并没有被执行,经过确认,在使用hour和minute参数时,任务并不会在指定时间被执行,但是如果设置为每N分钟执行一次却是正常的,到了小时级别无法运行排除是我们自己程序代码的问题,剩下能想到的就只有时区问题了,但是机器上的时间和代码里面的时区都没有问题,那可能是celery本身的问题了。

于是搜了一下celery beat execute time的关键字,看到https://github.com/celery/celery/issues/4177这个issue里讨论的就是我们遇到的问题,原来是celery打的最新的这个版本代码里面有bug。就是celery在获取当前时间时使用了错误的方法导致计算下一次任务执行的时间时错误的bug,所以在我们指定的时间任务才没有被执行,他会被执行,但不是在我们想要的那个时间。

当前默认pip install celery安装的是 4.1.0 版本,也是celery目前pypi上的最新版,github上看源码tag可以知道上一个版本是4.0.2,在最新的master分支的代码和4.0.2版本的代码里面是没有这个bug的。在内网环境里要避免触发这个bug最简单的办法就是使用celery 4.0.2 代替 4.1.0 ,所以只需在pip安装时指定版本(pip install celery==4.0.2)或者在requirements里面指定版本为4.0.2即可。

原因分析

因为是celery beat没有在正确的时间发送任务,所以从4.1.0的setup.py中我们可以看到

命令行启动是来自celery.__main__:main 的代码。

__main__.py中看到所有命令都来自bin目录下celery.py

celery会根据命令行参数来调用真正的beat命令

这个beat就是self.app.Beat 通过使用setup_app_from_commandline 读取命令行参数进行配置,我们代码中的Celery就是app.base.Celery ,我们将Celery实例化后就是这里的self.app

self.app.Beat在它的run方法中会调用start_schedule 方法来启动 beat.Service

启动后会进入循环通过tick方法计算执行时间

在tick方法中调用了reserve 通过next 生成到下一次需要执行的任务。

next调用_next_instance 返回实例,参数有last_run_at默认值为self.app.now()

终于找到根源了。查看app.base.Celery的now() ,在最新的master代码中使用的是将utc时间使用astimezone方法转换为datetime

而在4.1.0的代码中只是对datetime的tzinfo进行了replace

def now(self):
    """Return the current time and date as a datetime."""
    from datetime import datetime
    return datetime.utcnow().replace(tzinfo=self.timezone)

在4.0.2中是调用的loaders中的now()

def now(self, utc=True):
    if utc:
        return datetime.utcnow()
    return datetime.now()

在master最新代码中是使用的astimezone对utc时间转成datetime

def now(self):
    """Return the current time and date as a datetime."""
    now_in_utc = to_utc(datetime.utcnow())
    return now_in_utc.astimezone(self.timezone)

看一下astimezone和replace的区别

In [31]: from celery.utils.time import timezone, to_utc

In [32]: from datetime import datetime

In [33]: tz = timezone.get_timezone('Asia/Shanghai')

In [34]: un = datetime.utcnow()

In [35]: print un
2017-12-27 07:46:08.824000

In [36]: print datetime.now()
2017-12-27 15:46:36.228000

In [37]: print to_utc(un).astimezone(tz)
2017-12-27 15:46:08.824000+08:00

In [38]: print un.replace(tzinfo=tz)
2017-12-27 07:46:08.824000+08:06

In [39]: un
Out[39]: datetime.datetime(2017, 12, 27, 7, 46, 8, 824000)

In [40]: un.replace(tzinfo=tz)
Out[40]: datetime.datetime(2017, 12, 27, 7, 46, 8, 824000, tzinfo=<DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>)

In [41]: to_utc(un).astimezone(tz)
Out[41]: datetime.datetime(2017, 12, 27, 15, 46, 8, 824000, tzinfo=<DstTzInfo 'Asia/Shanghai' CST+8:00:00 STD>)

In [42]: tz
Out[42]: <DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>

为什么使用replace会得到错误的时间呢,因为celery在这里它做了对时区的处理, 但是 使用replace只会修改时区,但对时刻并没有修改为对应时区的时刻,所以他把utc的当前时间变成了我们当前所在时区的当前时间 , 所以慢了8小时,任务会等到第二天才会开始执行,所以我们在我们设置的时间看不到任务被执行。

而正确的将带有时区信息的对象转成datetime的方式应该使用astimezone,他会对utc的偏移时间进行处理。

def astimezone(self, tz):
    if self.tzinfo is tz:
        return self
    # Convert self to UTC, and attach the new time zone object.
    utc = (self - self.utcoffset()).replace(tzinfo=tz)
    # Convert from UTC to tz's local time.
    return tz.fromutc(utc)

Python官方文档也说了:

If you merely want to attach a time zone object tz to a datetime dt without adjustment of date and time data, use dt.replace(tzinfo=tz).


也可以看看


全国大流量卡免费领

19元月租ㆍ超值优惠ㆍ长期套餐ㆍ免费包邮ㆍ官方正品