现在有这样一个函数 processBatch ,负责读取数据,执行一些操作后再更新它们,相关的数据库操作都在事务内执行。伪代码如下:
function processBatch(): tx = db.beginTransaction() // 1. 批量读取:取出最多 N 条“待处理”数据 items = tx.query("SELECT * FROM tasks WHERE status = 'PENDING' LIMIT N") for itm in items: // 2. 业务处理 doBusinessLogic(item) // 3. 更新状态 tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id) tx.commit()
// 线程 A spawn threadA: processBatch() // 线程 B (几乎同时执行) spawn threadB: processBatch()
但由于 processBatch 在多个地方都会被调用,因此存在并发问题。线程 A 和线程 B 执行时可能查询到同一批数据,导致这批数据被处理两次。解决这个问题有两个方案:
我的问题是:
感谢各位赐教
感谢各位的热情回复,但我想问的不是“如何在应用层实现锁来解决这个问题”,而是“为什么”的问题。大多数回复都给出了在应用层解决的方案,所以我再重新描述下我的问题:
![]() | 1 wxyz 170 天前 感觉是外部事务的颗粒度太大了, 一次查询多条数据,但没有立即更新该批次数据的状态,肯定会导致查询到重复的数据的; 建议增加一层内存或缓存级别的互斥锁,锁任务的 id 以及一个任务一个事务,这样可以保证一个任务每次只有一个线程处理。 |
![]() | 2 wyntalgeer 170 天前 噗……不会并发执行的多线程? |
![]() | 3 luckyrayyy 170 天前 ![]() 互联网公司的习惯一般并发控制都放在业务逻辑上,不太依赖数据库,就是用方案 A ,当然你数据库还是需要设置合理的隔离级别。在你的业务场景里,遇到并发,没抢到锁的线程是等待,还是直接返回报错不处理了,如果是前者的话,可以把所有的处理逻辑放到一个有序队列里,依次执行。 |
4 kanepan19 170 天前 |
5 kanepan19 170 天前 接上面的 加一个状态,处理中 |
&bsp; 6 lvlongxiang199 170 天前 |
7 kanepan19 170 天前 boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id) 多线程 只有一个成功。 是否需要抛异常,看业务决定 |
8 Georgedoe 170 天前 3. 哥们不会还没用过 deepseek 和 chatgpt 吧 |
![]() | 9 siweipancc 170 天前 via iPhone 不要在数据库玩这个…… |
10 hwdq0012 170 天前 1.读写分离,写放队列执行 2.读到的数据可能会重复,处理掉,用 etcd ,zookeeper, rides 之类的 3.没写过后端,轻喷 |
![]() | 11 rekulas 170 天前 加锁不够优雅,不如消息队列,不过数据少用不上队列,但你可以用生产者消费者的思维来改写,保证只有一个生产者就行了 |
12 vikaptain 170 天前 悲观锁、乐观锁、队列 |
13 unused 170 天前 除了上面提到的,还可以考虑用某种查询条件对数据分区,让不同线程查询到不同的数据 |
14 yinmin 170 天前 方案 A 和方案 B 都有问题。 如果 doBusinessLogic 是 IO 密集型,推荐使用 ThreadPoolExecutor 。操作步骤如下: 1. 设置 ThreadPoolExecutor 并发 workers ,例如 workers=5 也就是有 5 个并发同时处理; 2. 函数 processBatch:将 pending 的任务 ID submit 到 ThreadPoolExecutor 队列里; 3. ThreadPoolExecutor workers 处理:先 update tasks set status='RUNNING' where id=? and status='PENDING',如果返回更新记录数=0 ,就直接 return 不处理; (这是一个防冲突的技巧),再 doBusinessLogic(id),然后 update tasks set status='DONE' where id=? 好处是:突发高并发时,任务是加入到队列的,不会挤爆服务器;可以设置并发 workers 同时处理。 |
15 Ayanokouji 170 天前 这不是并发的问题,是设计的问题。 简单点,给 processBatch 加一个 start_time 和 end_time 参数,保证查到不一样的数据。 |
16 mooyo 170 天前 两阶段乐观锁吧, 第一阶段先拿一部分,从 Waiting 改成 Running ,拿到了再去执行。 |
![]() | 17 sagaxu 170 天前 SELECT 取出 id 列表,遍历的时候按照随机顺序(如果业务逻辑允许),用 SELECT FOR UPDATE 锁住每一行,检测状态,把 PENDING 更新为 PROCESSING ,处理完成后再更新为 DONE 。这里要有一个机制,把停留在 PROCESSING 超时的任务重新放回 PENDING 或者标记为 DONE 。 如果类似任务比较多的话,可以引入一个任务调度系统,别自己搞了,要填的坑和细节非常多。 |
18 encounter2017 170 天前 这完全没必要用锁,是设计的问题,流程调整下就好了。 我理解你这块是离线的业务对吧。 首先 select id from tasks where status = 'PENDING' 拿出全量需要处理的数据,做成一个离线文件或者放内存里(看你自己的数据规模决定) 接着实现一个缓冲,简单点可以在内存里面构造一个比如长度为 16 的队列,存放下一批需要处理的数据 然后是设置并发度,比如说 4 ,这一块你用线程/纤程/进程 实现都可以,依次从队列里面取任务,队列空了在获取下一批数据到队列里面。 这一块自己实现细节还挺多的,比如任务失败了如何重跑,需不需要做背压之类的。 我之前做过类似的,用框架实现,对应的代码就很简洁,伪代码类似这样 ``` ids.toStream.buffer(16).mapPar(4)(row => processData(row)) ``` |
19 EMMMMMMMMM 170 天前 你就不能给任务分一下片吗? 线程 1:WHERE status = 'PENDING' and id % 2 = 0 线程 2:WHERE status = 'PENDING' and id % 2 = 1 说实话, 在互联网干了七八年了,多线程的代码屈指可数,都是多进程 |
![]() | 20 Romic 170 天前 1. 使用分布式锁 性能一般 2. 使用数据库的乐观锁 加 version 开发成本太高,需要多维护一个 version 字段 3. 推荐方案,将数据分批打散,比如 1000 条数据 2 个现成并行执行,那么 1-500 是线程 A 执行。501-1000 线程 B 执行。经典方案。 话说这种问题直接丢给 ai ,很多方案。以前的 deepseek R1 模型的方案完整 准确率高。现在好像不行啦。 gg 思密达。 |
![]() | 21 netnr 170 天前 这类似发短信的系统,很多地方调用发短信接口,都先写入发送记录表,状态为待发送,然后起一个任务循环执行发送并改状态; 如果是在一个进程的前提下,可以用线程安全的先进先出队列,把 processBatch 添加到队列,另起一个线程来消费队列 |
![]() | 22 netnr 170 天前 |
![]() | 23 ThreeK 170 天前 1 、要是不能控制调用方并发就推介方案 B ,方案 A 容易等不到锁。 2 、2.1 doBusinessLogic 如果 IO 多计算少可以考虑并发执行。2.2 processBatch 可以写成幂等的,最终一致就行,多执行几遍还能申请加资源。 3 、你这不算高并发,高并发一般不存在同一数据并发处理。高并发并发大但调用都带着唯一 id ,直接分布式锁解决同一 id 并发问题,你这种同一数据多处调用应该是锁等待/唤醒问题。 我认为你们这业务槽点太多 1) 事务太大不能拆就只能等报错了。 2 )改 status 像事件驱动(消息通知),又不知你们写的什么。 3 ) limit N 像是要批处理又好多处调用,调用的地方还不加条件,光用 limit N 来确定数据属于一个事务。。。。 |
![]() | 24 geebos PRO 这种场景一般用生产-消费者模型,一个线程查,多个线程处理 |
25 thevita 170 天前 ![]() 没说清楚啊,与你的事务会会发生冲突的都有啥啊,仅仅同一个逻辑的不同任务吗?有没有 读-写冲突?有没有其他不同粒度、不同逻辑的写-写冲突,doBusinessLogic 里面有不有 外部一致性要求? 超大事务呗,某些系统很常见,并不是所有业务都是互联网,上面的不要看到这种就报警 锁放外部(方案 A )正如你所说,只解决了 processBatch 的并发问题,但是不能避免其他事物的更新,依然可能导致 write-skew ,除非你保证只要该这个表,都拿锁,那和表锁其实也没太大差别,就看你们的数据库实现得整么样了 锁表(方案 B )通过合理的加锁,能避免 write-skew, 但是冲突域会变大,影响系统吞吐,甚至某些 db 可能会阻塞读,但是话又说回来,如果你的场景类似,半夜批量计算,冲突可能低那种,耶完全可以接受 其他方案: 其实具体看你能接受 哪部分 可以被适当取舍,比如上面只讨论了锁的情况,取舍的就是与其他事物的冲突 如果你能接受适当若化 这个超大事务的原子性的话还可以: processBatch 内加锁,这个锁止解决 不同 processBatch 任务间的冲突(更好的办法可能是引入一个协调者来保证 不同 processBatch 尽量不冲突),然后更新使用乐观锁+重试,让 这个 batch 实现最终一致,也不失为一种办法(当然,这里没讨论你的 doBusinessLogic 有不有外部一致性的情况) |
26 prosgtsr 170 天前 via iPhone 如果是我的话,我会改成一个线程查,然后多线程领取任务再处理。 要问怎么学,我也不知道,我也是草台班子 |
27 listenerri 169 天前 via Android 问题在哪里发生的,就尽可能在那里解决 |
28 NoDataNoBB 169 天前 select for update |
![]() | 29 shangfabao 169 天前 上边写的是对的,先 update,毕竟看你的逻辑,是否 update 是没有看执行逻辑的返回结果的 |
![]() | 30 kai1412 169 天前 多线程分页取 分配好每页的数量 线程取完各自根据主键 id 更新也不会有冲突 |
![]() | 32 heiya 169 天前 实现上来说 A 和 B 都行,不过锁粒度都很大。根据你对 processBatch()发生并发可能性的预测,还有一些别的方案:1.并发频繁发生,可以用#28 的 select for update 2.并发偶尔发生,在表里边加一列版本号,每次更新时对比这个值是否和取出时的值一致,不一致就是有并发。这样的锁粒度控制在行上。不过假如 10 条数据 2 条有并发问题,这种情况又得额外处理。 |
![]() | 33 chiaoyuja 169 天前 ![]() 应用层加锁 加锁可以做到显式控制并发,你能清楚知道「谁在处理」; 不依赖数据库细节,业务逻辑统一,不受数据库种类、配置影响; 代码层面的锁(如 Redis 分布式锁)支持跨进程、跨服务的同步; 开发和调试更直观,出问题也容易定位。 数据库加锁 数据库事务的隔离级别越高,性能越差(如 SERIALIZABLE 会阻塞读取); 很难用 SQL 实现「原子地读取并更新任务状态」这种逻辑,语法不友好; 不同数据库实现不同,如 PostgreSQL 支持 SELECT ... FOR UPDATE SKIP LOCKED ,但 MySQL 实现不同,兼容性差; 数据库锁只在事务内有效,不能用于跨服务同步控制; 如果多个服务部署在不同机器上,仅靠 DB 锁,还是可能出现争抢/重入的问题。 |
![]() | 34 testliyu 168 天前 我们之前是直接加分布式锁 |