秒级查询之开源分布式SQL查询引擎Presto实操-上

@

概述

定义

Presto 官网地址 https://prestodb.io/

Presto 官网文档 https://prestodb.io/docs/current/

Presto GitHub源码地址 https://github.com/prestodb/presto

Presto是一个开源的分布式SQL查询引擎,特点是快速、可靠、高效,也可便捷快速的集成到湖仓一体架构中。最新版本为0.278

Presto是 Facebook 开源的 MPP (Massive Parallel Processing) SQL 引擎,数据量支持GB到PB字节;常用于大数据交互式查询场景,其支持并行查询分布在一个或多个异构数据源上的大型数据集,可实现亚秒级响应性能。Presto旨在处理数据仓库和数据分析,聚合大量数据并生成报告。也即是Presto适合于在线分析处理(OLAP)。虽然其查询性能稍逊于Impala,但是Presto功能则更加强大,支持丰富数据源包含Hive、图数据库、传统关系型数据库、Redis等。

注意:Presto不是一个通用的关系数据库;它不是MySQL、PostgreSQL或Oracle等数据库的替代品,其主要为处理联机事务处理(OLTP)而设计的。

概念

  • 服务器类型:resource manager(资源管理器), coordinators(协调器)、 workers(工作者)
    • Resource Manager:聚合来自所有coordinators和workers的数据的服务器,并构建集群的全局视图。立即安装与分解coordinators必须需要资源管理器。集群支持多个资源管理器,每个资源管理器充当一个主资源管理器。coordinators和workers使用thrift API与资源管理器通信。
    • Coordinators:负责解析语句、规划查询和管理Presto工作节点的服务器。它是Presto安装的“大脑”,也是客户端连接到的节点,以提交语句以执行。每个Presto安装必须有一个Presto coordinators和一个或多个worker工人。跟踪每个worker上的活动,并协调查询的执行。coordinators创建一个包含一系列阶段的查询逻辑模型,然后将其转换为在Presto worker集群上运行的一系列连接任务。coordinators使用REST API与worker和客户机通信。如果是仅用于开发或测试的目的,可以配置一个Presto实例来执行这两个角色。
    • Workers:是Presto安装中的服务器,负责执行任务和处理数据。workers节点从连接器获取数据,并彼此交换中间数据。coordinators负责从worker获取结果,并将最终结果返回给客户机。当一个Presto工作进程启动时,它会在coordinators中将自己发布给发现服务器,这使得它可以供Presto协调器执行任务。workers使用REST API与其他workers和Presto coordinators进行通信。
  • 数据源
    • Connector :连接器使Presto适应数据源,如Hive或关系数据库,是Presto SPI的实现,使用标准API与资源交互。Presto内置连接器有用于JMX的连接器、提供对内置系统表访问的系统连接器、Hive连接器和用于提供TPC-H基准测试数据的TPCH连接器。也支持第三方开发连接器,以便Presto可以访问各种数据源中的数据。
    • Catalog:包含模式并通过连接器引用数据源。例如配置一个JMX目录,以通过JMX连接器提供对JMX信息的访问。catalog定义在存储在Presto配置目录中的属性文件中。
    • Schema:是一种组织表的方法。和schema一起定义了一组可以查询的表。使用Presto访问Hive或关系数据库(如MySQL)时,模式在目标数据库中转换为相同的概念。其他类型的连接器可能选择以对底层数据源有意义的方式将表组织到模式中。
    • Table:表是一组无序的行,它们被组织成具有类型的命名列。这与任何关系数据库中的情况相同。从源数据到表的映射由连接器定义。
  • 查询执行模型:Presto执行SQL语句,并将这些语句转换为coordinators和workers的分布式集群执行的查询。
    • Statement:Presto执行ansi兼容的SQL语句,该标准由子句、表达式和谓词组成。
    • Query:解析一条语句时,它将其转换为一个查询,并创建一个分布式查询计划,然后将其实现为在Presto worker上运行的一系列相互连接的阶段。语句和查询之间的区别很简单。一条语句可以被认为是传递给Presto的SQL文本,而查询则是指为执行该语句而实例化的配置和组件。查询包括阶段、任务、分段、连接器以及协同工作以产生结果的其他组件和数据源。
    • Stage:Presto执行查询时,通过将执行分解为阶段层次结构来执行。例如需要聚合Hive中存储的十亿行的数据,它会创建一个根阶段来聚合其他几个阶段的输出,所有这些阶段都是为了实现分布式查询计划的不同部分而设计的。组成查询的阶段层次结构类似于树。每个查询都有一个根阶段,负责聚合来自其他阶段的输出。阶段是协调器用来建模分布式查询计划,但是阶段本身并不在Presto worker上运行。
    • Task:stage对分布式查询计划的特定部分建模,但stage本身并不在Presto worker上执行。任务是Presto体系结构中的工作项,因为分布式查询计划被分解为一系列阶段,然后转换为任务,然后这些任务作用于或处理分割。Presto任务有输入和输出,就像一个阶段可以由一系列任务并行执行一样,一个任务也可以与一系列驱动程序并行执行。
    • Split:任务在分片上操作,分片是更大数据集的部分。分布式查询计划的最低级别的阶段通过连接器的分割检索数据,分布式查询计划较高级别的中间阶段从其他阶段检索数据。当Presto调度查询时,协调器将查询一个连接器,以获得一个表中可用的所有分割的列表。协调器跟踪哪些机器正在运行哪些任务,以及哪些任务正在处理哪些分割。
    • Driver:任务包含一个或多个并行驱动程序。驱动程序作用于数据并结合操作符以产生输出,然后由一个任务聚合,然后交付给另一个阶段的另一个任务。驱动程序是操作符实例的序列,它是Presto体系结构中并行度的最低级别。驱动程序有一个输入和一个输出。
    • Operator:操作符消费、转换和生成数据。例如,表扫描从连接器获取数据并生成可被其他操作符使用的数据,筛选操作符使用数据并通过对输入数据应用谓词来生成子集。
    • Exchange:交换在Presto节点之间为查询的不同阶段传输数据。任务将数据生成到输出缓冲区,并使用交换客户机使用来自其他任务的数据。

架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-puiIUWlb-1670510239411)(image-20221208091733068.png)]

  • 先从Presto CLI提交到Coordinator,也即是由客户端来提交查询请求。
  • Coordinator通过SQL解析生成查询计划并把任务分发给一个或多个worker去执行。
  • Worker负责执行具体任务和处理数据。
  • Catelog表示数据源,一个catelog包括connector和schema、table。
    • Connector是连接适配器,用于Presto和数据源(如hive,redis)的连接,类似于JDBC。也可以自定义编程实现连接器
    • Schema类似于mysql中数据库,table类似于mysql中表。
  • Coordinator是负责从worker获取结果并返回最终结果给client。

Presto查询请求是分Stage阶段执行,示例如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WeSqiwpp-1670510239413)(image-20221208111107526.png)]

优缺点

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VCnUmXmQ-1670510239414)(image-20221208111752965.png)]

  • 优点
    • Presto基于内存运算,减少了磁盘IO,因此计算更快。
    • Presto 支持多数据源,且能够同时连接多个数据源进行跨数据源连表查询;比如可以从Hive查询大量APP网站访问记录然后从Mysql中关联匹配出对应的设备信息。
    • 部署比Hive更简单(Hive是依赖于HDFS)。
  • 缺点
    • Presto 能够处理PB级别的海量数据分析,但Presto并不是把PB级数据都放在内存中计算的。而是根据场景,如Count、AVG等聚合hanshu ,是边读数据边计算,再清内存然后重复读数据和计算,这种耗的内存并不高。但是连表查就可能出现大量的临时数据,因此速度会变慢。

连接器

支持连接器很多,从关系数据库、NoSQL数据库、Hive等,还包括对支持目前主流三大数据湖技术Delta Lake、Hudi、Iceberg的连接器

  • Accumulo Connector
  • BigQuery Connector
  • Black Hole Connector
  • Cassandra Connector
  • ClickHouse connector
  • Delta Lake Connector
  • Druid Connector
  • Elasticsearch Connector
  • Hive Connector
  • Hive Security Configuration
  • Hudi connector
  • Iceberg Connector
  • JMX Connector
  • Kafka Connector
  • Kafka Connector Tutorial
  • Kudu Connector
  • Lark Sheets connector
  • Local File Connector
  • Memory Connector
  • MongoDB Connector
  • MySQL Connector
  • Oracle Connector
  • Apache Pinot Connector
  • PostgreSQL Connector
  • Prometheus Connector
  • Redis Connector
  • Redshift Connector
  • SQL Server Connector
  • System Connector
  • Thrift Connector
  • TPCDS Connector
  • TPCH Connector

部署

集群安装

# 创建presto的数据目录, mkdir presto-data # 下载最新版的presto二进制包 wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.278/presto-server-0.278.tar.gz # 解压 tar -xvf presto-server-0.278.tar.gz # 进入根目录 cd presto-server-0.278 # 创建配置目录 mkdir etc # 创建catalog mkdir etc/catalog 

添加节点配置文件,vim etc/node.properties

# 环境的名称,集群中的所有Presto节点必须具有相同的环境名称。 node.environment=production # 节点的id,此Presto安装的唯一标识符。这对于每个节点都必须是唯一的。这个标识符应该在重启或升级Presto时保持一致。如果在一台机器上运行多个Presto安装(即同一台机器上的多个节点),每个安装必须有唯一的标识符。 node.id=ffffffff-ffff-ffff-ffff-fffffffffff1 # 节点数据目录的位置(文件系统路径),Presto将在这里存储日志和其他数据。 node.data-dir=/home/commons/presto-data 

添加JVM参数配置,vim etc/jvm.config

-server -Xmx16G -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:+ExitOnOutOfMemoryError 

如果规划为coordinator节点,添加的主配置文件,vim etc/config.properties

coordinator=true node-scheduler.include-coordinator=false http-server.http.port=8084 query.max-memory=50GB query.max-memory-per-node=1GB discovery-server.enabled=true discovery.uri=http://hadoop1:8084 

如果规划为worker节点,添加的主配置文件,vim etc/config.properties

coordinator=false http-server.http.port=8084 query.max-memory=50GB query.max-memory-per-node=1GB discovery.uri=http://hadoop1:8084 

配置日志级别,vim etc/log.properties

# 日志级别有四种,DEBUG, INFO, WARN and ERROR com.facebook.presto=INFO 

在etc/catalog/目录配置Presto Hive连接器,vim etc/catalog/hive.properties

connector.name=hive-hadoop2 hive.metastore.uri=thrift://hadoop2:9083 #如果hive metastore的引用文件存放在一个存在联邦的HDFS上,或者你是通过其他非标准的客户端来访问HDFS集群的,请添加以下配置信息来指向你的HDFS配置文件: hive.config.resources=/home/commons/hadoop/etc/hadoop/core-site.xml,/home/commons/hadoop/etc/hadoop/hdfs-site.xml 
# 将数据目录和安装目录presto-server-0.278拷贝到其他节点,修改node.id每台唯一,这里以一个协调节点和3个worker节点为例 # 启动每个节点,安装目录中包含bin/launcher中的启动器脚本,Presto可以作为一个守护进程启动,运行命令如下 bin/launcher start # 可以在前台运行,日志和其他输出被写入stdout/stderr bin/launcher run 

常用配置说明

  • coordinator:指定是否运维Presto实例作为一个coordinator(接收来自客户端的查询情切管理每个查询的执行过程)。
  • node-scheduler.include-coordinator:是否允许在coordinator服务中进行调度工作。对于大型的集群,在一个节点上的Presto server即作为coordinator又作为worke将会降低查询性能。因为如果一个服务器作为worker使用,那么大部分的资源都不会被worker占用,那么就不会有足够的资源进行关键任务调度、管理和监控查询执行。
  • http-server.http.port:指定HTTP server的端口。Presto 使用 HTTP进行内部和外部的所有通讯。
  • task.max-memory=1GB:一个单独的任务使用的最大内存 (一个查询计划的某个执行部分会在一个特定的节点上执行)。 这个配置参数限制的GROUP BY语句中的Group的数目、JOIN关联中的右关联表的大小、ORDER BY语句中的行数和一个窗口函数中处理的行数。 该参数应该根据并发查询的数量和查询的复杂度进行调整。如果该参数设置的太低,很多查询将不能执行;但是如果设置的太高将会导致JVM把内存耗光。
  • discovery-server.enabled:Presto 通过Discovery 服务来找到集群中所有的节点。为了能够找到集群中所有的节点,每一个Presto实例都会在启动的时候将自己注册到discovery服务。Presto为了简化部署,并且也不想再增加一个新的服务进程,Presto coordinator 可以运行一个内嵌在coordinator 里面的Discovery 服务。这个内嵌的Discovery 服务和Presto共享HTTP server并且使用同样的端口。
  • discovery.uri:Discovery server的URI。由于启用了Presto coordinator内嵌的Discovery 服务,因此这个uri就是Presto coordinator的uri。修改example.net:8080,根据你的实际环境设置该URI。注意:这个URI一定不能以“/“结尾。

资源管理安装模式

如果规模大可以部署为资源管理器、协调器池、worker池的集群模式。

  • 一个集群至少需要1个资源管理器,可以向集群中添加更多资源管理器,每个资源管理器都充当主资源管理器。资源管理器的配置节点,vim etc/config.properties
resource-manager=true resource-manager-enabled=true coordinator=false node-scheduler.include-coordinator=false http-server.http.port=8080 thrift.server.port=8081 query.max-memory=50GB query.max-memory-per-node=1GB discovery-server.enabled=true discovery.uri=http://hadoop1:8080 thrift.server.ssl.enabled=true 
  • 集群支持协调器池。每个协调器将运行集群中的查询子集。协调器的配置节点,vim etc/config.properties
coordinator=true node-scheduler.include-coordinator=false http-server.http.port=8080 query.max-memory=50GB query.max-memory-per-node=1GB discovery.uri=http://hadoop1:8080 resource-manager-enabled=true 
  • 集群支持worker池,把自己的心跳发给资源管理器。worker的配置节点,vim etc/config.properties
coordinator=false http-server.http.port=8080 query.max-memory=50GB query.max-memory-per-node=1GB discovery.uri=http://hadoop1:8080 resource-manager-enabled=true 

安装命令行界面

# 下载最新版本0.278 wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.278/presto-cli-0.278-executable.jar # 赋可执行权限 chmod a+x ./presto-cli-0.278-executable.jar # 进入命令行界面 ./presto-cli-0.278-executable.jar --server hadoop1:8084 --catalog hive --schema default # 执行sql presto:default> show schemas; presto:default> use test; presto:test> show tables; presto:test> select * from emp_mid; 

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XPSx33KD-1670510239416)(image-20221208181548599.png)]

基于Tableau Web 连接器

Tableau的Presto web连接器允许用户从Tableau对Presto运行查询。它实现了Tableau web连接器API中的函数。

直接访问http://hadoop1:8084/tableau/presto-connector.html

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y4ZdXkuq-1670510239417)(image-20221208182302170.png)]

使用优化

数据存储

  • 合理设置分区:与Hive类似,Presto会根据元信息读取分区数据,合理的分区能减少Presto数据读取量,提升查询性能。
  • 使用列式存储:Presto对ORC文件读取做了特定优化,因此在Hive中创建Presto使用的表时,建议采用ORC格式存储;相对于Parquet,Presto对ORC支持更好。
  • 使用压缩:数据压缩可以减少节点间数据传输对IO带宽压力,对于即席查询需要快速解压,建议采用Snappy压缩。
  • 预先排序:对于已经排序的数据,在查询的数据过滤阶段,ORC格式支持跳过读取不必要的数据,比如对于经常需要过滤的字段可以预先排序。

查询SQL优化

  • 只选择使用必要的字段:由于采用列式存储,选择需要的字段可加快字段的读取、减少数据量。避免采用*读取所有字段。
  • 过滤条件必须加上分区字段:对于有分区的表,where语句中优先使用分区字段进行过滤。acct_day是分区字段,visit_time是具体访问时间。
  • Group By语句优化:合理安排Group by语句中字段顺序对性能有一定提升。将Group By语句中字段按照每个字段distinct数据多少进行降序排列。
  • Order by时使用Limit:Order by需要扫描数据到单个worker节点进行排序,导致单个worker需要大量内存。如果是查询Top N或者Bottom N,使用limit可减少排序计算和内存压力。
  • 使用近似聚合函数Presto有一些近似聚合函数,对于允许有少量误差的查询场景,使用这些函数对查询性能有大幅提升。比如使用approx_distinct() 函数比Count(distinct x)有大概2.3%的误差。SELECT approx_distinct(user_id) FROM access
  • 用regexp_like代替多个like语句:Presto查询优化器没有对多个like语句进行优化,使用regexp_like对性能有较大提升。
  • 使用Join语句时将大表放在左边:Presto中join的默认算法是broadcast join,即将join左边的表分割到多个worker,然后将join右边的表数据整个复制一份发送到每个worker进行计算。如果右边的表数据量太大,则可能会报内存溢出错误。
  • 使用Rank函数代替row_number函数来获取TopN。

无缝替换Hive表

  • 建立对应的orc表
  • 先将数据灌入orc表,然后更换表名
  • 其中原表不要删除,若线上运行一段时间后没有出现问题,则可以删除该源表。

建表格式的选择

  • ORC和Parquet都支持列式存储,但是ORC对Presto支持更好(Parquet对Impala支持更好)
  • 对于列式存储而言,存储文件为二进制的,对于经常增删字段的表,建议不要使用列式存储(修改文件元数据代价大)。对比数据仓库,dwd层建议不要使用ORC,而dm层则建议使用。

本人博客网站IT小神 www.itxiaoshen.com

发表评论

相关文章