我是如何构建一个“一劳永逸”的同步系统的

产品价格会在很多地方发生变化。它们可能在管理后台(admin panel)中更改,通过批量导入更改,或者通过 API webhooks 更改。

如果你想将这些更改同步到外部市场,你会面临一个问题。在每一个代码路径中都添加同步调用是一个错误。你会漏掉一个,或者搞坏一个。维护工作将变成一场噩梦。

Django signals 可以解决这个问题。你可以挂载(hook)到模型的保存事件(save event)上。这样就能在一个地方捕获所有的更改。

但 signals 有一个缺陷。如果你一次性更新 100 个价格,signal 会触发 100 次。这会引发 100 次 API 调用。你会触及频率限制(rate limits)或浪费资源。

我使用一种三部分构成的模式来解决这个问题:

• 一个不立即执行操作、而是收集 ID 的 signal handler。 • 一个用于去重的线程级集合(per-thread set)。 • 一个使用 transaction.on_commit 的 flush 回调,用于一次性处理所有内容。

以下是它的工作原理:

  1. 使用 threading.local() 不要使用全局变量。全局变量会在不同请求之间共享状态,这会导致数据泄露。threading.local() 可以将数据隔离在单个线程中。

  2. 只记录,不执行 signal handler 只是简单地将产品 ID 添加到一个集合(set)中。然后它告诉 Django,只有在数据库事务成功后才运行 flush 函数。这可以防止同步那些保存失败的数据。

  3. 批量处理工作 当事务提交时,flush 函数会运行。它会复制该集合并将其清空。然后,它将整个 ID 列表发送到服务层(service layer)。

服务层执行一次批量查询(bulk query)来获取所有产品。它按商店对产品进行分组。最后,它为每个商店向 Celery 发送一个单一任务。

优势显而易见:

你只需构建一次系统。之后添加的每一个新功能都会自动与该同步系统协同工作。

你在 Django 中是如何处理外部 API 同步的?你是使用 signals 还是其他的模式?

来源:https://dev.to/acel/how-i-built-a-set-it-and-forget-it-sync-system-with-django-signals-2ld7