from datetime import timedelta from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from database import get_db from config import get_settings from auth import ( UserCreate, UserUpdate, UserResponse, Token, authenticate_user, create_user, create_access_token, get_user_by_email, get_current_active_user, get_password_hash ) from models import User router = APIRouter(prefix="/auth", tags=["Authentication"]) settings = get_settings() @router.post("/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def signup(user_data: UserCreate, db: Session = Depends(get_db)): """Register a new user""" # Check if user exists existing_user = get_user_by_email(db, user_data.email) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create user user = create_user(db, user_data) return user @router.post("/login", response_model=Token) def login( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ): """Login and get access token""" user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) access_token = create_access_token( data={"sub": user.email}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @router.get("/me", response_model=UserResponse) def get_current_user_info(current_user: User = Depends(get_current_active_user)): """Get current user information""" return current_user @router.put("/me", response_model=UserResponse) def update_current_user( user_update: UserUpdate, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Update current user information""" update_data = user_update.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(current_user, field, value) db.commit() db.refresh(current_user) return current_user @router.post("/change-password") def change_password( current_password: str, new_password: str, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Change user password""" from auth import verify_password if not verify_password(current_password, current_user.hashed_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect current password" ) current_user.hashed_password = get_password_hash(new_password) db.commit() return {"message": "Password updated successfully"} @router.delete("/me") def delete_account( current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """Delete user account""" db.delete(current_user) db.commit() return {"message": "Account deleted successfully"}