- Add findagram.co React frontend with product search, brands, categories - Add findadispo.com React frontend with dispensary locator - Wire findagram to backend /api/az/* endpoints - Update category/brand links to route to /products with filters - Add k8s manifests for both frontends - Add multi-domain user support migrations 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
109 lines
3.3 KiB
Python
109 lines
3.3 KiB
Python
from datetime import timedelta
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db
|
|
from config import get_settings
|
|
from auth import (
|
|
UserCreate, UserUpdate, UserResponse, Token,
|
|
authenticate_user, create_user, create_access_token,
|
|
get_user_by_email, get_current_active_user, get_password_hash
|
|
)
|
|
from models import User
|
|
|
|
router = APIRouter(prefix="/auth", tags=["Authentication"])
|
|
settings = get_settings()
|
|
|
|
|
|
@router.post("/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
def signup(user_data: UserCreate, db: Session = Depends(get_db)):
|
|
"""Register a new user"""
|
|
# Check if user exists
|
|
existing_user = get_user_by_email(db, user_data.email)
|
|
if existing_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Email already registered"
|
|
)
|
|
|
|
# Create user
|
|
user = create_user(db, user_data)
|
|
return user
|
|
|
|
|
|
@router.post("/login", response_model=Token)
|
|
def login(
|
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Login and get access token"""
|
|
user = authenticate_user(db, form_data.username, form_data.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect email or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
|
|
access_token = create_access_token(
|
|
data={"sub": user.email}, expires_delta=access_token_expires
|
|
)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def get_current_user_info(current_user: User = Depends(get_current_active_user)):
|
|
"""Get current user information"""
|
|
return current_user
|
|
|
|
|
|
@router.put("/me", response_model=UserResponse)
|
|
def update_current_user(
|
|
user_update: UserUpdate,
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Update current user information"""
|
|
update_data = user_update.model_dump(exclude_unset=True)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(current_user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(current_user)
|
|
return current_user
|
|
|
|
|
|
@router.post("/change-password")
|
|
def change_password(
|
|
current_password: str,
|
|
new_password: str,
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Change user password"""
|
|
from auth import verify_password
|
|
|
|
if not verify_password(current_password, current_user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Incorrect current password"
|
|
)
|
|
|
|
current_user.hashed_password = get_password_hash(new_password)
|
|
db.commit()
|
|
return {"message": "Password updated successfully"}
|
|
|
|
|
|
@router.delete("/me")
|
|
def delete_account(
|
|
current_user: User = Depends(get_current_active_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Delete user account"""
|
|
db.delete(current_user)
|
|
db.commit()
|
|
return {"message": "Account deleted successfully"}
|