今天带着学习的目的花了半天时间看了一下 python 广泛使用的任务队列 celery ,看完感觉不少地方挺奇怪。我们部门任务因为削峰一直用的是自己写的任务队列,所以说实话其实一直也没用过 rabbitmq 这些。
网上相关信息很少不提了,stackoverflow 能找到几个问题基本都是好多年前的。
这些都是小问题,我感觉其他几个需要吐槽的地方
1 、2202 年了,asyncio 居然不是被原生支持的。
追踪最新任务 issue , github.com/celery/celery/issues/6552,显然异步支持不在当前官方列表里,甚至不在官方发布计划中,可以说是遥遥无期。社区当然是有一些曲折迂回的方法的,比如我看到 so 上有人说用async_to_sync做奇怪的封装
from asgiref.sync import sync_to_async

def task_to_async(task):
async def wrapper(args, *kwargs):
delay = 0.1
async_result = await sync_to_async(task.delay)(args, *kwargs)
while not async_result.ready():
await asyncio.sleep(delay)
delay = min(delay * 1.5, 2)
return async_result.get()
return wrapper

只能缓缓打出一个问号?且不提这 while not ready: sleep & try ,看了一下依赖的第三方库里实现是基于一个最大线程数为 1 的线程池维护事件循环,这又带来更多衍生问题,比如我如何在 task 间共享连接状态,比如 mysql 连接池?如果我要在异步任务里使用多线程又该如何管理?总之是问题多多。
再者不提消费者不能异步消费,生产者也不能异步生产,添加任务到任务队列的过程是同步的,也许这带来的延迟非常短暂,但是也许设计者认为该延迟应该忽略?看了看 celery 基于的通信模块是基于 socket 自己搞了个 selector ,自行维护的事件循环没有享受到任何现有生态的好处,纯 py 编写无法享受社区 libuv 版的好处,性能和可靠性都让人质疑。。总之这一个设计让用户代码整个又被拖回同步宇宙,也是问题多多。
2 、生产者不支持任务完成回调
对于一些常见的短任务的需求(处理时间小于 1s ),短的任务就不需要加入任务队列了吗?我觉得显然不是的,但是同样的,段任务的处理规范是否应该是返回一个任务状态,再由前端轮询执行结果?我感觉也未必,毕竟任务很短。所以我们部门网关调用时可以选择用异步通知,await 等待执行结果。而 celery 目前看起来网关想获取任务成功状态的话只能轮询,或者是在 worker 那边定义任务完成后再向队列加入一个子任务,你可以在子任务里用自定义的方式给网关一个回调。。。

生产者不支持任务完成回调,这个应该有比较多的解决方法:生产者想要回调只能直接等任务完全完成然后同步进行下一个操作;如果想像 js 那样,那可以把函数名传进去消费者,由消费者结束后再调用函数执行

因为历史原因,celery 开发的时候没有 asyncio ,改成 asyncio 估计要推到重建,这样就变成了遥遥无期

celery 是个传销软件,作者早就弃坑了。连个任务硬超时都没作好

这是 python 的问题还是 celery 的问题呢?

celey 自己实现了一个异步库 所以改造比较难,至于第二点自己封装一下就可以实现了

“2202 年了,asyncio 居然不是被原生支持的。“

不被原生支持的 python 库还是大多数的吧。。。asyncio 生态好了一些,但是指望随手找一个库都是原生支持 asyncio ,真的还有太长的路要走。

说白了就是 python 半路加入协程带来的阵痛

我只是很质疑,事件以及回文通知需要自己封装。还有问题又回到 worker 不支持异步调用,所以我需要在异步网关那里 await channel ,然后 worker 这边同步 publish 。过程中间的可靠性又有一大票需要维护的,那为什么我需要一个第三方任务队列呢

看不懂……
我一直以为 celery 既然都是离线任务,其实同步写法也可以啊

绝大多数计算库不做支持完全可以理解,基于 asgi 的 web 框架不做异步支持也可以理解,毕竟没有必要重新发明轮子,当然也有一些偏就发明了的,比如 flask 。甚至 sqlalchemy 之类的 ORM 不支持也可以理解,但是现在好像也不需要我理解了。现在 py 唯一指定任务队列,文档里第一句话写我是为了处理大量分布式任务存在的,它解决分布式任务的方式是让大家都用同步代码这。。

*wsgi

草,看了一下 contribute 列表还真是,原作者提交显示 2016 年就弃坑了,之后一直是社区在维护。我看他最近版本是 22 天前发布的还以为更新非常频繁

celery 从来就没好感。。。。python 里有 3 大骗子营销软件

1 是 flask
2 是 celery
3 是 gunicorn

其实好多实现都是玩具级别。当然别人是开源项目,没法指责作者什么,但是哪些写教程的把吹上天就不对了。

我觉得也不能说是传销吧,毕竟简单的东西就是容易流行,也容易被各种自媒体拿来洗稿吸粉,这也是没办法的事情。

另外初学者通过这些简单的东西入门其实也没问题,至于后面怎么办,那只能说愿意钻研的人总有办法找到更好的替代品或者更可以贡献代码让这些东西不再只是一个玩具

celery 就是个玩具级别的项目,谁用谁被坑

gunicorn 有什么问题吗,因为现在 fastapi 炒的火,fastapi 里的推荐部署方式都是 gunicorn

所以现在建议用的任务队列是什么呢?

要不试试 github.com/long2ice/rearq

fastapi 我也在用,比如它 http 日志如何规范搜集?

github.com/encode/uvicorn/issues/527

gunicorn 这货只要指定了 work ,就甩手啥都不管了。日志日志不管,被 SIGTERM 杀了也不管,感情 gunicorn 就管了个 fd ,管了个寂寞啊。

flask 也是骗子啊?咋啦?

uvicorn 我还真不知道,gunicorn 以前倒是做过,记不太清了,印象里是 logging.getlogger("gunicorn.err")这种感觉的,然后重写 handler

是的,全链路异步生态太差,需要各种妥协,我受不了就切到 nodejs 了,你这个需求可以看看 bullmq

我怎么看 fastapi 文档里面使用 uvicorn 呢

又是回复改变认知的一帖

本质上其实还是 python 的 asyncio 生态问题

比如 celery 搭配最多的 django, asyncio 支持也是混搭, 尤其你要搭配 channels , 要硬编码一堆 a**,sync_to_async 各种混用, 割裂得一批

所以你猜 celery 的命令行里为啥给了--eventlet 和--gevent 就是没给--asyncio, 就是压根没建议你用 asyncio 啊.........

对 gunicorn 还有个 handler 可以写。uvicorn 直接无视你定义的 handler 。。。他丫的日志就是一个写死的格式。连时间都木有。

github.com/encode/uvicorn/blob/master/uvicorn/_logging.py#L97

早年间还直接 async 关键字冲突,报错一大堆。。。。

还好吧我改造成本就 20 行代码左右

老哥可以一发,我们学习一下

celery 应该用的 kumbo 把? mq 库不支持 asyncio 你指望上代码支持?

python 的海量库是优势也是累赘...

所以说,老老实实学 openstack,就用 eventlet,折腾啥 asyncio/gevent 的。
另外,任务对了 rpc 之类的抄 openstack 它不香么?折腾个啥子 celery 、flask 、gunicorn

flask 有啥问题啊?

python 异步生态极其别扭,有些框架为了适配异步,用各种魔法来搞。异步还是转 go 吧,go 写异步是真的省事

关键现实中很多项目只用玩具做就够了

没毛病。但是我比较反感把玩具吹上天。

希望实事求是,有更多的人贡献把玩具调教得更好用,更好玩。

拔凉拔凉,刚学会 flask+celery+vue ,用 gunicorn 部署到 nginx 上,给 pdf 加页码的小工具,原来都是玩具,玩具。。。。

不是吧,我觉得 fastapi 比 flask 骗子营销多了……至少 flask 没吹过自己压根不存在的“高性能”。

我前几天把 python3.6 上好好的软件挪到 3.8 去运行也遇到 async 关键字的问题了哈哈哈哈

回复直接改变我认知了,
用了五六年 python ,gunicorn celery flask 都是玩具...
反正 web 后台性能差 那就都归类玩具...

flask 满世界吹自己 lightweight 啊。。其实一个 werkzeug 设计就比 django 复杂了。。(不对这句话负责。逃。。。。。。。。。。。。。

别乱说。是很多地方的设计。。。我没有一竿子全打死啊。。(逃。。。。。

我印象里生产部署是 gunicorn 套 uvicorn.workers.UvicornWorker

没什么改变认知的,玩不玩具另说,起码那位老哥吐槽的几个问题确实存在,作为生产级引擎日志格式不能改确实令人惊讶,同时没有子进程守护也同样是痛点

uvicorn 那日志, 好几次想清都没清干净, 想改改了不对... 不过倒是学到了 propagate 这个参数, 以前居然从来没用过这东西. 好在 Python 的库就算再难搞, 基本都可以魔改一下解决问题, 遇到 Cython 的才真麻烦

至于 celery... 前同事挺喜欢配合 Django 用的, 然后遇到了几次 worker block 主线程的一个常见问题, 后来交接给我看了几天直接全丢掉了, 从那之后能解耦到消息队列的, 基本不用任务队列, 耦合的东西越多越复杂, 一出问题找不到就头大. 现在更是能不用框架就不用框架了

all-in 了两年多协程, 痛苦并快乐着. 最舒服的地方是 CPU 利用率高了不少, 内存也很友好, 最烦躁的地方是写了几百个 to_thread 代码长的太难看了..... 有次 DOM 解析纯计算那种本来觉得速度很快可以不放 run_in_executor, 结果遇到两三万 tag 的 HTML 直接把主线程堵了, 调试好久才找到. 唉, 真羡慕 go 那种方式

自己用 redis 实现吧,celery 已经被我们抛弃了

没错 这货我们上次项目差点用了 哦写了个 demo 着实被恶心到了,一个上下文到处传递 连个切面注入都没有 真正的营销库

详细聊聊?

通过这个库,可以写一些类似于 go 的调度的... github.com/omnilib/aiomultiprocess

task_async_result: AsyncResult = AsyncResult(task_id)
result = task_async_result.get(timeout=3)

自动回复了两条忽略下,最简单就是这样实现

 from celery.result import AsyncResult
 task_async_result: AsyncResult = AsyncResult(task_id)
 result = task_async_result.get(timeout=3)

这库几个月前看到过, 能解决无法利用多核问题, 也能节省一些套路代码, 不过还是用不习惯, 以前自己代码里已经做过类似的操作了.

主要还是羡慕 go 那种管它协程线程进程的异步直接丢给 CSP 的原生体验, 吐槽一下, 感觉 py 有不少设计尾大不掉地, 勉强期待一下 3.11 但对协程易用性暂时不抱太大期望

这个确实,新生语言没有那么多包袱

消息队列的话, 不是有一个国人写的分布式消息队列库吗
不是很熟悉但是看起来好像还可以?

看了这么多讨论,似乎都是一些边角 case ,不是“核心设计”级别的不合格啊?
这样就轻易的说: 是玩具?

#51 之前见过 celery 相关博客下发现有人留言“写的太好了。但是本质还是 celery 设计的太复杂,我推荐一个国产的高性能、支持人物语言、任何框架的分布式消息队列库”,

就是那个作者自己写的脚本刷的评论。。。博客园 celery 相关的博文,都快被他刷完了

回复标记一下,celery 居然有这么不堪? 目前没遇到什么问题,有代替品或者有自己实现的大概思路可以分享一下?

dropbox 分享过一个任务系统的设计思路。celery 玩具不至于,不过架构复杂,导致加什么功能都比较难吧。而且 celery 自身已经有任务编排聚合功能,一般的任务队列还真是不能跟它比较。
至于 asyncio ,理论上自己写一个 worker 类是完全可以支持的。只不过很多人也不看源码,也不看文档。所以就觉得不行。反正 IO 复用,我是用过 gevent 的 worker 类,效果还不错。

#48 问题在于你这是同步代码,转到异步里难道开线程池?

读了几遍回复感觉你说的挺乱的,你说的可以是 asyncio 可以还是 gevent 可以,社区维护者 issue 里标记说 aio 无支持,我觉得你有解决方案可以放出来,解决全网网友痛点。

python 的 asyncio 生态基本没有靠谱的库(特指 web 开发),建议远离。。回到任务队列,喜欢 redis 的试试 huey ,喜欢 rabbitmq 的试试 dramatiq ,当然这俩相比于 celery 都有一些功能缺失,不过都可以克服。。

这么多人觉得 celery 还不错,可能这几年的确改进不少了吧。这里 8 一下有个老外用 celery 的故事

早年间有个网站叫 backtype ,就是把你在全世界 blog/twitter 等各个地方留的评论聚合在一个地方,这样你可以整理你的互联网踪迹,它家当年就是用了大量 celery 任务来跑数据(不奇怪),celery 问题在哪里呢,数据处理本身可能代码不复杂,复杂的是数据怎么可靠的进去,可靠的出来,数据之间依赖怎么处理,Celery 当时 简直烂透了。就简单的说失败重试都是不可靠的。可能新手连它异常出在哪一步都不清楚。需要写大量的业务之外的保障性代码。这作者就自己轮了一个基于 jvm 的框架,把异步任务当成一个分布式 stream 来整体管控,他的这个设计得到了极大的成功,被推崇为「 realtime hadoop 」,后来他把这个框架开源了,交给了 apache 孵化,成了 apache storm

nathanmarz.com/blog/history-of-apache-storm-and-lessons-learned.html

吐槽 py 的东西烂不是为了秀优越感而贬低它,是真的希望有改进,或者更好的替代出现。

dramatiq 我老早就推荐过了。。t/418343 不知道现在许可证授权问题有解不。

另外 8 一嘴,backtype 被 twitter 收购了,收购过程中, 因为作者的这个 celery 轮子吹得太好,twitter 展示了极大的兴趣,极大提高 backtype 被收购估值

During acquisition talks I announced Storm to the world by writing a post on BackType's tech blog. The purpose of the post was actually just to raise our valuation in the negotiations with Twitter. And it worked: Twitter became extremely interested in the technology, and when they did their tech due-diligence on us, the entire due-diligence turned into a big demo of Storm.
The post had some surprising other effects. In the post I casually referred to Storm as "the Hadoop of realtime", and this phrase really caught on. To this day people still use it, and it even gets butchered into "realtime Hadoop" by many people. This accidental branding was really powerful and helped with adoption later on.

所以喷 celery 烂,光拿出替代品是不够的,还要得会包装,会营销。

#59 现在是 LGPL 了

是全部都可以,gevent 有比较好的支持,asyncio 没有。但理论上你都可以简单写一个 worker 类来支持。几十行代码的事情,全网网友的痛点就过了。
既然 gevent 都能做,asyncio 的支持显然不是什么问题。问题绝大多数来自于跟其他类型的 worker 结合不好。
有 IO loop 的 worker 是不能做 CPU-bound 的任务,所以此类 worker 的使用有限制,你是遇到不能扩展还是什么情况?

celery 不是还不错,是社区比较久。资料相对较多。说它是玩具过了,但是说它写得多么好,我觉得也不是。
毕竟我看过里面的代码,简直让人觉得头大。
不过它很早就写了,现在要搞一个跟它一样多功能的,怕是要下一番功夫。至于你说的失败重试不可靠是怎么一会事能说说吗?
根据我的使用经验,它的失败重试还是比较靠谱的,前提是你的消息后端一定要基于 rabbitmq 。其他 backend 有些问题,比如 redis 。这事 celery 的社区跟 redis 有过直接的联系,不知道修得如何。

不是 celery 开发者说不支持就是不行。他们不支持不代表你不能写。主要是你想要实现到什么程度,要花多大力气。你在对应任务里面开一个 io loop ,自己等待这样行不能,算不算支持?
还是说要 celery 支持对 coroutine 类任务的调度?还是说社区要做到封装好,使用者感知不到,可以随便用一个装饰器把函数直接转变为 task 才算支持?
你都没有说具体的需求,那怎么给你解决方案?

它的失败重试还是比较靠谱的,前提是你的消息后端一定要基于 rabbitmq

这就是我最大的疑惑了。都有 rabbitmq 了,还需要 celery 干啥?就为了额外做了一个序列化+日志操作+函数里用装饰符标记为任务的 helper ? LZ 所有的问题,如果基于裸 rabbitmq 自己 pub/sub ,恐怕压根都不会成为问题。

不就是为了这些?什么都裸写,新写的架构做好一点都比 celery 来得强。那个代码看到头疼

确实,werkzeug 设计实在是……我自己写了一个 github.com/abersheeran/baize 真正轻量,没有多余共功能又刚好能直接用的程度。

我觉得你有什么方案大可以发出来,不必要说我没有明确需求,我的需求很明确,在 aio 框架下使用消息队列,你的代码能解决到什么程度这是由你决定的,你可以发出来让大家评断。我之所以让你发代码,是因为整个楼里还是有不少网友提出了建设性意见,而你的回复中直接将网友们评价为“很多人也不看源码,也不看文档所以才有问题”,并且你觉得这些问题很简单就可以解决。我觉得既然对你来说这并不需要高昂成本,比起楼里回复很多字,不如直接用你所描述的几行代码解决问题来得实在。

gist.github.com/neoblackcap/98c98b3ea66553cdf77976c4f7f7b6d2  显示 Gist 代码 

赞一个~

如果你确实追踪了楼内讨论的话,你的代码没有解决上文提出的两个问题,

首先是消费方,按照你的逻辑该消费者会在各个 worker 子进程创建独立的事件循环并执行,

1 、我提出的如何在 task 间共享状态,这是使用异步很基本的需求,如果我不希望每次协程调用都执行一次创建和销毁后端连接池的话。
2 、你目前所谓的解决方式是,在单个 worker 进程内创建、执行、结束事件循环,然后在开启下一个循环。所以一个事件循环的意义是仅为一个协程服务,并不能并发调度协程任务,所以使用协程的意义在哪里?

其次像楼上已经有人提到的,你的生产者依然在执行同步逻辑请求任务,所以他们如何被事件循环管理?如何获取任务完成的回调?我觉得在 2022 年使用异步网关不是什么罕见需求。

  1. 我的代码只能说解决 celery 能不能用 asyncio 的问题。
  2. 既然你调研过 celery ,你就会明白你所说的并不是什么不可能的事情,就是需要额外提供一个基于 asyncio 的并发池,可以通过继承 celery.concurrency.base.BasePool ,并实现对应的接口。既然 gevent 能做到,这显然是可以做到的。但是你所说的共享连接池,等资源共享就未必。celery 是分布式的,worker 可以分布在多台机器上,你的需求本身就跟它的设计大方向矛盾。
  3. 我理解你想要的回调应该是 rpc 式的回调,而不是在 worker 里面调用你的回调接口。你的回调是需要生产者配合的,哪怕现在 Python 绝大多数的人还是在使用基于 wsgi 的 web 框架。支持 rpc 式的回调,基本上就得在框架上面动手,不改支持不好。
  4. 如果你所说的异步生产应该是指这个生产的动作会被你调用者的 IO loop 所管控的话,那么就是跟上面有着一样的问题,那么应该在很长一段时间内 celery 也不会改,这个同步异步需求可以使用线程池绕过。
  5. 你的需求很好,但是这不是 celery 能解决或者解决好。这不是 celery 的问题,任何一个开源工具都没有说要对某一个人的需求负责。你如果觉得需求重要,有通用性的,那么你可以提交你的解决方案,或者提思路,又或者提供资金。我觉得退一万步讲,哪怕 celery 是个垃圾它也没有强制你使用。你完全可以选择其他方案,而不是发一个贴将它批判一番。自由软件是它已经提供源代码给你,授权你使用,修改的权利。一些个人需求,不代表就是社区的需求。

    /(ㄒoㄒ)/~~你甚至都不愿意点个 Star 凑个整