@classmethod defchoice_redis_db(cls, db): """choice designated db in redis""" """different redis db , but follow the single mode""" ifnot cls._instance.setdefault(db, None): cls._instance[db] = get_redis_connection(db) return cls(cls._instance[db])
defcheck_key_value(self, key, value): """compare key-value code and code in redis for equality """ try: if self._redis.exists(key): _value = self._redis.get(key).decode() returnTrueif _value == value elseFalse else: returnFalse except Exception as e: consumer_logger.error(e) finally: self._redis.close()
defsave_key_value(self, key, code, time): """cache verification code for ten minutes""" try: self._redis.set(key, code) self._redis.expire(key, time) except Exception as e: consumer_logger.error(e) finally: self._redis.close()