[toc]
1.什么是数据倾斜
简单的讲,数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了集群中的一台或者几台机器上计算,而集群中的其他节点空闲。这些倾斜了的数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。
不是数据集中导致倾斜了,++而是计算太过集中++导致倾斜
2. 数据倾斜发生时的现象:
- 绝大多数task执行得都非常快,但个别task执行的极慢。
- 原本能正常执行的Spark作业,某天突然爆出OOM(内存溢出)异常。观察异常栈,是我们写的业务代码造成的
3. 数据倾斜发生的原理 :
在进行shuffle的时候,必须将各个节点上相同的Key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或者join操作。如果某个key对应的数据量特别大的话,会发生数据倾斜。比如大部分key对应的10条数据,但个别key却对应了100万条数据,那么大部分task会只分配到10条数据,而个别task可能会分配了100万数据。整个spark作业的运行进度是由运行时间最长的那个task决定的。 因此出现数据倾斜的时候,spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致OOM。
反正就是过多的key分配到同一个reduce去执行了,而那台机器无法处理那么多,解决方案基本往减轻压力方向靠
原文链接:https://blog.csdn.net/weixin_35353187/article/details/84303518
4. 产生原因
数据倾斜原因有以下几方面:
- key分布不均匀
- 业务数据本身的特性 (比如北京/上海数据量比长沙数据要多)
- 建表时考虑不周 (用户表和日志表,共同拥有访问ip字段,但是类型却不一样或者默认值不同之类)
- 某些SQL语句本身就有数据倾斜 (出现shuffle操作)
5. 解决方案
hive和MR/spark的方案不一样,毕竟一个是写sql,一个是写程序,但是方向是一样的
hive的sql写好一点会有效避免数据倾斜
1. 空值产生的数据倾斜
场景说明
在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。
解决方案 1:user_id 为空的不参与关联
解决方案 2:赋予空值新的 key 值
方法 2 比方法 1 效率更好,不但 IO 少了,而且作业数也少了,方案 1 中,log 表 读了两次,jobs 肯定是 2,而方案 2 是 1。这个优化适合无效 id(比如-99,’’,null)产 生的数据倾斜,把空值的 key 变成一个字符串加上一个随机数,就能把造成数据倾斜的 数据分到不同的 reduce 上解决数据倾斜的问题。
改变之处:使本身为 null 的所有记录不会拥挤在同一个 reduceTask 了,会由于有替代的 随机字符串值,而分散到了多个 reduceTask 中了,由于 null 值关联不上,处理后并不影响最终结果。
1.1 特殊值分开处理
我们已经知道user_id = 0是一个特殊key,那么可以把特殊值隔离开来单独做join,这样特殊值肯定会转化成map join,非特殊值就是没有倾斜的普通join了:
select
*
from
(
select * from logs where user_id = 0
)
a
join
(
select * from users where user_id = 0
)
b
on
a.user_id = b.user_id
union all
select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
2. 不同数据类型关联产生数据倾斜
场景说明
用户表中 user_id 字段为 int,log 表中 user_id 为既有 string 也有 int 的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的 hash 操作会按照 int 类型的 id 进 行分配,这样就会导致所有的 string 类型的 id 就被分到同一个 reducer 当中
解决方案: 把数字类型 id 转换成 string 类型的 id
3. 大小表关联查询产生数据倾斜
注意:使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。 map join 概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从 而避免了 reduceTask(因为没有reduce了),前提要求是内存足以装下该全量数据
以大表 a 和小表 b 为例,所有的 maptask 节点都装载小表 b 的所有数据,然后大表 a 的 一个数据块数据比如说是 a1 去跟 b 全量数据做链接,就省去了 reduce 做汇总的过程。 所以相对来说,在内存允许的条件下使用 map join 比直接使用 MapReduce 效率还高些, 当然这只限于做 join 查询的时候。
在 hive 中,直接提供了能够在 HQL 语句指定该次查询使用 map join,map join 的用法是 在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为map join(早期的 Hive 版本的优化器是不能自动优化 map join 的)。
具体用法
select /* +mapjoin(a) */ a.id aid, name, age from a join b on a.id = b.id;
select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movieid;
在 hive0.11 版本以后会自动开启 map join 优化,由两个参数控制:
set hive.auto.convert.join=true; //设置 MapJoin 优化自动开启
set hive.mapjoin.smalltable.filesize=25000000 //设置小表不超过多大时开启 mapjoin 优化
https://www.cnblogs.com/qingyunzong/p/8847597.html https://blog.csdn.net/baichoufei90/article/details/86554840 https://blog.csdn.net/weixin_35353187/article/details/84303518