牛宝体育新闻
牛宝体育从T+1到秒级爱奇艺大数据实时化建设与演进
看过去:针对以往数据,制作天级别、小时级别或实时的报表;负责内容、会员的运营工作,基于数据分析进行决策。
爱奇艺大数据实时应用包括实时广告系统、实时推荐和搜索、实时热度,以及实时风控。下方右图是应用搜索界面,搜索推荐基于用户及其他个性化信息实时计算生成。
以上提及的产品通过爱奇艺大数据服务体系实现。如上图,爱奇艺大数据服务体系主要由数据采集、数据处理、数据应用等方面构成。
数据处理包括大数据基础架构的存储和计算层,以及上层的数据分析引擎和平台;
我们的集群部署模式由通用集群和少量专用集群组成,总共有1万多台机器、500PB以上的存储,每天会运行50多万个批处理任务及4000多个流计算任务。
第一阶段是比较原始的Hive on MapReduce。在此阶段,我们借助Hive工具实现了SQL化的分析牛宝体育,通过SQL在Hadoop上构建离线数仓。SQL避免用户自己写MapReduce的Java代码,解决了大数据的初步问题之一。但随着业务发展、实时性需求增加,离线分析的处理延时难以满足业务需要。
第二阶段采用基于Spark SQL分析,大大加速离线数仓的构建进程,同时也探索基于Flink的实时数据处理。这个阶段我们初步引入Flink,用户直接写Flink任务代码,相比于基于SQL的离线分析,开发和运维的难度高了不少。同时,因为需要维护离线和实时两条链路,成本较高,且存在流与批数据不一致等问题。
第三阶段主要进行两项工作:一方面是实时计算SQL化,我们引入了统一元数据,简化了Flink SQL的开发,使得撰写实时计算的逻辑与Hive SQL一样简单;另一方面,引入数据湖Iceberg,初步实现流批一体。
爱奇艺的Spark SQL服务基于开源的Apache Kyuubi搭建,因为直接使用Spark Thrift Server服务有很多缺点,比如不支持多租户、资源隔离较难实现等。
引入Kyuubi后,整体架构如上图所示。我们在Hadoop集群上层搭建了Kyuubi Server集群,再上层通过Pilot统一SQL网关(自研服务)接入,最上层是离线计算的Gear工作流调度系统和魔镜即席查询平台,分别承接定时工作流以及Ad-hoc的即席查询。
除此之外,Kyuubi Server和Spark任务引擎会注册到ZooKeeper服务发现集群中,供其调用方进行服务发现,由此实现了高可用性,去除了单点故障。
多租户:企业内部包含各个业务线和不同的Hadoop user,只需部署一套服务即可支持不同用户的访问;
资源隔离:我们为每个来自定时工作流的例行化任务,启动独立的Spark引擎,各个任务间资源隔离,不会互相影响;
快速Ad-hoc支持:我们也为来自即席查询的任务,启动用户级别的共享Spark引擎,无需在每次查询时都花费1分钟去启动Spark,使得查询响应低至秒级;
高扩展性:单个Kyuubi实例支持上千个连接,且每个查询只需要花费Kyuubi实例很少的资源,因此每台Kyuubi服务器都可承担很高的工作负载;
高可用性:由于采用了ZooKeeper的服务注册与发现机制,单个Kyuubi实例故障不影响任务运行;
执行计划优化:为了加快SQL的运行速度,Kyuubi默认支持了一些查询优化规则,并且允许用户在此基础上扩展,添加新的优化规则。
爱奇艺在作为Apache Kyuubi用户的同时,也积极参与社区讨论,回馈社区,目前共有70多个patches被社区接受。
除了通过Kyuubi建立Spark SQL的服务之外,我们也对Spark本身进行了优化,使得计算速度更快,资源更节省。
由于我们平台上每天都会运行大量的任务,用户很难为每个任务配置一个合适的资源量,因此经常出现任务的并行度不足或资源浪费的问题。DRA是Spark提供的已经比较成熟的功能,开启之后能够根据并行度需求自动申请或释放资源,在避免资源浪费的同时,还能在一定程度上加快任务运行。
AQE的定义是根据Spark任务运行时的数据,动态修改查询计划。因此它是一个优化框架,而非特定功能,用户可以扩展各种优化规则。
社区版Spark自带的优化规则包括Shuffle分区合并、自动转换为广播Join、Join倾斜优化等。我们基于Kyuubi进行功能扩展,比如自动合并小文件、末级Stage配置隔离等。其中,末级Stage配置隔离是一个非常好用的优化规则,它允许在配置层面,为普通Stage和末级Stage分别配置处理并行度。
这样,我们可以在末级Stage上按目标文件大小设置并行度,以合并小文件;在普通Stages上配置较高的并行度,加速任务处理,达成两者兼顾的效果。
爱奇艺内部使用Apache Atlas管理数据血缘,为此我们将其元数据和血缘投递的逻辑集成到了Kyuubi中,使得Kyuubi在运行Spark SQL任务时,能够自动向Atlas投递血缘。我们已将这一功能贡献给了社区(KYUUBI #4814),将在即将发布的Kyuubi V1.8版本中可用。
在引入Apache Uniffle前,存在两种问题:一个是Shuffle不稳定,比如大数据量情况下,下载数据失败,出现fetch failure的报错;另一个是存算分离的云原生架构下,计算节点容量、IO性能不足。
增强大数据量下的稳定性:Shuffle 10TB以上不再失败,可平稳运行
爱奇艺作为Apache Uniffle的共同贡献者,深度参与了社区讨论和贡献。欢迎大家试用并提出反馈意见。
在支持Spark SQL后,已有的大量Hive任务需要迁移过来。迁移过程会遇到两种问题:
运行失败:Spark SQL的语法并不是100%兼容Hive,其语法更加严格;
数据不一致:相比于运行失败更加麻烦,因为用户无法立即发现,且对比数据一致性的工作也更加复杂。
为了解决迁移的问题,牛宝体育我们基于Pilot SQL网关开发了“双跑对数”的功能,在迁移前自动预测迁移结果,运行步骤如下:
结果一致性校验:行数、CRC32(浮点型保留小数点后4位,避免因精度不同导致的判断错误)
使用“双跑对数”功能之后,我们在迁移的过程中发现了一些问题,其中有部分可以通过优化Spark SQL的兼容性来解决,进一步降低用户迁移的工作量:
用户在Hive中设置很多参数,比如reduce的个数,但这些参数在Spark中原本无法被识别,我们通过参数映射,将其转化为Spark的相应参数,尽可能保留用户的SQL逻辑。
最后,使用“自动降级”功能令迁移顺利进行,即首次使用Spark运行失败后,重试时降级为Hive引擎。由此,迁移分为两个阶段:第一阶段开启自动降级,用户可以放心迁移,并通过降级的记录梳理出迁移失败的任务;第二阶段,将这些失败的任务修复后,再完全切换到Spark。
目前Hive迁移的总体进度已经达成90%,对于这些迁移的任务,平均性能提升了67%,资源(包括CPU、内存使用量)也降低了近一半。
在使用Spark SQL提高实时性的同时,我们也尝试引入Flink SQL,牛宝体育希望能够真正做到实时计算。但原生的Flink SQL如上示左图,比Hive SQL长很多,需要定义输入输出表,字段名称和类型,以及背后的数据源配置。应如何解决使用过程繁琐的问题?
我们引入了“统一元数据中心”的概念,类似于Hive的Metastore。因为Hive具有Metastore,所以无需反复定义输入输出表,写SQL非常简单,如上图中写三行语句即可。
我们将内部的各种数据,包括流式的Kafka和RocketMQ,传统数据库MySQL、Redis、HBase,以及数据湖产品,都集成到统一元数据中心,并开发了Flink Catalog、Flink Connectors与其对接。这样依赖,我们无需在每个任务中,牛宝体育重新定义表的结构以及连接串等信息,做到开箱即用,有效提升开发效率。
因为传统SQL(比如Hive、MySQL),输入是一个表,输出也是一个表,从表到表的SQL究竟能否表达流式的计算逻辑?我们认为是可以的。牛宝体育
这个观点具有理论支撑,一位来自Google的工程师Akidau在其著作《Streaming Systems》中,提出了流和表的“相对论”。他认为流和表本质上是数据的两种表现形式。他拆解了传统SQL表到表的过程,将其拆分为表到流、流到表、流到流三种操作的组合。
以上图右边的SQL举例,输入是一组用户得分,按照团队进行聚合,计算出每个团队的总分,输出到新的表中。它的输入表由4个字段组成:用户、团队、得分和时间。
让我们来拆解这个SQL的执行逻辑(假设这是离线计算)。首先,原始表并不是一次性加载到内存的,而是通过一个SCAN算子,一条一条地读入,变成内部的流。然后经过SELECT算子,去掉无用字段,保留team和score字段,得到了一个新的流。
最后,流的数据全部到齐后,一次性计算聚合的值,即把每个team的所有分数相加得到总分,再输出到目标表。由此看出,第一个操作SCAN是表到流,第二个操作SELECT是流到流,第三个操作GROUP BY是流到表。
从上面的SQL执行逻辑拆解可以看出,将传统SQL描述为表到表的操作,黑盒地看是对的,但在微观层面是不准确的,实际上是表到流、流到表、流到流三种操作的组合,唯一不存在的是直接的流到流的操作。
流计算的过程中包括很多要素,比如Map或Filter可以认为是一个流到流的操作,分组的聚合或窗口的聚合,就是流到表的过程;而通过定时的trigger或Watermark引起的trigger,是表到流的过程。
当把上面的SQL看成流计算时,会发现其拆解过程与离线计算一模一样,都是由SCAN(表到流)、SELECT(流到流)、GROUP BY(流到表)组成的。
因此,SQL对于流计算和离线计算来说,没有本质区别,所以它非常适合流计算的开发。SQL开发优势如下:
开发门槛低:相较于通过Java/Scala代码开发Flink任务,SQL的开发门槛较低。引入开箱即用的元数据后,SQL就更加简洁,用户无需学习Flink的API。
版本升级容易:Flink SQL对齐了SQL标准,语法相对稳定,跨版本升级改动较小。
运行效率高:因为Flink SQL具有一些参数,控制SQL执行的计划优化,无需复杂的代码逻辑实现这些功能。
在爱奇艺的实时计算平台上,目前SQL的任务占比已经达到2/3,已经能覆盖大部分的功能,所以较推荐内部用户使用SQL进行流计算的开发。
我们在存储侧也做了技术革新。传统方案使用Lambda架构,即离线一条通路、实时一条通路,在下游合并这两条通路。但这种架构存在明显问题:
我们通过引入数据湖技术,可以做到流批一体架构,即使用Flink与数据湖交互,实时写入、实时更新。数据湖技术解决了两条链路、实时性、以及实时通路容量不足的问题。由于无需维护两条通路,计算成本与存储成本比之前的模式更低。
爱奇艺选择的数据湖产品是Apache Iceberg,其具体好处将通过案例介绍。
上图是会员订单分析的应用场景。爱奇艺的会员业务有10多年的历史,每个会员订单都对应一条记录,订单表存储在MySQL中,这些表非常大。会员团队进行用户会员运营分析时,如果直接用MySQL对这些表进行查询,速度非常慢,因为MySQL对这种OLAP分析的场景支持不佳。
最原始的方案是通路1(上图标号1和2),先用内部数据集成工具BabelX将MySQL表全量导出到Hive,再使用Hive、Spark SQL或Impala查询。这条通路的问题是,MySQL的全量导出是一个天级别的任务,数据分析的时效性很差;每次导出的数据量很大,对MySQL产生很大压力;每天都在反复导出相同的数据,效率很低。
后来会员团队和我们合作了另外一条通路2(即上图标号3和4),通过内部工具,将MySQL的变更流实时导出到Kudu,用Impala进行查询。Kudu介于HDFS和HBase之间,既有实时写入的能力,又有分析型查询能力。这条通路的问题在于:
基于这些痛点,我们调研后发现Iceberg比较适合完成这种任务,我们选择了图中最下面的新通路:通过内部的RCP平台,使用Flink CDC技术实时导出到Iceberg中,在下游使用Spark SQL进行查询。
查询快:我们对查询性能进行了优化,使其达到了接近Impala+Kudu的查询性能
1)小文件智能合并:Iceberg表在写入过程中会产生很多小文件,积累到一定程度会严重影响查询性能;而合并小文件时,如果每次都全表合并,又会造成严重的写放大。为此我们开发了智能合并策略,基于分区下文件大小均方差,自动选择待合并的分区,最大程度地避免了写放大。
2)写Parquet文件开启BloomFilter:BloomFilter可以判断一组数据中是否不含指定数据,被Parquet等存储格式广泛使用,用来降低读取数据量。爱奇艺将这一特性集成到Iceberg中,在写Parquet文件时允许开启BloomFilter,在内部场景中取得了很好的效果。这一功能已贡献给社区(PR #4831)。
最终,查询的时间从900秒降低到10秒,达到了交互式查询的性能大数据,很好地满足了会员运营分析的需求。
爱奇艺的实时计算主要又两个平台:负责通用型计算任务的RCP实时计算平台、负责特定分析型需求的RAP实时分析平台。
基于原始数据,可通过RCP进行通用分析,将结果写入新的流、数据库或Iceberg,供线上服务和数据分析直接使用。如需根据事件流,制作实时报表等特定的复杂目标,可使用RAP平台。
支持多种开发模式:JAR/SQL/DAG,其中DAG模式是对SQL模式的进一步简化,通过图形化界面配置即可完成开发
如上图架构所示,Server层负责资源管理、任务管理、任务提交、监控报警等功能。Launcher层负责直接提交任务到运行集群,这一层包含内部的Flink版本和Spark版本,对于Flink,又包含了JAR/SQL/DAG引擎、接入统一元数据、以及各种数据源的connector。
RCP平台能结合各个数据库的Connnector,将传统数据库接入实时计算。
上图是针对广告库存计算的实时化改造。业务需要对多个MySQL表做Join,写入Redis中,供下游的实时任务查询。
原有方案是,使用Spark批处理作业,每10分钟全量拉取这些MySQL表,在Spark任务里进行Join。这个方案的问题是,每10分钟进行全量拉表,对MySQL压力较大,且整体写入Redis的数据时效性较差,至少延迟10分钟,这会导致业务数据的准确性下降。
改造后的方案见图中绿框,我们采用Flink CDC的方案。在Flink任务中配置三个CDC的source,由此实现对MySQL全量同步以及自动转增量拉取的过程;紧接着一个Join节点,负责实时计算Join结果。如此一来,Join的输出是实时更新的结果,上游MySQL表的更新会实时地反映到Redis中。
将以上三个实时的表Join数据写入Redis后,业务准确性明显提升,达到7个9;
RCP支持了各类CDC connector,降低了将数据库接入实时计算的门槛,主要的优势有:
相较其他实时同步方案,Flink方案可以实现读取和数据加工的一体性,比如无需借助Kafka实时队列,整个链路更加简单。
RCP平台支持故障诊断功能。针对单个任务,平台可自助排查故障原因,如下图所示:
如果需要分析整条链路,平台也提供了一键链路诊断的功能。只需点击一下,即可对链路上的所有实时计算作业,进行健康度情况分析,获取其最近的重启次数和消费延迟等信息大数据。
爱奇艺RAP(Real-time Analytics Platform)实时分析平台,提供一站式的大数据摄取、计算和分析能力,支持超大规模实时数据多维度的分析,并生成分钟级延时的可视化报表。主要特色是:
右图是直播期间每分钟的总UV值(同步在线人数),只需三个步骤就能完成该报表的配置:
首先选择接入数据源:此处选择一张Iceberg表(用户行为表),包含时间、设备ID、事件类型、app版本、运营商、省份等字段。
总体来说,只需少量的页面操作,即可配置一张实时报表,整个过程非常迅速。原先使用通用型工具开发此类报表,可能需要一周时间,但在RAP进行配置,仅需小时级别的时间,并且支持灵活的需求变更。目前,RAP平台已在爱奇艺的直播、会员监控等业务中广泛应用。
实时计算SQL化:进一步提升SQL化的比例,丰富 SQL化的配套支持,比如调试功能,增强使用SQL开发的便捷性。
实时化的数据集成平台:基于配置化的方式,完成同种数据源的不同集群、甚至不同种类数据源之间的数据同步。
流批一体新方案探索:跟进社区新动向,比如Hybrid Source和Apache Paimon等流批一体的存储和计算产品。