15# Apache HIVE 的学习笔记

 目录

[HIVE](#hive)
      1. [内部表和外部表的区别](#内部表和外部表的区别)
      2. [分区和分桶的区别](#分区和分桶的区别)
         1. [分区](#分区)
         2. [分桶](#分桶)
            1. [上传到分区目录, 令分区表和数据关联](#上传到分区目录-令分区表和数据关联)
      3. [order/sort/distribute/cluster by 的区别](#ordersortdistributecluster-by-的区别)
      4. [HIVE 的数据倾斜](#hive-的数据倾斜)
         1. [针对数据内容设置合理的 Map 数量](#针对数据内容设置合理的-map-数量)
         2. [小文件合并](#小文件合并)
         3. [复杂文件增加 Map 数](#复杂文件增加-map-数)
         4. [合理设置 Reduce 数](#合理设置-reduce-数)
      5. [HIVE 的 UDF 怎么实现](#hive-的-udf-怎么实现)
      6. [HIVE 的工作流](#hive-的工作流)
      7. [HIVE 分区是否越多越好](#hive-分区是否越多越好)
      8. [HIVE 调优](#hive-调优)
         1. [hive-site.xml](#hive-sitexml)
         2. [HIVE CLI 调整](#hive-cli-调整)
      9. [HIVE 压缩](#hive-压缩)
         1. [HIVE 数据压缩](#hive-数据压缩)
            1. [压缩配置参数](#压缩配置参数)
         2. [HIVE 文件压缩](#hive-文件压缩)
## HIVE

HIVE 是基于 Hadoop 的一个数据仓库工具, 可以将结构化的数据文件映射为一张数据库表, 并提供类 SQL 查询功能.

### 内部表和外部表的区别

- 内部表是 HIVE 管理的, 外部表是 HDFS 管理的.
- 创建表时: 内部表会将数据移动到数仓指向的路径; 外表仅记录所在的路径, 不对数据位置做改变.
- 删除表时: 内部表会删除元数据与储存数据; 删除外部表只会删除元数据. 这样外部表相对安全, 组织和数据共享也更灵活.

> 通常情况下使用外部表保证数据安全, 中间表, 结果表则使用内部表 (管理表).

### 分区和分桶的区别

#### 分区

指按照数据表的某列或某几列进行分区, 区域从形式上可以理解为文件目录, 若大量的数据保存在一个目录下, 查询的时候会很慢而且占用大量资源.

这个时候就可以按照数据中具有特征和共同性的字段作为分区字段, 不同特征存在不同分区, 这时查询只需要按照字段名就能在特定分区下查询.

> 简单理解分区就是 HDFS 上分目录, 分桶就是分成单独文件.

#### 分桶

分桶则是对分区更细粒度的划分.

分桶将整个数据内容安装某列属性值的哈希值进行分区. 按照某一属性分为 N 个桶, 就是对该属性值的哈希值对 N 取模, 按照结果对数据分桶.

##### 上传到分区目录, 令分区表和数据关联

- 上传数据后修复表:

```shell
dfs -mkdir -p path
dfs -put path
msck repair table table_name
```

- 上传数据后添加分区

```shell
dfs -mkdir -p path
dfs -put path
alter table table_name add partition (partition_colN=partition_valueN);
```

> 直接将新的分区文件上传到 HDFS, HIVE 没有对应元数据所以无法查询到.

### order/sort/distribute/cluster by 的区别

- order by 会对所给的全部数据进行全局排序, 不管数据量, 只启动一个 Reducer 来处理.
- sort by 会根据数据量的大小启动数个 Reducer 来处理, 并且进入 Reducer 前为每个 Reducer 产生一个排序文件.
    - sort by 不是全局排序, 其在数据进入 reducer 前完成排序; 如果进行排序, 并且设置 `mapred.reduce.Task, 每个 Task 都是一个 JVM 实例, JVM 的开启与销毁会降低系统运行效率s>1`, 则只保证每个 Reducer 的输出有序, 不保证全局有序.
    - sort by 不受 `hive.mapred.mode` 是否为 strict, nostrict 的影响.
    - sort by 的数据只能保证在同一个 Reduce 中的数据可以按指定字段排序.
    - 使用 sort by 可以指定执行的 Reduce 个数 `set mapred.reduce.Task, 每个 Task 都是一个 JVM 实例, JVM 的开启与销毁会降低系统运行效率 s= N`; 对输出的数据执行归并排序, 可以得到全部结果.
        > 注意: 可以用 Limit 子句减少数据量. 使用 Limit N 后, 传输到 Reduce 端 (单机) 的数据记录减少到 $N\times{MapNumber}$ . 否则由于数据过大可能出不了结果.
- distribute by 控制 Map 结果的分发, 将相同字段的 Map 输出分发到一个 Reducer 上处理.
- cluster by 是前两者的结合版, 当 distribute by 和 sort by 后面所跟的列名相同时, 就等同于直接使用 cluster by 该列名, 但是 cluster by 指定的列, 最终的排序结果是降序排列, 且无法指定 asc / desc.

### HIVE 的数据倾斜

> 通过 YARN 监控平台的 Task 的运行状态, 超时和失败的都可能是发生数据倾斜的地方.

数据倾斜的根源是 Key 分布不均匀, 不让数据分区, 直接在 map 端搞定, 或者在分区时清洗集中无效的 Key, 或打乱 Key 使其进入到不同的 Reduce 中.

#### 针对数据内容设置合理的 Map 数量

主要的决定因素有:input 的文件总个数, input 的文件大小, 集群设置的文件块大小.

通常情况下, 作业会通过 input 的目录产生一个或者多个 map 任务.

> map 数越多越好?

不. 如果一个任务有很多小文件 (远远小于块大小 128m), 则每个小文件也会被当做一个块, 用一个 map 任务来完成, 而一个 map 任务启动和初始化的时间远远大于逻辑处理的时间, 就会造成很大的资源浪费. 而且, 同时可执行的 map 数是受限的.

> 保证每个 map 都是 128 mb?

不. 比如有一个 127m 的文件, 正常会用一个 map 去完成, 但这个文件只有一个或者两个小字段, 却有大量记录, 如果 map 处理的逻辑比较复杂, 用一个 map 任务去做, 肯定也比较耗时.

#### 小文件合并

在 Map 前合并小文件. 例如合并小于 iAmSize 的文件:

```sql
set mapred.max.split.size = iAmSize;
set mapred.min.split.size.per.node = iAmSize;
set mapred.min.split.size.per.rack = iAmSize;
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
```

#### 复杂文件增加 Map 数

当 Input 的文件都很大, 任务逻辑复杂, Map 执行非常慢的时候, 可以考虑增加 Map 数, 来使得每个 Map 处理的数据量减少, 从而提高任务的执行效率.

> 根据 `computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))` 公式调整 maxSize 最大值. 让 maxSize 最大值低于 blocksize 就可以增加 map 的个数

```sql
-- 默认值为 1 --
set mapreduce.input.fileinputformat.split.minsize = 1

-- 默认值 Long.MAXValue, 默认情况下, 切片大小 = blocksize --
set mapreduce.input.fileinputformat.split.maxsize = Long.MAXValue

-- 切片最大值, 参数如果比 blocksize 小, 则会让切片变小, 且等于配置的参数值. --
set maxsize:
-- 切片最小值, 参数调的比 blockSize 大, 则让切片变得比 blocksize 大. --
set minsize:
-- 设置 maxsize 大小为 N mb, 即单个 fileSplit 的大小为 N mb. --
set mapreduce.input.fileinputformat.split.maxsize = N*1024*1024;
```

#### 合理设置 Reduce 数

设置每一个 job 中 reduce 个数 `set mapreduce.job.reduces = N;`.

### HIVE 的 UDF 怎么实现

UDF 是 Hive 提供的自定义函数工具.

### HIVE 的工作流

1. 查询: HIVE 接口, 命令行或 Web UI 发送查询驱动程序.
2. get Plan: 驱动程序查询编译器.
3. 语法分析.
4. 语义分析.
5. 逻辑计划产生.
6. 逻辑计划优化.
7. 物理计划生成.
8. 物理计划优化.
9. 物理计划执行.
10. 返回查询结果.

### HIVE 分区是否越多越好

- HIVE 如果拥有过多的分区, 由于底层储存是在 HDFS 上的, HDFS 只存储大文件, 过多的分区会加重 NameNode 的负担.

- HIVE 会转化为 MapReduce, 再转化为多个 Task, 每个 Task 都是一个 JVM 实例, JVM 的开启与销毁会降低系统运行效率.

> 合理的分区不应该有过多的分区和文件目录, 并且每个目录下的文件应该足够大.

### HIVE 调优

> 不根据业务内容的调优都是整蛊.

#### hive-site.xml

```xml
<configuration>
    <!-- 合并 block 减少 task 数量 -->
    <property>
        <name>ngmr.partition.automerge</name>
        <value>true</value>
    </property>
    <!-- 将 n 个 block 排给单个线程处理 -->
    <property>
        <name>ngmr.partition.mergesize.mb</name>
        <value>n</value>
    </property>
    <!-- 小文件合并 -->
    <property>
        <name>hive.merge.sparkfiles</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.map.agg</name>
        <value>true</value>
    </property>
    <!-- 使用向量化查询 -->
    <property>
        <name>hive.vectorized.execution.enabled</name>
        <value>true</value>
    </property>
    <!-- cbo 优化 HIVE 查询 -->
    <property>
        <name>hive.cbo.enable</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.stats.fetch.column.stats</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.stats.fetch.partition.stats</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.compute.query.using.stats</name>
        <value>true</value>
    </property>
    <!-- 数据压缩 -->
    <property>
        <name>hive.exec.compress.intermediate</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.exec.compress.output</name>
        <value>true</value>
    </property>
    <!-- 简单 SQL 不使用 Spark -->
    <property>
        <name>hive.fetch.task.conversion</name>
        <value>more</value>
    </property>
    <!--
    有数据倾斜的时候进行负载均衡;
    group by 操作是否允许数据倾斜, 默认是false, 当设置为true时, 执行计划会生成两个 Map / Reduce 作业, 第一个 MapReduce 会将 Map 的结果随机分发到 Reduce 中, 达到负载均衡的目的来解决数据倾斜.
    -->
    <property>
        <name>hive.groupby.skewindata</name>
        <value>true</value>
    </property>
    <!-- 列裁剪, 默认为 true, 在做查询时只读取用到的列. -->
    <property>
        <name>hive.optimize.cp</name>
        <value>true</value>
    </property>
    <!-- JVM 重用 -->
    <property>
        <name>mapreduce.job.jvm.numtasks</name>
        <value>n</value>
        <description>${n} tasks to run per JVM. -1 for unlimited.</description>
    </property>
</configuration>
```

#### HIVE CLI 调整

```shell
// 合并 Block 减少 Task 数量
set ngmr.partition.automerge = true;
// JVM 重用
set mapreduce.job.jvm.numtasks=n;  
// 将 n 个 Block 排给单个线程处理
set ngmr.partition.mergesize.mb = n;
// 小文件合并
set hive.merge.sparkfiles = true;
set hive.map.agg = true;
// 使用向量化查询
set hive.vectorized.execution.enabled = true;
// cbo 优化 HIVE 查询
set hive.cbo.enable = true;
set hive.stats.fetch.column.stats = true;
set hive.stats.fetch.partition.stats = true;
set hive.compute.query.using.stats = true;
// 数据压缩
set hive.exec.compress.intermediate = true;
set hive.exec.compress.output = true;
// 有数据倾斜的时候进行负载均衡; group by 操作是否允许数据倾斜, 默认是false, 当设置为true时, 执行计划会生成两个 Map / Reduce 作业, 第一个 MapReduce 会将 Map 的结果随机分发到 Reduce 中, 达到负载均衡的目的来解决数据倾斜.
set hive.groupby.skewindata =  true;
// 列裁剪, 默认为 true, 在做查询时只读取用到的列.
set hive.optimize.cp = true;
```

### HIVE 压缩

#### HIVE 数据压缩

- 空间优先 bzip2

- 速度优先 snappy

##### 压缩配置参数

要在 Hadoop 中使用压缩, 在 mapred-site.xml 中配置如下:

```xml
<configuration>
<!-- something else... -->
    <property>
        <name>mapreduce.map.output.compress</name>
        <value>true</value>
        <description>Compress when Mapper output, true for enable.</description>
    </property>
    <property>
        <name>mapreduce.map.output.compress.codec</name>
        <value>org.apache.hadoop.io.compress.${iAmCodecName}</value>
        <description>Codec with LZO, LZ4, snappy.</description>
    </property>
    <property>
        <name>mapreduce.output.fileoutputformat.compress</name>
        <value>true</value>
        <description>Compress when Reducer output, true for enable.</description>
    </property>
    <property>
        <name>mapreduce.output.fileoutputformat.compress.codec</name>
        <value>org.apache.hadoop.io.compress.${iAmCodecName}</value>
        <description>Codec with LZO, LZ4, snappy.</description>
    </property>
    <property>
        <name>mapreduce.output.fileoutputformat.compress.type</name>
        <value>RECORD</value>
        <description>Compress logger</description>
    </property>
</configuration>
```

在 core-site.xml 中配置如下:

```xml
<configuration>
    <property>
        <name>io.compression.codecs</name>
        <!-- <value>org.apache.hadoop.io.compress.DefaultCodec</value> -->
        <!-- <value>org.apache.hadoop.io.compress.GzipCodec</value> -->
        <!-- <value>org.apache.hadoop.io.compress.BZip2Codec</value> -->
        <value>org.apache.hadoop.io.compress.Lz4Codec</value>
        <description>输入压缩阶段; Hadoop 使用文件扩展名判断是否支持某种编解码器</description>
    </property>
</configuration>
```

#### HIVE 文件压缩

HIVE 支持的存储数的格式主要有 TEXTFILE (行式存储), SEQUENCEFILE (行式存储), ORC (列式存储), PARQUET (列式存储).

- TEXTFILE 格式:
    - 默认格式, 数据不做压缩, 磁盘空间开销大, 数据解析开销大. 若使用 Bzip2 等压缩, HIVE 不会对数据进行切分, 从而无法进行并行操作.
- Optimized Row Columnar 格式:
    - HIVE 0.11 引入的储存格式, ORC 文件由数个 Stripe 组成, Stripe 实际相当于 RowGroup 概念, 大小由 4mb 到 250mb 不等, 这样做可以提升顺序读的性能.


评论

此博客中的热门博文

17# Apache Spark 的学习笔记 (WIP)

20# Apache Kafka 的学习笔记

1# 失败的赛博空间匿名技术的思考