news 2026/4/3 4:40:26

Jetlinks 物联网平台社区版 源码学习分析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Jetlinks 物联网平台社区版 源码学习分析

设备接入

设备接入流程图

device-flow.fd9a8a41

网络 > 协议 > 网关

网络组件 (org.jetlinks.community.network.Network)

真正与设备连接交互的网络层, 用于管理各种网络服务(MQTT,TCP等),动态配置, 启停. 只负责接收/发送报文,不负责任何处理逻辑。

社区版, 网络组件的实现有四类:

org.jetlinks.community.network.tcp.server.TcpServer // 作为服务器接受设备端连接

org.jetlinks.community.network.tcp.client.TcpClient // 主动tcp连接设备端

org.jetlinks.community.network.mqtt.client.MqttClient //使用客户端连接第三方的MQTT服务器

org.jetlinks.community.network.mqtt.server.MqttServer //使用的本机MQTT服务, 接受设备端连接

网络组件, 支持提供关键的两个接口

org.jetlinks.community.network.Network

public interface Network {

/**

* ID唯一标识

*

* @return ID

*/

String getId();

/**

* @return 网络类型

* @see DefaultNetworkType

*/

NetworkType getType();

/**

* 关闭网络组件

*/

void shutdown();

/**

* @return 是否存活

*/

boolean isAlive();

/**

* 当{@link Network#isAlive()}为false是,是否自动重新加载.

* @return 是否重新加载

* @see NetworkProvider#reload(Network, Object)

*/

boolean isAutoReload();

}

org.jetlinks.community.network.NetworkProvider

public interface NetworkProvider<P> {

/**

* @return 类型

* @see DefaultNetworkType

*/

@Nonnull

NetworkType getType();

/**

* 使用配置创建一个网络组件

* @param properties 配置信息

* @return 网络组件

*/

@Nonnull

Network createNetwork(@Nonnull P properties);

/**

* 重新加载网络组件

* @param network 网络组件

* @param properties 配置信息

*/

void reload(@Nonnull Network network, @Nonnull P properties);

/**

* @return 配置定义元数据

*/

@Nullable

ConfigMetadata getConfigMetadata();

/**

* 根据可序列化的配置信息创建网络组件配置

* @param properties 原始配置信息

* @return 网络配置信息

*/

@Nonnull

Mono<P> createConfig(@Nonnull NetworkProperties properties);

...

每一个网络组件(org.jetlinks.community.network.Network) 对应有一个组件提供器对应 (org.jetlinks.community.network.NetworkProvider)

最终网络组件统一由 org.jetlinks.community.network.NetworkManager 管理;

默认实现是org.jetlinks.community.network.DefaultNetworkManager(用Spring BeanPostProcessor hook 加载的)

调用其org.jetlinks.community.network.DefaultNetworkManager#register方法, 传递 NetworkProvider 可以注册一个网络组件

实例组件数据是存在数据库的 network_config 表

协议相关 (org.jetlinks.core.ProtocolSupport)

用于自定义消息解析规则,用于认证、将设备发送给平台报文解析为平台统一的报文,以及处理平台下发给设备的指令。

协议(org.jetlinks.core.ProtocolSupport)主要由: 认证器(Authenticator), 消息编解码器(DeviceMessageCodec),消息发送拦截器(DeviceMessageSenderInterceptor) 以及配置元数据(ConfigMetadata)组成.

org.jetlinks.core.defaults.Authenticator // Authenticator

org.jetlinks.core.codec.defaults.DeviceMessageCodec //DeviceMessageCodec

org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor //DeviceMessageSenderInterceptor

org.jetlinks.core.ProtocolSupport + org.jetlinks.core.metadata.DeviceMetadataCodec //ConfigMetadata

其默认自带的JetLinks V1.0 协议,在org.jetlinks.supports.official.JetLinksProtocolSupportProvider 提供

每一个协议(org.jetlinks.core.ProtocolSupport) 对应有一个组件提供器对应 (org.jetlinks.core.spi.ProtocolSupportProvider)

自定义协议, 即实现 org.jetlinks.core.spi.ProtocolSupportProvider 这个接口;

统一由org.jetlinks.supports.protocol.management.ProtocolSupportManager 管理;

默认实现是org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager(用Spring BeanPostProcessor hook自动加载 2.0是org.jetlinks.community.protocol.configuration.ProtocolAutoConfiguration配置类加载的)

调用其org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager#store方法传递 ProtocolSupportDefinition 进行注册;

注意: 注册后只存起来, 顾名思义, 还跟集群的其他节点有关, 而且跟协议的实现类型(jar,js)有关, 加载也不一样; 参考 org.jetlinks.supports.protocol.management.ProtocolSupportDefinition的属性

public class ProtocolSupportDefinition implements Serializable {

private static final long serialVersionUID = -1;

private String id;//ID ; 数据库存的就是本地对象的数据

private String name;

private String description;

private String provider;//jar script 协议实现类型, 目前只有 jar, js 脚本

private byte state;//协议状态

private Map<String,Object> configuration;//配置元数据, jar 的话会存ProtocolSupportProvider类全限定名, jar包路径等

}

实例组件数据是存在数据库 dev_protocol 表

参考下: [自定义协议开发]

网关组件 (org.jetlinks.community.gateway.DeviceGateway)

设备上报数据的处理逻辑入口, 网关代表接入方式需要选择网络组件, 它关联协议, 配置协议

负责平台侧统一的设备接入, 使用网络组件处理对应的请求以及报文, 使用配置的协议解析为平台统一的设备消息(DeviceMessage),然后推送到事件总线。

org.jetlinks.community.gateway.supports.DeviceGateway 网关接口抽象;

设备网关支持提供商,用于提供对各种设备网关的支持.在启动设备网关时,会根据对应的提供商以及配置来创建设备网关. 实现统一管理网关配置,动态创建设备网关.

社区版, 网关的实现有三个

在 mqtt-component 项目有两个

org.jetlinks.community.network.mqtt.gateway.device.MqttClientDeviceGateway

使用MQTT客户端, 连接第三方MQTT服务器; 例如 emqx

org.jetlinks.community.network.mqtt.gateway.device.MqttServerDeviceGateway

本机作为MQTT服务端, 内置启用MQTT服务器, 接受设备接入;

在 tcp-component 项目有一个

org.jetlinks.community.network.tcp.device.TcpServerDeviceGateway

本机作为TCP服务器, 监听端口, 接受设备接入;

类似 桥接网络组件和协议组件(作为一个中介者)

统一由org.jetlinks.community.gateway.DeviceGatewayManager管理;

默认实现是 org.jetlinks.community.gateway.supports.DefaultDeviceGatewayManager(用Spring BeanPostProcessor hook自动加载; 2.0是org.jetlinks.community.gateway.GatewayConfiguration配置类加载的)

调用其org.jetlinks.community.gateway.DeviceGatewayManager#start传递一个网关实例ID 启动网关; 其实是改变了一下网关实例的状态, 在有新消息时根据自身状态决定是否分发消息, 在此之前已经调了协议解析, 所有暂停/停止网关不会影响协议跟设备的交互;

参考: org.jetlinks.community.network.mqtt.gateway.device.MqttClientDeviceGateway 这个网关实现

实例组件是存在数据库 device_gateway 表

关键的对象概览

DeviceRegistry 设备注册中心

org.jetlinks.core.device.DeviceRegistry

设备注册中心, 用于统一管理设备以及产品的基本信息,缓存,进行设备指令下发等操作.

例如: 获取设备以及设备的配置缓存信息

registry

.getDevice(deviceId)

.flatMap(device->device.getSelfConfig("my-config"))

.flatMap(conf-> doSomeThing(...))

相当于总的设备管理器

根据设备ID. 通过 org.jetlinks.core.device.DeviceRegistry#getDevice 可以获取设备操作对象(DeviceOperator)

以 org.jetlinks.supports.cluster.ClusterDeviceRegistry#getDevice 为例:

public Mono<DeviceOperator> getDevice(String deviceId) {

if (StringUtils.isEmpty(deviceId)) {

return Mono.empty();

} else {

//先从缓存获取

Mono<DeviceOperator> deviceOperator = (Mono)this.operatorCache.getIfPresent(deviceId);

if (null != deviceOperator) {

return deviceOperator;

} else {

//创建 DeviceOperator

DeviceOperator deviceOperator = this.createOperator(deviceId);

return deviceOperator.getSelfConfig(DeviceConfigKey.productId).doOnNext((r) -> {

//放到缓存

this.operatorCache.put(deviceId, Mono.just(deviceOperator).filterWhen((device) -> {

return device.getSelfConfig(DeviceConfigKey.productId).hasElement();

}));

}).map((ignore) -> {

return deviceOperator;

});

}

}

}

private DefaultDeviceOperator createOperator(String deviceId) {

DefaultDeviceOperator device = new DefaultDeviceOperator(deviceId, this.supports, this.manager, this.handler, this, this.interceptor, this.stateChecker);

if (this.rpcChain != null) {

device.setRpcChain(this.rpcChain);

}

return device;

}

DeviceOperator 设备操作接口

org.jetlinks.core.device.DeviceOperator

设备操作接口,通过DeviceRegister.getDevice(deviceId)获取,用于对设备进行相关操作,如获取配置,发送消息等.

//通过getConfig 可以获取配置的协议元数据

DeviceOperator#getConfig

它是如何创建的?

org.jetlinks.core.defaults.DefaultDeviceOperator为例

从注册中心拿的时候, 如果不存在, 则创建DeviceOperator

@Override

public Mono<DeviceOperator> getDevice(String deviceId) {

if (StringUtils.isEmpty(deviceId)) {

return Mono.empty();

}

{

Mono<DeviceOperator> deviceOperator = operatorCache.getIfPresent(deviceId);

if (null != deviceOperator) {

return deviceOperator;

}

}

//创建 DeviceOperator

DeviceOperator deviceOperator = createOperator(deviceId);

return deviceOperator

//有productId说明是存在的设备

.getSelfConfig(DeviceConfigKey.productId)

.doOnNext(r -> operatorCache.put(deviceId, Mono

.just(deviceOperator)

.filterWhen(device -> device.getSelfConfig(DeviceConfigKey.productId).hasElement())

//设备被注销了?则移除之

.switchIfEmpty(Mono.fromRunnable(() -> operatorCache.invalidate(deviceId)))

))

.map(ignore -> deviceOperator);

}

DeviceProductOperator 产品操作接口

DeviceProductOperator: 产品操作接口,通过DeviceProductOperator.getProduct(productId)获取.

DeviceGateway 设备接入网关接口

DeviceGateway : 设备接入网关接口,利用网络组件来接入设备消息.

DeviceMessageBusinessHandler

DeviceMessageBusinessHandler: 处理设备状态数据库同步,设备自动注册等逻辑等类.

LocalDeviceInstanceService

LocalDeviceInstanceService: 设备实例管理服务类.

DeviceSessionManager

DeviceSessionManager: 设备会话管理器,可获取当前服务的会话信息.

管理会话, 设备的上线下线;

参考: org.jetlinks.community.standalone.configuration.DefaultDeviceSessionManager

在 init

Flux.interval(Duration.ofSeconds(10), Duration.ofSeconds(30), Schedulers.newSingle("device-session-checker"))

.flatMap(i -> this

.checkSession()//周期性 调用 checkSession() 检查状态

.onErrorContinue((err, val) -> log.error(err.getMessage(), err)))

.subscribe();

DeviceDataStoragePolicy 设备数据存储策略(行式/列式/不存储)

DeviceDataStoragePolicy: 设备存储策略接口,实现此接口来进行自定义设备数据存储策略.

DeviceGatewayHelper

DeviceGatewayHelper: 统一处理设备消息,创建Session等操作的逻辑.

DecodedClientMessageHandler

DecodedClientMessageHandler: 解码后的平台消息处理器,如果是自定义实现网关或者在协议包里手动回复消息等处理, 则可以使用此接口直接将设备消息交给平台.(如果调用了DeviceGatewayHelper则不需要此操作).

EventBus 事件总线

EventBus: 事件总线,通过事件总线去订阅设备数据来实现解耦.(也可以用过@Subscribe()注解订阅).

DeviceMessageConnector

DeviceMessageConnector: 负责将设备消息转发到事件总线.

DeviceMessageSender 消息发送器

org.jetlinks.core.device.DeviceMessageSender

消息发送器,用于发送消息给设备.

DeviceMessage 消息对象

所有设备消息(即设备上报转换后, 平台可识别的消息) 派生自这个 org.jetlinks.core.message.DeviceMessage 接口

EncodedMessage 消息对象

设备端原始的消息, (下发/上报给的原始消息)

源码大体流程研究

先了解消息的组成

消息主要由 deviceId, messageId, headers, timestamp 组成.

deviceId为设备的唯一标识, messageId为消息的唯一标识,headers为消息头,通常用于对自定义消息处理的行为,如是否异步消息, 是否分片消息等.

常用的Headers org.jetlinks.core.message.Headers

async 是否异步,boolean类型.

timeout 指定超时时间. 毫秒.

frag_msg_id 分片主消息ID,为下发消息的messageId

frag_num 分片总数

frag_part 当前分片索引

frag_last 是否为最后一个分片,当无法确定分片数量的时候,可以将分片设置到足够大,最后一个分片设置:frag_last=true来完成返回.

keepOnline 与DeviceOnlineMessage配合使用,在TCP短链接,保持设备一直在线状态,连接断开不会设置设备离线.

keepOnlineTimeoutSeconds 指定在线超时时间,在短链接时,如果超过此间隔没有收到消息则认为设备离线.

ignoreStorage 不存储此消息数据,如: 读写属性回复默认也会记录到属性时序数据库中,设置为true后,将不记录.(1.9版本后支持)

ignoreLog 不记录此消息到日志,如: 设置为true,将不记录此消息的日志.

mergeLatest 是否合并最新属性数据,设置此消息头后,将会把最新的消息合并到消息体里( 需要开启最新数据存储)//jetlinks.device.storage.enable-last-data-in-db=true 是否将设备最新到数据存储到数据库

网络组件(Network) 真正与设备连接交互的网络层, 但是它只负责接收/发送报文,不负责任何处理逻辑, 所以最佳的调试地方是网关组件(DeviceGateway)入口处.

消息类型

所有消息类型参考 org.jetlinks.community.device.enums.DeviceLogType

属性相关消息

获取设备属性(ReadPropertyMessage)对应设备回复的消息 ReadPropertyMessageReply.

修改设备属性(WritePropertyMessage)对应设备回复的消息 WritePropertyMessageReply.

设备上报属性(ReportPropertyMessage) 由设备上报.

功能相关消息

调用设备功能到消息(FunctionInvokeMessage)由平台发往设备,对应到返回消息 FunctionInvokeMessageReply.

事件消息

org.jetlinks.core.message.event.EventMessage

EventMessage eventMessage = new EventMessage();

eventMessage.setDeviceId(deviceId);

eventMessage.setMessageId(fromDevice.path(MessageConstant.MESSAGE_KEY_MESSAGE_SIGN).asText() );

eventMessage.event(eventId);

HashMap data = JacksonUtils.jsonToBean(output.toString(), HashMap.class);

eventMessage.setData(data);

其他消息

DeviceOnlineMessage 设备上线消息,通常用于网关代理的子设备的上线操作.

DeviceOfflineMessage 设备离线消息,通常用于网关代理的子设备的下线操作.

ChildDeviceMessage 子设备消息,通常用于网关代理的子设备的消息.

ChildDeviceMessageReply 子设备消息回复,用于平台向网关代理的子设备发送消息后设备回复给平台的结果.

UpdateTagMessage 更新设备标签.

DerivedMetadataMessage 更新设备独立物模型.

设备自注册消息

DeviceRegisterMessage 设备注册消息,通过设置消息头message.addHeader("deviceName","设备名称");和 message.addHeader("productId","产品ID")可实现设备自动注册.

如果配置了状态自管理,在检查子设备状态时,会发送指令ChildDeviceMessage<DeviceStateCheckMessage>, 网关需要返回ChildDeviceMessageReply<DeviceStateCheckMessageReply>.

自定义协议包将消息解析为 DeviceRegisterMessage,

并设置header:productId(必选),deviceName(必选),configuration(可选)。

平台将自动添加设备信息到设备实例中。如果是注册子设备,则解析为 ChildDeviceMessage<DeviceRegisterMessage>即可

消息上报 (MQTT Broker)

设备不是直接接入平台, 而是通过第三方MQTT服务, 如:emqx. 消息编解码与MQTT服务一样,从消息协议中使用 DefaultTransport.MQTT 来获取消息编解码器.

网关处理逻辑

org.jetlinks.community.network.mqtt.gateway.device.MqttClientDeviceGateway

public class MqttClientDeviceGateway extends AbstractDeviceGateway {

public MqttClientDeviceGateway(String id,

MqttClient mqttClient,

DeviceRegistry registry,

ProtocolSupports protocolSupport,

String protocol,

DeviceSessionManager sessionManager,

DecodedClientMessageHandler clientMessageHandler,

List<String> topics,

int qos) {

super(id);

// mqtt的客户端

this.mqttClient = Objects.requireNonNull(mqttClient, "mqttClient");

//DeviceRegistry : 设备注册中心, 用于统一管理设备以及产品的基本信息,缓存,进行设备指令下发等操作(约等于设备统一管理器)

this.registry = Objects.requireNonNull(registry, "registry");

this.protocolSupport = Objects.requireNonNull(protocolSupport, "protocolSupport");

this.protocol = Objects.requireNonNull(protocol, "protocol");

this.topics = Objects.requireNonNull(topics, "topics");

this.helper = new DeviceGatewayHelper(registry, sessionManager, clientMessageHandler);

this.qos = qos;

}

private void doStart() {

if (disposable != null) {

disposable.dispose();

}

disposable = mqttClient

.subscribe(topics, qos)//关注MQTT主题, 当有新的消息时

.filter((msg) -> isStarted())//需当前网关 处于启动状态

.flatMap(mqttMessage -> {

AtomicReference<Duration> timeoutRef = new AtomicReference<>();

return this

//注意这里是根据自定义协议 ProtocolSupportProvider::create 返回的 ProtocolSupport id 去匹配的

.getProtocol()

//通过 ProtocolSupport 获取其 DeviceMessageCodec

.flatMap(codec -> codec.getMessageCodec(getTransport()))

//使用消息编码器 解码消息

.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(

new UnknownDeviceMqttClientSession(getId() + ":unknown", mqttClient) {//实际给到协议编解码器的Session 总是这个..

@Override

public Mono<Boolean> send(EncodedMessage encodedMessage) {

return super

.send(encodedMessage)

.doOnSuccess(r -> monitor.sentMessage());

}

@Override

public void setKeepAliveTimeout(Duration timeout) {

timeoutRef.set(timeout);

}

}

, mqttMessage, registry)

))

.doOnError((err) -> log.error("解码MQTT客户端消息失败 {}:{}", //发生了错误

mqttMessage.getTopic(),

mqttMessage

.getPayload()

.toString(StandardCharsets.UTF_8),

err))

//消息向上转型

.cast(DeviceMessage.class)

.flatMap(message -> {

//设备网关监控 (主要是监控消息数量等指标的)

monitor.receivedMessage();

return helper//设备网关消息处理,会话管理工具类,用于统一封装对设备消息和会话的处理逻辑

.handleDeviceMessage(message,//最终主要源码见下: org.jetlinks.community.network.utils.DeviceGatewayHelper#handleDeviceMessage

device -> createDeviceSession(device, mqttClient),//<!> 注意这个会话在不存在时回调; 见下: [创建会话的回调]

ignore->{},//会话自定义回调,处理会话时用来自定义会话,比如重置连接信

() -> log.warn("无法从MQTT[{}]消息中获取设备信息:{}", mqttMessage.print(), message)//当设备在平台不存在时

);

})

.then()

//错误处理, 返回 empty

.onErrorResume((err) -> {

log.error("处理MQTT消息失败:{}", mqttMessage, err);

return Mono.empty();

});

}, Integer.MAX_VALUE)

.onErrorContinue((err, ms) -> log.error("处理MQTT客户端消息失败", err))

.subscribe();//Flux的API 触发计算

}

}

创建会话的回调

private MqttClientSession createDeviceSession(DeviceOperator device, MqttClient client) {

return new MqttClientSession(device.getDeviceId(), device, client, monitor);

}

DeviceGatewayHelper 处理

主要处理消息分支 子设备分支, 上线/离线 处理....

//如果设备状态为'离线' 则会先构造一个设备上线的消息 publish, 然后在 publish 设备原消息

org.jetlinks.community.network.utils.DeviceGatewayHelper#handleDeviceMessage

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/26 13:18:27

StrmAssistant终极配置指南:5步快速部署Emby增强插件

StrmAssistant终极配置指南&#xff1a;5步快速部署Emby增强插件 【免费下载链接】StrmAssistant Strm Assistant for Emby 项目地址: https://gitcode.com/gh_mirrors/st/StrmAssistant StrmAssistant是一款专为Emby媒体服务器设计的开源增强插件&#xff0c;通过优化视…

作者头像 李华
网站建设 2026/3/31 22:09:55

DiT:Transformer架构重塑扩散模型的图像生成革命

DiT&#xff1a;Transformer架构重塑扩散模型的图像生成革命 【免费下载链接】DiT Official PyTorch Implementation of "Scalable Diffusion Models with Transformers" 项目地址: https://gitcode.com/GitHub_Trending/di/DiT 在AI图像生成领域&#xff0c;…

作者头像 李华
网站建设 2026/4/2 0:53:49

从零开始:QLC+舞台灯光控制软件完全操作指南

从零开始&#xff1a;QLC舞台灯光控制软件完全操作指南 【免费下载链接】qlcplus Q Light Controller Plus (QLC) is a free and cross-platform software to control DMX or analog lighting systems like moving heads, dimmers, scanners etc. This project is a fork of th…

作者头像 李华
网站建设 2026/4/2 12:15:36

【axios 拦截器】使用 typescript 封装 axios 拦截器

本文将封装一个 类型安全、结构清晰、易于维护 的 Axios 拦截器模板&#xff0c;融合 TypeScript 泛型、自定义配置扩展等最佳实践&#xff0c;适用于 Vue 3 / React 等框架目录结构request/├── index.ts // 封装 axios 拦截器└── types.ts // 拦截器相关类型声明类型声明…

作者头像 李华
网站建设 2026/3/31 12:14:44

BERT:让模型 “读懂上下文” 的双向语言学习法

文章目录〇、预训练的作用核心作用预训练语言模型的典型代表一、模型整体结构1. 输入表示 (Input Representation)Segment EmbeddingsPosition Embeddings2. Transformer Encoder结构多头自注意力机制 (Multi-Head Self-Attention)前馈神经网络 (Feed Forward Network)残差连接…

作者头像 李华
网站建设 2026/4/1 3:22:20

Electron桌面应用开发终极指南:从零构建完整项目

Electron桌面应用开发终极指南&#xff1a;从零构建完整项目 【免费下载链接】electron-api-demos-Zh_CN 这是 electron-api-demos 的中文版本, 更新至 v2.0.2 项目地址: https://gitcode.com/gh_mirrors/el/electron-api-demos-Zh_CN 想要快速掌握Electron桌面应用开发…

作者头像 李华