今天开始RabbitMQ教程的第二讲,废话不多说,直接进入话题。 (使用.NET 客户端 进行事例演示)
在第一个教程中,我们编写了一个从命名队列中发送和接收消息的程序。在本教程中,我们将创建一个工作队列,这个队列将用于在多个工人之间分配耗时的任务。 工作队列【又名:任务队列】背后主要的思想是避免立刻执行耗时的工作任务,并且一直要等到它结束为止。相反,我们规划任务并晚些执行。我们封装一个任务作为消息发送到一个命名的消息队列中,后台运行的工作线程将获取任务并且最终执行该任务。当你运行很多的任务的时候他们会 共享工作线程和队列。 这个概念在Web应用程序中是尤其有用的,异步执行可以在短时间内处理一个复杂Http请求。1、准备工作 在本系列教程的前一个教程中,我们发送了一个包含“Hello World!”的消息,现在我们发送一个代表复杂任务的字符串。我们不会创建一个真实的任务,比如对图像文件进行处理或PDF文件的渲染,因此让我们假装我们很忙-通过采用Thread.Sleep()功能来实现复杂和繁忙。我们将根据字符串中的点的数量作为它的复杂性,每一个点将占一秒钟的“工作”。例如,一个假的任务描述Hello…,有三个点,我们就需要三秒。 我们将稍微修改一下我们以前的例子中Send 程序的代码,允许从命令行发送任意消息。这个程序将把任务发送到我们的消息队列中,所以我们叫它NewTask: 像教程一,我们需要生成两个项目。dotnet new console --name NewTask mv NewTask/Program.cs NewTask/NewTask.cs dotnet new console --name Worker mv Worker/Program.cs Worker/Worker.cs cd NewTask dotnet add package RabbitMQ.Client dotnet restore cd ../Worker dotnet add package RabbitMQ.Client dotnet restore var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
private static string GetMessage(string[] args){ return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");}
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000);
# shell 1 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C # shell 2 cd Worker dotnet run # => [*] Waiting for messages. To exit press CTRL+C
# shell 3 cd NewTask dotnet run "First message." dotnet run "Second message.." dotnet run "Third message..." dotnet run "Fourth message...." dotnet run "Fifth message....."
让我们看看交付了什么东西在Workers:
# shell 1 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' # shell 2 # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用这个代码,我们可以肯定的是,即使你使用Ctrl + C关掉一个正在处理消息的Worker,也不会丢失任何东西。【Worker】被杀死后,未被确认的消息很快就会被退回。
4、忘记确认 忘记调用BasicAck这是一个常见的错误。虽然这是一个简单的错误,但后果是严重的。消息会被退回时,你的客户退出(这可能看起来像是随机的)但是RabbitMQ将会使用更多的内存保存这些任何延迟确认消息。 为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段值:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 如果是在Window环境下,删除掉sudo字符就可以:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged5、持久性的消息 我们已经学会了如何确保即使【消费者】死亡,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然会丢失的。 当RabbitMQ退出或死机会清空队列和消息,除非你告诉它即使宕机也不能丢失任何东西。要确保消息不会丢失,有两件事情我们是必需要做的:我们需要将队列和消息都标记为持久的。 首先,我们需要确保我们RabbitMQ从来都不会损失我们的的队列。为了做到这一点,我们需要声明我们的队列为持久化的:channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
虽然这个命令本身是正确的,它不会起作用在我们目前的设置中。这是因为我们已经定义了一个叫hello的队列,它不是持久化的。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,在任何程序代码中,都试图返回一个错误。但有一个快速的解决方法-让我们声明一个名称不同的队列,例如task_queue:
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
这行代码QueueDeclare表示队列的声明,创建并打开队列,这个段代码需要应用到【生产者】和【消费者】中。
在这一点上,我们相信,task_queue队列不会丢失任何东西即使RabbitMQ重启了。现在我们要通过设置IbasicProperties.SetPersistent属性值为true来标记我们的消息持久化的。var properties = channel.CreateBasicProperties(); properties.Persistent = true;
关于消息持久性的注意
将消息标记为持久性并不能完全保证消息不会丢失。虽然该设置告诉RabbitMQ时时刻刻把保存消息到磁盘上,但是这个时间间隔还是有的,当RabbitMQ已经接受信息但并没有保存它,此时还有可能丢失。另外,RabbitMQ不会为每个消息调用fsync(2)--它可能只是保存到缓存并没有真正写入到磁盘。虽然他的持久性保证不强,但它我们简单的任务队列已经足够用了。如果您需要更强的保证,那么您可以使用Publisher Comfirms。6、公平调度 你可能已经注意到,调度仍然没有像我们期望的那样的工作。例如,在两个Workers的情况下,当所有的奇数消息是沉重的,甚至消息是轻的,一个Worker忙个不停,而另一个Worker几乎没事可做。哎,RabbitMQ对上述情况一无所知,仍将消息均匀发送。 发生这种情况是因为当有消息进入队列的时候RabbitMQ才仅仅调度了消息。它根本不看【消费者】未确认消息的数量,它只是盲目的把第N个消息发送给第N个【消费者】。 为了避免上述情况的发生,我们可以使用prefetchcount = 1的设置来调用BasicQos方法。这个方法告诉RabbitMQ在同一时间不要发送多余一个消息的数据给某个【Worker】。或者,换句话说,当某个消息处理完毕,并且已经收到了消息确认之后,才可以继续发送消息给那个【Worker】。相反,它将把消息分配给给下一个不忙的【Worker】。channel.BasicQos(0, 1, false);
注意队列大小
如果所有的工人都很忙,你的队列可以填满。你要留意这一点,也许会增加更多的【Worker】,或者有其他的策略。7、把所有的代码放在一起NewTask.cs类最终的代码是:1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class NewTask 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" };10 using(var connection = factory.CreateConnection())11 using(var channel = connection.CreateModel())12 {13 channel.QueueDeclare(queue: "task_queue",14 durable: true,15 exclusive: false,16 autoDelete: false,17 arguments: null);18 19 var message = GetMessage(args);20 var body = Encoding.UTF8.GetBytes(message);21 22 var properties = channel.CreateBasicProperties();23 properties.Persistent = true;24 25 channel.BasicPublish(exchange: "",26 routingKey: "task_queue",27 basicProperties: properties,28 body: body);29 Console.WriteLine(" [x] Sent {0}", message);30 }31 32 Console.WriteLine(" Press [enter] to exit.");33 Console.ReadLine();34 }35 36 private static string GetMessage(string[] args)37 {38 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");39 }40 }
Worker.cs完整源码如下:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 using System.Threading; 6 7 class Worker 8 { 9 public static void Main()10 {11 var factory = new ConnectionFactory() { HostName = "localhost" };12 using(var connection = factory.CreateConnection())13 using(var channel = connection.CreateModel())14 {15 channel.QueueDeclare(queue: "task_queue",16 durable: true,17 exclusive: false,18 autoDelete: false,19 arguments: null);20 21 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);22 23 Console.WriteLine(" [*] Waiting for messages.");24 25 var consumer = new EventingBasicConsumer(channel);26 consumer.Received += (model, ea) =>27 {28 var body = ea.Body;29 var message = Encoding.UTF8.GetString(body);30 Console.WriteLine(" [x] Received {0}", message);31 32 int dots = message.Split('.').Length - 1;33 Thread.Sleep(dots * 1000);34 35 Console.WriteLine(" [x] Done");36 37 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);38 };39 channel.BasicConsume(queue: "task_queue",40 noAck: false,41 consumer: consumer);42 43 Console.WriteLine(" Press [enter] to exit.");44 Console.ReadLine();45 }46 }47 }
使用消息确认和BasicQos方法可以建立一个工作队列。持久化的选项可以让我们的任务队列保持存活即使RabbitMQ重启。
好了,写完了,翻译的不好,大家见谅。
原文地址如下:
欢迎大家来探讨。