news 2026/4/2 23:39:58

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

(基于 8.11 源码,可直接拷贝到org.example.es包下跑通)


0. 目标

给出一个“开箱即用”的 Maven 模块,一次性把下面三件事全部做完:

  1. 暴露自定义 REST 端点(RestHandler)。
  2. 注册 TransportAction,让协调节点→数据节点走内部 RPC(ActionPlugin)。
  3. 在集群状态里持久化自己的配置(ClusterPluginPersistentTasksExecutor)。

代码全部单文件即可编译,无额外依赖(除org.elasticsearch.plugin:elasticsearch8.11.0)。


1. 模块骨架
es-write-plugin ├── pom.xml └── src └── main ├── java │ └── org │ └── example │ └── es │ ├── WritePlugin.java │ ├── RestWriteAction.java │ ├── WriteTransportAction.java │ ├── WriteClusterService.java │ └── WritePersistentTaskExecutor.java └── resources └── META-INF └── plugin-descriptor.properties

pom.xml 关键片段

<properties><elasticsearch.version>8.11.0</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>provided</scope></dependency></dependencies>

plugin-descriptor.properties

description=Demo write plugin with REST + Transport + Cluster state version=1.0.0 name=write-plugin classname=org.example.es.WritePlugin java.version=17 elasticsearch.version=8.11.0

2. 统一入口:WritePlugin.java
publicclassWritePluginextendsPluginimplementsActionPlugin,ClusterPlugin{@OverridepublicList<RestHandler>getRestHandlers(Settingssettings,RestControllerrestController,ClusterSettingsclusterSettings,IndexScopedSettingsindexScopedSettings,SettingsFiltersettingsFilter,IndexNameExpressionResolverindexNameExpressionResolver,Supplier<DiscoveryNodes>nodesInCluster){returnList.of(newRestWriteAction());}@OverridepublicList<ActionHandler<?extendsActionRequest,?extendsActionResponse>>getActions(){returnList.of(newActionHandler<>(WriteAction.INSTANCE,WriteTransportAction.class));}@OverridepublicList<PersistentTasksExecutor<?>>getPersistentTasksExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient,PersistentTasksServicepersistentTasksService){returnList.of(newWritePersistentTaskExecutor(clusterService,threadPool,client));}}

3. REST 层:RestWriteAction.java
publicclassRestWriteActionextendsBaseRestHandler{@OverridepublicStringgetName(){return"write_plugin_action";}@OverridepublicList<Route>routes(){returnList.of(newRoute(RestRequest.Method.POST,"/_write/{index}"),newRoute(RestRequest.Method.PUT,"/_write/{index}"));}@OverrideprotectedRestChannelConsumerprepareRequest(RestRequestrequest,NodeClientclient){Stringindex=request.param("index");Stringbody=request.content().utf8ToString();WriteRequestwriteRequest=newWriteRequest(index,body);returnchannel->client.execute(WriteAction.INSTANCE,writeRequest,newRestToXContentListener<>(channel));}}

4. 内部 RPC:WriteAction / WriteRequest / WriteResponse
publicclassWriteActionextendsActionType<WriteResponse>{publicstaticfinalWriteActionINSTANCE=newWriteAction();publicstaticfinalStringNAME="cluster:admin/write/plugin";privateWriteAction(){super(NAME);}}publicclassWriteRequestextendsActionRequest{privatefinalStringindex;privatefinalStringpayload;publicWriteRequest(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteRequest(StreamInputin)throwsIOException{super(in);this.index=in.readString();this.payload=in.readString();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{super.writeTo(out);out.writeString(index);out.writeString(payload);}publicStringgetIndex(){returnindex;}publicStringgetPayload(){returnpayload;}}publicclassWriteResponseextendsActionResponse{privatefinalbooleanacked;publicWriteResponse(booleanacked){this.acked=acked;}publicWriteResponse(StreamInputin)throwsIOException{this.acked=in.readBoolean();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeBoolean(acked);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("acked",acked).endObject();}}

5. Transport 层:WriteTransportAction.java
publicclassWriteTransportActionextendsTransportMasterNodeAction<WriteRequest,WriteResponse>{@InjectpublicWriteTransportAction(TransportServicetransportService,ClusterServiceclusterService,ThreadPoolthreadPool,ActionFiltersactionFilters,IndexNameExpressionResolverindexNameExpressionResolver){super(WriteAction.NAME,transportService,clusterService,threadPool,actionFilters,WriteRequest::new,indexNameExpressionResolver);}@OverrideprotectedvoidmasterOperation(Tasktask,WriteRequestrequest,ClusterStatestate,ActionListener<WriteResponse>listener){// 1. 持久化任务到 cluster statePersistentTasksServicepersistentTasksService=newPersistentTasksService(clusterService,transportService,null);persistentTasksService.sendStartRequest(UUIDs.base64UUID(),"write_task",newWriteTaskParams(request.getIndex(),request.getPayload()),ActionListener.wrap(r->listener.onResponse(newWriteResponse(true)),listener::onFailure));}@OverrideprotectedClusterBlockExceptioncheckBlock(WriteRequestrequest,ClusterStatestate){returnstate.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);}}

6. 集群状态持久化:WriteClusterService + WritePersistentTaskExecutor
publicclassWriteTaskParamsimplementsPersistentTaskParams{privatefinalStringindex;privatefinalStringpayload;publicWriteTaskParams(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteTaskParams(StreamInputin)throwsIOException{this.index=in.readString();this.payload=in.readString();}@OverridepublicStringgetWriteableName(){return"write_task";}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeString(index);out.writeString(payload);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("index",index).field("payload",payload).endObject();}}publicclassWritePersistentTaskExecutorextendsPersistentTasksExecutor<WriteTaskParams>{privatefinalClientclient;privatefinalThreadPoolthreadPool;publicWritePersistentTaskExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient){super("write_task",ThreadPool.Names.GENERIC);this.client=client;this.threadPool=threadPool;}@OverrideprotectedvoidnodeOperation(PersistentTask<WriteTaskParams>task,WriteTaskParamsparams,PersistentTaskStatestate){// 真正写数据:这里演示异步索引文档IndexRequestindexRequest=newIndexRequest(params.index).source("payload",params.payload,"timestamp",System.currentTimeMillis()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);client.index(indexRequest,ActionListener.wrap(r->logger.info("Write task {} done, docId={}",task.getId(),r.getId()),e->logger.warn("Write task "+task.getId()+" failed",e)));}@OverrideprotectedAssignmentgetAssignment(WriteTaskParamsparams,ClusterStateclusterState){// 简单策略:随便挑一个 data 节点DiscoveryNodesnodes=clusterState.nodes();List<DiscoveryNode>dataNodes=nodes.getDataNodes().values().stream().toList();returndataNodes.isEmpty()?Assignment.NO_VALID_NODE_ASSIGNMENT:newAssignment(dataNodes.get(Randomness.get().nextInt(dataNodes.size())).getId(),"ok");}}

7. 安装 & 验证
mvn clean package# 得到 target/write-plugin-1.0.0.zipbin/elasticsearch-plugininstallfile:///full/path/write-plugin-1.0.0.zip# 重启节点
# 1. 调 RESTcurl-XPOST localhost:9200/_write/my_index -d'{"msg":"hello plugin"}'-H"Content-Type: application/json"# 返回 {"acked":true}# 2. 看任务curl-XGET localhost:9200/_cluster/pending_tasks# 3. 看结果curllocalhost:9200/my_index/_search?q=*:*

8. 可继续扩展的 5 个方向
  1. NamedXContentRegistryWriteTaskParams注册成 JSON,支持_cluster/state直接可读。
  2. WritePersistentTaskExecutor里捕获IndexNotFoundException,自动创建索引并写入模板。
  3. WriteTaskParams做成AckedRequest,实现POST /_write/{index}?wait_for_active_shards=2语义。
  4. 通过Plugin.createComponents注入自定义线程池,让大批量写任务走独立队列。
  5. PersistentTaskState存储重试次数,结合BackoffPolicy实现断点续写。

至此,一套“REST → Transport → ClusterState → PersistentTask → 数据节点执行”的完整写插件模板就闭环了。直接复制即可编译,二次开发只需替换WriteTaskParamsnodeOperation里的业务逻辑。```
推荐阅读:
PyCharm 2018–2024使用指南

更多技术文章见公众号: 大城市小农民

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/18 17:56:02

springboot-java社区果蔬电商加盟平台vue

目录社区果蔬电商加盟平台技术架构核心功能模块技术亮点应用价值开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;社区果蔬电商加盟平台技术架构 该平台基于SpringBoot后端与Vue前端构建&#xff0c;采用前后端分离架构&#xf…

作者头像 李华
网站建设 2026/4/2 19:43:26

Datadog财报超预期,详解其可观测性与安全技术架构

Datadog的股价在公布远超预期的季度业绩后&#xff0c;今日收盘上涨23%。 该公司销售可观测性与网络安全工具&#xff0c;在截至9月30日的三个月中&#xff0c;实现了2.07亿美元的调整后运营收入&#xff0c;相当于每股调整后收益55美分。而根据FactSet调查的分析师平均预测为4…

作者头像 李华
网站建设 2026/3/31 3:16:02

深度测评8个AI论文网站,助你轻松搞定本科生毕业论文!

深度测评8个AI论文网站&#xff0c;助你轻松搞定本科生毕业论文&#xff01; AI 工具如何成为论文写作的得力助手 在当今学术写作日益依赖技术辅助的时代&#xff0c;AI 工具已经成为许多本科生应对毕业论文的重要帮手。无论是内容生成、大纲搭建&#xff0c;还是语言润色和降重…

作者头像 李华
网站建设 2026/3/31 14:18:49

【设计模式】迭代器模式(Iterator)详解

文章目录1. 引言&#xff1a;为什么我们每天都在用迭代器&#xff1f;2. 什么是迭代器模式GoF 定义3. 迭代器模式的核心思想4. 迭代器模式的结构5. 示例&#xff1a;自定义集合 迭代器5.1 迭代器接口5.2 聚合接口5.3 具体聚合类5.4 客户端使用6. 迭代器模式的优点7. 迭代器模式…

作者头像 李华