Source code for ccbenefits.routers.user_cards

from datetime import date

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import or_
from sqlalchemy.orm import Session, joinedload

from ..database import get_db
from ..dependencies import get_current_user
from ..metrics import cards_added_counter
from ..models import (
    BenefitTemplate,
    BenefitUsage,
    CardTemplate,
    RedemptionType,
    User,
    UserBenefitSetting,
    UserCard,
)
from ..schemas import (
    BenefitSettingUpdate,
    BenefitStatusOut,
    BenefitUsageCreate,
    BenefitUsageOut,
    CardCloseRequest,
    PeriodSegment,
    UserCardCreate,
    UserCardDetailOut,
    UserCardOut,
    UserCardSummaryOut,
    UserCardUpdate,
)
from ..utils import compute_annual_max, get_all_periods_in_year, get_current_period

router = APIRouter(prefix="/api/user-cards", tags=["user-cards"])


[docs] @router.post("/", response_model=UserCardOut, status_code=201) def create_user_card( data: UserCardCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): card_template = db.query(CardTemplate).filter(CardTemplate.id == data.card_template_id).first() if not card_template: raise HTTPException(status_code=404, detail="Card template not found") user_card = UserCard( user_id=current_user.id, card_template_id=data.card_template_id, nickname=data.nickname, member_since_date=data.member_since_date, ) db.add(user_card) db.commit() db.refresh(user_card) cards_added_counter.add(1) user_card.card_template = card_template return _to_user_card_out(user_card)
[docs] @router.patch("/{user_card_id}", response_model=UserCardOut) def update_user_card( user_card_id: int, data: UserCardUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = ( db.query(UserCard) .options(joinedload(UserCard.card_template)) .filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id) .first() ) if not uc: raise HTTPException(status_code=404, detail="User card not found") if data.renewal_date is not None: uc.renewal_date = data.renewal_date elif "renewal_date" in data.model_fields_set: uc.renewal_date = None db.commit() db.refresh(uc) return _to_user_card_out(uc)
def _to_user_card_out(uc: UserCard) -> UserCardOut: """Build UserCardOut from a UserCard with card_template loaded.""" ct = uc.card_template return UserCardOut( id=uc.id, card_template_id=uc.card_template_id, card_name=ct.name, issuer=ct.issuer, annual_fee=ct.annual_fee, nickname=uc.nickname, member_since_date=uc.member_since_date, renewal_date=uc.renewal_date, closed_date=uc.closed_date, is_active=uc.is_active, created_at=uc.created_at, ) def _get_available_years(uc: UserCard) -> list[int]: """Compute available years for a card based on member_since and closed_date.""" today = date.today() start_year = uc.member_since_date.year if uc.member_since_date else uc.created_at.year end_year = uc.closed_date.year if uc.closed_date else today.year return list(range(start_year, end_year + 1)) def _query_user_cards(db: Session, user_id: int, year: int): """Query user cards with year-aware filtering (includes closed cards active in the given year).""" return ( db.query(UserCard) .filter( UserCard.user_id == user_id, # Show cards that were active at any point during the requested year or_( UserCard.closed_date.is_(None), UserCard.closed_date >= date(year, 1, 1), ), # Exclude cards opened after the requested year or_( UserCard.member_since_date.is_(None), UserCard.member_since_date <= date(year, 12, 31), ), ) .options( joinedload(UserCard.card_template).joinedload(CardTemplate.benefits), joinedload(UserCard.usages), joinedload(UserCard.benefit_settings), ) .all() )
[docs] @router.get("/", response_model=list[UserCardSummaryOut]) def list_user_cards( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), year: int | None = Query(default=None, ge=2000, le=2100, description="Year to compute summary for"), ): year = year or date.today().year user_cards = _query_user_cards(db, current_user.id, year) result = [] for uc in user_cards: summary = _compute_summary(uc, year) summary.available_years = _get_available_years(uc) result.append(summary) return result
[docs] @router.get("/details", response_model=list[UserCardDetailOut]) def list_user_card_details( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), year: int | None = Query(default=None, ge=2000, le=2100, description="Year to compute details for"), ): year = year or date.today().year user_cards = _query_user_cards(db, current_user.id, year) result = [] for uc in user_cards: summary = _compute_summary(uc, year) benefits_status = _compute_benefits_status(uc, year) result.append( UserCardDetailOut( id=uc.id, card_template_id=uc.card_template_id, card_name=uc.card_template.name, issuer=uc.card_template.issuer, annual_fee=uc.card_template.annual_fee, nickname=uc.nickname, member_since_date=uc.member_since_date, renewal_date=uc.renewal_date, closed_date=uc.closed_date, is_active=uc.is_active, available_years=_get_available_years(uc), ytd_actual_used=summary.ytd_actual_used, utilization_pct=summary.utilization_pct, benefits_status=benefits_status, ) ) result.sort(key=lambda x: x.utilization_pct) return result
[docs] @router.get("/{user_card_id}", response_model=UserCardDetailOut) def get_user_card_detail( user_card_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), year: int | None = Query(default=None, ge=2000, le=2100, description="Year to compute details for"), ): year = year or date.today().year uc = ( db.query(UserCard) .options( joinedload(UserCard.card_template).joinedload(CardTemplate.benefits), joinedload(UserCard.usages), joinedload(UserCard.benefit_settings), ) .filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id) .first() ) if not uc: raise HTTPException(status_code=404, detail="User card not found") summary = _compute_summary(uc, year) benefits_status = _compute_benefits_status(uc, year) return UserCardDetailOut( id=uc.id, card_template_id=uc.card_template_id, card_name=uc.card_template.name, issuer=uc.card_template.issuer, annual_fee=uc.card_template.annual_fee, nickname=uc.nickname, member_since_date=uc.member_since_date, renewal_date=uc.renewal_date, closed_date=uc.closed_date, is_active=uc.is_active, available_years=_get_available_years(uc), ytd_actual_used=summary.ytd_actual_used, utilization_pct=summary.utilization_pct, benefits_status=benefits_status, )
[docs] @router.delete("/{user_card_id}", status_code=204) def delete_user_card( user_card_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserCard).filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id).first() if not uc: raise HTTPException(status_code=404, detail="User card not found") db.delete(uc) db.commit()
[docs] @router.put("/{user_card_id}/close", response_model=UserCardOut) def close_user_card( user_card_id: int, data: CardCloseRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = ( db.query(UserCard) .options(joinedload(UserCard.card_template)) .filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id) .first() ) if not uc: raise HTTPException(status_code=404, detail="User card not found") if uc.closed_date is not None: raise HTTPException(status_code=400, detail="Card is already closed") if uc.member_since_date and data.closed_date < uc.member_since_date: raise HTTPException(status_code=400, detail="Close date cannot be before membership date") uc.closed_date = data.closed_date uc.is_active = False # backward compat db.commit() db.refresh(uc) return _to_user_card_out(uc)
[docs] @router.put("/{user_card_id}/reopen", response_model=UserCardOut) def reopen_user_card( user_card_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = ( db.query(UserCard) .options(joinedload(UserCard.card_template)) .filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id) .first() ) if not uc: raise HTTPException(status_code=404, detail="User card not found") if uc.closed_date is None: raise HTTPException(status_code=400, detail="Card is not closed") uc.closed_date = None uc.is_active = True # backward compat db.commit() db.refresh(uc) return _to_user_card_out(uc)
[docs] @router.post("/{user_card_id}/usage", response_model=BenefitUsageOut, status_code=201) def log_usage( user_card_id: int, data: BenefitUsageCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserCard).filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id).first() if not uc: raise HTTPException(status_code=404, detail="User card not found") # Date-range validation (replaces blanket is_active gate) target = data.target_date or date.today() if uc.member_since_date and target < uc.member_since_date: raise HTTPException(status_code=400, detail="Cannot log usage before card membership date") if uc.closed_date and target > uc.closed_date: raise HTTPException(status_code=400, detail="Cannot log usage after card close date") benefit = db.query(BenefitTemplate).filter(BenefitTemplate.id == data.benefit_template_id).first() if not benefit: raise HTTPException(status_code=404, detail="Benefit template not found") if benefit.card_template_id != uc.card_template_id: raise HTTPException(status_code=400, detail="Benefit does not belong to this card") # Coerce binary benefits amount = data.amount_used if benefit.redemption_type == RedemptionType.binary: amount = benefit.max_value if amount > 0 else 0.0 if amount > benefit.max_value: raise HTTPException( status_code=400, detail=f"Amount {amount} exceeds max value {benefit.max_value}", ) period_start, period_end = get_current_period(benefit.period_type, target) # Check for existing usage in this period existing = ( db.query(BenefitUsage) .filter( BenefitUsage.user_card_id == user_card_id, BenefitUsage.benefit_template_id == data.benefit_template_id, BenefitUsage.period_start_date == period_start, ) .first() ) if existing: raise HTTPException( status_code=409, detail="Usage already logged for this benefit in this period. Use PUT to update.", ) usage = BenefitUsage( user_card_id=user_card_id, benefit_template_id=data.benefit_template_id, period_start_date=period_start, period_end_date=period_end, amount_used=amount, notes=data.notes, ) db.add(usage) db.commit() db.refresh(usage) return BenefitUsageOut( id=usage.id, benefit_template_id=usage.benefit_template_id, benefit_name=benefit.name, period_start_date=usage.period_start_date, period_end_date=usage.period_end_date, amount_used=usage.amount_used, notes=usage.notes, created_at=usage.created_at, )
[docs] @router.put( "/{user_card_id}/benefits/{benefit_template_id}/setting", response_model=dict, ) def upsert_benefit_setting( user_card_id: int, benefit_template_id: int, data: BenefitSettingUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): uc = db.query(UserCard).filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id).first() if not uc: raise HTTPException(status_code=404, detail="User card not found") benefit = db.query(BenefitTemplate).filter(BenefitTemplate.id == benefit_template_id).first() if not benefit: raise HTTPException(status_code=404, detail="Benefit template not found") if benefit.card_template_id != uc.card_template_id: raise HTTPException(status_code=400, detail="Benefit does not belong to this card") setting = ( db.query(UserBenefitSetting) .filter( UserBenefitSetting.user_card_id == user_card_id, UserBenefitSetting.benefit_template_id == benefit_template_id, ) .first() ) if setting: setting.perceived_max_value = data.perceived_max_value else: setting = UserBenefitSetting( user_card_id=user_card_id, benefit_template_id=benefit_template_id, perceived_max_value=data.perceived_max_value, ) db.add(setting) db.commit() return {"perceived_max_value": data.perceived_max_value}
[docs] @router.get("/{user_card_id}/summary", response_model=UserCardSummaryOut) def get_card_summary( user_card_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), year: int | None = Query(default=None, ge=2000, le=2100, description="Year to compute summary for"), ): year = year or date.today().year uc = ( db.query(UserCard) .options( joinedload(UserCard.card_template).joinedload(CardTemplate.benefits), joinedload(UserCard.usages), joinedload(UserCard.benefit_settings), ) .filter(UserCard.id == user_card_id, UserCard.user_id == current_user.id) .first() ) if not uc: raise HTTPException(status_code=404, detail="User card not found") summary = _compute_summary(uc, year) summary.available_years = _get_available_years(uc) return summary
def _compute_benefits_status(uc: UserCard, year: int) -> list[BenefitStatusOut]: settings_map = {s.benefit_template_id: s.perceived_max_value for s in uc.benefit_settings} today = date.today() is_current_year = year == today.year # For past/future years, use last day of year as reference for "current period" reference_date = today if is_current_year else date(year, 12, 31) # Index usages by (benefit_id, period_start) usage_map: dict[tuple[int, date], "BenefitUsage"] = {} for u in uc.usages: usage_map[(u.benefit_template_id, u.period_start_date)] = u result = [] for benefit in uc.card_template.benefits: period_start, period_end = get_current_period(benefit.period_type, reference_date) days_remaining = max(0, (period_end - today).days) if is_current_year else 0 usage = usage_map.get((benefit.id, period_start)) amount_used = usage.amount_used if usage else 0.0 perceived_max = settings_map.get(benefit.id, benefit.max_value) if benefit.max_value > 0: utilized_perceived = amount_used * (perceived_max / benefit.max_value) else: utilized_perceived = 0.0 # Build period segments for the year all_periods = get_all_periods_in_year(benefit.period_type, year) segments = [] for p_start, p_end, label in all_periods: p_usage = usage_map.get((benefit.id, p_start)) is_future = p_start > today segments.append( PeriodSegment( label=label, period_start_date=p_start, period_end_date=p_end, amount_used=p_usage.amount_used if p_usage else 0.0, usage_id=p_usage.id if p_usage else None, is_used=p_usage is not None and p_usage.amount_used > 0, is_current=p_start == period_start, is_future=is_future, ) ) result.append( BenefitStatusOut( benefit_template_id=benefit.id, usage_id=usage.id if usage else None, name=benefit.name, description=benefit.description, max_value=benefit.max_value, period_type=benefit.period_type, redemption_type=benefit.redemption_type, category=benefit.category, period_start_date=period_start, period_end_date=period_end, days_remaining=days_remaining, amount_used=amount_used, remaining=max(0, benefit.max_value - amount_used), perceived_max_value=perceived_max, utilized_perceived_value=round(utilized_perceived, 2), is_used=usage is not None and usage.amount_used > 0, periods=segments, ) ) return result def _compute_summary(uc: UserCard, year: int) -> UserCardSummaryOut: settings_map = {s.benefit_template_id: s.perceived_max_value for s in uc.benefit_settings} today = date.today() is_current_year = year == today.year is_past_year = year < today.year total_annual_max = sum( compute_annual_max(b.max_value, b.period_type) for b in uc.card_template.benefits ) total_perceived_annual = sum( compute_annual_max(settings_map.get(b.id, b.max_value), b.period_type) for b in uc.card_template.benefits ) # YTD: sum all usage in the requested year ytd_actual = 0.0 ytd_perceived = 0.0 benefits_used_ids = set() benefits_by_id = {b.id: b for b in uc.card_template.benefits} for usage in uc.usages: if usage.period_start_date.year == year: ytd_actual += usage.amount_used benefit = benefits_by_id.get(usage.benefit_template_id) if benefit and benefit.max_value > 0: perceived_max = settings_map.get(benefit.id, benefit.max_value) ytd_perceived += usage.amount_used * (perceived_max / benefit.max_value) if is_current_year: # Check if benefit is used in current period period_start, _ = get_current_period( benefit.period_type if benefit else "annual", today ) if usage.period_start_date == period_start and usage.amount_used > 0: benefits_used_ids.add(usage.benefit_template_id) else: # For past/future years, count any benefit with usage as "used" if usage.amount_used > 0: benefits_used_ids.add(usage.benefit_template_id) # Prorated max: only count periods that have started prorated_max = 0.0 for benefit in uc.card_template.benefits: annual_max = compute_annual_max(benefit.max_value, benefit.period_type) if is_past_year: # Past year: all periods have elapsed prorated_max += annual_max elif is_current_year: # Current year: count periods that have started if benefit.period_type == "monthly": periods_elapsed = today.month prorated_max += benefit.max_value * periods_elapsed elif benefit.period_type == "quarterly": quarters_elapsed = ((today.month - 1) // 3) + 1 prorated_max += benefit.max_value * quarters_elapsed elif benefit.period_type == "semiannual": halves_elapsed = 1 if today.month <= 6 else 2 prorated_max += benefit.max_value * halves_elapsed else: # annual prorated_max += annual_max # Future year: prorated_max stays 0 utilization_pct = (ytd_actual / prorated_max * 100) if prorated_max > 0 else 0.0 return UserCardSummaryOut( id=uc.id, card_name=uc.card_template.name, issuer=uc.card_template.issuer, nickname=uc.nickname, annual_fee=uc.card_template.annual_fee, total_max_annual_value=round(total_annual_max, 2), total_perceived_annual_value=round(total_perceived_annual, 2), ytd_actual_used=round(ytd_actual, 2), ytd_perceived_value=round(ytd_perceived, 2), net_actual=round(ytd_actual - uc.card_template.annual_fee, 2), net_perceived=round(ytd_perceived - uc.card_template.annual_fee, 2), utilization_pct=round(utilization_pct, 2), benefit_count=len(uc.card_template.benefits), benefits_used_count=len(benefits_used_ids), )