跳转至

自定义规则节点

概述

在本教程中,您将学习如何创建自定义规则节点并将它们添加到您的 YiCONNECT 服务器实例。我们将回顾三种不同类型的规则节点:过滤、丰富和转换。

步骤 1. 下载并构建示例项目

克隆存储库并导航到存储库文件夹:

git clone -b release-3.6 https://github.com/yiqisoft/rule-node-examples
cd rule-node-examples

默认情况下,示例项目配置为使用 YiCONNECT API。

nano pom.xml

最后,构建项目:

mvn clean install

预期输出:

...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.431 s
[INFO] Finished at: 2020-08-18T11:01:40+03:00
[INFO] ------------------------------------------------------------------------

步骤2.将项目导入IDE

确保Lombok插件已安装到您最喜欢的 IDE 中。将项目作为 Maven 项目导入到您最喜欢的 IDE。

步骤 3. 创建规则节点

为了创建新的规则节点,您应该实现 TbNode接口并使用RuleNode注释对其进行注释 。

作为示例,您可以查看一个非常简单的规则节点,该规则节点根据消息负载中密钥的存在来过滤传入消息。该规则节点是您在上一步中下载的项目的一部分。

@RuleNode(
        type = ComponentType.FILTER,
        name = "check key",
        relationTypes = {"True", "False"},
        configClazz = TbKeyFilterNodeConfiguration.class,
        nodeDescription = "Checks the existence of the selected key in the message payload.",
        nodeDetails = "If the selected key exists - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
        uiResources = {"static/rulenode/custom-nodes-config.js"},
        configDirective = "tbFilterNodeCheckKeyConfig")
public class TbKeyFilterNode implements TbNode {

    private static final ObjectMapper mapper = new ObjectMapper();

    private TbKeyFilterNodeConfiguration config;
    private String key;


    @Override
    public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
        this.config = TbNodeUtils.convert(configuration, TbKeyFilterNodeConfiguration.class);
        key = config.getKey();
    }

    @Override
    public void onMsg(TbContext ctx, TbMsg msg) {
        try {
            ctx.tellNext(msg, mapper.readTree(msg.getData()).has(key) ? "True" : "False");
        } catch (IOException e) {
            ctx.tellFailure(msg, e);
        }
    }

    @Override
    public void destroy() {
    }
}

上面列出的源代码中有几件事需要注意:

@RuleNode 注释

@RuleNode 注释定义节点类型、名称、描述、UI 形式和出站[关系]。

让我们看一下可用的参数:

  • type是可用的规则节点类型之一。该参数影响规则链编辑器的哪个部分将包含您的规则节点;
  • name - 将用于规则链编辑器和调试消息的规则节点的任何合理名称;
  • nodeDescription - 节点的简短描述。在规则链编辑器中可见;
  • nodeDetails - 带有 html 标签支持的节点的完整描述。在规则链编辑器中可见;
  • configClazz - 描述配置 json 的类的完整类名。
  • lationTypes - 具有预定义关系类型的字符串数组;该值应与TbContext.tellNext方法中使用的值相对应;
  • customRelations - 布尔值,指示您将在TbContext.tellNext方法中使用任何自定义关系;
  • configDirective - 基于 Angular 的 UI 指令的名称,允许用户编辑规则节点的配置。这是可选的并且可以为空。在这种情况下,用户将看到原始 JSON 编辑器;
  • uiResources - 包含配置指令的 Angular UI 文件的路径。这是可选的并且可以为空。在这种情况下,用户将看到原始 JSON 编辑器;
  • icon - 角度材质包中的图标名称;
  • iconUrl - 图标的完整 URL,该图标将用于在位于规则链编辑器的节点列表中显示规则节点;
  • docUrl - 链接到当前规则节点的文档页面,该页面将在规则链编辑器中可用。

规则节点生命周期

当创建新的规则节点时,规则引擎会调用 “init” 方法。如果有人将规则节点添加到规则链或系统停止,则可能会发生这种情况。此方法主要用于解析 JSON 对象的配置或获取TbContext的本地副本。“TbNodeUtils.convert”正在将原始配置解析为指定类的java对象。

当规则节点被销毁时,规则引擎会调用 “destroy” 方法。如果有人从规则链中删除规则节点或系统停止,则可能会发生这种情况。

当用户决定更改现有规则节点的配置时,规则引擎将依次调用 “destroy”“init” 方法。

处理传入消息

规则节点实现必须使用以下方法之一来通知规则引擎消息已成功处理:

/**
 * Indicates that message was successfully processed by the rule node.
 * Sends message to all Rule Nodes in the Rule Chain
 * that are connected to the current Rule Node using "Success" relationType.
 *
 * @param msg
 */
void tellSuccess(TbMsg msg);

/**
 * Sends message to all Rule Nodes in the Rule Chain
 * that are connected to the current Rule Node using specified relationType.
 *
 * @param msg
 * @param relationType
 */
void tellNext(TbMsg msg, String relationType);

/**
 * Sends message to all Rule Nodes in the Rule Chain
 * that are connected to the current Rule Node using one of specified relationTypes.
 *
 * @param msg
 * @param relationTypes
 */
void tellNext(TbMsg msg, Set<String> relationTypes);

如果消息处理失败,规则节点实现必须调用“tellFailure”方法:

/**
 * Notifies Rule Engine about failure to process current message.
 *
 * @param msg - message
 * @param th  - exception
 */
void tellFailure(TbMsg msg, Throwable th);

如果规则节点实现不会调用上面列出的任何方法,则规则引擎将等待可配置的超时并阻止其他消息的处理,并最终将当前消息标记为失败。

使用 YiCONNECT 服务

TbContext包含许多有用服务的“getter” 请不要忘记在您喜欢的 IDE 中按“下载源”以简化这些服务界面的浏览;下面列出了可用服务 getter 的简短列表:

// Allows to work with entity attributes: get and save them;
AttributesService getAttributesService();

// Allows CRUD (Create, Read, Updat, Delete) operations over the customer entities;
CustomerService getCustomerService();

// Allows CRUD operations over users;
UserService getUserService();

// Allows CRUD operations over assets;
AssetService getAssetService();

// Allows CRUD operations over devices;
DeviceService getDeviceService();

// Allows CRUD operations over entity views;
EntityViewService getEntityViewService();

// Allows to programmatically create and manage dashboards;
DashboardService getDashboardService();

// Allows to create and clear alarms;
RuleEngineAlarmService getAlarmService();

// Allows to programmatically create and manage rule chains;
RuleChainService getRuleChainService();

// Allows to send RPC commands to devices;
RuleEngineRpcService getRpcService();

// Allows to store telemetry to the database and push notifications to the dashbaords via WebSockets;
RuleEngineTelemetryService getTelemetryService();

// Allows to find telemetry and save it to the database without notifications to the dashboards;
TimeseriesService getTimeseriesService();

// Allows to programmatically query and manage entity relations;
RelationService getRelationService();
// Allows to programmatically create and manage integrations;
IntegrationService getIntegrationService();

// Allows to programmatically create and manage entity groups;
EntityGroupService getEntityGroupService();

// Allows to programmatically create reports;
ReportService getReportService();

// Allows to programmatically manage blob entities;
BlobEntityService getBlobEntityService();

// Allows to programmatically manage group permissions;
GroupPermissionService getGroupPermissionService();

// Allows to programmatically manage roles;
RoleService getRoleService();

// Get entity owner (TenantId or CustomerId)
EntityId getOwner(TenantId tenantId, EntityId entityId);

// Clear entity owners cache
void clearOwners(EntityId entityId);

// Get all sub-customers of the current entity
Set<EntityId> getChildOwners(TenantId tenantId, EntityId parentOwnerId);

// Allows to change entity owner. Expects TenantId or CustomerId as targetOwnerId
void changeDashboardOwner(TenantId tenantId, EntityId targetOwnerId, Dashboard dashboard) throws YiCONNECTException;

void changeUserOwner(TenantId tenantId, EntityId targetOwnerId, User user) throws YiCONNECTException;

void changeCustomerOwner(TenantId tenantId, EntityId targetOwnerId, Customer customer) throws YiCONNECTException;

void changeEntityViewOwner(TenantId tenantId, EntityId targetOwnerId, EntityView entityView) throws YiCONNECTException;

void changeAssetOwner(TenantId tenantId, EntityId targetOwnerId, Asset asset) throws YiCONNECTException;

void changeDeviceOwner(TenantId tenantId, EntityId targetOwnerId, Device device) throws YiCONNECTException;

void changeEntityOwner(TenantId tenantId, EntityId targetOwnerId, EntityId entityId, EntityType entityType) throws YiCONNECTException;

// Allows to push custom downlink message to the integration
void pushToIntegration(IntegrationId integrationId, TbMsg tbMsg, FutureCallback<Void> callback);

从规则节点创建新消息

可能需要创建从当前消息派生的消息并将其推送到规则引擎。例如,让我们编写一个自定义规则节点,将当前客户的消息复制到所有客户设备:

@Override
public void onMsg(TbContext ctx, TbMsg msg) {
    EntityId msgOriginator = msg.getOriginator();
    // Checking that the message originator is a Customer;
    if (EntityType.CUSTOMER.equals(msgOriginator.getEntityType())) {
        CustomerId customerId = new CustomerId(msgOriginator.getId());
        boolean hasNext = true;
        // Creating the page link to iterate through the devices;
        PageLink pageLink = new PageLink(1024);
        while (hasNext) {
            // Using the Device Service to get devices from the database;
            PageData<Device> devices = ctx.getDeviceService().findDevicesByTenantIdAndCustomerId(ctx.getTenantId(), customerId, pageLink);
            hasNext = devices.hasNext();
            pageLink = pageLink.nextPageLink();
            for (Device device : devices.getData()) {
                // Creating new message with different originator
                TbMsg newMsg = TbMsg.newMsg(msg.getQueueName(), msg.getType(), device.getId(), msg.getMetaData(), msg.getData());
                // Pushing new message to the queue instead of tellNext to make sure that the message will be persisted;
                ctx.enqueueForTellNext(newMsg, "Success");
            }
        }
        // Don't forget to acknowledge original message or use ctx.tellSuccess(msg);
        ctx.ack(msg);
    } else {
        ctx.tellFailure(msg, new IllegalArgumentException("Msg originator is not Customer!"));
    }
}

您可能会注意到,我们使用了TbContext.enqueueForTellNext方法将新消息推送到规则引擎。消息将根据关系类型推送到相关的规则节点。另一种选择是将消息放在处理的开头,基本上是放在根规则链中。

void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);

此外,您可以使用稍微不同的方法,该方法还允许您接收新消息已成功推送到队列的确认:

void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);

多线程

规则引擎是参与者模型的实现,它为规则节点邮箱中的每条新消息顺序调用 TbNode.onMsg方法。因此,如果您在同一线程中处理消息,则您的实现是线程安全的。

然而,出于性能原因,大多数 API 调用都在独立的线程中执行。例如,让我们回顾一下应该如何保存传入消息中的遥测数据:

@Override
public void onMsg(TbContext ctx, TbMsg msg) {
    try {
        // Parsing the incoming message;
        ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
        // Converting temperature from °F to °C
        double temperatureF = json.get("temperature").asDouble();
        double temperatureC = (temperatureF - 32) * 5 / 9;
        // Creating the telemetry data point
        TsKvEntry tsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", temperatureC));
        // Using async API call to save telemetry with the callback
        ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), Collections.singletonList(tsKvEntry), new FutureCallback<Void>() {
            @Override
            public void onSuccess(@Nullable Void aVoid) {
                // Telemetry is saved, now we can acknowledge the message; 
                ctx.tellSuccess(msg);
            }

            @Override
            public void onFailure(Throwable throwable) {
                // Telemetry is not saved, we need rule engine to reprocess the message;
                ctx.tellFailure(msg, throwable);
            }
        });
    } catch (JsonProcessingException e) {
        ctx.tellFailure(msg, e);
    }
}

您可能会注意到,我们在回调线程中而不是在主线程中通过 TbContext.tellSuccess “确认”或“转发”消息。

聚类模式

为每个规则引擎微服务启动规则节点的单个实例。例如,如果您有三个规则引擎实例,则每个实例都会启动一个 RuleNode 实例。规则引擎消息根据消息的发起者 ID(设备或资产 ID)进行分区。因此,来自一台设备的消息将始终发送到特定规则引擎微服务上的同一规则节点实例。唯一的特殊情况是添加或删除规则节点时。在这种情况下,就会发生“重新分区”事件。

作为规则节点开发人员,您可以重写默认方法TbNode.onPartitionChangeMsg 以对集群拓扑的更改做出反应。这对于根据消息的发起者(设备/资产)ID 决定缓存信息的有状态节点非常有用。为了确定当前实体 id 属于当前分配的分区列表,可以使用TbContext.isLocalEntity。请参阅下面的完整示例:

package org.thingsboard.rule.engine.node.filter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;


@Slf4j
@RuleNode(
        type = ComponentType.FILTER,
        name = "Cache example",
        relationTypes = {"True", "False"},
        configClazz = EmptyNodeConfiguration.class,
        nodeDescription = "Checks that the incoming value exceeds certain threshold",
        nodeDetails = "If temperature is too high - send Message via <b>True</b> chain, otherwise <b>False</b> chain is used.",
        uiResources = {"static/rulenode/rulenode-core-config.js"},
        configDirective = "tbNodeEmptyConfig")
public class TbCacheExampleNode implements TbNode {

    private static final ObjectMapper mapper = new ObjectMapper();
    private ConcurrentMap<EntityId, Double> cache;

    @Override
    public void init(TbContext tbContext, TbNodeConfiguration configuration) throws TbNodeException {
        this.cache = new ConcurrentHashMap<>();
    }

    @Override
    public void onMsg(TbContext ctx, TbMsg msg) {
        try {
            // Parsing the incoming message;
            ObjectNode json = (ObjectNode) mapper.readTree(msg.getData());
            double temperature = json.get("temperature").asDouble();
            // Fetching temperatureThreshold attribute from cache or from the database
            Double temperatureThreshold = getCacheValue(ctx, msg.getOriginator(), "temperatureThreshold", 42);
            // Compare and do something with the result of comparison;
            ctx.tellNext(msg, temperature > temperatureThreshold ? "True" : "False");
        } catch (JsonProcessingException e) {
            ctx.tellFailure(msg, e);
        }
    }

    @Override
    public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
        // Cleanup the cache for all entities that are no longer assigned to current server partitions
        cache.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
    }

    private double getCacheValue(TbContext ctx, EntityId entityId, String attributeKey, double defaultValue) {
        // Get value from cache or from the database.
        return cache.computeIfAbsent(entityId, id -> {
            try {
                Optional<AttributeKvEntry> attr = ctx.getAttributesService().find(ctx.getTenantId(), entityId, DataConstants.SERVER_SCOPE, attributeKey).get();
                if (attr.isPresent()) {
                    return attr.get().getDoubleValue().orElse(defaultValue);
                } else {
                    return defaultValue;
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }


    @Override
    public void destroy() {
        // In case you have changed the configuration, it is good idea to clear the entire cache.
        cache.clear();
    }
}

步骤 4. 将自定义规则节点导入到您的 YiCONNECT 实例

完成规则节点的编码后,再次执行构建命令:

mvn clean install

然后,将 jar 文件作为依赖库添加到您的 YiCONNECT 项目中。构建的结果位于此处:

target/rule-engine-1.0.0-custom-nodes.jar

现在您已准备好将包含规则节点的 jar 文件添加到 YiCONNECT 实例中:

  • 如果您的 YiCONNECT 作为服务安装,请使用步骤 4.1。
  • 如果您的 YiCONNECT 是从源代码构建并从 IDE 本地启动的,请使用步骤 4.2

步骤 4.1 将 JAR 文件添加到作为服务安装的 YiCONNECT

  • 首先,您需要执行以下命令将 jar 文件复制到 YiCONNECT 扩展:
sudo cp rule-engine-1.0.0-custom-nodes.jar /usr/share/thingsboard/extensions/
  • 接下来,执行以下命令将所有者更改为 YiCONNECT:
sudo chown thingsboard:thingsboard /usr/share/thingsboard/extensions/*

重新启动 YiCONNECT 服务:

sudo service thingsboard restart

YiCONNECT 重新启动后,您需要清除浏览器缓存并刷新网页以重新加载规则节点的 UI

步骤 4.2 将 JAR 文件添加到使用 IDE 启动的本地 YiCONNECT

重新启动 YiCONNECT 服务器端容器。

YiCONNECT 重新启动后,您需要清除浏览器缓存并刷新网页以重新加载规则节点的 UI

步骤 5. 将自定义包名称添加到 thingsboard.yml

注意, 如果您已将包名称从org.thingsboard.rule.engine更改为您公司的包名称,例如 com.example.rule.engine ,您还需要在插件部分的thingsboard.yml文件中添加您的包名称:

# Plugins configuration parameters
plugins:
  # Comma separated package list used during classpath scanning for plugins
  scan_packages: "${PLUGINS_SCAN_PACKAGES:org.thingsboard.server.extensions,org.thingsboard.rule.engine,com.example.rule.engine}"

步骤 6. 对规则节点进行故障排除

验证自定义规则节点的最简单方法是创建生成器规则节点并将其连接到自定义规则节点。这将生成可配置的传入消息流。完成此操作后,您应该为自定义规则节点启用调试以验证节点输出并检查它们是否有错误。

步骤 7. 规则节点 UI 定制(可选)

要在热重新部署模式下运行规则节点 UI 容器:

  • 首先,您需要将proxy.conf.js文件中的常量ruleNodeUiforwardPort8080更改为 5000 ,该文件应位于此处:
nano ${TB_WORK_DIR}/ui-ngx/proxy.conf.js
  • 其次,您需要以热重新部署模式运行 UI 容器。

  • 最后一步是从本地目录TB_RULE_NODE_UI_WORK_DIR执行以下命令: