Skip to the content.

消息 TTL(Time-To-Live)和过期

RabbitMQ 可以给队列和消息两个都设置 TTL 时间。通过使用队列参数或策略就能做到这个(推荐用策略配置)。

TTL 消息可以应用在单队列,一组队列或是应用在基于消息的逐个应用。TTL 设置可以通过策略操作强制设置。

队列中的每个消息的 TTL

TTL 是在队列中,通过使用策略将 message-ttl 复制代表给消息添加 TTL,不用策略也可以在队列申明的时候添加相同的参数来指定 TTL。

在队列中的消息的存在的时间如果设置的 TTL 要长的话,就会过期。请注意,路由到多个队列的消息可能在不同时间死亡,或者在其所在的每个队列中根本没有死亡。在一个队列中存在一个死信消息对队列中的其它消息没有影响。

服务器保证了死信队列不会通过 basic.delivery 推送消息或者是通过 basi.get-ok 这种响应(用于一次性获取操作)。另外,服务器还会将超过 TTL 的消息快速移除。

TTL 参数或策略的值必须是非负整数(0 <= n),用毫秒来描述间隔。假设 TTL 的值是 1000,那么在这个消息推送给消费者之前存活的时间就是 1 秒。参数类型格式是 AMQP 0-9-1 中的 short-short-int, short-int, long-int, 或者 long-long-int

队列使用策略定义 TTL

在策略中指定 TTL,只需要添加 message-ttl

rabbitmqctl rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
rabbitmqctl (Windows) rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues

上面的配置是说给所有队列设置 TTL 时间为 1 分钟

使用 x-argument 在队列申明期间消息设置 TTL

下面是 Java 客户端例子,创建一个队列让消息存留 60 秒

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

.NET 客户端

var args = new Dictionary<string, object>();
args.Add("x-message-ttl", 60000);
model.QueueDeclare("myqueue", false, false, false, args);

给已经有的消息的队列应用 TTL,但是有一些注意事项(后面会提到)。

如果它重入队列了(例如由于使用了 AMQP 方法并传递了参数 requeue,或者是由于 channel 关闭),就会提供消息的原始过期时间。

将 TTL 设置为 0 将导致消息在到达队列时就过期,除非它们能够立即交付给消费者。因此它们提供了另一个推送标签 immediate,RabbitMQ 服务端是不支持这个参数。不像那个标签,没有发出 basic.return,并且如果设置了一个死信交换机,那么消息都将成为死信消息。

发布者中每个消息的 TTL

当发送一个 basic.publish 时,TTL 可以通过设置在 AMQP 0-9-1 的类 basicexpiration 字段,给每个消息指定。

expiration 字段值描述的时 TTL 时间间隔。受 x-message-ttl 约束的一样。由于 expiration 必须是字符串,broker 将一个字符串表示数字。

当给每个队列和每个消息指定 TTL 时,会选择其中最低的值。

下面是 JAVA 客户端的例子,存留时间为 60 秒:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                                   .expiration("60000")
                                   .build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

C# :

byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Expiration = "36000000"

model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);

警告

具有每个消息 TTL 的队列(当它们已经有消息时)将在发生特定事件时丢弃消息。只有当过期的消息到达队列头部时,它们才会真正被丢弃(或被死信)。消费者不会收到已过期的消息。请记住,在消息过期和使用者交付之间可能存在竞态条件,例如在消息写进 Socket 之后,在过期之前消费者又接收到了这个消息。

在设置每条消息的TTL时,过期的消息会排在未过期的消息之后,知道这个消息被消费或过期。因此这些过期的消息所占用的资源不会被释放,并且它们还会再队列中计数统计(比如再队列中的消息数量)。

当我们追溯给每个消息应用 TTL 时,它推荐这些消费能确保快速的丢弃。

考虑到现有队列上的每个消息 TTL 设置的这种行为,当需要删除消息以释放资源时,应该使用队列 TTL(或队列清除,或队列删除)。

队列 TTL

TTL 可以设置在队列上,并不仅时队列上的内容。队列只有它们没有使用一段时间之后就会过期(并不是被消费)。这个特征能同自动删除队列属性一起使用。

能通过给 queue.declare 上设置 x-arguments 来给队列设置过期时间,或者通过设置策略的 expires。它控制了一个队列在自动删除之前,能使用多长时间。未使用代表还没有消费者,最近队列还没有重新申明(重新申明新的租约),并且在过期这段时间内还没有调用 basic.get。例如这还能用在对于 RPC 风格的回复队列,这里有很多队列创建永不耗尽的队列。

如果在过期间隔内队列没有被使用,那么服务器就会保证队列将会被删除。对于过期后如何迅速删除队列,没有给出任何保证。持久化队列的租约(lease)在服务器重启的时候启动。

x=arguments 参数的值或者 expires 策略描述了以毫秒未单位的过期时间间隔。它必须时一个正整数(不像 TTL 它可以是 0)。因此 1000 表示的这个队列在 1 秒内没有使用则将会被删除。

队列使用策略定义队列 TTL

以下策略使所有队列在上次使用后 30 分钟后过期:

rabbitmqctl rabbitmqctl set_policy expiry “.*” ‘{“expires”:1800000}’ –apply-to queues
rabbitmqctl(windows) rabbitmqctl.bat set_policy expiry “.*” “{““expires”“:1800000}” –apply-to queues

申明队列期间使用 x-arguments 定义队列 TTL

下面这个例子是 Java 创建一个队列,并设置了未使用时间位 30 分钟后过期

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);