Files
cannaiq/findadispo/backend/auth.py
Kelly a0f8d3911c feat: Add Findagram and FindADispo consumer frontends
- Add findagram.co React frontend with product search, brands, categories
- Add findadispo.com React frontend with dispensary locator
- Wire findagram to backend /api/az/* endpoints
- Update category/brand links to route to /products with filters
- Add k8s manifests for both frontends
- Add multi-domain user support migrations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-05 16:10:15 -07:00

140 lines
3.9 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from config import get_settings
from database import get_db
from models import User
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
# Pydantic models
class UserCreate(BaseModel):
email: EmailStr
password: str
full_name: str
phone: Optional[str] = None
default_location: Optional[str] = None
class UserUpdate(BaseModel):
full_name: Optional[str] = None
phone: Optional[str] = None
default_location: Optional[str] = None
notify_price_alerts: Optional[bool] = None
notify_new_dispensaries: Optional[bool] = None
notify_weekly_digest: Optional[bool] = None
notify_promotions: Optional[bool] = None
class UserResponse(BaseModel):
id: int
email: str
full_name: Optional[str]
phone: Optional[str]
default_location: Optional[str]
is_active: bool
is_verified: bool
notify_price_alerts: bool
notify_new_dispensaries: bool
notify_weekly_digest: bool
notify_promotions: bool
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def get_user_by_email(db: Session, email: str) -> Optional[User]:
return db.query(User).filter(User.email == email).first()
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
user = get_user_by_email(db, email)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def create_user(db: Session, user_data: UserCreate) -> User:
hashed_password = get_password_hash(user_data.password)
db_user = User(
email=user_data.email,
hashed_password=hashed_password,
full_name=user_data.full_name,
phone=user_data.phone,
default_location=user_data.default_location,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError:
raise credentials_exception
user = get_user_by_email(db, email=token_data.email)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user