java 应用,生产者单线程分页查询数据库并写入 redis 队列中,消费者 30 个线程多线程消费 list 。本来是没有出现过这个报错的,但是更新了生产者的 sql 后,sql 速度变快了?每次正常运行一段时间后,生产者异常就出现,消费者也接着出现异常
报的异常如下
生产者异常: :"sendQueueMessageInfo leftPushAll : 异常 : org.springframework.data.redis.RedisConnectionFailureException: Unexpected end of stream.; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. org.springframework.data.redis.connection.jedis.JedisExceptionConverter.convert(JedisExceptionConverter.java:67) org.springframework.data.redis.connection.jedis.JedisExceptionConverter.convert(JedisExceptionConverter.java:41) org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) org.springframework.data.redis.connection.jedis.JedisConnection.convertJedisAccessException(JedisConnection.java:142) org.springframework.data.redis.connection.jedis.JedisListCommands.convertJedisAccessException(JedisListCommands.java:486) org.springframework.data.redis.connection.jedis.JedisListCommands.lPush(JedisListCommands.java:84) org.springframework.data.redis.connection.DefaultedRedisConnection.lPush(DefaultedRedisConnection.java:444) org.springframework.data.redis.core.DefaultListOperations.lambda$leftPushAll$2(DefaultListOperations.java:124) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:224) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:184) org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:95) org.springframework.data.redis.core.DefaultListOperations.leftPushAll(DefaultListOperations.java:124) com.service.ds.exec.DataSourceActuator.sendQueueMessageInfo(DataSourceActuator.java:392) com.service.ds.exec.DataSourceActuator.execAndPushQueue(DataSourceActuator.java:214) com.controller.rest.etl.EtlTaskRestController$1.run(EtlTaskRestController.java:86) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)\
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:199) redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) redis.clients.jedis.Protocol.process(Protocol.java:153) redis.clients.jedis.Protocol.read(Protocol.java:218) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:341) redis.clients.jedis.Connection.getIntegerReply(Connection.java:266) redis.clients.jedis.BinaryJedis.lpush(BinaryJedis.java:1119) org.springframework.data.redis.connection.jedis.JedisListCommands.lPush(JedisListCommands.java:82)\n\t... 12 more\n","stackTrace":"","reqId":"","elapsedTime":null}
消费者异常: 1","className":"com.service.schedule.SchedulerTask$1","lineNumber":"63","message":"队列消费 : 异常 : org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:281) org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:464) org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:132) org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:95) org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:82) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:184) org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:95) org.springframework.data.redis.core.DefaultListOperations.leftPush(DefaultListOperations.java:99) com.service.etl.consumer.QueueConsumer.retryQueueMessage(QueueConsumer.java:101) com.service.etl.consumer.QueueConsumer.consume(QueueConsumer.java:180) com.service.schedule.SchedulerTask$1.run(SchedulerTask.java:60) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)java.lang.Thread.run(Thread.java:745)\nCaused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the poolredis.clients.util.Pool.getResource(Pool.java:53)redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)redis.clients.jedis.JedisPool.getResource(JedisPool.java:16) org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:271)\n\t... 14 more\nCaused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Connection resetredis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) redis.clients.jedis.Protocol.process(Protocol.java:153) redis.clients.jedis.Protocol.read(Protocol.java:218) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:341) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:240) redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2223) redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:108) org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:889) org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:424) org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:349) redis.clients.util.Pool.getResource(Pool.java:49)\n\t... 17 more\nCaused by: java.net.SocketException: Connection reset java.net.SocketInputStream.read(SocketInputStream.java:209) java.net.SocketInputStream.read(SocketInputStream.java:141) java.net.SocketInputStream.read(SocketInputStream.java:127) redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)\n\t... 28 more\n","stackTrace":"","reqId":"","elapsedTime":null}
redis 配置
public RedisTemplate<String, QueueMessageInfo> redisQueueMessageTemplate(RedisConnectionFactory factory) { RedisTemplate<String, QueueMessageInfo> template = new RedisTemplate<String, QueueMessageInfo>(); template.setConnectionFactory(factory); // 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 redis 的 value 值 Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(om); template.setValueSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } @Bean public RedisConnectionFactory connectionFactory() { JedisPoolConfig poolCOnfig= new JedisPoolConfig(); poolConfig.setMaxTotal(maxActive); poolConfig.setMaxIdle(maxIdle); poolConfig.setMaxWaitMillis(maxWait); poolConfig.setMinIdle(minIdle); poolConfig.setTestOnBorrow(true); poolConfig.setTestOnReturn(false); poolConfigsetTestWhileIdle(true); JedisClientConfiguration jedisClientCOnfiguration= null; if (ssl) { jedisClientCOnfiguration= JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).and() .readTimeout(Duration.ofMillis(timeout)).useSsl().build(); } else { jedisClientCOnfiguration= JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).and() .readTimeout(Duration.ofMillis(timeout)).build(); } RedisStandaloneConfiguration redisStandalOneConfiguration= new RedisStandaloneConfiguration(); redisStandaloneConfiguration.setDatabase(redisDatabase); redisStandaloneConfiguration.setPort(port); redisStandaloneConfiguration.setPassword(password); redisStandaloneConfiguration.setHostName(host); RedisConnectionFactory redisCOnnectionFactory= new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration); logger.info("初始化 redis 链接池 : JedisPoolConfig"); return redisConnectionFactory; } # 池在给定时间可以分配的最大连接数。使用负值无限制。 #spring.redis.pool.max-active=200 spring.redis.pool.max-active=200 # 池中“空闲”连接的最大数量。使用负值表示无限数量的空闲连接。 # spring.redis.pool.max-idle=20 spring.redis.pool.max-idle=20 # 连接分配在池被耗尽时抛出异常之前应该阻塞的最长时间量(以毫秒为单位)。使用负值可以无限期地阻止。 spring.redis.pool.max-wait=9000 # 目标为保持在池中的最小空闲连接数。这个设置只有在正面的情况下才有效果。 #spring.redis.pool.min-idle=20 spring.redis.pool.min-idle=20
![]() | 1 rqxiao OP 额 先用 LettuceConnectionFactory 了 |