news 2026/4/3 2:47:01

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(一)

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务率先运行(一)

假设现在有一种场景,在一个任务接收器中,源源不断且不知道任务发送者何时会将新任务发送过来,每个任务都具备不同的任务优先级,任务无时无刻的进入任务缓冲池,目的是把任务缓冲池中优先级最高的那个任务挑选出来最先运行。

import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.runBlocking import java.util.UUID fun main() { val myThreadPool = newFixedThreadPoolContext(4, "my-thread") val bufferCapacity = 5 val totalTaskSize = 15 val channel = Channel<TaskInfo>() val taskList = mutableListOf<TaskInfo>() runBlocking { //接收任务 async { channel.receiveAsFlow() .buffer(bufferCapacity) .onEach { it -> //生产者 println("onEach $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") taskList.add(it) }.flowOn(myThreadPool) .collect { it -> //消费者 println("collect $it at time=${System.currentTimeMillis()} ${Thread.currentThread().name}") val newOrderList = taskList.sortedBy { it.priority } newOrderList.forEach { print("${it.priority} ") } val lastTaskInfo = newOrderList.lastOrNull() println("\n最大优先级任务:$lastTaskInfo") taskList.remove(lastTaskInfo) loader(lastTaskInfo!!) } } //源源不断的密集发送加载任务。 async { repeat(totalTaskSize) { it -> enqueue(channel, it) } } } } private suspend fun enqueue(channel: Channel<TaskInfo>, id: Int) { val taskInfo = TaskInfo(id, (Math.random() * 9999).toInt()) println("enqueue $taskInfo") channel.send(taskInfo) } //假设这里是真正的耗时任务执行体 private suspend fun loader(info: TaskInfo) { println("load start $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") delay(500) println("load end $info @time=${System.currentTimeMillis()} ${Thread.currentThread().name}") } private class TaskInfo { var id = 0 var priority = 0 private val taskId = UUID.randomUUID() constructor(id: Int, priority: Int) { this.id = id this.priority = priority } override fun equals(other: Any?): Boolean { return taskId == (other as TaskInfo).taskId } override fun toString(): String { return "TaskInfo(id=$id, priority=$priority)" } }

输出:

enqueue TaskInfo(id=0, priority=7947)
enqueue TaskInfo(id=1, priority=1045)
enqueue TaskInfo(id=2, priority=4478)
onEach TaskInfo(id=0, priority=7947) at time=1765979341859 my-thread-2
onEach TaskInfo(id=1, priority=1045) at time=1765979341859 my-thread-2
onEach TaskInfo(id=2, priority=4478) at time=1765979341859 my-thread-2
enqueue TaskInfo(id=3, priority=5964)
enqueue TaskInfo(id=4, priority=2658)
onEach TaskInfo(id=3, priority=5964) at time=1765979341859 my-thread-4
onEach TaskInfo(id=4, priority=2658) at time=1765979341859 my-thread-4
enqueue TaskInfo(id=5, priority=3495)
onEach TaskInfo(id=5, priority=3495) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=6, priority=1461)
onEach TaskInfo(id=6, priority=1461) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=7, priority=4860)
onEach TaskInfo(id=7, priority=4860) at time=1765979341860 my-thread-3
enqueue TaskInfo(id=8, priority=7226)
onEach TaskInfo(id=8, priority=7226) at time=1765979341860 my-thread-4
enqueue TaskInfo(id=9, priority=1939)
enqueue TaskInfo(id=10, priority=133)
onEach TaskInfo(id=9, priority=1939) at time=1765979341861 my-thread-3
onEach TaskInfo(id=10, priority=133) at time=1765979341861 my-thread-3
enqueue TaskInfo(id=11, priority=1818)
enqueue TaskInfo(id=12, priority=7695)
onEach TaskInfo(id=11, priority=1818) at time=1765979341861 my-thread-2
onEach TaskInfo(id=12, priority=7695) at time=1765979341861 my-thread-2
enqueue TaskInfo(id=13, priority=4365)
onEach TaskInfo(id=13, priority=4365) at time=1765979341862 my-thread-4
enqueue TaskInfo(id=14, priority=4889)
onEach TaskInfo(id=14, priority=4889) at time=1765979341862 my-thread-2
collect TaskInfo(id=0, priority=7947) at time=1765979341862 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695 7947
最大优先级任务:TaskInfo(id=0, priority=7947)
load start TaskInfo(id=0, priority=7947) @time=1765979341887 main
load end TaskInfo(id=0, priority=7947) @time=1765979342391 main
collect TaskInfo(id=1, priority=1045) at time=1765979342392 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226 7695
最大优先级任务:TaskInfo(id=12, priority=7695)
load start TaskInfo(id=12, priority=7695) @time=1765979342392 main
load end TaskInfo(id=12, priority=7695) @time=1765979342901 main
collect TaskInfo(id=2, priority=4478) at time=1765979342901 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964 7226
最大优先级任务:TaskInfo(id=8, priority=7226)
load start TaskInfo(id=8, priority=7226) @time=1765979342902 main
load end TaskInfo(id=8, priority=7226) @time=1765979343412 main
collect TaskInfo(id=3, priority=5964) at time=1765979343412 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889 5964
最大优先级任务:TaskInfo(id=3, priority=5964)
load start TaskInfo(id=3, priority=5964) @time=1765979343412 main
load end TaskInfo(id=3, priority=5964) @time=1765979343922 main
collect TaskInfo(id=4, priority=2658) at time=1765979343922 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860 4889
最大优先级任务:TaskInfo(id=14, priority=4889)
load start TaskInfo(id=14, priority=4889) @time=1765979343923 main
load end TaskInfo(id=14, priority=4889) @time=1765979344433 main
collect TaskInfo(id=5, priority=3495) at time=1765979344433 main
133 1045 1461 1818 1939 2658 3495 4365 4478 4860
最大优先级任务:TaskInfo(id=7, priority=4860)
load start TaskInfo(id=7, priority=4860) @time=1765979344434 main
load end TaskInfo(id=7, priority=4860) @time=1765979344943 main
collect TaskInfo(id=6, priority=1461) at time=1765979344943 main
133 1045 1461 1818 1939 2658 3495 4365 4478
最大优先级任务:TaskInfo(id=2, priority=4478)
load start TaskInfo(id=2, priority=4478) @time=1765979344943 main
load end TaskInfo(id=2, priority=4478) @time=1765979345452 main
collect TaskInfo(id=7, priority=4860) at time=1765979345452 main
133 1045 1461 1818 1939 2658 3495 4365
最大优先级任务:TaskInfo(id=13, priority=4365)
load start TaskInfo(id=13, priority=4365) @time=1765979345452 main
load end TaskInfo(id=13, priority=4365) @time=1765979345960 main
collect TaskInfo(id=8, priority=7226) at time=1765979345960 main
133 1045 1461 1818 1939 2658 3495
最大优先级任务:TaskInfo(id=5, priority=3495)
load start TaskInfo(id=5, priority=3495) @time=1765979345960 main
load end TaskInfo(id=5, priority=3495) @time=1765979346467 main
collect TaskInfo(id=9, priority=1939) at time=1765979346467 main
133 1045 1461 1818 1939 2658
最大优先级任务:TaskInfo(id=4, priority=2658)
load start TaskInfo(id=4, priority=2658) @time=1765979346467 main
load end TaskInfo(id=4, priority=2658) @time=1765979346973 main
collect TaskInfo(id=10, priority=133) at time=1765979346973 main
133 1045 1461 1818 1939
最大优先级任务:TaskInfo(id=9, priority=1939)
load start TaskInfo(id=9, priority=1939) @time=1765979346974 main
load end TaskInfo(id=9, priority=1939) @time=1765979347482 main
collect TaskInfo(id=11, priority=1818) at time=1765979347482 main
133 1045 1461 1818
最大优先级任务:TaskInfo(id=11, priority=1818)
load start TaskInfo(id=11, priority=1818) @time=1765979347483 main
load end TaskInfo(id=11, priority=1818) @time=1765979347986 main
collect TaskInfo(id=12, priority=7695) at time=1765979347986 main
133 1045 1461
最大优先级任务:TaskInfo(id=6, priority=1461)
load start TaskInfo(id=6, priority=1461) @time=1765979347987 main
load end TaskInfo(id=6, priority=1461) @time=1765979348498 main
collect TaskInfo(id=13, priority=4365) at time=1765979348498 main
133 1045
最大优先级任务:TaskInfo(id=1, priority=1045)
load start TaskInfo(id=1, priority=1045) @time=1765979348498 main
load end TaskInfo(id=1, priority=1045) @time=1765979349006 main
collect TaskInfo(id=14, priority=4889) at time=1765979349006 main
133
最大优先级任务:TaskInfo(id=10, priority=133)
load start TaskInfo(id=10, priority=133) @time=1765979349007 main
load end TaskInfo(id=10, priority=133) @time=1765979349513 main

相关:

https://blog.csdn.net/zhangphil/article/details/154843029

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/1 20:32:24

终极指南:如何使用免费在线工具快速制作专业EPUB电子书

还在为制作电子书而烦恼吗&#xff1f;&#x1f629; 传统的电子书制作软件往往需要下载安装、学习成本高&#xff0c;而且功能复杂让人望而却步。现在&#xff0c;EPubBuilder为你提供了一个完美的解决方案——这款免费的在线EPUB编辑器让任何人都能轻松创建专业的电子书&…

作者头像 李华
网站建设 2026/4/1 21:00:58

ZorinOS火爆100万下载量的背后,普通人迁移到Linux发行版的障碍在哪?

近期ZorinOS爆火&#xff0c;一个月下载量高达100万次。 从这个数字看&#xff0c;可以感觉到多数用户已经苦Win11久矣。全球4亿左右的电脑无法升级使用Windows11&#xff0c;而留在已经停止服务支持的Windows10会让用户感到不安。 100万次的下载量&#xff0c;其中高达78%的下…

作者头像 李华
网站建设 2026/3/31 23:43:42

Memobase项目安装与配置指南:构建AI长期记忆系统

Memobase项目安装与配置指南&#xff1a;构建AI长期记忆系统 【免费下载链接】memobase Profile-Based Long-Term Memory for AI Applications 项目地址: https://gitcode.com/gh_mirrors/me/memobase 项目概述 Memobase是一个创新的基于用户资料的长期记忆系统&#x…

作者头像 李华
网站建设 2026/3/10 12:23:08

揭秘上下文切换:操作系统如何让单个CPU同时运行上百个程序

揭秘上下文切换&#xff1a;操作系统如何让单个CPU同时运行上百个程序 【免费下载链接】putting-the-you-in-cpu A technical explainer by kognise of how your computer runs programs, from start to finish. 项目地址: https://gitcode.com/gh_mirrors/pu/putting-the-yo…

作者头像 李华
网站建设 2026/3/24 13:17:52

3分钟掌握CAD效率翻倍:源泉设计插件终极使用指南

想要告别传统CAD绘图的繁琐操作&#xff0c;实现设计效率的质的飞跃吗&#xff1f;源泉设计CAD插件正是你需要的效率提升工具。这款免费的专业CAD插件不仅强化了AutoCAD的核心功能&#xff0c;更通过智能化操作让建筑设计工作变得前所未有的简单高效。 【免费下载链接】源泉设计…

作者头像 李华
网站建设 2026/3/1 8:13:09

18、文本文件基础操作指南

文本文件基础操作指南 在Linux系统中,对文本文件进行操作是一项非常常见且重要的任务。本文将详细介绍一些基础的文本文件操作命令和技巧,包括制表符与空格的转换、临时文件的创建、锁文件的使用、命名管道、进程替换、文件的打开与关闭,以及 head 、 tail 、 wc 和 …

作者头像 李华