我是如何构建一个“一劳永逸”的同步系统的
产品价格会在很多地方发生变化。它们可能在管理后台(admin panel)中更改,通过批量导入更改,或者通过 API webhooks 更改。
如果你想将这些更改同步到外部市场,你会面临一个问题。在每一个代码路径中都添加同步调用是一个错误。你会漏掉一个,或者搞坏一个。维护工作将变成一场噩梦。
Django signals 可以解决这个问题。你可以挂载(hook)到模型的保存事件(save event)上。这样就能在一个地方捕获所有的更改。
但 signals 有一个缺陷。如果你一次性更新 100 个价格,signal 会触发 100 次。这会引发 100 次 API 调用。你会触及频率限制(rate limits)或浪费资源。
我使用一种三部分构成的模式来解决这个问题:
• 一个不立即执行操作、而是收集 ID 的 signal handler。
• 一个用于去重的线程级集合(per-thread set)。
• 一个使用 transaction.on_commit 的 flush 回调,用于一次性处理所有内容。
以下是它的工作原理:
使用
threading.local()不要使用全局变量。全局变量会在不同请求之间共享状态,这会导致数据泄露。threading.local()可以将数据隔离在单个线程中。只记录,不执行 signal handler 只是简单地将产品 ID 添加到一个集合(set)中。然后它告诉 Django,只有在数据库事务成功后才运行 flush 函数。这可以防止同步那些保存失败的数据。
批量处理工作 当事务提交时,flush 函数会运行。它会复制该集合并将其清空。然后,它将整个 ID 列表发送到服务层(service layer)。
服务层执行一次批量查询(bulk query)来获取所有产品。它按商店对产品进行分组。最后,它为每个商店向 Celery 发送一个单一任务。
优势显而易见:
- 自动去重。集合(set)会为你处理好一切。
- 内置事务安全性。你永远不会同步已回滚的数据。
- 高效率。你避免了 N+1 查询问题。
- 高可靠性。如果 API 调用失败,Celery 会处理重试。
你只需构建一次系统。之后添加的每一个新功能都会自动与该同步系统协同工作。
你在 Django 中是如何处理外部 API 同步的?你是使用 signals 还是其他的模式?
来源:https://dev.to/acel/how-i-built-a-set-it-and-forget-it-sync-system-with-django-signals-2ld7