news 2026/4/3 3:24:03

java实现Windows 命名管道:千万别在读取时“死等”

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
java实现Windows 命名管道:千万别在读取时“死等”

1.代码背景

博主想实现一个跨平台通信机制来连接LSP服务端,想通过unix domian socket来实现这个。在java实现客户端连接时,出现了问题,JDK 原生搞不定 Windows 命名管道,Java 的网络库只认 Socket 家族,不认“文件”家族。所以想直接访问这个通道,就必须用 Kernel32 。

2. 问题分析

  • 描述 :在 Windows 上,如果用一个线程 ReadFile 阻塞等数据,主线程想 WriteFile 发消息时会直接卡死。博主在测试过程中,发现能收到服务端的通知,但是在发送消息后,服务端无法收到。经过判断,是在发送时卡住,然后研究出具体原因。由此记录一下过程
  • 原因 :同步模式下,读和写共用同一个句柄锁。读不完,锁不放,写不进。
  • 避坑指南 :在 Windows 环境下, 必须先“轮询”再“读取” 。
    • 使用 PeekNamedPipe 检查是否有数据。
    • 没数据就 Thread.sleep(20) ,让出句柄锁。
  • 选择同步的理由
    • 代码简洁度 :重叠 I/O 在 Java/JNA 里的实现非常臃肿,容易引入内存泄漏或指针错误。
    • 兼容性 :LSP 服务端通常一次只允许一个连接,双句柄方案可能会导致服务端报错“管道忙”。

3.代码实现

packageorg.example.lspClient;importcom.sun.jna.platform.win32.Kernel32;importcom.sun.jna.platform.win32.WinBase;importcom.sun.jna.platform.win32.WinNT;importcom.sun.jna.ptr.IntByReference;importorg.tinylog.Logger;importjava.io.*;importjava.net.StandardProtocolFamily;importjava.net.UnixDomainSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.SocketChannel;importjava.nio.charset.StandardCharsets;importjava.nio.file.Path;importjava.util.Arrays;importjava.util.function.Consumer;/** * 跨平台 UDS 传输层,具有 LSP 协议感知能力 */publicclassUdsTransportimplementsCloseable{privatefinalbooleanisWindows;privateWinNT.HANDLE hPipe;privateSocketChannelchannel;privatefinalByteArrayOutputStreambuffer=newByteArrayOutputStream();privateConsumer<String>messageListener;privatevolatilebooleanrunning=true;publicUdsTransport(StringsocketPath)throwsIOException{Stringos=System.getProperty("os.name").toLowerCase();this.isWindows=os.contains("win");if(isWindows){connectWindowsPipe(socketPath);}else{connectUnixSocket(socketPath);}startReadingThread();}publicvoidsetOnMessage(Consumer<String>listener){this.messageListener=listener;}privatevoidconnectWindowsPipe(StringpipeName)throwsIOException{this.hPipe=Kernel32.INSTANCE.CreateFile(pipeName,WinNT.GENERIC_READ|WinNT.GENERIC_WRITE,0,null,WinNT.OPEN_EXISTING,0,null);if(WinBase.INVALID_HANDLE_VALUE.equals(hPipe)){interror=Kernel32.INSTANCE.GetLastError();thrownewIOException("Failed to connect to Named Pipe: "+pipeName+" (Error: "+error+")");}}privatevoidconnectUnixSocket(StringsocketPath)throwsIOException{this.channel=SocketChannel.open(StandardProtocolFamily.UNIX);this.channel.connect(UnixDomainSocketAddress.of(Path.of(socketPath)));}publicvoidsendMessage(StringjsonContent)throwsIOException{byte[]body=jsonContent.getBytes(StandardCharsets.UTF_8);Stringheader="Content-Length: "+body.length+"\r\n\r\n";writeRaw(header.getBytes(StandardCharsets.US_ASCII));writeRaw(body);}privatevoidwriteRaw(byte[]bytes)throwsIOException{if(isWindows){IntByReferencewritten=newIntByReference();booleansuccess=Kernel32.INSTANCE.WriteFile(hPipe,bytes,bytes.length,written,null);if(!success){interr=Kernel32.INSTANCE.GetLastError();thrownewIOException("WriteFile failed with error: "+err);}if(written.getValue()<bytes.length){Logger.warn("Partial write detected: "+written.getValue()+"/"+bytes.length);return;}}else{ByteBuffernioBuffer=ByteBuffer.wrap(bytes);while(nioBuffer.hasRemaining()){intwritten=channel.write(nioBuffer);if(written==0){thrownewIOException("Channel write stalled");}}}Logger.info("Wrote bytes to: "+Arrays.toString(bytes));}privatevoidstartReadingThread(){Threadthread=newThread(()->{byte[]readBuffer=newbyte[8192];try{while(running){intn=readToBuffer(readBuffer);if(n>0){synchronized(buffer){buffer.write(readBuffer,0,n);}processBuffer();}else{// 给WriteFile 竞争句柄锁的机会Thread.sleep(20);}}}catch(Exceptione){if(running)System.err.println("Transport read error: "+e.getMessage());}},"UdsTransport-Reader");thread.setDaemon(true);thread.start();}privateintreadToBuffer(byte[]target)throwsIOException{if(isWindows){IntByReferenceavail=newIntByReference();// 轮询是否有数据booleanpeekOk=Kernel32.INSTANCE.PeekNamedPipe(hPipe,null,0,null,avail,null);if(!peekOk){interr=Kernel32.INSTANCE.GetLastError();if(err==109)thrownewEOFException("Pipe closed");return0;}if(avail.getValue()==0){return0;// 当前没数据,返回 0 让出 CPU 和句柄锁}IntByReferenceread=newIntByReference();// 此时 ReadFile 不会阻塞,因为它已经知道有数据了booleansuccess=Kernel32.INSTANCE.ReadFile(hPipe,target,target.length,read,null);if(!success){interr=Kernel32.INSTANCE.GetLastError();if(err==109)thrownewEOFException("Pipe closed");if(err==234)returntarget.length;// ERROR_MORE_DATAthrownewIOException("ReadFile failed: "+err);}returnread.getValue();}else{ByteBuffernioBuf=ByteBuffer.wrap(target);intn=channel.read(nioBuf);if(n==-1)thrownewEOFException("Socket closed");returnn;}}privatevoidprocessBuffer(){synchronized(buffer){while(true){byte[]data=buffer.toByteArray();if(data.length==0)break;StringtempStr=newString(data,StandardCharsets.US_ASCII);intheaderEnd=tempStr.indexOf("\r\n\r\n");if(headerEnd==-1)break;intcontentLength=-1;String[]lines=tempStr.substring(0,headerEnd).split("\r\n");for(Stringline:lines){if(line.toLowerCase().startsWith("content-length:")){contentLength=Integer.parseInt(line.substring(15).trim());}}if(contentLength==-1)break;inttotalSize=headerEnd+4+contentLength;if(data.length<totalSize)break;byte[]bodyBytes=newbyte[contentLength];System.arraycopy(data,headerEnd+4,bodyBytes,0,contentLength);Stringmessage=newString(bodyBytes,StandardCharsets.UTF_8);// 异步回调if(messageListener!=null){messageListener.accept(message);}byte[]remaining=newbyte[data.length-totalSize];System.arraycopy(data,totalSize,remaining,0,remaining.length);buffer.reset();try{buffer.write(remaining);}catch(IOExceptionignored){}}}}@Overridepublicvoidclose()throwsIOException{running=false;if(isWindows){if(hPipe!=null)Kernel32.INSTANCE.CloseHandle(hPipe);}else{if(channel!=null)channel.close();}}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/13 9:09:12

【Global ID概念】——vtkGenerateGlobalIds应用详解

VTK中Global ID概念与vtkGenerateGlobalIds应用详解 在VTK&#xff08;Visualization Toolkit&#xff09;的数据处理流程中&#xff0c;Global ID&#xff08;全局标识符&#xff09; 是一个核心概念&#xff0c;尤其在并行计算、大型数据集管理和多源数据融合场景中发挥着关…

作者头像 李华
网站建设 2026/4/1 1:48:39

邀您并肩,点燃全国开源星火 | AtomGit 城市坐标计划 2.0 招募开启

在过去的半年里&#xff0c;我们与社区伙伴在沈阳、杭州、深圳、昆明、重庆、西安、成都、广州都留下了坚实的足迹&#xff0c;也收获了最宝贵的财富&#xff1a;开发者们的真实反馈与社区伙伴的深度信任。 深圳站 成都站 杭州站 广州站 西安站 沈阳站 重庆站 昆明站 我们深信&…

作者头像 李华
网站建设 2026/3/31 21:59:26

外部群成员管控:批量移除与“拟人化”禁言的技术实现

QiWe开放平台 个人名片 API驱动企微自动化&#xff0c;让开发更高效 核心能力&#xff1a;为开发者提供标准化接口、快速集成工具&#xff0c;助力产品高效拓展功能场景 官方站点&#xff1a;https://www.qiweapi.com 团队定位&#xff1a;专注企微API生态的技术服务团队 对接…

作者头像 李华
网站建设 2026/4/2 1:57:19

模型开源的核心目的与潜在隐患(附 AI 领域 / 具身智能专属视角)

模型开源是 AI 产业技术扩散与生态构建的核心方式&#xff0c;其目的围绕技术迭代、产业落地、生态卡位展开&#xff0c;而隐患则集中在安全风险、商业利益、技术滥用等维度&#xff0c;结合具身智能这类融合物理交互的特殊模型&#xff0c;还会衍生出物理安全、硬件适配等专属…

作者头像 李华
网站建设 2026/3/31 14:43:24

如何使用 7 种解决方案将照片从Mac传输到三星

由于 macOS 和Android之间的兼容性问题&#xff0c;将照片从Mac传输到三星设备可能颇具挑战性。幸运的是&#xff0c;有几种有效的方法可以将照片从Mac传输到三星设备。无论您使用的是三星智能手机还是平板电脑&#xff0c;总有一种解决方案适合您的需求。在本指南中&#xff0…

作者头像 李华