您现在的位置是:网站首页> 编程资料编程资料

.NET Core实现简单的Redis Client框架_实用技巧_

2023-05-24 473人已围观

简介 .NET Core实现简单的Redis Client框架_实用技巧_

0,关于 Redis RESP

RESP 全称 REdis Serialization Protocol ,即 Redis 序列化协议,用于协定客户端使用 socket 连接 Redis 时,数据的传输规则。

官方协议说明:https://redis.io/topics/protocol

那么 RESP 协议在与 Redis 通讯时的 请求-响应 方式如下:

  • 客户端将命令作为 RESP 大容量字符串数组(即 C# 中使用 byte[] 存储字符串命令)发送到 Redis 服务器。
  • 服务器根据命令实现以 RESP 类型进行回复。

RESP 中的类型并不是指 Redis 的基本数据类型,而是指数据的响应格式:

在 RESP 中,某些数据的类型取决于第一个字节:

  • 对于简单字符串,答复的第一个字节为“ +”
  • 对于错误,回复的第一个字节为“-”
  • 对于整数,答复的第一个字节为“:”
  • 对于批量字符串,答复的第一个字节为“ $”
  • 对于数组,回复的第一个字节为“ *

对于这些,可能初学者不太了解,下面我们来实际操作一下。

我们打开 Redis Desktop Manager ,然后点击控制台,输入:

set a 12 set b 12 set c 12 MGET abc

以上命令每行按一下回车键。MGET 是 Redis 中一次性取出多个键的值的命令。

输出结果如下:

本地:0>SET a 12 "OK" 本地:0>SET b 12 "OK" 本地:0>SET c 12 "OK" 本地:0>MGET a b c 1) "12" 2) "12" 3) "12"

但是这个管理工具以及去掉了 RESP 中的协议标识符,我们来写一个 demo 代码,还原 RESP 的本质。

using System; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp { class Program { static async Task Main(string[] args) { IPAddress IP = IPAddress.Parse("127.0.0.1"); IPEndPoint IPEndPoint = new IPEndPoint(IP, 6379); Socket client = new Socket(IP.AddressFamily, SocketType.Stream, ProtocolType.Tcp); await client.ConnectAsync(IPEndPoint); if (!client.Connected) { Console.WriteLine("连接 Redis 服务器失败!"); Console.Read(); } Console.WriteLine("恭喜恭喜,连接 Redis 服务器成功"); // 后台接收消息 new Thread(() => { while (true) { byte[] data = new byte[100]; int size = client.Receive(data); Console.WriteLine(); Console.WriteLine(Encoding.UTF8.GetString(data)); Console.WriteLine(); } }).Start(); while (true) { Console.Write("$> "); string command = Console.ReadLine(); // 发送的命令必须以 \r\n 结尾 int size = client.Send(Encoding.UTF8.GetBytes(command + "\r\n")); Thread.Sleep(100); } } } }

输入以及输出结果:

$> SET a 123456789 +OK $> SET b 123456789 +OK $> SET c 123456789 +OK $> MGET a b c *3 $9 123456789 $9 123456789 $9 123456789

可见,Redis 响应的消息内容,是以 $、*、+ 等字符开头的,并且使用 \r\n 分隔。

我们写 Redis Client 的方法就是接收 socket 内容,然后从中解析出实际的数据。

每次发送设置命令成功,都会返回 +OK;*3 表示有三个数组;$9 表示接收的数据长度是 9;

大概就是这样了,下面我们来写一个简单的 Redis Client 框架,然后睡觉。

记得使用 netstandard2.1,因为有些 byte[] 、string、ReadOnlySpan 的转换,需要 netstandard2.1 才能更加方便。

1,定义数据类型

根据前面的 demo,我们来定义一个类型,存储那些特殊符号:

 ///  /// RESP Response 类型 ///  public static class RedisValueType { public const byte Errors = (byte)'-'; public const byte SimpleStrings = (byte)'+'; public const byte Integers = (byte)':'; public const byte BulkStrings = (byte)'$'; public const byte Arrays = (byte)'*'; public const byte R = (byte)'\r'; public const byte N = (byte)'\n'; }

2,定义异步消息状态机

创建一个 MessageStrace 类,作用是作为消息响应的异步状态机,并且具有解析数据流的功能。

 ///  /// 自定义消息队列状态机 ///  public abstract class MessageStrace { protected MessageStrace() { TaskCompletionSource = new TaskCompletionSource(); Task = TaskCompletionSource.Task; } protected readonly TaskCompletionSource TaskCompletionSource; ///  /// 标志任务是否完成,并接收 redis 响应的字符串数据流 ///  public Task Task { get; private set; } ///  /// 接收数据流 ///  ///  /// 实际长度 public abstract void Receive(MemoryStream stream, int length); ///  /// 响应已经完成 ///  ///  protected void SetValue(string data) { TaskCompletionSource.SetResult(data); } ///  /// 解析 $ 或 * 符号后的数字,必须传递符后后一位的下标 ///  ///  /// 解析到的位置 ///  protected int BulkStrings(ReadOnlySpan data, ref int index) { int start = index; int end = start; while (true) { if (index + 1 >= data.Length) throw new ArgumentOutOfRangeException("溢出"); // \r\n if (data[index].CompareTo(RedisValueType.R) == 0 && data[index + 1].CompareTo(RedisValueType.N) == 0) { index += 2; // 指向 \n 的下一位 break; } end++; index++; } // 截取 $2 *3 符号后面的数字 return Convert.ToInt32(Encoding.UTF8.GetString(data.Slice(start, end - start).ToArray())); } }

3,定义命令发送模板

由于 Redis 命令非常多,为了更加好的封装,我们定义一个消息发送模板,规定五种类型分别使用五种类型发送 Client。

定义一个统一的模板类:

using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace CZGL.RedisClient { ///  /// 命令发送模板 ///  public abstract class CommandClient where T : CommandClient { protected RedisClient _client; protected CommandClient() { } protected CommandClient(RedisClient client) { _client = client; } ///  /// 复用 ///  ///  ///  internal virtual CommandClient Init(RedisClient client) { _client = client; return this; } ///  /// 请求是否成功 ///  /// 响应的消息 ///  protected bool IsOk(string value) { if (value[0].CompareTo('+') != 0 || value[1].CompareTo('O') != 0 || value[2].CompareTo('K') != 0) return false; return true; } ///  /// 发送命令 ///  /// 发送的命令 /// 数据类型客户端 ///  protected Task SendCommand(string command, out TStrace strace) where TStrace : MessageStrace, new() { strace = new TStrace(); return _client.SendAsync(strace, command); } } }

4,定义 Redis Client

RedisClient 类用于发送 Redis 命令,然后将任务放到队列中;接收 Redis 返回的数据内容,并将数据流写入内存中,调出队列,设置异步任务的返回值。

Send 过程可以并发,但是接收消息内容使用单线程。为了保证消息的顺序性,采用队列来记录 Send - Receive 的顺序。

C# 的 Socket 比较操蛋,想搞并发和高性能 Socket 不是那么容易。

以下代码有三个地方注释了,后面继续编写其它代码会用到。

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace CZGL.RedisClient { ///  /// Redis 客户端 ///  public class RedisClient { private readonly IPAddress IP; private readonly IPEndPoint IPEndPoint; private readonly Socket client; //private readonly Lazy stringClient; //private readonly Lazy hashClient; //private readonly Lazy listClient; //private readonly Lazy setClient; //private readonly Lazy sortedClient; // 数据流请求队列 private readonly ConcurrentQueue StringTaskQueue = new ConcurrentQueue(); public RedisClient(string ip, int port) { IP = IPAddress.Parse(ip); IPEndPoint = new IPEndPoint(IP, port); //stringClient = new Lazy(() => new StringClient(this)); //hashClient = new Lazy(() => new HashClient(this)); //listClient = new Lazy(() => new ListClient(this)); //setClient = new Lazy(() => new SetClient(this)); //sortedClient = new Lazy(() => new SortedClient(this)); client = new Socket(IP.AddressFamily, SocketType.Stream, ProtocolType.Tcp); } ///  /// 开始连接 Redis ///  public async Task ConnectAsync() { await client.ConnectAsync(IPEndPoint); new Thread(() => { ReceiveQueue(); }) { IsBackground = true }.Start(); return client.Connected; } ///  /// 发送一个命令,将其加入队列 ///  ///  ///  ///  internal Task SendAsync(MessageStrace task, string command) { var buffer = Encoding.UTF8.GetBytes(command + "\r\n"); var result = client.SendAsync(new ArraySegment(buffer, 0, buffer.Length), SocketFlags.None); StringTaskQueue.Enqueue(task); return result; } /* Microsoft 对缓冲区输入不同大小的数据,测试响应时间。 1024 - real 0m0,102s; user 0m0,018s; sys 0m0,009s 2048 - real 0m0,112s; user 0m0,017s; sys 0m0,009s 8192 - real 0m0,163s; user 0m0,017s; sys 0m0,007s 256 - real 0m0,101s; user 0m0,019s; sys 0m0,008s 16 - real 0m0,144s; user 0m0,016s; sys 0m0,010s .NET Socket,默认缓冲区的大小为 8192 字节。 Socket.ReceiveBufferSize: An Int32 that contains the size, in bytes, of the receive buffer. The default is 8192. 但响应中有很多只是 "+OK\r\n" 这样的响应,并且 MemoryStream 刚好默认是 256(当然,可以自己设置大小),缓冲区过大,浪费内存; 超过 256 这个大小,MemoryStream 会继续分配新的 256 大小的内存区域,会消耗性能。 BufferSize 设置为 256 ,是比较合适的做法。 */ private const int BufferSize = 256; ///  /// 单线程串行接收数据流,调出任务队列完成任务 ///  private void ReceiveQueue() { while (true) { MemoryStream stream = new MemoryStream(BufferSize); // 内存缓存区 byte[] data = new byte[BufferSize]; // 分片,每次接收 N 个字节 int size = client.Receive(data); // 等待接收一个消息 int length = 
                
                

-六神源码网