热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

详解大数据处理引擎Flink内存管理

Flink是jvm之上的大数据处理引擎,jvm存在java对象存储密度低、fullgc时消耗性能,gc存在stw的问题,同时omm时会影响稳定性。针对频繁序列化和反序列化问题flink使用堆内堆外内存可以直接在一些场景下操作二进制数据,减少序列化反序列化消耗。本文带你详细理解其原理。

内存模型

Flink可以使用堆内和堆外内存,内存模型如图所示:

flink使用内存划分为堆内内存和堆外内存。按照用途可以划分为task所用内存,network memory、managed memory、以及framework所用内存,其中task network managed所用内存计入slot内存。framework为taskmanager公用。

堆内内存包含用户代码所用内存、heapstatebackend、框架执行所用内存。

堆外内存是未经jvm虚拟化的内存,直接映射到操作系统的内存地址,堆外内存包含框架执行所用内存,jvm堆外内存、Direct、native等。

Direct memory内存可用于网络传输缓冲。network memory属于direct memory的范畴,flink可以借助于此进行zero copy,从而减少内核态到用户态copy次数,从而进行更高效的io操作。

jvm metaspace存放jvm加载的类的元数据,加载的类越多,需要的空间越大,overhead用于jvm的其他开销,如native memory、code cache、thread stack等。

Managed Memory主要用于RocksDBStateBackend和批处理算子,也属于native memory的范畴,其中rocksdbstatebackend对应rocksdb,rocksdb基于lsm数据结构实现,每个state对应一个列族,占有独立的writebuffer,rocksdb占用native内存大小为 blockCahe + writebufferNum * writeBuffer + index ,同时堆外内存是进程之间共享的,jvm虚拟化大量heap内存耗时较久,使用堆外内存的话可以有效的避免该环节。但堆外内存也有一定的弊端,即监控调试使用相对复杂,对于生命周期较短的segment使用堆内内存开销更低,flink在一些情况下,直接操作二进制数据,避免一些反序列化带来的开销。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。

内存管理

类似于OS中的page机制,flink模拟了操作系统的机制,通过page来管理内存,flink对应page的数据结构为dataview和MemorySegment,memorysegment是flink内存分配的最小单位,默认32kb,其可以在堆上也可以在堆外,flink通过MemorySegment的数据结构来访问堆内堆外内存,借助于flink序列化机制(序列化机制会在下一小节讲解),memorysegment提供了对二进制数据的读取和写入的方法,flink使用datainputview和dataoutputview进行memorysegment的二进制的读取和写入,flink可以通过HeapMemorySegment 管理堆内内存,通过HybridMemorySegment来管理堆内和堆外内存,MemorySegment管理jvm堆内存时,其定义一个字节数组的引用指向内存端,基于该内部字节数组的引用进行操作的HeapMemorySegment。

public abstract class MemorySegment {
    /**
     * The heap byte array object relative to which we access the memory.
     *  如果为堆内存,则指向访问的内存的引用,否则若内存为非堆内存,则为null
     * 

Is non-null if the memory is on the heap, and is null, if the memory is * off the heap. If we have this buffer, we must never void this reference, or the memory * segment will point to undefined addresses outside the heap and may in out-of-order execution * cases cause segmentation faults. */ protected final byte[] heapMemory; /** * The address to the data, relative to the heap memory byte array. If the heap memory byte * array is null, this becomes an absolute memory address outside the heap. * 字节数组对应的相对地址 */ protected long address; }

HeapMemorySegment用来分配堆上内存。

public final class HeapMemorySegment extends MemorySegment {
    /**
     * An extra reference to the heap memory, so we can let byte array checks fail by the built-in
     * checks automatically without extra checks.
     * 字节数组的引用指向该内存段
     */
    private byte[] memory;
    public void free() {
        super.free();
        this.memory = null;
    }
 
    public final void get(DataOutput out, int offset, int length) throws IOException {
        out.write(this.memory, offset, length);
    }
}

HybridMemorySegment即支持onheap和offheap内存,flink通过jvm的unsafe操作,如果对象o不为null,为onheap的场景,并且后面的地址或者位置是相对位置,那么会直接对当前对象(比如数组)的相对位置进行操作。如果对象o为null,操作的内存块不是JVM堆内存,为off-heap的场景,并且后面的地址是某个内存块的绝对地址,那么这些方法的调用也相当于对该内存块进行操作。

public final class HybridMemorySegment extends MemorySegment {
  @Override
    public ByteBuffer wrap(int offset, int length) {
        if (address <= addressLimit) {
            if (heapMemory != null) {
                return ByteBuffer.wrap(heapMemory, offset, length);
            }
            else {
                try {
                    ByteBuffer wrapper = offHeapBuffer.duplicate();
                    wrapper.limit(offset + length);
                    wrapper.position(offset);
                    return wrapper;
                }
                catch (IllegalArgumentException e) {
                    throw new IndexOutOfBoundsException();
                }
            }
        }
        else {
            throw new IllegalStateException("segment has been freed");
        }
    }
}

flink通过MemorySegmentFactory来创建memorySegment,memorySegment是flink内存分配的最小单位。对于跨memorysegment的数据方位,flink抽象出一个访问视图,数据读取datainputView,数据写入dataoutputview。

/**
 * This interface defines a view over some memory that can be used to sequentially read the contents of the memory.
 * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
 */
@Public
public interface DataInputView extends DataInput {
private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 该组memorysegment可以视为一个内存页,
flink可以顺序读取memorysegmet中的数据
/**
     * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
     * It returns the number of read bytes or -1 if there is no more data left.
     * @param b byte array to store the data to
     * @param off offset into byte array
     * @param len byte length to read
     * @return the number of actually read bytes of -1 if there is no more data left
     */
    int read(byte[] b, int off, int len) throws IOException;
}

dataoutputview是数据写入的视图,outputview持有多个memorysegment的引用,flink可以顺序的写入segment。

/**
 * This interface defines a view over some memory that can be used to sequentially write contents to the memory.
 * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
 */
@Public
public interface DataOutputView extends DataOutput {
private final List memory; // memorysegment的引用
/**
     * Copies {@code numBytes} bytes from the source to this view.
     * @param source The source to copy the bytes from.
     * @param numBytes The number of bytes to copy.
    void write(DataInputView source, int numBytes) throws IOException;
}

上一小节中讲到的managedmemory内存部分,flink使用memorymanager来管理该内存,managedmemory只使用堆外内存,主要用于批处理中的sorting、hashing、以及caching(社区消息,未来流处理也会使用到该部分),在流计算中作为rocksdbstatebackend的部分内存。memeorymanager通过memorypool来管理memorysegment。

/**
 * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
 * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
 * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
 * Any allocated memory has to be released to be reused later.
 * 

The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}). * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately * releases the underlying memory. */ public class MemoryManager { /** * Allocates a set of memory segments from this memory manager. *

The total allocated memory will not exceed its size limit, announced in the constructor. * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. * @param numberOfPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. */ public void allocatePages( Object owner, Collection target, int numberOfPages) throws MemoryAllocationException { } private static void freeSegment(MemorySegment segment, @Nullable Collection segments) { segment.free(); if (segments != null) { segments.remove(segment); } } /** * Frees this memory segment. *

After this operation has been called, no further operations are possible on the memory * segment and will fail. The actual memory (heap or off-heap) will only be released after this * memory segment object has become garbage collected. */ public void free() { // this ensures we can place no more data and trigger // the checks for the freed segment address = addressLimit + 1; } }

对于上一小节中提到的NetWorkMemory的内存,flink使用networkbuffer做了一层buffer封装。buffer的底层也是memorysegment,flink通过bufferpool来管理buffer,每个taskmanager都有一个netwokbufferpool,该tm上的各个task共享该networkbufferpool,同时task对应的localbufferpool所需的内存需要从networkbufferpool申请而来,它们都是flink申请的堆外内存。

上游算子向resultpartition写入数据时,申请buffer资源,使用bufferbuilder将数据写入memorysegment,下游算子从resultsubpartition消费数据时,利用bufferconsumer从memorysegment中读取数据,bufferbuilder与bufferconsumer一一对应。同时这一流程也和flink的反压机制相关。如图

/**
 * A buffer pool used to manage a number of {@link Buffer} instances from the
 * {@link NetworkBufferPool}.
 * 

Buffer requests are mediated to the network buffer pool to ensure dead-lock * free operation of the network stack by limiting the number of buffers per * local buffer pool. It also implements the default mechanism for buffer * recycling, which ensures that every buffer is ultimately returned to the * network buffer pool. *

The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to * match its new size. */ class LocalBufferPool implements BufferPool { @Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException { MemorySegment segment = null; synchronized (availableMemorySegments) { returnExcessMemorySegments(); if (availableMemorySegments.isEmpty()) { segment = requestMemorySegmentFromGlobal(); } // segment may have been released by buffer pool owner if (segment == null) { segment = availableMemorySegments.poll(); } if (segment == null) { availabilityHelper.resetUnavailable(); } if (segment != null && targetChannel != UNKNOWN_CHANNEL) { if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { unavailableSubpartitionsCount++; availabilityHelper.resetUnavailable(); } } } return segment; } } /** * A result partition for data produced by a single task. * *

This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one * or more {@link ResultSubpartition} instances, which further partition the data depending on the * number of consuming tasks and the data {@link DistributionPattern}. *

Tasks, which consume a result partition have to request one of its subpartitions. The request * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel}) The life-cycle of each result partition has three (possibly overlapping) phases: Produce Consume Release Buffer management State management */ public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { @Override public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { checkInProduceState(); return bufferPool.requestBufferBuilderBlocking(targetChannel); } } }

自定义序列化框架

flink对自身支持的基本数据类型,实现了定制的序列化机制,flink数据集对象相对固定,可以只保存一份schema信息,从而节省存储空间,数据序列化就是java对象和二进制数据之间的数据转换,flink使用TypeInformation的createSerializer接口负责创建每种类型的序列化器,进行数据的序列化反序列化,类型信息在构建streamtransformation时通过typeextractor根据方法签名类信息等提取类型信息并存储在streamconfig中。

/**
     * Creates a serializer for the type. The serializer may use the ExecutionConfig
     * for parameterization.
     * 创建出对应类型的序列化器
     * @param config The config used to parameterize the serializer.
     * @return A serializer for this type.
     */
    @PublicEvolving
    public abstract TypeSerializer createSerializer(ExecutionConfig config);
/**
 * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
 * functions.
 */
@Public
public class TypeExtractor {
/**
 * Creates a {@link TypeInformation} from the given parameters.
     * If the given {@code instance} implements {@link ResultTypeQueryable}, its information
     * is used to determine the type information. Otherwise, the type information is derived
     * based on the given class information.
     * @param instance            instance to determine type information for
     * @param baseClass            base class of {@code instance}
     * @param clazz                class of {@code instance}
     * @param returnParamPos    index of the return type in the type arguments of {@code clazz}
     * @param                 output type
     * @return type information
     */
    @SuppressWarnings("unchecked")
    @PublicEvolving
    public static  TypeInformation createTypeInfo(Object instance, Class<&#63;> baseClass, Class<&#63;> clazz,
 int returnParamPos) {
        if (instance instanceof ResultTypeQueryable) {
            return ((ResultTypeQueryable) instance).getProducedType();
        } else {
            return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
        }
    }
}

对于嵌套的数据类型,flink从最内层的字段开始序列化,内层序列化的结果将组成外层序列化结果,反序列时,从内存中顺序读取二进制数据,根据偏移量反序列化为java对象。flink自带序列化机制存储密度很高,序列化对应的类型值即可。

flink中的table模块在memorysegment的基础上使用了BinaryRow的数据结构,可以更好地减少反序列化开销,需要反序列化是可以只序列化相应的字段,而无需序列化整个对象。

同时你也可以注册子类型和自定义序列化器,对于flink无法序列化的类型,会交给kryo进行处理,如果kryo也无法处理,将强制使用avro来序列化,kryo序列化性能相对flink自带序列化机制较低,开发时可以使用env.getConfig().disableGenericTypes()来禁用kryo,尽量使用flink框架自带的序列化器对应的数据类型。

缓存友好的数据结构

cpu中L1、L2、L3的缓存读取速度比从内存中读取数据快很多,高速缓存的访问速度是主存的访问速度的很多倍。另外一个重要的程序特性是局部性原理,程序常常使用它们最近使用的数据和指令,其中两种局部性类型,时间局部性指最近访问的内容很可能短期内被再次访问,空间局部性是指地址相互临近的项目很可能短时间内被再次访问。

结合这两个特性设计缓存友好的数据结构可以有效的提升缓存命中率和本地化特性,该特性主要用于排序操作中,常规情况下一个指针指向一个对象,排序时需要根据指针pointer获取到实际数据,然后再进行比较,这个环节涉及到内存的随机访问,缓存本地化会很低,使用序列化的定长key + pointer,这样key就会连续存储到内存中,避免的内存的随机访问,还可以提升cpu缓存命中率。对两条记录进行排序时首先比较key,如果大小不同直接返回结果,只需交换指针即可,不用交换实际数据,如果相同,则比较指针实际指向的数据。

以上就是详解大数据处理引擎Flink内存管理的详细内容,更多关于大数据处理引擎Flink内存管理的资料请关注其它相关文章!


推荐阅读
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • java.lang.Class.getDeclaredMethod()方法java.lang.Class.getDeclaredMethod()方法用法实例教程-方法返回一个Met ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • 前言本篇为大家总结社区多人合作常见的场景和对应的git操作命令。本篇非新手教程,阅读本篇前需具备Git基础知识。Git入门教程请参考https://www ... [详细]
  • Flink(三)IDEA开发Flink环境搭建与测试
    一.IDEA开发环境1.pom文件设置1.8 ... [详细]
  • 数据库异常智能分析与诊断
    数据库,异常, ... [详细]
  • flink 本地_Flink本地安装和创建Flink应用
    本篇介绍一下Flink本地环境搭建和创建Flink应用。本地安装Flink可以在Linux、MacOSX和Windows上运行,要求安装Java8.x。java-ve ... [详细]
  • 在计算机领域,数据仓库(DW或DWH),是一个用于报告和数据分析的零碎,被认为是商业智能的一个外围组成部分。它将以后和历史数据存储在一个中央,为整个企 ... [详细]
  • 目录摘要SQL的现在NoSQL,NotOnlySQL要分布式,也要SQL总结引用摘要毫不夸张的说,关系数据库是企业软件系统的核心,企业形形色色信息行为的背后,都有关系数据库的支撑。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
author-avatar
陈雅杰昱宏
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有