[Python] 1, Fix to allow single login, 2, update password to force re-login (#16556)

This commit is contained in:
Wang Qi
2026-07-02 15:47:51 +08:00
committed by GitHub
parent cbb24944e8
commit 4130091b69
2 changed files with 26 additions and 24 deletions

View File

@@ -78,10 +78,8 @@ app.config["BODY_TIMEOUT"] = int(os.environ.get("QUART_BODY_TIMEOUT", 600))
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "redis"
app.config["SESSION_REDIS"] = settings.decrypt_database_config(name="redis")
app.config["MAX_CONTENT_LENGTH"] = int(
os.environ.get("MAX_CONTENT_LENGTH", 1024 * 1024 * 1024)
)
app.config['SECRET_KEY'] = settings.get_secret_key()
app.config["MAX_CONTENT_LENGTH"] = int(os.environ.get("MAX_CONTENT_LENGTH", 1024 * 1024 * 1024))
app.config["SECRET_KEY"] = settings.get_secret_key()
app.secret_key = settings.get_secret_key()
commands.register_commands(app)
@@ -148,7 +146,7 @@ def _load_user(auth_types=None):
auth_types = _normalize_auth_types(auth_types)
if getattr(g, "user", None) and (not explicit_auth_types or getattr(g, "auth_type", None) in auth_types):
return g.user
# No Authorization header, try to load user from session cookie if JWT auth is allowed
authorization = request.headers.get("Authorization")
if not authorization:
@@ -159,7 +157,7 @@ def _load_user(auth_types=None):
parts = authorization.split(maxsplit=1)
if len(parts) < 2:
logging.warning("Authorization header has invalid bearer format")
return _load_user_from_session() if AUTH_JWT in auth_types else None
return None
auth_token = parts[1]
else:
auth_token = authorization
@@ -178,10 +176,10 @@ def _load_user(auth_types=None):
g.auth_type = AUTH_BETA
g.user = user[0]
return user[0]
g.auth_error_message = 'Authentication error: API key is invalid! '
g.auth_error_message = "Authentication error: API key is invalid! "
except Exception as e_beta:
logging.warning(f"load_user from beta token got exception {e_beta}")
g.auth_error_message = 'Authentication error: API key is invalid!'
g.auth_error_message = "Authentication error: API key is invalid!"
# Try JWT decoding
if AUTH_JWT in auth_types:
@@ -191,21 +189,21 @@ def _load_user(auth_types=None):
if not access_token or not access_token.strip():
logging.warning("Authentication attempt with empty access token")
return _load_user_from_session()
return None
if len(access_token.strip()) < 32:
logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars")
return _load_user_from_session()
return None
user = UserService.query(access_token=access_token, status=StatusEnum.VALID.value)
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return _load_user_from_session()
return None
g.auth_type = AUTH_JWT
g.user = user[0]
return user[0]
return _load_user_from_session()
return None
except Exception as e_jwt:
logging.warning(f"load_user from jwt got exception {e_jwt}")
@@ -218,7 +216,7 @@ def _load_user(auth_types=None):
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return _load_user_from_session() if AUTH_JWT in auth_types else None
return None
g.auth_type = AUTH_API
g.user = user[0]
return user[0]
@@ -228,7 +226,7 @@ def _load_user(auth_types=None):
except Exception as e_api_token:
logging.warning(f"load_user from api token got exception {e_api_token}")
return _load_user_from_session() if AUTH_JWT in auth_types else None
return None
current_user = LocalProxy(_load_user)

View File

@@ -114,7 +114,7 @@ async def login():
user = UserService.query_user(email, password)
if user and hasattr(user, 'is_active') and user.is_active == "0":
if user and hasattr(user, "is_active") and user.is_active == "0":
logging.warning("Login failed: disabled account for user_id=%s", user.id)
return get_json_result(
data=False,
@@ -260,7 +260,7 @@ async def oauth_callback(channel):
# User exists, try to log in
user = users[0]
user.access_token = get_uuid()
if user and hasattr(user, 'is_active') and user.is_active == "0":
if user and hasattr(user, "is_active") and user.is_active == "0":
return redirect("/?error=user_inactive")
login_user(user)
@@ -331,6 +331,7 @@ async def setting_user():
"""
update_dict = {}
request_data = await get_request_json()
password_changed = False
if request_data.get("password"):
new_password = request_data.get("new_password")
if not check_password_hash(current_user.password, decrypt(request_data["password"])):
@@ -342,6 +343,8 @@ async def setting_user():
if new_password:
update_dict["password"] = generate_password_hash(decrypt(new_password))
update_dict["access_token"] = f"INVALID_{secrets.token_hex(16)}"
password_changed = True
for k in request_data.keys():
if k in [
@@ -367,6 +370,8 @@ async def setting_user():
try:
UserService.update_by_id(current_user.id, update_dict)
if password_changed:
logout_user()
return get_json_result(data=True)
except Exception as e:
logging.exception(e)
@@ -653,7 +658,7 @@ async def forget_get_captcha():
- Generate an image captcha and cache it in Redis under key captcha:{email} with TTL = OTP_TTL_SECONDS.
- Returns the captcha as a PNG image.
"""
email = (request.args.get("email") or "")
email = request.args.get("email") or ""
if not email:
return get_json_result(data=False, code=RetCode.ARGUMENT_ERROR, message="email is required")
@@ -664,9 +669,10 @@ async def forget_get_captcha():
# Generate captcha text
allowed = string.ascii_uppercase + string.digits
captcha_text = "".join(secrets.choice(allowed) for _ in range(OTP_LENGTH))
REDIS_CONN.set(captcha_key(email), captcha_text, 60) # Valid for 60 seconds
REDIS_CONN.set(captcha_key(email), captcha_text, 60) # Valid for 60 seconds
from captcha.image import ImageCaptcha
image = ImageCaptcha(width=300, height=120, font_sizes=[50, 60, 70])
img_bytes = image.generate(captcha_text).read()
response = await make_response(img_bytes)
@@ -816,7 +822,7 @@ async def forget_reset_password():
- auto login
- clear verified flag
"""
req = await get_request_json()
email = req.get("email") or ""
new_pwd = req.get("new_password")
@@ -829,8 +835,8 @@ async def forget_reset_password():
return get_json_result(data=False, code=RetCode.AUTHENTICATION_ERROR, message="email not verified")
new_pwd_base64 = decrypt(new_pwd)
new_pwd_string = base64.b64decode(new_pwd_base64).decode('utf-8')
new_pwd2_string = base64.b64decode(decrypt(new_pwd2)).decode('utf-8')
new_pwd_string = base64.b64decode(new_pwd_base64).decode("utf-8")
new_pwd2_string = base64.b64decode(decrypt(new_pwd2)).decode("utf-8")
if new_pwd_string != new_pwd2_string:
return get_json_result(data=False, code=RetCode.ARGUMENT_ERROR, message="passwords do not match")
@@ -838,7 +844,7 @@ async def forget_reset_password():
users = UserService.query_user_by_email(email=email)
if not users:
return get_json_result(data=False, code=RetCode.DATA_ERROR, message="invalid email")
user = users[0]
try:
UserService.update_user_password(user.id, new_pwd_base64)
@@ -854,5 +860,3 @@ async def forget_reset_password():
msg = "Password reset successful. Logged in."
return await construct_response(data=user.to_safe_dict(for_self=True), auth=user.get_id(), message=msg)