BlackChen'site

美团Leaf 源码阅读(一)

美团Leaf 分布式ID生成器源码分析(一)

There are no two identical leaves in the world.
世界上没有两片完全相同的树叶。
— 莱布尼茨

Leaf 最早期需求是各个业务线的订单ID生成需求。在美团早期,有的业务直接通过DB自增的方式生成ID,有的业务通过redis缓存来生成ID,也有的业务直接用UUID这种方式来生成ID。以上的方式各自有各自的问题,因此我们决定实现一套分布式ID生成服务来满足需求。具体Leaf 设计文档见: leaf 美团分布式ID生成服务

官方代码仓库: Leaf

工程目录结构

项目分为两个模块: leaf-serverleaf-core,下面分开进行介绍

leaf-server

leaf-server 主要作用是使用spring-boot框架对外提供服务接口.

leaf-server结构

leaf-core

leaf-core 是核心代码,提供两种生成的ID的方式,包括号段模式和snowflake模式.

leaf-core

源码分析

相关分析代码已上传到github: 美团 LEAF

核心代码都在leaf-core中.

SegmentIDGenImpl分析

  1. 查看IDGenServiceTest
    IDGenServiceTest
  2. Config ID Gen
  3. 执行init方法
    • 执行init方法,从数据库中获取所有的tag,并保留在内存中.
    • 定时从数据库中获取最新数据
  4. 获取Id
        @Override
    public Result get(final String key) {
        // 必须在 SegmentIDGenImpl 初始化后执行. init()方法
        if (!initOK) {
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        // 通过缓存获取SegmentBuffer
        if (cache.containsKey(key)) {
    
            // 从缓存中获取对应key的 SegmentBuffer
            SegmentBuffer buffer = cache.get(key);
    
            // SegmentBuffer 没有初始化,则先进行初始化.
            if (!buffer.isInitOk()) {
                synchronized (buffer) {
                    // 双重判断,避免重复执行SegmentBuffer的初始化操作.
                    if (!buffer.isInitOk()) {
                        try {
                            updateSegmentFromDb(key, buffer.getCurrent());
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            return getIdFromSegmentBuffer(cache.get(key));
        }
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }
    
  5. updateSegmentFromDb
     public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
    
        LeafAlloc leafAlloc;
    
        if (!buffer.isInitOk()) {
            // 第一次初始化
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
    
            //leafAlloc中的step为DB中的step
            buffer.setMinStep(leafAlloc.getStep());
        } else if (buffer.getUpdateTimestamp() == 0) {
            // 第二次,需要准备next Segment
            // 第二号段,设置updateTimestamp
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
    
            //leafAlloc中的step为DB中的step
            buffer.setMinStep(leafAlloc.getStep());
        } else {
            // 三次以上 动态设置 nextStep
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
    
            /**
             *  动态调整step
             *  1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP
             *  2) 15分钟 < duration < 30分钟 : nothing
             *  3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数
             */
            // 15分钟
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    // 步数 * 2
                    nextStep = nextStep * 2;
                }
                // 15分 < duration < 30
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数)
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
    
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
    
            temp.setKey(key);
            temp.setStep(nextStep);
            // 更新maxId by CustomStep (nextStep)
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
    
            // 更新 updateTimestamp
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            // 设置 buffer的step
            buffer.setStep(nextStep);
            //leafAlloc的step为DB中的step
            buffer.setMinStep(leafAlloc.getStep());
        }
    
        // must set value before set max TODO
        // 暂时还未想通,这里为什么这样写.
        // 已经向作者提交了issue.(https://github.com/Meituan-Dianping/Leaf/issues/16)
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
    
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }
    
  6. getIdFromSegmentBuffer
     public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
        while (true) {
            try {
                // 获取buffer的读锁
                buffer.rLock().lock();
                // 获取当前的号段
                final Segment segment = buffer.getCurrent();
    
                if (    // nextReady is false (下一个号段没有初始化.)
                        !buffer.isNextReady()
                        // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
                        && (segment.getIdle() < 0.9 * segment.getStep())
                        // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
                        // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
                        && buffer.getThreadRunning().compareAndSet(false, true)
                ) {
                    // 放入线程池进行异步更新.
                    service.execute(new Runnable() {
                        @Override
                        public void run() {
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            try {
                                updateSegmentFromDb(buffer.getKey(), next);
    
                                // 更新成功,设置标记位为true
                                updateOk = true;
                                logger.info("update segment {} from db {}", buffer.getKey(), next);
                            } catch (Exception e) {
                                logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            } finally {
                                if (updateOk) {
                                    // 获取buffer 的写锁
                                    buffer.wLock().lock();
                                    // next准备完成
                                    buffer.setNextReady(true);
                                    // next运行标记位设置为false
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();
                                } else {
                                    buffer.getThreadRunning().set(false);
                                }
                            }
                        }
                    });
                }
    
                // 获取value
                long value = segment.getValue().getAndIncrement();
    
                // value < 当前号段的最大值,则返回改值
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
            } finally {
                buffer.rLock().unlock();
            }
    
            // 等待下一个号段执行完成,执行代码在-> execute()
            // buffer.setNextReady(true);
            // buffer.getThreadRunning().set(false);
            waitAndSleep(buffer);
    
    
            try {
                // buffer 级别加写锁.
                buffer.wLock().lock();
                final Segment segment = buffer.getCurrent();
                // 获取value -> 为什么重复获取value, 多线程执行时,在进行waitAndSleep() 后,
                // 当前Segment可能已经被调换了.直接进行一次获取value的操作,可以提高id下发的速度(没必要再走一次循环),并且防止出错(在交换Segment前进行一次检查).
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
    
                // 执行到这里, 其他的线程没有进行号段的调换,并且当前号段所有号码已经下发完成.
                // 判断nextReady是否为true.
                if (buffer.isNextReady()) {
                    // 调换segment
                    buffer.switchPos();
                    // 调换完成后, 设置nextReady为false
                    buffer.setNextReady(false);
                } else {
                    // 进入这里的条件
                    // 1. 当前号段获取到的值大于maxValue
                    // 2. 另外一个号段还没有准备好
                    // 3. 等待时长大于waitAndSleep中的时间.
                    logger.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            } finally {
                // finally代码块中释放写锁.
                buffer.wLock().unlock();
            }
        }
    }
    
  7. waitAndSleep
        /**
     * 等待下一个号段执行完成
     * buffer.setNextReady(true);
     * buffer.getThreadRunning().set(false);
     * @param buffer
     */
    private void waitAndSleep(SegmentBuffer buffer) {
        int roll = 0;
        while (buffer.getThreadRunning().get()) {
            roll += 1;
            if(roll > 10000) {
                try {
                    Thread.sleep(10);
                    break;
                } catch (InterruptedException e) {
                    logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }
    

相关代码已上传到github: 美团 LEAF

技术重点解析

  1. volatile 修饰变量提升可见性
  2. 使用读写锁ReadWriteLock,提升并发读下的读取速度
  3. 使用Atomic变量 ,利用CAS机制保证原子性, 提高并发能力.
    if (    // nextReady is false (下一个号段没有初始化.)
                        !buffer.isNextReady()
                        // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
                        && (segment.getIdle() < 0.9 * segment.getStep())
                        // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
                        // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
                        && buffer.getThreadRunning().compareAndSet(false, true)
                ) {
                ...
                }
    
  4. 动态调整step来适应不同的请求速度.
     /**
             *  动态调整step
             *  1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP
             *  2) 15分钟 < duration < 30分钟 : nothing
             *  3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数
             */
            // 15分钟
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    // 步数 * 2
                    nextStep = nextStep * 2;
                }
                // 15分 < duration < 30
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数)
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
    
  5. 使用事务,保证update 操作和select 操作的原子性.
         /**
      * 使用事务保证这两步的原子性(事务的隔离机制)
      * 根据数据库中对应tag的step来更新max_value,同时获取 tag的信息
      * 1. UPDATE leaf_alloc SET max_id = max_id + step WHERE biz_tag = #{tag}
      * 2. SELECT biz_tag, max_id, step FROM leaf_alloc WHERE biz_tag = #{tag}
      * @param tag
      * @return
      */
     LeafAlloc updateMaxIdAndGetLeafAlloc(String tag);
    
        @Override
    public LeafAlloc updateMaxIdAndGetLeafAlloc(String tag) {
        SqlSession sqlSession = sqlSessionFactory.openSession();
        try {
            sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxId", tag);
            LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", tag);
            sqlSession.commit();
            return result;
        } finally {
            sqlSession.close();
        }
    }
    

文章链接: www.blackchen.site/meituan-leaf-1
作者: BlackChen

评论