Redis 面试题
# Cluster
Redis Cluster 将所有数据划分为 16384 个 slots(槽位),每个节点负责其中一部分槽位。槽位的信息存储于每个节点中。
槽位定位算法
Cluster 默认会对 key 值使用 crc16 算法进行 hash 得到一个整数值,然后用这个整数值对 16384 进行取模来得到具体槽位。
HASH_SLOT = CRC16(key) mod 16384
Redis集群对批量操作命令的支持
对于类似mset,mget这样的多个key的原生批量操作命令,redis集群只支持所有key落在同一slot的情况,如果有多个key一定要用mset命令在redis集群上操作,则可以在key的前面加上{XX},这样参数数据分片hash计算的只会是大括号里的值,这样能确保不同的key能落到同一slot里去,示例如下:
mset {user1}:1:name zhangsan {user1}:1:age 18
2
假设name和age计算的hash slot值不一样,但是这条命令在集群下执行,redis只会用大括号里的 user1 做hash slot计算,所以算出来的slot值肯定相同,最后都能落在同一slot。
扩容缩容
将新的节点加入到集群,并且手动给新的节点分配槽位。
#新增节点
/usr/local/redis‐5.0.3/src/redis‐cli ‐a password ‐‐cluster add‐node ip:port(新增节点) ip:port(已知存在节点)
#分配槽位
/usr/local/redis‐5.0.3/src/redis‐cli ‐a password ‐‐cluster reshard ip:port(已知存在节点)
2
3
4
# 缓存雪崩
缓存雪崩指的是缓存层支撑不住或宕掉后, 流量会像奔逃的野牛一样, 打向后端存储层。
由于缓存层承载着大量请求, 有效地保护了存储层, 但是如果缓存层由于某些原因不能提供服务(比如超大并发过来,缓存层支撑不住,或者由于缓存设计不好,类似大量请求访问bigkey,导致缓存能支撑的并发急剧下降), 于是大量请求都会打到存储层, 存储层的调用量会暴增, 造成存储层也会级联宕机的情况。预防和解决缓存雪崩问题, 可以从以下方面进行着手。
1) 保证缓存层服务高可用性,比如使用Redis Sentinel或Redis Cluster。
2) 依赖隔离组件为后端限流熔断并降级。比如使用Sentinel或Hystrix限流降级组件。
比如服务降级,我们可以针对不同的数据采取不同的处理方式。当业务应用访问的是非核心数据(例如电商商品属性,用户信息等)时,暂时停止从缓存中查询这些数据,而是直接返回预定义的默认降级信息、空值或是错误提示信息;当业务应用访问的是核心数据(例如电商商品库存)时,仍然允许查询缓存,如果缓存缺失, 也可以继续通过数据库读取。
3) 提前演练。 在项目上线前, 演练缓存层宕掉后, 应用以及后端的负载情况以及可能出现的问题, 在此基础上做一些预案设定。
4)多级缓存,jvm缓存
缓存失效(击穿)
由于大批量缓存在同一时间失效可能导致大量请求同时穿透缓存直达数据库,可能会造成数据库瞬间压力过大甚至挂掉,对于这种情况我们在批量增加缓存时最好将这一批数据的缓存过期时间设置为一个时间段内的不同时间。
String get(String key) {
// 从缓存中获取数据
String cacheValue = cache.get(key);
// 缓存为空
if (StringUtils.isBlank(cacheValue)) {
// 从存储中获取
String storageValue = storage.get(key);
cache.set(key, storageValue);
//设置一个过期时间(300到600之间的一个随机数)
int expireTime = new Random().nextInt(300) + 300;
if (storageValue == null) {
cache.expire(key, expireTime);
}
return storageValue;
} else {
// 缓存非空
return cacheValue;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
热点缓存key重建优化
开发人员使用“缓存+过期时间”的策略既可以加速数据读写, 又保证数据的定期更新, 这种模式基本能够满足绝大部分需求。 但是有两个问题如果同时出现, 可能就会对应用造成致命的危害:
- 当前key是一个热点key(例如一个热门的娱乐新闻),并发量非常大。
- 重建缓存不能在短时间完成, 可能是一个复杂计算, 例如复杂的SQL、 多次IO、 多个依赖等。
在缓存失效的瞬间, 有大量线程来重建缓存, 造成后端负载加大, 甚至可能会让应用崩溃。
要解决这个问题主要就是要避免大量线程同时重建缓存。
我们可以利用互斥锁来解决,此方法只允许一个线程重建缓存, 其他线程等待重建缓存的线程执行完, 重新从缓存获取数据即可。
伪代码:
String get(String key) {
// 从Redis中获取数据
String value = redis.get(key);
// 如果value为空, 则开始重构缓存
if (value == null) {
// 只允许一个线程重建缓存, 使用nx, 并设置过期时间ex
String mutexKey = "mutext:key:" + key;
if (redis.set(mutexKey, "1", "ex 180", "nx")) {
// 从数据源获取数据
value = db.get(key);
// 回写Redis, 并设置过期时间
redis.setex(key, timeout, value);
// 删除key_mutex
redis.delete(mutexKey);
}// 其他线程休息50毫秒后重试
else {
Thread.sleep(50);
get(key);
}
}
return value;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 缓存和数据库一致
双写不一致
读写不一致
1、对于并发几率很小的数据(如个人维度的订单数据、用户数据等),这种几乎不用考虑这个问题,很少会发生缓存不一致,可以给缓存数据加上过期时间,每隔一段时间触发读的主动更新即可。
2、就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期时间依然可以解决大部分业务对于缓存的要求。
3、如果不能容忍缓存数据不一致,可以通过加分布式读写锁保证并发读写或写写的时候按顺序排好队,读读的时候相当于无锁。
4、可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是引入了新的中间件,增加了系统的复杂度。
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24;
public static final String EMPTY_CACHE = "{}";
public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX = "lock:product:hot_cache_create:";
public static final String LOCK_PRODUCT_UPDATE_PREFIX = "lock:product:update:";
public static Map<String, Product> productMap = new HashMap<>();
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult));
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = null;
//RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = productUpdateLock.writeLock();
//加分布式写锁解决缓存双写不一致问题
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//从缓存里查数据
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//加分布式锁解决热点缓存并发重建问题
RLock hotCreateCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotCreateCacheLock.lock();
// 这个优化谨慎使用,防止超时导致的大规模并发重建问题
// hotCreateCacheLock.tryLock(1, TimeUnit.SECONDS);
try {
// DCL 否则获取到锁之后还会查数据库
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
//RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock rLock = productUpdateLock.readLock();
//加分布式读锁解决缓存双写不一致问题
rLock.lock();
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product),
genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
//设置空缓存解决缓存穿透问题
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
} finally {
hotCreateCacheLock.unlock();
}
return product;
}
private Integer genProductCacheTimeout() {
//加随机超时机制解决缓存批量失效(击穿)问题
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
private Product getProductFromCache(String productCacheKey) {
Product product = null;
//多级缓存查询,jvm级别缓存可以交给单独的热点缓存系统统一维护,有变动推送到各个web应用系统自行更新
product = productMap.get(productCacheKey);
if (product != null) {
return product;
}
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)) {
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
//缓存读延期
redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
- 01
- 保姆级教程 用DeepSeek+飞书,批量写文案、写文章,太高效了06-06
- 03
- 熬夜做PPT?AI一键生成高逼格幻灯片,效率提升10倍!06-06