.Net 相关
线程 0 调用硬件异步 API, 拿到数据后, 从 devices 根据 id 取到 Device 实例, 更新硬件最新数据到这个实例上.
同时有多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作, 并在异步操作执行完成后, 对硬件数据进行部分更新.
这个要怎么做才能确保线程安全?
// 设备集中存储处 ConcurrentDictionary<int, Device> devices = new(); // 设备类 public class Device { public int Id { get; init; } public bool Enable { get; set; } public string Group { get; init; } = ""; public int[] Locations { get; init; } = Array.Empty<int>(); public int Margin { get; set; } public int RsCount { get; init; } public bool EnableSplit { get; init; } public int DynamicMerge { get; set; } public int Width { get; set; } public int Length { get; set; } public int LeftLength { get; set; } public int LoadEdge { get; set; } public int Dest { get; set; } } // 数据更新线程相关 public Thread0Executor() { public async Task Execute() { var data = await GetDataFromHardwareApi(); Update(data, devices); } } // 数据监控处理线程相关 public MonitorThreadExecutor() { public async Task Execute() { Resolve(devices); await Operate0(); DoSomething(); await Operate1(); DoSomething(); } public async Task Operate0() { try { await CallApi(); Update(devices); } catch() { UpdateIfError(devices); } } }
异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.
这里也不能对整个 Execute 方法用锁. 因为监控线程中的异步操作耗时是不一定的, 可能因为网络问题花个几分钟都有可能.
貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁.
也考虑过弄个类似 ANDROID 里的 UI 线程和子线程的东西, 数据读取和更新都放在 UI 线程里, 异步操作放在子线程里. 但是搞了半天没搞出来.
最后的最后, 实在没办法了, 我在想要不把 Device 的所有属性都加一个 volatile 关键字. 我这里更新数据的时候基本不会看原来数据是多少, 不会出现count++
这种情况, 貌似 volatile 是可行的. 但是实际这个 Device 有几十个属性, 并且有一两千个 Device, 如果每个属性都加一个 volatile 关键字, 那就是 2000*50=100 万个属性带 volatile 了. 这会不会极大地影响程序运行性能?
1 svnware 2024-02-27 18:39:51 +08:00 单写多读不就已经是线程安全的了么。。。 |
![]() | 2 wamson 2024-02-27 18:50:10 +08:00 via iPhone 看标题,寻思,这不就是个读写锁么 |
3 laminux29 2024-02-27 18:50:53 +08:00 你这不是一线程更新,多线程读,而是多线程读写。 这种问题,没把握的话,直接丢给 MSSQL ,如果对数据一致性要求严谨,用序列化级别的事务去操作数据。 如果要求不严谨,直接用 EF 的乐观锁或最终一致性。 |
![]() | 4 wayne1007 2024-02-27 18:51:15 +08:00 double buffer ,写线程 先 load 数据,然后和更新 buffer 的 idx 0->1 或者 1->0 |
![]() | 5 wayne1007 2024-02-27 18:52:34 +08:00 @wayne1007 读线程,直接按当前的 idx 读数据,就 idx 切换的瞬间,可能不一致,用信号量也不一定能完全保证,看你这个场景够不够用,不用锁的话 |
7 namonai 2024-02-27 18:54:37 +08:00 @svnware 不一样的。比如一段数据,写入一半的时候被读取,读到的就是 broken 的数据。哪怕是对单个字节进行读写操作,也可能存在问题,所以至少要使用原子操作进行保护。 |
8 codcrafts 2024-02-27 18:54:38 +08:00 我没太懂,你这种情况下会有线程安全问题吗?我感觉不会 |
10 bthulu OP @laminux29 丢 SQL 里不到万不得已不考虑, 尽量在内存这一层面解决, 实在么办法了再考虑丢 SQL 里去. |
11 guo4224 2024-02-27 18:57:25 +08:00 临界区…… |
12 namonai 2024-02-27 19:04:30 +08:00 ![]() @bthulu 你可以试试 trible buffer ,编号 0 、1 、2 ,读线程实现一个 getIndex(),初始的 valid indx 是 0 ,需要对数据更改的时候,index + 1 ,往 1 上写,写完了以后 valide index 也更新到 1 ,这个时候 0 和 1 的数据都是有效的,过了一小段时间,0 就没人访问了。在这段时间里如果又有需要写入的数据,那就往 2 上写。这样子可以始终保证读到的数据是完整的。可以把写入操作放在一个单独的线程里进行,其他线程如果有修改数据的需要,就通过队列传递数据过去。 |
13 billccn 2024-02-27 19:12:20 +08:00 `devices`这个字典一定程度上就是一个手搓的数据库,你这个里面要考虑的情况很多,比如: 1. 字典需要动态增删吗?不需要的话这个`Concurrent`是徒增开销 2. 字典里面的值(就是每个 Device 实例)会被多个线程同时引用吗?字典的`Concurrent`是不会管里面的值是不是 thread-safe 3. Device 实例需要 referential equivalence 吗?不需要的话建议把这个类变成只读的,每次更新的时候直接替换整个实例最安全 4. Device 与 Device 之间有关系吗?有的话你可能需要考虑如何 atomically 更新这个字典 |
14 bthulu OP @billccn 字典可以保证 Device 实例引用线程安全. 这里主要的问题就是这个 Device 实例上的茫茫多的属性怎么保证线程安全 |
![]() | 15 geelaw 2024-02-27 19:48:17 +08:00 via iPhone >异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁. >貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁. 规则是 lock 里面不可以有 await (可以实现,但是几乎总是错误的,因此语言层面拒绝这样做),在 async 方法中 lock 是完全 OK 的。 ConcurrentDictionary 已经确保每次访问它的成员都是原子的,然而这不代表对它的访问逻辑就已经线程安全,比如一段代码里连续访问它的成员两次,那么在中途其他线程可能已经修改过了这个字典。说这点是预防针,楼主在 #14 提到这是为了确保 Device 存在 devices 里面的引用安全。 要保证每个 Device 实例线程安全,最简单的思路是细粒度,比如操作每个 device 的时候 lock 之。如果操作过程需要异步,那么我想象中楼主说有多个线程查看 devices 并做一些事,意思是如果 A 线程处理了 device1 则 B 线程应该跳过并处理 device2 ,这种情况下因为 device 被占用时无需等待,所以可以用 interlocked operation 实现: 1. 在 device 上加上一个 int 字段 InUse ,表示目前是否在处理它,初始化为 0 。 2. 要访问一个 device ,先用 Interlocked.Exchange 查看 InUse 并设置为 1 ,如果 InUse 之前也是 1 ,则跳过。 3. 否则 InUse 之前是 0 并且被原子设置为 1 ,此时当前方法认为自己接管该 device 并开始异步硬件 API 操作,在 await 结束、处理完 device 后,重新 Exchange 把 InUse 还原为 0 。 |
16 dogfeet 2024-02-27 20:18:47 +08:00 如果更新的时候不看原来的数据,且 [多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作] 这个过程中数据变化了也没关系的话,可以考虑直接将 Device 变为不可变(所有字段都 readonly)。 C# 不是特别熟了,devices 本身读写是线程安全的,里面的 device 只要每次更新的时候是替换一个新的不可变对象,这在 java 中是线程安全的。 几十个字段的拷贝,应该也还好。 |
![]() | 18 zzzyk 2024-02-27 22:53:00 +08:00 无锁队列看行不行。 |
19 CLMan 2024-02-27 23:30:10 +08:00 ![]() 这个问题的核心是你业务逻辑的“线程安全”是如何定义的(只有你自己知道),至于是用锁、读写锁、Semaphore 、无锁、volatile 等,这些纯粹是实现细节,取决于你对并发相关基础知识(操作系统领域)以及特定语言(这里是.NET )相关库和语法的熟悉程度。 由于不了解你的业务逻辑实现细节,我只能提问: - 线程 0 是只写吗,是否依赖 Device 当前的状态? - 监控线程统计所有设备状态时,以及执行异步操作时,是否允许线程 0 进行更新? - 监控线程的异步任务与线程 0 是否存在写入相同的内存区域的情况? - 监控线程的异步任务是否可能执行超过 100 毫秒,如果超过,是否允许多个监控线程的异步任务同时执行?如果允许,它们的写是否冲突? 你至少需要补充以上细节,才能让回答者更好的帮你解决问题。 |
20 iceheart 2024-02-28 07:22:25 +08:00 via Android 多个副本数据策略。 属性数据放两个以上副本,由一个 volatile 索引指定最新副本。 写线程更新副本后再更新索引。 读线程按索引访问副本数据。 |
21 bthulu OP @CLMan 线程 0 只写, 不依赖 Device 当前的状态 监控线程执行异步操作时,允许线程 0 进行更新 监控线程的异步任务跟线程 0 写入的就是相同的内存区域 监控线程的异步任务是轮询执行的, 执行完毕后等 100 毫秒再次执行,且执行时间可能长达几分钟。允许多个监控线程的异步任务同时执行。他们的写存在冲突。 |
![]() | 22 xuanbg 2024-02-28 08:48:44 +08:00 这……单写不是已经线程安全了么?看内容貌似又不是,OP 还是直接说需求吧,这问题都说不清楚,实在让人挠头。 |
![]() | 23 zzl22100048 2024-02-28 08:58:13 +08:00 |
24 layxy 2024-02-28 09:16:09 +08:00 又不是多写,单写多读没啥线程安全问题吧 |
25 1008610001 2024-02-28 09:26:55 +08:00 看描述。。。只有一个线程负责写数据 不存在线程安全的问题啊 |
![]() | 26 lakehylia 2024-02-28 09:29:46 +08:00 简单点,直接用事务线程不行么?其他多个线程都是提交事务给事务线程负责读写,然后事务线程回调结果。 |
27 4kingRAS 2024-02-28 09:56:15 +08:00 读写操作是原子的吗?原子的,一个线程写根本没多线程问题 如果不是原子的,先尝试做到原子,做不到就读写时加锁 |
28 wu00 2024-02-28 10:19:56 +08:00 是不是想太多了? ConcurrentDictionary 本就是线程安全集合,TryAdd(),TryUpdate()都是原子操作。 所以就算你 Thread0 、Monitor1 、Monitor2 三个线程并发 ConcurrentDictionary 进行操作,也不会出现线程安全问题;会出现的是你业务上的“线程安全”问题:到底谁的优先级更高? |
![]() | 29 cloud107202 2024-02-28 11:01:46 +08:00 这里可以考虑做个线程读写分离。没接触过 .Net 我会用 Java 的 type 与 API 描述,自行对应一下: 首先把成员 devices 与相关的操作都封装到一个类型里面,对外暴露一个 public 的阻塞队列成员变量,Java 的话我会用有阻塞语义的 ArrayBlockingQueue. 这个类型在构建的时候(onCreation),启动一个单线程去 poll 这个 Queue. devices 的更新逻辑都由这个单线程完成 外面的异步操作获取到设备信息后,以 ImmutableEvent 的形式把必要的信息封装描述好,放入队列. 形如 ArrayBlockingQueue<DeviceUpdatedEvent> 这样子,里面的单线程 poll 到事件直接更新 Dictionary 即可。 最后剩下这个“多个监控线程每隔 100 毫秒读取一次所有设备状态” ,这里简单起见可以将 devices 也设置成 public ,直接在外面访问 devices 成员(重点是:一定要约定好,在 poll 的线程之外的逻辑,全部只能 read 这个 ConcurrentDictionary )。因为 Dictionary 本身使用了线程安全的 ConcurrentDictionary ,对它的 CRUD 是线程安全的,只需要防止外面监控程序获取到某个尚未更新完成的某个 Device 实例(有点像 DB 的脏读),这里给 Device 每个属性设置 volatile 肯定是不合适的:可以考虑前面提到的,在负责 poll 的单线程,获取到更新事件后,不要就地改变 device 对象本身的属性值,而是以 deepCopy 的方式创建个全新的 Device 实例。然后用 ConcurrentDictionary.put(key, value) 的 API 直接更新整个 Device 对象,规避外部监控线程在 scan 的时候,获取到属性更新不完整的 stale state |
30 jones2000 2024-02-28 11:13:29 +08:00 奇偶读写,2 个内存块( 0 号,1 号),0 号写的时候,1 号读。1 号写的时候,0 号读。 |
31 dode 2024-02-28 14:06:07 +08:00 调整锁的粒度 |
32 liuky 2024-02-28 14:30:22 +08:00 使用阻塞队列 BlockingCollection 试试, |
![]() | 33 qping 2024-02-28 14:31:33 +08:00 我感觉 27 楼说的做到写原子操作就可以了 Device 应该是一个 immutable 得对象,不可变 想要更新只能 clone ,然后 update 到字典中 |
34 sparklee 2024-02-28 14:37:27 +08:00 单个线程更新, 所有需要更新的操作都做成 任务 都放到任务队列 |
35 yansideyu 2024-02-28 14:40:53 +08:00 楼主的问题是所有线程更新数据的时候,需要更新多个属性,怎么避免没有全部更新完的情况下,其他线程读取了数据。拿到了脏数据? |
36 i8086 2024-02-28 14:41:51 +08:00 楼主意思应该是多线程更新集合里 Device 类型属性值的问题? 用 volatile 就好了,目前是最方便。 |
![]() | 37 qping 2024-02-28 14:44:47 +08:00 又仔细看了下,你是多线程写啊,MonitorThread (多个)和 Thread0 都能更新, 那存在一些问题 1. MonitorThread 和 Thread0 是否会写入冲突 如果 MonitorThread 和 Thread0 写入相同得内存,那感觉就是设计有问题 那我假想他们不会冲突 2. 多个 MonitorThread 冲突的问题 多个 MonitorThread 每次都更新全部的 devices ,这个设计也很奇怪 假设已经做到通过锁或其他手段,保证一个 MonitorThread 更新是原子级别的。 MonitorThread A 先启动,MonitorThread B 后启动,因为等待时间长 A 的结果却比 B 后写入,这样没有问题吗? 我觉得,应该可以有多个 MonitorThread 线程,但是每个 Device 只能同一时间被一个 MonitorThread 更新 实现方法上,可以用队列,每次更新 MonitorThread 从队列中取一个 Device ,如果更新完重新还回 |
![]() | 38 yicong135 2024-02-28 15:24:41 +08:00 ReaderWriterLockSlim 读写锁 https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.readerwriterlockslim?view=net-8.0 |
39 shapper 2024-02-28 16:23:22 +08:00 task 本身就是开新线程,减少锁粒度,锁 devices 就可以,把具体 device 分配到 task ,task 只修改自己引用的 device ,不修改 devices ; |
40 dogfeet 2024-02-28 17:32:04 +08:00 @bthulu 看起来就是写不依赖读,或者说写需要的读状态可以是旧数据(只需完整,无需最新)。那么单纯的将 Device 变为不可变就行。ConcurrentDictionary 单纯的读写本身是原子的,查了一下,不可变的线程安全 C# 与 Java 是一致的。 |
41 nevermoreluo 2024-02-28 17:33:40 +08:00 除了 Group 都是 int 或者 bool ,Group 不动的话 保证原子性应该就好了吧.... |
43 xumng123 2024-02-28 19:05:58 +08:00 via iPhone 已经是安全的了 |
44 bthulu OP @qping Thread0 会不依赖原有属性值更新所有 Device 的属性. MonitorThread 会读大部分的 devices, 并更新小部分 devices. |
45 m2276699 2024-02-29 08:49:00 +08:00 这样的业务应该用事件驱动 |
46 johnnyyeen 2024-02-29 17:16:40 +08:00 1 生产者对多消费者,给每个消费者一个队列。 通过原子操作(信号或者锁)的方式保护生产者与消费者的竞争条件(我写数据你取走数据)。 |