队列
介绍
YiCONNECT 平台使用队列来保证消息处理、处理偶尔的峰值并保持系统在极端负载下正常运行。您可以查看该架构以了解有关队列的更多信息。YiCONNECT 支持著名的消息代理/队列提供商(Kafka、RabbitMQ、AWS SQS、Azure 服务总线、Google Pub/Sub)。在以后的版本中,我们将添加新的实现。该平台的3.4版本引入了配置UI,以简化设置和管理流程并改善用户体验。简而言之,规则引擎在启动时订阅队列并轮询新消息。始终有一个main主题(队列)用作新消息的默认入口点。可以使用检查点节点将消息放入另一主题中。后者自动确认目标主题中的相应消息。
队列配置
只有系统管理员用户可以配置队列。配置后,新的更改将立即应用。队列有两种配置文件:通用队列配置和 独立租户的队列配置 。
常用队列配置
开箱即用,所有消息(如遥测、连接或生命周期事件等)都被推送到Main或默认选择的其他主题。当独立处理被禁用(默认)时,YiCONNECT 将所有租户的消息放置到一个公共主题中。优点:这种方法更具成本效益;无需管理额外的虚拟机或容器。缺点:所有租户之间共享单个规则引擎服务。
要创建新队列,请执行以下步骤:
- 以系统管理员身份登录;
- 导航至“设置”页面中的 “队列” 选项卡;
- 单击“加号”按钮创建新队列。
- 输入队列名称。选择策略类型并配置重试处理设置和轮询设置。单击添加。
您已创建自定义队列。
独立租户的队列配置
队列设置
队列的定义由以下参数和模块组成:
- 名称 ——用于统计和记录;
- 提交设置——定义向规则引擎提交消息的逻辑和顺序;
- 重试处理设置- 定义消息确认的逻辑;
- 轮询设置- 用于批量和立即处理的队列设置。
提交设置
规则引擎服务不断轮询特定主题的消息,一旦消费者返回消息列表,它就会创建 TbMsgPackProcessingContext 对象。队列提交策略控制来自 TbMsgPackProcessingContext 的消息如何传递到规则链。有 5 种可用策略:
- 按发起者顺序 - 消息针对特定实体(消息发起者)一一提交。例如,在设备 A 的前一条消息得到确认之前,不会提交设备 A 的新消息。
- 按租户顺序 - 消息在租户(消息发起者的所有者)内按顺序提交。例如,在租户 A 的前一条消息得到确认之前,租户 A 的新消息不会离开队列。
- 顺序 - 消息被一个接一个地提交。在确认前一条消息之前,不会提交新消息。这使得处理速度相当慢。
- 突发 - 所有消息都按照到达的顺序提交到规则链。
- 批次- 使用 分组参数“批次大小” 将消息分组为批次。在上一批次得到确认之前,不会提交新批次。
重试处理设置
处理策略控制如何重新处理失败或超时的消息。有 5 种可用策略:
- 重试失败和超时 - 重试处理包中的所有失败和超时消息。
- 跳过所有失败 - 简单地忽略所有失败。会导致失败的消息“丢失”。例如,如果数据库关闭,消息将不会被持久化,但仍会被标记为“已确认”并从队列中删除。创建此策略主要是为了与以前的版本和开发/演示环境向后兼容。已经提交到规则链处理的超时消息不会被取消。这意味着尽管超时,规则引擎仍将尝试处理它们。
- 跳过所有失败和超时 - 简单地忽略所有失败和超时。会导致失败和超时的消息“丢失”。例如,如果数据库关闭,消息将不会被持久化,但仍会被标记为“已确认”并从队列中删除。已提交规则链处理的超时消息将被取消。规则节点不会开始处理已取消的消息。然而,在消息被取消之前开始处理消息的规则节点不会被中断。
- 全部重试 - 重试处理包中的所有消息。假设处理包包含 100 条消息。如果 100 条消息中有 1 条失败,策略仍将重新处理(重新提交到规则引擎)100 条消息。每次策略将消息重新提交到规则引擎时,这些消息都是原始消息的二进制副本。在重新提交之前,之前提交的所有消息都会被取消。规则节点不会开始处理已取消的消息。然而,在消息被取消之前开始处理消息的规则节点不会被中断。
- 重试失败 - 重试处理包中的所有失败消息。假设处理包包含 100 条消息。如果 100 条消息中有 1 条失败,策略将仅重新处理(重新提交到规则引擎)1 条消息。超时消息将不会被重新处理。每次策略将消息重新提交到规则引擎时,这些消息都是原始消息的二进制副本。在重新提交之前,之前提交的所有消息都会被取消。规则节点不会开始处理已取消的消息。然而,在消息被取消之前开始处理消息的规则节点不会被中断。
- 重试超时 - 重试处理包中的所有超时消息。假设处理包包含 100 条消息。如果 100 条消息中有 1 条超时,策略将仅重新处理(重新提交到规则引擎)1 条消息。失败的消息将不会被重新处理。每次策略将消息重新提交到规则引擎时,这些消息都是原始消息的二进制副本。在重新提交之前,之前提交的所有消息都会被取消。规则节点不会开始处理已取消的消息。然而,在消息被取消之前开始处理消息的规则节点不会被中断。
所有重试处理策略都支持重要的配置参数:
- Number of retries - 迭代次数,0为无限制;
- 跳过重试的失败消息百分比 - 如果失败或超时少于消息的 X 百分比,则跳过重试;
- 重试时间 - 重试之前在消费者线程中等待的时间(以秒为单位);
- 额外重试时间 - 第二次和后续重试尝试的等待时间(以秒为单位)。
轮询设置
批量处理:
- 轮询间隔 - 如果没有新消息到达,轮询消息之间的持续时间(以毫秒为单位)。
- 分区 - 与此队列关联的分区数。用于缩放可以并行处理的消息数量。
立即处理:
- 为每个消费者发送消息轮询 - 队列由分区组成。如果未选中该复选框,则所有分区都有一个使用者。如果选中,每个分区将有独立的消费者。
- 处理时间间隔(以毫秒为单位),用于处理消费者返回的特定消息包。
自定义属性
您可以为队列(主题)创建指定自定义属性。它们特定于队列提供程序,例如retention.ms:604800000;retention.bytes:1048576000
Kafka 或MaximumMessageSize:262144;MessageRetentionPeriod:604800
AWS SQS 等。
请注意,这些属性仅在首次创建队列时应用。
默认队列
配置了三个默认队列:Main、HighPriority 和 SequentialByOriginator。它们根据提交和处理策略而有所不同。基本上,规则引擎处理来自main主题的消息,并可以选择使用“检查点”规则节点将它们放入其他主题。默认情况下,主要主题只是忽略失败的消息。这样做是为了向后兼容以前的版本。但是,您可以自行承担重新配置的风险。请注意,如果由于规则节点脚本中的某些故障而未处理一条消息,则可能会阻止处理下一条消息。我们设计了特定的仪表板来监控规则引擎处理和故障。
高优先级主题可用于传送警报或其他关键处理步骤。HighPriority topic 中的消息在失败时会不断重新处理,直到消息处理成功。如果 SMTP 服务器或外部系统发生故障,这会非常有用。规则引擎将重试发送消息,直到消息被处理为止。
如果您想确保消息按正确的顺序处理,SequentialByOriginator主题就很重要。来自同一实体的消息将按照它们到达队列的顺序进行处理。在同一实体 ID 的前一条消息得到确认之前,规则引擎不会向规则链提交新消息。