博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[编织消息框架][netty源码分析]11 ByteBuf 实现类UnpooledHeapByteBuf职责与实现
阅读量:5099 次
发布时间:2019-06-13

本文共 12097 字,大约阅读时间需要 40 分钟。

 

每种ByteBuf都有相应的分配器ByteBufAllocator,类似工厂模式。我们先学习UnpooledHeapByteBuf与其对应的分配器UnpooledByteBufAllocator

如何知道alloc分配器那是个?

可以从官方下载的TimeServer 例子来学习,本项目已有源码可在 TestChannelHandler.class里断点追踪

从图可以看出netty 4.1.8默认的ByteBufAllocator是PooledByteBufAllocator,可以参过启动参数-Dio.netty.allocator.type unpooled/pooled 设置

细心的读者可以看出分配ByteBuf只有pool跟unpool,但ByteBuf有很多类型,可能出于使用方面考虑,有时不一定设计太死板,太规范反而使学习成本很大

public final class ByteBufUtil {    static final ByteBufAllocator DEFAULT_ALLOCATOR;    static {        String allocType = SystemPropertyUtil.get(                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");        allocType = allocType.toLowerCase(Locale.US).trim();        ByteBufAllocator alloc;        if ("unpooled".equals(allocType)) {            alloc = UnpooledByteBufAllocator.DEFAULT;        } else if ("pooled".equals(allocType)) {            alloc = PooledByteBufAllocator.DEFAULT;        } else {            alloc = PooledByteBufAllocator.DEFAULT;        }        DEFAULT_ALLOCATOR = alloc;    }}

 AbstractReferenceCountedByteBuf是统计引用总数处理,用到Atomic*技术。

refCnt是从1开始,每引用一次加1,释放引用减1,当refCnt变成1时执行deallocate由子类实现

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {    private static final AtomicIntegerFieldUpdater
refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); private volatile int refCnt = 1; @Override public ByteBuf retain() { return retain0(1); } private ByteBuf retain0(int increment) { for (;;) { int refCnt = this.refCnt; final int nextCnt = refCnt + increment; if (nextCnt <= increment) { throw new IllegalReferenceCountException(refCnt, increment); } if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) { break; } } return this; } @Override public boolean release() { return release0(1); } private boolean release0(int decrement) { for (;;) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true; } return false; } } } protected abstract void deallocate();}

 

对于ByteBuf I/O 操作经常用的是 writeByte readByte两种

由于ByteBuf支持多种bytes对象,如 OutputStream、GatheringByteChannel、ByteBuffer、ByteBuf等,
我们只拿两三种常用的API来做分析,其它逻辑大同小异
如果读者有印象的话,通常底层只负责流程控制,实现交给应用层/子类处理,AbstractByteBuf.class writeByte/readByte 也是这种处理方式

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {    //分配器    private final ByteBufAllocator alloc;    //数据    byte[] array;    //临时ByteBuffer,用于内部缓存    private ByteBuffer tmpNioBuf;        private UnpooledHeapByteBuf(            ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {        //省去部分代码同边界处理        super(maxCapacity);        this.alloc = alloc;        array = initialArray;        this.readerIndex = readerIndex;        this.writerIndex = writerIndex;    }    //获取ByteBuffer容量    @Override    public int capacity() {        ensureAccessible();        return array.length;    }    @Override    public boolean hasArray() {        return true;    }    //获取原始数据    @Override    public byte[] array() {        ensureAccessible();        return array;    }    //扩容/缩容    @Override    public ByteBuf capacity(int newCapacity) {        ensureAccessible();        //newCapacity参数边界判断        if (newCapacity < 0 || newCapacity > maxCapacity()) {            throw new IllegalArgumentException("newCapacity: " + newCapacity);        }        int oldCapacity = array.length;        //扩容处理,直接cp到新的array        if (newCapacity > oldCapacity) {            byte[] newArray = new byte[newCapacity];            System.arraycopy(array, 0, newArray, 0, array.length);            setArray(newArray);        } else if (newCapacity < oldCapacity) {            //减容处理            //这里有两种处理情况             //1.readerIndex > newCapacity 说明还有数据未处理直接将 readerIndex,writerIndex相等 newCapacity            //2.否则 writerIndex =Math.min(writerIndex,newCapacity),取最少值,然后直接复制数据                        //可以看出netty处理超出readerIndex、writerIndex 限界直接丢弃数据。。。。。。                        byte[] newArray = new byte[newCapacity];            int readerIndex = readerIndex();            if (readerIndex < newCapacity) {                int writerIndex = writerIndex();                if (writerIndex > newCapacity) {                    writerIndex = newCapacity                    this.writerIndex = writerIndex;                }                System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);              //System.arraycopy(复制来源数组, 来源组起始坐标, 目标数组, 目标数组起始坐标, 复制数据长度);            } else {                this.readerIndex = newCapacity;                this.writerIndex = newCapacity;            }            setArray(newArray);        }        return this;    }}

 

AbstractByteBuf.class readBytes 调用子类实现 getBytes方法,区别是调用readBytes会改变readerIndex记录

public abstract class AbstractByteBuf extends ByteBuf {    @Override    public ByteBuf readBytes(ByteBuffer dst) {        int length = dst.remaining();        //checkReadableBytes(length);         if (readerIndex > (writerIndex - length)) {            throw new IndexOutOfBoundsException(String.format(                    "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",                    readerIndex, length, writerIndex, this));        }        //调用子类实现        getBytes(readerIndex, dst);        //记录已读长度        readerIndex += length;        return this;    }    @Override    public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {        checkReadableBytes(length);        getBytes(readerIndex, dst, dstIndex, length);        readerIndex += length;        return this;    }        //这里如果index不为负的话只需要 capacity - (index + length) < 0 判断就可以    //用到 | 运算 如果 index为-1的话 index | length 还是负数 第二个 | (index + length)运算有可能 index + length相加为负    public static boolean isOutOfBounds(int index, int length, int capacity) {        return (index | length | (index + length) | (capacity - (index + length))) < 0;    }}
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {    //支持ByteBuffer读取    @Override    public ByteBuf getBytes(int index, ByteBuffer dst) {        //checkIndex(index, dst.remaining());        if (isOutOfBounds(index,  dst.remaining(), capacity())) {            throw new IndexOutOfBoundsException(String.format(                    "index: %d, length: %d (expected: range(0, %d))", index, dst.remaining(), capacity()));        }        dst.put(array, index, dst.remaining());        return this;    }    //支持ByteBuf读取    @Override    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {        checkDstIndex(index, length, dstIndex, dst.capacity());        //是unsafe类型,要调用jdk unsafe方法复制        if (dst.hasMemoryAddress()) {            PlatformDependent.copyMemory(array, index, dst.memoryAddress() + dstIndex, length);        } else if (dst.hasArray()) { //如果是数组即 heap类型,直接复制过去            getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);        } else {            dst.setBytes(dstIndex, array, index, length);        }        return this;    }        //支持数组读取    @Override    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {        checkDstIndex(index, length, dstIndex, dst.length);        System.arraycopy(array, index, dst, dstIndex, length);        return this;    }}

 AbstractByteBuf.class writeBytes 调用子类实现 setBytes方法,区别是调用writeBytes会改变writerIndex记录

public abstract class AbstractByteBuf extends ByteBuf {    @Override    public ByteBuf writeBytes(ByteBuf src) {        writeBytes(src, src.readableBytes());        return this;    }    @Override    public ByteBuf writeBytes(ByteBuf src, int length) {        if (length > src.readableBytes()) {            throw new IndexOutOfBoundsException(String.format(                    "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));        }        writeBytes(src, src.readerIndex(), length);        //读取src数据到this.ByteBuf 所以要更改src readerIndex        src.readerIndex(src.readerIndex() + length);        return this;    }    @Override    public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {        ensureAccessible();        //是否扩容处理        ensureWritable(length);        //调用子类实现        setBytes(writerIndex, src, srcIndex, length);        //记录已写长度        writerIndex += length;        return this;    }        private void ensureWritable0(int minWritableBytes) {        if (minWritableBytes <= writableBytes()) {            return;        }        //写入数据长度大于最大空间剩余长度抛异常        if (minWritableBytes > maxCapacity - writerIndex) {            throw new IndexOutOfBoundsException(String.format(                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",                    writerIndex, minWritableBytes, maxCapacity, this));        }                //通过分配器计算,参数1写完后的writerIndex记录,参数2最大容量长度        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);        //子类实现        capacity(newCapacity);    }    //AbstractByteBufAllocator.class//    @Override    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {        if (minNewCapacity < 0) {            throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");        }        if (minNewCapacity > maxCapacity) {            throw new IllegalArgumentException(String.format(                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",                    minNewCapacity, maxCapacity));        }                final int threshold = 1048576 * 4; // 4 MiB page        if (minNewCapacity == threshold) {            return threshold;        }        //如果新容量大于4M,不走双倍扩大算法,数值范围取 minNewCapacity <= maxCapacity        if (minNewCapacity > threshold) {            // 除以threshold再乘以threshold得出的结果是 threshold的倍数,可以理解是去掉余数            int newCapacity = minNewCapacity / threshold * threshold;            //如果剩余容量不够4M直接给maxCapacity,否则自增4M            if (newCapacity > maxCapacity - threshold) {                newCapacity = maxCapacity;            } else {                newCapacity += threshold;            }            return newCapacity;        }        //newCapacity <<= 1 意思是 newCapacity*2,双倍自增        int newCapacity = 64;        while (newCapacity < minNewCapacity) {            newCapacity <<= 1;        }        return Math.min(newCapacity, maxCapacity);    }}

 

//setBytes逻辑跟getBytes一样public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {    @Override    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {        checkSrcIndex(index, length, srcIndex, src.capacity());        if (src.hasMemoryAddress()) {            PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);        } else  if (src.hasArray()) {            setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);        } else {            src.getBytes(srcIndex, array, index, length);        }        return this;    }    @Override    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {        checkSrcIndex(index, length, srcIndex, src.length);        System.arraycopy(src, srcIndex, array, index, length);        return this;    }}

 

总结:

1.writeBytes跟setBytes、readBytes跟getBytes区别是前者有记录,后者没有,而后者是子类的实现

2.扩容算法是两种策略:

  2.1.大于4M时不走double自增,数值范围取 minNewCapacity <= maxCapacity

  2.2.少于4M时从64开始double自增

3.更改容量也是每个子类实现,要考虑两种情况

  3.1.大于当前容量

  3.2.小于当前容量,当小于的时候要考虑 readerIndex、writerIndex边界,当超过 readerIndex、writerIndex边界heap的策略是丢去原来的数据

4.heap是继承 AbstractReferenceCountedByteBuf的,当refCnt记录为1时释放数据

    

转载于:https://www.cnblogs.com/solq111/p/7099327.html

你可能感兴趣的文章
第三方支付架构设计之—帐户体系
查看>>
诸城项目-开发日志
查看>>
fdisk (二) 详解(转)
查看>>
hdu 2768 Cat vs. Dog 最大独立集 巧妙的建图
查看>>
简单将集合的内容转为字符串
查看>>
Python pandas 0.19.1 Intro to Data Structures 数据结构介绍 文档翻译
查看>>
《寿康宝鉴》
查看>>
Mongodb
查看>>
软工个人总结
查看>>
如何将u盘、移动硬盘转化为活动分区--绝招
查看>>
MYSQL 5.7 修改密码、登录问题
查看>>
linux 同步时间 调试core内核
查看>>
PAT Basic 1085
查看>>
springMVC传递一组对象的接受方式
查看>>
收藏一个虚函数表以及虚表指针介绍的文章
查看>>
POJ---2492 A Bug's Life[并查集]
查看>>
[BZOJ1195] [HNOI2006]最短母串
查看>>
final阶段140字评论
查看>>
zookeeper集群搭建
查看>>
Jenkins-在windows上配置自动化部署(Jenkins+Gitblit)
查看>>