通讯录同步服务的演进
Table of Contents
背景 #
一家企业往往会使用钉钉或者企业微信或者飞书进行日常的沟通交流,同时HR也要使用其他平台处理工作,比如招聘、考勤、审批等等,这时候将不同平台的通讯录进行同步就成为了一种“刚需”。
前言 #
在这篇博客里,我会专注于记录通讯录同步服务的技术架构上的演进,因此不会展示业务上的细节。
同步流程 #
同步逻辑大概是这样:
- 对需要新增或者更新的部门进行新增或更新。
- 对需要新增或者更新的员工进行新增或更新。
- 对需要删除的员工进行删除。
- 同步部门leader。
- 对需要删除的部门进行删除。
这个流程的其他部分的顺序是不能变的,这是因为:
- 新增或者更新员工时,需要修改员工身上的部门信息,因此新增/更新部门要在新增/更新员工之前。
- 删除部门时要保证部门下的员工已删除,因此删除部门要在删除员工之后。
- 部门需要同步负责人信息,因此需要在同步完员工之后才能同步部门leader。
标识 #
第一次同步时只能通过名称或手机号进行关联,同步完后需要记录ID标识的映射关系,那么下次同步就可以直接使用ID进行同步。
对于部门来说,第一次同步会根据部门名称进行关联,以后再次同步就可以使用ID作为关联。
对于员工来说,第一次同步会使用员工手机号进行关联,以后再次同步就可以使用ID作为关联。
简称 #
在这篇博客中,我会使用以下简称来描述某种信息:
- 源数据:被同步的平台的通讯录数据。
- 目的数据:需要同步到的平台的通讯录数据。
了解完以上信息之后我们就可以开始查看技术架构的演进了。
第一版:平平无奇的CRUD #
在第一版,我们主要需要做这些:
- 对接两个平台的通讯录组件。
- 支持手动同步、定时同步和事件同步。
因为只需要对接两个平台,因此就没有考虑通用性——避免过早优化,因为业务规则是需要逐步完善的,过早的考虑整体很容易造成过度设计,以至于浪费大量的精力。
全量同步 #
手动同步和定时同步的逻辑是一样的,只是触发条件不同。基本流程是这样:
- 准备数据,包括用户的配置数据、两个平台的部门和员工数据以及关联的ID。
- 对需要新增或者更新的部门进行新增或更新
- 按层级遍历源数据的部门树(bfs),令当前遍历的部门为A。
- 判断A部门是否已存在映射关系
- 若存在,令目的部门为a, 判断目的数据是否存在a
- 若存在,判断是否需要更新,若需要,则进行更新
- 若不存在,则需要新建部门,并更新ID映射
- 若不存在,则获取对应服务门下的部门列表,查看是否有相同名称的部门
- 若存在,则判断是否需要更新,若需要,则进行更新,最后新建ID映射
- 若不存在,则需要新建部门,并新建ID映射
- 若存在,令目的部门为a, 判断目的数据是否存在a
- 对需要新增或者更新的员工进行新增或更新。
- 遍历源数据的员工列表,令当前遍历的员工为E
- 判断员工E是否已存在映射关系
- 若存在,令目的员工为e,判断目的数据是否存在e
- 若存在,判断是否需要更新,若需要,则进行更新
- 若不存在,则需要新建员工,并更新ID映射
- 若不存在,则根据手机号判断目的数据中是否存在该员工
- 若存在,则判断是否需要更新,若需要,则进行更新,最后新建ID映射
- 若不存在,则需要新建员工,并新建ID映射
- 若存在,令目的员工为e,判断目的数据是否存在e
- 对需要删除的员工进行删除:在上述步骤中,我们已经得知同步了哪些员工,将这些员工构成一个哈希表m,那么在目的数据中,不在哈希表m中的员工就是我们需要删除的员工。
- 同步部门leader:我们已经有了部门和员工的映射关系,因此直接同步部门leader即可。
- 对需要删除的部门进行删除:在上述步骤中,我们已经得知同步了哪些部门,讲这些部门构成一个哈希表m2,那么在目的数据中,不在哈希表m2中的部门就是我们需要删除的部门。
增量同步 #
通过回调事件进行同步的方式可以理解为增量同步。
增量同步和全量同步的区别在于:前者是同步一个员工或者部门,后者是同步全量的员工和部门。
部门的增量同步的基本流程 #
- 准备数据:获取回调事件对应的源数据的部门信息A
- 判断映射关系汇中是否存在A的映射关系,令目的数据中的对应部门为a
- 若已存在并且目的数据中是否仍存在a,则判断父部门以及部门的基础信息是否相同,如果父部门不匹配,则先同步父部门(进入步骤1),然后再同步a
- 若不存在映射关系或者目的数据中已不存在a,则先同步父部门(进入步骤1),再根据部门名称匹配父部门下的子部门,若存在匹配的部门,则更新,否则创建。
需要特别说明同步部门负责人,因为存在循环创建问题:假设员工e存在于部门a,并且e是部门a的负责人,那么在创建部门a时需要先创建员工e,而创建员工e时因为需要同步员工的部门信息,因此又需要先创建部门a,所以进入了一个死循环。这个问题可以通过先创建部门,再同步部门负责人来解决。
员工的增量同步的基本流程 #
- 准备数据:获取回调事件对应的源数据的员工信息E
- 判断映射关系汇中是否存在E的映射关系,令目的数据中的对应员工为e
- 先同步员工的部门(流程见“部门的增量同步的基本流程”)
- 若存在员工的映射关系并且目的数据中仍存在e,则判断员工e的信息是否需要更新,需要则更新
- 若不存在员工的映射关系或者目的数据中已不存在e,则根据手机号来匹配,若能够匹配,则进行更新(进入步骤2),否则进行创建
问题 #
在增量同步员工和部门时,同步员工和部门的逻辑彼此嵌套,因此对员工和部门的同步逻辑进行了封装,实现了代码的解耦和复用。但是全量同步由于在一开始就获取了全量数据,不需要像增量同步那样一个数据一个数据的获取,因此全量同步没有复用增量同步的代码。
这造成了同一种逻辑的代码在全量同步和增量同步中分别写了一套,这无疑加大了维护的成本。
第二版:统一员工和部门的实现 #
在第一版中,同步员工和部门在增量同步和全量同步中分别实现了一套,这个问题在第二版中得到了解决。
之所以分别实现是因为获取数据的方式不同:全量同步一次性获取了所有数据,而增量同步则是一个数据一个数据的获取。所以我们将业务逻辑和数据获取分开即可。
我们在第二版中构建了一个数据缓存池,每次获取数据都要从这个数据缓存池中读取,如果缓存池中不存在,则会请求接口。当然,更新或者删除数据也会同步缓存池。
在增量同步中,直接使用这个缓存池即可;而在全量同步时,需要先获取全量的数据来填充缓存池。这样两者的实现就得到了统一!
需要说明,这个缓存池是在内存中实现,在同步时创建,同步结束后释放。
第三版:抽象员工和部门的实现 #
随着业务的发展,我们从一开始的两个平台的通讯录同步,发展到了多个平台的通讯录同步,这些同步的核心逻辑是一样的,但是却没有复用。这会导致修改同步逻辑时往往需要对多个场景进行修改,很容易漏掉。
抽象的逻辑是这样的:
- 将核心的不变的同步规则封装成方法,假设为F
- 将可变的同步逻辑通过接口I进行抽象
- 每个平台都去适配抽象出来的接口I
- 于是每次同步都去调用方法F即可
这种抽象的好处是:
- 核心的同步逻辑得到了复用
- 每个平台只需要关注自己特殊的地方,这种地方往往很少,因此能够节省大量的开发时间
举例:部门同步实现 #
以部门同步为例来看下如何实现。
type IDeptSync interface {
// Pre 用于处理同步之前的逻辑,比如准备配置数据、对同步加锁等
Pre() (after func(), err error)
// IsNeedSync 判断来源部门id是否需要同步
IsNeedSync(sourceDeptID interface{}) (bool, error)
// GetSourceDept 获取来源数据的部门信息
GetSourceDept(sourceDeptID interface{}) (ISourceDept, error)
GetTargetDept(targetDeptID interface{}) (ITargetDept, bool, error)
// GetMatchedChild 在没有关联ID的情况下,通过父部门id和来源部门信息获取目的数据中匹配的部门
GetMatchedChild(targetParentID interface{}, child ISourceDept) (ITargetDept, bool, error)
// BindDeptMapping 绑定部门映射
BindDeptMapping(targetID, sourceID interface{})
// GetTargetDeptIDInMapping 获取已有的关联映射
GetTargetDeptIDInMapping(sourceID interface{}) (targetID interface{}, exist bool, err error)
// CreateDept 创建目的部门
CreateDept(targetParentID string, dept ISourceDept) (id interface{}, err error)
// NeedUpdate 部门是否需要更新
NeedUpdate(targetParentID string, targetDept ITargetDept, sourceDept ISourceDept) (bool, error)
// UpdateDept 更新目的部门
UpdateDept(targetDept ITargetDept, sourceDept ISourceDept) error
// DeleteDept 删除目的部门
DeleteDept(target interface{}) error
}
type DeptSyncer struct {
IDeptSync
}
func (c *DeptSyncer) SaveDept(deptID interface{}) (id interface{}, err error) {
after, err := c.Pre()
if err != nil {
return nil, err
}
defer after()
isNeedSync, err := c.IsNeedSync(deptID)
if err != nil || !isNeedSync {
return nil, err
}
return c.syncDept(deptID)
}
func (c *DeptSyncer) syncDept(deptID interface{}) (interface{}, error) {
sourceDept, err := c.GetSourceDept(deptID)
if err != nil {
return nil, err
}
// 递归同步父部门
targetParentID, err := c.syncDept(sourceDept.GetParentID())
if err != nil {
return nil, err
}
// 在已有的映射中获取目的部门id
targetDeptID, exist, err := c.GetTargetDeptIDInMapping(deptID)
if err != nil {
return nil, err
}
if exist {
// 获取目的部门
targetDept, exist, err := c.GetTargetDept(targetDeptID)
if err != nil {
return nil, err
}
// 如果已存在目的部门,则直接更新
if exist {
needUpdate, err := c.NeedUpdate(targetParentID, targetDept, sourceDept)
if err != nil || !needUpdate {
return targetDept.GetID(), err
}
if err = c.UpdateDept(targetDept, sourceDept); err != nil {
return nil, err
}
return targetDept.GetID(), nil
}
}
// 根据父部门来获取匹配部门
targetDept, exist, err := c.GetMatchedChild(targetParentID, sourceDept)
if exist {
c.BindDeptMapping(targetDept.GetID(), sourceDept.GetID())
needUpdate, err := c.NeedUpdate(targetParentID, targetDept, sourceDept)
if err != nil || !needUpdate {
return nil, err
}
if err = c.UpdateDept(targetDept, sourceDept); err != nil {
return nil, err
}
return targetDept.GetID(), nil
}
// 已有部门中匹配不到,则直接创建
targetID, err := c.CreateDept(targetParentID, sourceDept)
if err != nil {
return nil, nil
}
c.BindDeptMapping(targetID, sourceDept.GetID())
return targetID, nil
}
DeptSyncer
中的SaveDept
方法中处理了部门同步的核心逻辑,这个方法中不涉及具体的平台信息,每个平台的都需要实现IDeptSyncer
接口,这样就实现了多个平台使用同一套核心同步逻辑!
延伸:模版方法模式 #
上述代码使用了模版方法模式,这个设计模式的好处是使用者(平台)无需关心底层的实现,只需实现少量的抽象接口即可;缺点是底层的实现无法修改,一旦修改就会影响所有使用者,即底层实现和使用者耦合了。
go代码中经常用做排序的sort包就是用的这一设计模式:
// sort.go
type Interface interface {
Len() int
Less(i, j int) bool
Swap(i, j int)
}
当我们对一组对象自定义排序时,实现这接口的三个方法即可:
type SortInt []int
func (s SortInt) Len() int {
return len(s)
}
func (s SortInt) Less(i, j int) bool {
return s[i] < s[j]
}
func (s SortInt) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
其他问题 #
Merkle Tree的应用 #
由于定时同步的存在,导致访问其他平台的接口会十分频繁。而大部分平台为了保护自己的服务,往往会对接口进行限流,这时我们就需要减少对这些接口的访问。
解决方案是:在定时同步时,如果源数据没有做任何修改,则无需同步。
那么如何判断数据没有做修改呢?一种被动的方式是通过事件来判断,另外一种主动的方式是将数据编码为一个哈希值,每次同步前判断这个哈希值是否有改变,这个哈希的过程就是组件Merkle Tree的过程。
对部门构建哈希树 #
对部门树从下到上按层级递归构建:
- 对同级部门排序
- 将每个部门的信息哈希得到一个值
- 连接相邻两个部门的哈希值之后再进行哈希得到哈希值
- 重复上个步骤,直到得到一个层级的部门哈希值
- 将层级部门的哈希值与父部门的哈希值连接之后进行哈希得到一个哈希值
- 重复以上步骤,直到得到整个部门树的哈希值
对员工构建哈希树 #
- 获取所有员工的信息,并排序
- 对每个员工的信息进行哈希
- 将哈希值两两连接,然后再进行哈希
- 重复上个步骤,直到得到最终的哈希值
每次同步完后,都将部门和员工的默克尔哈希值保存,再下次同步时,判断哈希值是否相同,如果相同,则无需再同步。
分布式同步 #
耗时分析 #
上述代码实现都是单线程同步的,假设一家企业有10万个员工和1万个部门,并且这些员工和部门都需要创建或者更新,那么一次同步所需时间可以大致估算为:
- 每个员工/部门都需要查询一次,每次访问时间为20ms:11w*20ms = 36.6min
- 每个员工/部门都要创建或者更新,每次执行时间为200ms:11w*200ms=366min
- 两个平台,一个平台只读一次,另一个则要读+写,因此总时间:(366min+36.6*2min)≈7h
估算时间是按照每个员工和部门都要调一次接口来计算的,实际上有些平台也确实是这样,而且有些平台的更新操作确实需要上百毫秒。
显然我们没办法忍受同步一家企业一次需要7个小时!
解决办法是使用MapReduce的方式对任务进行拆分、同步进行。需要明确的是“同步流程”不会变,也就是部门和员工之间的同步顺序不会变(否则会造成错误的同步),变的是对每步的执行进行了拆分。
节点划分 #
将节点分为1个主节点与多个子节点:
- 主节点将花费时间的任务分发给子节点(实际上是主节点将任务放到队列中,由子节点主动消费)
- 子节点执行“子任务”并将任务结果传输给主节点
- 主任务整合任务结果,并执行对应的逻辑操作
- 重复上述步骤,直至整体同步结束
主节点会耗费大量内存,因此在一个同步中的主节点,同时也是其他同步中的子节点。每次同步的角色划分可以通过etcd或zookeeper进行同步,这其实就是一个主节点竞选的过程,除了主节点,其他节点都是子节点。
主节点竞选 #
理想情况下,主节点应该选择CPU和内存使用率较低的节点,但是这样做会将代码实现与运维实现耦合在一起,不是很好的选择,因此主节点竞选采用的是同步任务最少的节点。
任务拆分 #
- 并行获取部门树:每次获取完部门信息,将子部门ID列表放到共享队列中,由各个节点消费共享队列,获取对应的部门信息 ,然后再将子部门ID列表放到共享队列中。重复这个步骤,直到获取完所有的部门信息。
- 并行获取员工信息:员工往往也是通过部门ID进行获取,因此逻辑同“并行获取部门树”。
- 更新部门和员工等流程同理。
完成通知 #
如何判断已经获取了所有的部门树呢?如何判断已经获取了所有员工的信息呢?也就是说主节点如何知道所有子节点已经完成了任务呢?
子节点可以在执行完任务后通知主节点,但是即使在一个主步骤中,子节点也会多次执行子任务,比如说会从任务队列中多次获取任务并执行,所以子节点只能判断自己当前是否还有在执行任务,但没办法判断是否不会有新任务了。
所以可行的办法是当主节点一段时间内没有接收到子节点上报的任务结果后,由主节点发起状态上报通知,如果所有子节点都汇报自身已无可执行任务,那么说明当前阶段的MapReduce完成。
内存分析 #
员工的字段要远多于部门的字段并且员工的数量也要多余部门的数量,所以我们简单分析下10w员工所需多少内存即可。
一个员工信息占用内存大概在500个字节,其中比较占用内存的是头像、邮箱这种字段。那么10w员工所需内存就是50MB,一次同步中需要两个平台,因此一次同步大概需要100MB内存,如果单节点的内存是8GB,那么一个节点(作为主节点)能够最多支持80个同步同时运行。
考虑到拥有10w员工的企业很少,大部分企业的员工都在1k以下,因此不用过多考虑内存问题。
不同场景的限流 #
每个平台都有自己的访问频率限制,针对不同的限制也应采用不同的措施:
- 无脑访问:直到被限制访问后等待解除限制后再继续访问。这种策略适用于那种不太可能超过上限的访问限制,比如企业微信对每个第三方应用提供商每ip限制调用次数为4万次/分,这个上限在我们的系统中就不太可能会超过,万一在某个时间点超过了,那么1分钟之后也会解除限制,影响不大。
- 令牌桶限流:通过redis构造一个全局的令牌桶限流。这种策略适用于很容易就超过上限的访问限制,比如钉钉要求每个企业的访问接口的频率不能超过20次/秒,在这种情况下,如果还继续使用“无脑访问”的策略,会导致大量的时间浪费,因此只能通过令牌桶进行限流。