DirectByteBuffer和文件IO的作用分别是什么

本篇文章为大家展示了DirectByteBuffer和文件IO的作用分别是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

专注于为中小企业提供做网站、网站制作服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业衡南免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上千余家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

a. 传统的IO操作(就是使用java.io包的api)访问磁盘文件,数据需要copy的次数:

    1. 磁盘文件的数据 copy 内核page cache 

    2. 内核的数据 copy  应用程序空间(即:jvm 堆外内存)

    3. jvm堆外内存  copy  jvm堆内 内存

为什么2、和3 不合并,将内核数据 copy jvm堆内内存。 因为jvm进行系统调用进行读文件时候,此时发生gc,那么堆内存的对应地址就会移动,所以直接copy到堆内是有问题的。

b. 使用DirectByteBuffer访问磁盘文件,数据需要copy的次数:

   1. 磁盘文件的数据 copy 内核page cache 

    2. 内核的数据 copy  应用程序空间(即:DirectByteBuffer)

所以DirectByteBuffer减少了内存copy次数。

1.传统文件IO解析

文件读取示例:

FileInputStream input = new FileInputStream("/data");

byte[] b = new byte[SIZE];

input.read(b);

byte数组示堆内存对象,此处将数据copy 到jvm堆内存。我们看一下read函数内部实现

public int read(byte b[]) throws IOException { 

    return readBytes(b, 0, b.length);

}

private native int readBytes(byte b[], int off, int len) throws IOException;

我们看到 read函数最终调用 native函数 readBytes。

jintreadBytes(JNIEnv *env, jobject this, jbyteArray bytes,          jint off, jint len, jfieldID fid){

    jint nread;

    char stackBuf[BUF_SIZE];

    char *buf = NULL;

    FD fd;

    if (IS_NULL(bytes)) {

        JNU_ThrowNullPointerException(env, NULL);

        return -1;

    }

    if (outOfBounds(env, off, len, bytes)) {

        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);

        return -1;

    }

    if (len == 0) {

        return 0;

    } else if (len > BUF_SIZE) {

        buf = malloc(len);

        if (buf == NULL) {

            JNU_ThrowOutOfMemoryError(env, NULL);

            return 0;

        }

    } else {

        buf = stackBuf;

    }

    fd = GET_FD(this, fid);

    if (fd == -1) {

        JNU_ThrowIOException(env, "Stream Closed");

        nread = -1;

    } else {

        nread = IO_Read(fd, buf, len);

        if (nread > 0) {

            (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);

        } else if (nread == -1) {

            JNU_ThrowIOExceptionWithLastError(env, "Read error");

        } else { /* EOF */

            nread = -1;

        }

    }

    if (buf != stackBuf) {

        free(buf);   

}

    return nread;

}

我们看到最终通过IO_Read将缓冲数据读到buf中去,这个IO_Read其实是一个宏定义:

#define IO_Read handleRead

handleRead函数实现如下,这里你可以看到这里进行了read系统调用:

ssize_t

handleRead(FD fd, void *buf, jint len)

{

    ssize_t result;

    RESTARTABLE(read(fd, buf, len), result);

    return result;

}

buf返回之后,由SetByteArrayRegion这个JNI函数拷贝到了bytes,它的具体实现如下(下面定义了一个通用的宏函数来表示各种数据类型数组区域的设置,可以将Result宏替换成Byte即可理解):

JNI_ENTRY(void, \

jni_Set##Result##ArrayRegion(JNIEnv *env, ElementType##Array array, jsize start, \

             jsize len, const ElementType *buf)) \

  JNIWrapper("Set" XSTR(Result) "ArrayRegion"); \

  DTRACE_PROBE5(hotspot_jni, Set##Result##ArrayRegion__entry, env, array, start, len, buf);\

  DT_VOID_RETURN_MARK(Set##Result##ArrayRegion); \

  typeArrayOop dst = typeArrayOop(JNIHandles::resolve_non_null(array)); \

  if (start < 0 || len < 0 || ((unsigned int)start + (unsigned int)len > (unsigned int)dst->length())) { \

    THROW(vmSymbols::java_lang_ArrayIndexOutOfBoundsException()); \

  } else { \

    if (len > 0) { \

      int sc = TypeArrayKlass::cast(dst->klass())->log2_element_size(); \

      memcpy((u_char*) dst->Tag##_at_addr(start), \

             (u_char*) buf, \

             len << sc);    \

    } \

  } \

JNI_END

(以上内容部门来源:https://www.zhihu.com/question/65415926)

由此可见,nativ方法,readBytes而采用了C Heap - JVM Heap进行内存拷贝的方式进行数据传递。

而readBytes 通过调用 handleRead 进行读写。handleRead就是读取内核缓存区数据。内核数据来源文件。

2. DirectByteBuffer

DirectByteBuffer 是构建在堆外的内存的对象。

DirectByteBuffer是包级别可访问的,通过 ByteBuffer.allocateDirect(int capacity) 进行构造。

public static ByteBuffer allocateDirect(int capacity) {

return new DirectByteBuffer(capacity);

}

我们看一下DirectByteBuffer 构造函数实现

DirectByteBuffer(int cap) {// package-private

    super(-1,0, cap, cap);

    boolean pa = VM.isDirectMemoryPageAligned();

    int ps = Bits.pageSize();

    long size = Math.max(1L, (long)cap + (pa ? ps :0));

    Bits.reserveMemory(size, cap);

    long base =0;

    try {

        base =unsafe.allocateMemory(size);

    }catch (OutOfMemoryError x) {

        Bits.unreserveMemory(size, cap);

        throw x;

    }

    unsafe.setMemory(base, size, (byte)0);

    if (pa && (base % ps !=0)) {

        // Round up to page boundary

        address = base + ps - (base & (ps -1));

    }else {

        address = base;

    }

    cleaner = Cleaner.create(this,new Deallocator(base, size, cap));   

    att =null;

}

这里我们主要关注这几个地方:

1.unsafe.allocateMemory(size);

利用 unsafe 类在堆外内存(C_HEAP)中分配了一块空间,这是一个 native 函数,转到进行堆外内存分配的 C/C++ 代码

inline char* AllocateHeap( size_t size, MEMFLAGS flags, address pc = 0, AllocFailType alloc_failmode = AllocFailStrategy::EXIT_OOM){

// ... 省略

char*p=(char*)os::malloc(size, flags, pc);

// 分配在 C_HEAP 上并返回指向内存区域的指针

// ... 省略

return p;

}

2.cleaner = Cleaner.create(this,new Deallocator(base, size, cap)); 

cleaner对象是对DirectByteBuffer占用对堆外内存进行清理。DirectByteBuffer.cleaner().clean() 进行手动清理。我们看一下clean() 函数

public void clean() {

//....省略

this.thunk.run();

//....省略

}

其中 thunk就是我们 Cleaner.create(this,new Deallocator(base, size, cap)); 中的Deallocator。看一下Deallocator。

private static class Deallocator implements Runnable

{

//。。。省略

    public void run() {

        if (address ==0) {

            // Paranoia

            return;

           }

        unsafe.freeMemory(address);

        address =0;

        Bits.unreserveMemory(size,capacity);

    }

}

可以看到其是一个线程进行 堆外内存的释放动作。

cleaner是PhantomReference的子类。

PhantomReference它其实主要是用来跟踪对象何时被回收的,它不能影响gc决策,但是gc过程中如果发现某个对象除了只有PhantomReference引用它之外,并没有其他的地方引用它了,那将会把这个引用放到java.lang.ref.Reference.pending队列里,在gc完毕的时候通知ReferenceHandler这个守护线程去执行一些后置处理。这个处理方法中,就会判断是否是cleaner对象,如果是,就性质clean()函数。

因此DirectByteBuffer并不需要我们手动清理内存。当jvm进行gc(oldgc)的时候,就会清理没有引用的 dirctByteBuffer。

当我们一直申请DirectByteBuffer。其实占用的是堆外内存,堆内内存只是占用一个引用。如果一直触发不了gc,纳闷堆外内存就不会回收,导致jvm进程占用内存很大。我们可以通过-XX:MaxDirectMemorySize限制DirecByteBuffer占用堆外内存的大小

3.Bits.reserveMemory(size, cap);

static void reserveMemory(long size,int cap) {

    synchronized (Bits.class) {

        if (!memoryLimitSet && VM.isBooted()) {

            maxMemory = VM.maxDirectMemory();

            memoryLimitSet =true;

        }

        // -XX:MaxDirectMemorySize limits the total capacity rather than the

        // actual memory usage, which will differ when buffers are page

        // aligned.

        if (cap <=maxMemory -totalCapacity) {

            reservedMemory += size;

            totalCapacity += cap;

            count++;

            return;

        }

    }

    System.gc();

    try {

        Thread.sleep(100);

    }catch (InterruptedException x) {

        // Restore interrupt status

        Thread.currentThread().interrupt();

    }

    synchronized (Bits.class) {

        if (totalCapacity + cap >maxMemory)

            throw new OutOfMemoryError("Direct buffer memory");

        reservedMemory += size;

        totalCapacity += cap;

        count++;

        }

}

该函数用于统计DirectByteBuffer占用的大小。VM.maxDirectMemory()是jvm允许申请的最大DirectBuffer的大小(XX:MaxDirectMemorySize 通过这个参数设置)

如果发现当前申请的空间,大于限制的空间,就会触发一次gc,上面说过gc会回收哪些之前不使用的directBuffer。然后再次申请。

VM.maxDirectMemory() 大小是如何设置的内,在VM类有这样一段代码

public static void saveAndRemoveProperties(Properties var0) {

    //....

    String var1 = (String)var0.remove("sun.nio.MaxDirectMemorySize");

    if (var1 !=null) {

        if (var1.equals("-1")) {

            directMemory = Runtime.getRuntime().maxMemory();

        }else {

            long var2 = Long.parseLong(var1);

            if (var2 > -1L) {

            directMemory = var2;

            }

    }

    //...

}

"sun.nio.MaxDirectMemorySize"  这个属性就是通过 -XX:MaxDirectMemorySize 这个参数设置的。如果我们不指定这个jvm参数,笔者在jdk8中测试了一下,默认是-1,这样就导致directBufffer内存限制为进程最大内存。当然这也是一个潜在风险。

风险案例:

笔者曾在线上运行一个应用。该应用就是从消息队列中消费数据,然后将数据处理后存到Hbase中。但是应用运行每次运行2周左右,机器就会出现swap占用过大。经过分析,是jvm进程占用内存太大,但是分析jvm相关参数(堆、线程大小),并没有设置的很大。最后发现原来是directBuffer占用达到了10G。后面通过-XX:MaxDirectMemorySize=2048m 限制directbuffer使用量,解决了问题。每次directBuffer占用达到2G,就会触发一次fullgc,将之前的无用directbuffer回收掉。hbase一个坑,有时间笔者会整理这个案例。

3.DirectByteBuffer文件IO

文件读取示例:

FileChannel filechannel=new RandomAccessFile("/data/appdatas/cat/mmm","rw").getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SIZE);

filechannel.read(byteBuffer)

我们看一下read函数

public int read(ByteBuffer var1)throws IOException {

//。。。。

var3 = IOUtil.read(this.fd, var1, -1L,this.nd);

//。。。。

}

主要逻辑调用IOUtil.read。我们看一下这个函数

static int read(FileDescriptor var0, ByteBuffer var1,long var2, NativeDispatcher var4)throws IOException {

    if (var1.isReadOnly()) {

        throw new IllegalArgumentException("Read-only buffer");

    }else if (var1instanceof DirectBuffer) {

        return readIntoNativeBuffer(var0, var1, var2, var4);

    }else {

        ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var7;

    try {

        int var6 = readIntoNativeBuffer(var0, var5, var2, var4);

        var5.flip();

        if (var6 >0) {

            var1.put(var5);

        }

        var7 = var6;

    }finally {

        Util.offerFirstTemporaryDirectBuffer(var5);

    }

    return var7;

    }

}

主要方法就是通过 readIntoNativeBuffer 这个函数将数据读入 directBuffer中,其中readIntoNativeBuffer也是调用一个native方法。

通过上面的代码,我们会看到,如果fielchannel.read(ByteBuffer) 也可以传入一个HeapByteBuffer,这个类是堆中。如果是这个类,那么内部读取的时候,会把数据先读到DirectByteBuffer中,然后在copy到HeapByteBuffer中。Util.getTemporaryDirectBuffer(var1.remaining());就是获取一个DirectBuffer对像。因为DirectBuffer创建的时候,开销比较大,所以使用的时候一般会用一个池子来管理。有兴趣可以看一下Util这个类里面的实现。

上述内容就是DirectByteBuffer和文件IO的作用分别是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。


本文标题:DirectByteBuffer和文件IO的作用分别是什么
链接地址:http://hbruida.cn/article/ipohoi.html