Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 集群客戶端,純手工有乾貨,您細品。
隨着業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支持集群模式。一通度娘翻牆無果后,決定自己強擼一個基於ServiceStack.Redis的Redis集群訪問組件。
話不多說,先上運行效果圖:
Redis-Cluster集群使用 hash slot 算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文檔)。
由於 ServiceStack.Redis已經實現了單個實例的Redis命令,因此我們可以將即將要實現的 Redis 集群客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(尋址)然後將Redis命令轉發給對應的節點執行即可。
ServiceStack.Redis的RedisClient是非線程安全的,ServiceStack.Redis 使用緩存客戶端管理器(PooledRedisClientManager)來提高性能和併發能力,我們的Redis Cluster集群客戶端也應集成PooledRedisClientManager來獲取 RedisClient 實例。
同時,Redis-Cluster集群支持在線動態擴容和slot遷移,我們的Redis集群客戶端也應具備自動智能發現新節點和自動刷新 slot 分佈的能力。
總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:
- 根據 key 計算 hash slot
- 自動讀取群集上所有的節點信息
- 為節點分配緩存客戶端管理器
- 將 hash slot 路由到正確的節點
- 自動發現新節點和自動刷新slot分佈
如下面類圖所示,接下來我們詳細分析具體的代碼實現。
一、CRC16
CRC即循環冗餘校驗碼,是信息系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裏Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於字節查表法的CRC校驗碼生成算法”。
1 /// <summary>
2 /// 根據 key 計算對應的哈希槽
3 /// </summary>
4 public static int GetSlot(string key)
5 {
6 key = CRC16.ExtractHashTag(key);
7 // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384
8 return GetCRC16(key) & (16384 - 1);
9 }
10
11 /// <summary>
12 /// 計算給定字節組的 crc16 檢驗碼
13 /// </summary>
14 public static int GetCRC16(byte[] bytes, int s, int e)
15 {
16 int crc = 0x0000;
17
18 for (int i = s; i < e; i++)
19 {
20 crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]);
21 }
22 return crc & 0xFFFF;
23 }
二、讀取集群節點
從集群中的任意節點使用 CLUSTER NODES 命令可以讀取到集群中所有的節點信息,包括連接狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以串行格式提供所有這些信息,輸出示例:
d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected
2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383
654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected
ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected
754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460
484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922
2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected
每個字段的含義如下:
1. id:節點 ID,一個40個字符的隨機字符串,當一個節點被創建時不會再發生變化(除非CLUSTER RESET HARD被使用)。
2. ip:port:客戶端應該聯繫節點以運行查詢的節點地址。
3. flags:逗號列表分隔的標誌:myself,master,slave,fail?,fail,handshake,noaddr,noflags。標誌在下一節詳細解釋。
4. master:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ – ”字符。
5. ping-sent:以毫秒為單位的當前激活的ping發送的unix時間,如果沒有掛起的ping,則為零。
6. pong-recv:毫秒 unix 時間收到最後一個乒乓球。
7. config-epoch:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會創建一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的哈希槽,則具有較高配置時期的節點將獲勝。
8. link-state:用於節點到節點集群總線的鏈路狀態。我們使用此鏈接與節點進行通信。可以是connected或disconnected。
9. slot:散列槽號或範圍。從參數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的散列槽列表。如果條目僅僅是一個数字,則被解析為這樣。如果它是一個範圍,它是在形式start-end,並且意味着節點負責所有散列時隙從start到end包括起始和結束值。
標誌的含義(字段編號3):
fail?:節點處於PFAIL狀態。對於正在聯繫的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL狀態)。
fail:節點處於FAIL狀態。對於將PFAIL狀態提升為FAIL的多個節點而言,這是無法訪問的。
1 // 讀取集群上的節點信息
2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source)
3 {
4 RedisClient c = null;
5 StringReader reader = null;
6 IList<InternalClusterNode> result = null;
7
8 int index = 0;
9 int rowCount = source.Count();
10
11 foreach (var node in source)
12 {
13 try
14 {
15 // 從當前節點讀取REDIS集群節點信息
16 index += 1;
17 c = new RedisClient(node.Host, node.Port, node.Password);
18 RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes());
19 string info = Encoding.UTF8.GetString(data.Data);
20
21 // 將讀回的字符文本轉成強類型節點實體
22 reader = new StringReader(info);
23 string line = reader.ReadLine();
24 while (line != null)
25 {
26 if (result == null) result = new List<InternalClusterNode>();
27 InternalClusterNode n = InternalClusterNode.Parse(line);
28 n.Password = node.Password;
29 result.Add(n);
30
31 line = reader.ReadLine();
32 }
33
34 // 只要任意一個節點拿到集群信息,直接退出
35 if (result != null && result.Count > 0) break;
36 }
37 catch (Exception ex)
38 {
39 // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取集群信息
40 // 否則拋出異常
41 if (index < rowCount)
42 Thread.Sleep(100);
43 else
44 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
45 }
46 finally
47 {
48 if (reader != null) reader.Dispose();
49 if (c != null) c.Dispose();
50 }
51 }
52
53
54 if (result == null)
55 result = new List<InternalClusterNode>(0);
56 return result;
57 }
58
59 /// <summary>
60 /// 從 cluster nodes 的每一行命令里讀取出集群節點的相關信息
61 /// </summary>
62 /// <param name="line">集群命令</param>
63 /// <returns></returns>
64 public static InternalClusterNode Parse(string line)
65 {
66 if (string.IsNullOrEmpty(line))
67 throw new ArgumentException("line");
68
69 InternalClusterNode node = new InternalClusterNode();
70 node._nodeDescription = line;
71 string[] segs = line.Split(' ');
72
73 node.NodeId = segs[0];
74 node.Host = segs[1].Split(':')[0];
75 node.Port = int.Parse(segs[1].Split(':')[1]);
76 node.MasterNodeId = segs[3] == "-" ? null : segs[3];
77 node.PingSent = long.Parse(segs[4]);
78 node.PongRecv = long.Parse(segs[5]);
79 node.ConfigEpoch = int.Parse(segs[6]);
80 node.LinkState = segs[7];
81
82 string[] flags = segs[2].Split(',');
83 node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER;
84 node.IsSlave = !node.IsMater;
85 int start = 0;
86 if (flags[start] == MYSELF)
87 start = 1;
88 if (flags[start] == SLAVE || flags[start] == MASTER)
89 start += 1;
90 node.NodeFlag = string.Join(",", flags.Skip(start));
91
92 if (segs.Length > 8)
93 {
94 string[] slots = segs[8].Split('-');
95 node.Slot.Start = int.Parse(slots[0]);
96 if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]);
97
98 for (int index = 9; index < segs.Length; index++)
99 {
100 if (node.RestSlots == null)
101 node.RestSlots = new List<HashSlot>();
102
103 slots = segs[index].Split('-');
104
105 int s1 = 0;
106 int s2 = 0;
107 bool b1 = int.TryParse(slots[0], out s1);
108 bool b2 = int.TryParse(slots[1], out s2);
109 if (!b1 || !b2)
110 continue;
111 else
112 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null));
113 }
114 }
115
116 return node;
117 }
View Code
三、為節點分配緩存客戶端管理器
在單實例的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑒這個思路,在Redis Cluster集群中,我們為每一個主節點實例化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共享一個 PooledRedisClientManager 實例。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一映射並緩存起來。
1 // 初始化集群管理
2 void Initialize(IList<InternalClusterNode> clusterNodes = null)
3 {
4 // 從 redis 讀取集群信息
5 IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes;
6
7 // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器
8 IList<InternalClusterNode> masters = null;
9 IDictionary<int, PooledRedisClientManager> managers = null;
10 foreach (var n in nodes)
11 {
12 // 節點無效或者
13 if (!(n.IsMater &&
14 !string.IsNullOrEmpty(n.Host) &&
15 string.IsNullOrEmpty(n.NodeFlag) &&
16 (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue;
17
18 n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId);
19 if (masters == null)
20 masters = new List<InternalClusterNode>();
21 masters.Add(n);
22
23 // 用每一個主節點的哈希槽做鍵,導入REDIS客戶端緩衝池管理器
24 // 然後,方法表指針(又名類型對象指針)上場,佔據 4 個字節。 4 * 16384 / 1024 = 64KB
25 if (managers == null)
26 managers = new Dictionary<int, PooledRedisClientManager>();
27
28 string[] writeHosts = new[] { n.HostString };
29 string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray();
30 var pool = new PooledRedisClientManager(writeHosts, readHosts, _config);
31 managers.Add(n.Slot.Start, pool);
32 if (n.Slot.End != null)
33 {
34 // 這個範圍內的哈希槽都用同一個緩衝池
35 for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++)
36 managers.Add(s, pool);
37 }
38 if (n.RestSlots != null)
39 {
40 foreach (var slot in n.RestSlots)
41 {
42 managers.Add(slot.Start, pool);
43 if (slot.End != null)
44 {
45 // 這個範圍內的哈希槽都用同一個緩衝池
46 for (int s = slot.Start + 1; s <= slot.End.Value; s++)
47 managers.Add(s, pool);
48 }
49 }
50 }
51 }
52
53 _masters = masters;
54 _redisClientManagers = managers;
55 _clusterNodes = nodes != null ? nodes : null;
56
57 if (_masters == null) _masters = new List<InternalClusterNode>(0);
58 if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0);
59 if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0);
60
61 if (_masters.Count > 0)
62 _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList();
63 }
View Code
四、將 hash slot 路由到正確的節點
在訪問一個 key 時,根據第三步緩存起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 實例,調用 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。
1 // 執行指定動作並返回值
2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action);
3
4 // 執行指定動作並返回值
5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
6 {
7 RedisClient c = null;
8 try
9 {
10 c = slot();
11 return action(c);
12 }
13 catch (Exception ex)
14 {
15 // 此處省略 ...
16 }
17 finally
18 {
19 if (c != null)
20 c.Dispose();
21 }
22 }
23
24 // 獲取指定key對應的主設備節點
25 private RedisClient GetRedisClient(string key)
26 {
27 if (string.IsNullOrEmpty(key))
28 throw new ArgumentNullException("key");
29
30 int slot = CRC16.GetSlot(key);
31 if (!_redisClientManagers.ContainsKey(slot))
32 throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key);
33
34 var pool = _redisClientManagers[slot];
35 return (RedisClient)pool.GetClient();
36 }
五、自動發現新節點和自動刷新slot分佈
在實際生產環境中,Redis 集群經常會有添加/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 組件必須具備自動發現節點和刷新在 第三步 緩存起來的 slot 的能力。在這裏我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制刷新集群節點信息並重新緩存 slot 與 節點之間的映射。
1 // 執行指定動作並返回值
2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1)
3 {
4 RedisClient c = null;
5 try
6 {
7 c = slot();
8 return action(c);
9 }
10 catch (Exception ex)
11 {
12 if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
13 else
14 {
15 tryTimes -= 1;
16 // 嘗試重新刷新集群信息
17 bool isRefresh = DiscoveryNodes(_source, _config);
18 if (isRefresh)
19 // 集群節點有更新過,重新執行
20 return this.DoExecute(slot, action, tryTimes);
21 else
22 // 集群節點未更新過,直接拋出異常
23 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex);
24 }
25 }
26 finally
27 {
28 if (c != null)
29 c.Dispose();
30 }
31 }
32
33 // 重新刷新集群信息
34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config)
35 {
36 bool lockTaken = false;
37 try
38 {
39 // noop
40 if (_isDiscoverying) { }
41
42 Monitor.Enter(_objLock, ref lockTaken);
43
44 _source = source;
45 _config = config;
46 _isDiscoverying = true;
47
48 // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步
49 if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL)
50 {
51 bool isRefresh = false;
52 IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source);
53 foreach (var node in newNodes)
54 {
55 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString);
56 isRefresh =
57 n == null || // 新節點
58 n.Password != node.Password || // 密碼變了
59 n.IsMater != node.IsMater || // 主變從或者從變主
60 n.IsSlave != node.IsSlave || // 主變從或者從變主
61 n.NodeFlag != node.NodeFlag || // 節點標記位變了
62 n.LinkState != node.LinkState || // 節點狀態位變了
63 n.Slot.Start != node.Slot.Start || // 哈希槽變了
64 n.Slot.End != node.Slot.End || // 哈希槽變了
65 (n.RestSlots == null && node.RestSlots != null) ||
66 (n.RestSlots != null && node.RestSlots == null);
67 if (!isRefresh && n.RestSlots != null && node.RestSlots != null)
68 {
69 var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList();
70 var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList();
71 for (int index = 0; index < slots1.Count; index++)
72 {
73 isRefresh =
74 slots1[index].Start != slots2[index].Start || // 哈希槽變了
75 slots1[index].End != slots2[index].End; // 哈希槽變了
76 if (isRefresh) break;
77 }
78 }
79
80 if (isRefresh) break;
81 }
82
83 if (isRefresh)
84 {
85 // 重新初始化集群
86 this.Dispose();
87 this.Initialize(newNodes);
88 this._lastDiscoveryTime = DateTime.Now;
89 }
90 }
91
92 // 最後刷新時間在 {MONITORINTERVAL} 內,表示是最新群集信息 newest
93 return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL;
94 }
95 finally
96 {
97 if (lockTaken)
98 {
99 _isDiscoverying = false;
100 Monitor.Exit(_objLock);
101 }
102 }
103 }
View Code
六、配置訪問組件調用入口
最後我們需要為組件提供訪問入口,我們用 RedisCluster 類實現 字符串、列表、哈希、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單實例 Redis 那樣調用 Redis Cluster 集群。調用示例:
var node = new ClusterNode("127.0.0.1", 7001);
var redisCluster = RedisClusterFactory.Configure(node, config);
string key = "B070x14668";
redisCluster.Set(key, key);
string value = redisCluster.Get<string>(key);
redisCluster.Del(key);
1 /// <summary>
2 /// REDIS 集群工廠
3 /// </summary>
4 public class RedisClusterFactory
5 {
6 static RedisClusterFactory _factory = new RedisClusterFactory();
7 static RedisCluster _cluster = null;
8
9 /// <summary>
10 /// Redis 集群
11 /// </summary>
12 public static RedisCluster Cluster
13 {
14 get
15 {
16 if (_cluster == null)
17 throw new Exception("You should call RedisClusterFactory.Configure to config cluster first.");
18 else
19 return _cluster;
20 }
21 }
22
23 /// <summary>
24 /// 初始化 <see cref="RedisClusterFactory"/> 類的新實例
25 /// </summary>
26 private RedisClusterFactory()
27 {
28 }
29
30 /// <summary>
31 /// 配置 REDIS 集群
32 /// <para>若群集中有指定 password 的節點,必須使用 IEnumerable<ClusterNode> 重載列舉出這些節點</para>
33 /// </summary>
34 /// <param name="node">集群節點</param>
35 /// <returns></returns>
36 public static RedisCluster Configure(ClusterNode node)
37 {
38 return RedisClusterFactory.Configure(node, null);
39 }
40
41 /// <summary>
42 /// 配置 REDIS 集群
43 /// <para>若群集中有指定 password 的節點,必須使用 IEnumerable<ClusterNode> 重載列舉出這些節點</para>
44 /// </summary>
45 /// <param name="node">集群節點</param>
46 /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
47 /// <returns></returns>
48 public static RedisCluster Configure(ClusterNode node, RedisClientManagerConfig config)
49 {
50 return RedisClusterFactory.Configure(new List<ClusterNode> { node }, config);
51 }
52
53 /// <summary>
54 /// 配置 REDIS 集群
55 /// </summary>
56 /// <param name="nodes">集群節點</param>
57 /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param>
58 /// <returns></returns>
59 public static RedisCluster Configure(IEnumerable<ClusterNode> nodes, RedisClientManagerConfig config)
60 {
61 if (nodes == null)
62 throw new ArgumentNullException("nodes");
63
64 if (nodes == null || nodes.Count() == 0)
65 throw new ArgumentException("There is no nodes to configure cluster.");
66
67 if (_cluster == null)
68 {
69 lock (_factory)
70 {
71 if (_cluster == null)
72 {
73 RedisCluster c = new RedisCluster(nodes, config);
74 _cluster = c;
75 }
76 }
77 }
78
79 return _cluster;
80 }
81 }
View Code
總結
今天我們詳細介紹了如何從0手寫一個Redis Cluster集群客戶端訪問組件,相信對同樣在尋找類似解決方案的同學們會有一定的啟發,喜歡的同學請點個 star。在沒有相同案例可以參考的情況下筆者通過查閱官方說明文檔和借鑒 Java 的 JedisCluster 的實現思路,雖說磕磕碰碰但最終也初步完成這個組件並投入使用,必須給自己加一個雞腿!!在此我有一個小小的疑問,.NET 的同學們在用 Redis 集群時,你們是用什麼組件耍的,為何網上的相關介紹和現成組件幾乎都沒有?歡迎討論。
GitHub 代碼託管:https://github.com/TANZAME/ServiceStack.Redis.Cluster
技術交流 QQ 群:816425449
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※超省錢租車方案
※別再煩惱如何寫文案,掌握八大原則!
※回頭車貨運收費標準
※教你寫出一流的銷售文案?
※FB行銷專家,教你從零開始的技巧
※聚甘新