|
| 1 | +import logging |
| 2 | + |
| 3 | +from django.utils import timezone |
| 4 | +from drf_spectacular.extensions import OpenApiAuthenticationExtension |
| 5 | +from rest_framework.authentication import BaseAuthentication |
| 6 | +from rest_framework.exceptions import AuthenticationFailed |
| 7 | + |
| 8 | +from common.models import PersonalAccessToken |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | +PAT_PREFIX = "bcrm_pat_" |
| 13 | + |
| 14 | + |
| 15 | +def _extract_raw(request): |
| 16 | + """Pull a bcrm_pat_ token from Authorization: Bearer or Token header.""" |
| 17 | + auth = request.headers.get("Authorization", "") |
| 18 | + if auth.startswith("Bearer "): |
| 19 | + candidate = auth[len("Bearer "):].strip() |
| 20 | + if candidate.startswith(PAT_PREFIX): |
| 21 | + return candidate |
| 22 | + token = request.headers.get("Token", "") |
| 23 | + if token.startswith(PAT_PREFIX): |
| 24 | + return token |
| 25 | + return None |
| 26 | + |
| 27 | + |
| 28 | +def resolve_valid_pat(raw): |
| 29 | + """Look up and validate a raw PAT. |
| 30 | +
|
| 31 | + Returns the PersonalAccessToken (with profile/user/org pre-fetched) or |
| 32 | + raises AuthenticationFailed. Shared by the GetProfileAndOrg middleware |
| 33 | + (which must set org context before RequireOrgContext runs) and the DRF |
| 34 | + PATAuthentication class, so the lookup/validation logic lives in one place. |
| 35 | + """ |
| 36 | + try: |
| 37 | + pat = PersonalAccessToken.objects.select_related( |
| 38 | + "profile", "profile__user", "org" |
| 39 | + ).get(token_hash=PersonalAccessToken.hash_token(raw)) |
| 40 | + except PersonalAccessToken.DoesNotExist as exc: |
| 41 | + logger.warning("Invalid PAT attempted") |
| 42 | + raise AuthenticationFailed("Invalid token") from exc |
| 43 | + if not pat.is_valid(): |
| 44 | + raise AuthenticationFailed("Token revoked or expired") |
| 45 | + if not pat.profile.is_active or not pat.org.is_active: |
| 46 | + raise AuthenticationFailed("Token owner or org is inactive") |
| 47 | + return pat |
| 48 | + |
| 49 | + |
| 50 | +class PATAuthentication(BaseAuthentication): |
| 51 | + """Authenticate an agent AS the token's owning Profile (inherits role+org).""" |
| 52 | + |
| 53 | + def authenticate(self, request): |
| 54 | + raw = _extract_raw(request) |
| 55 | + if not raw: |
| 56 | + return None # Not a PAT — let JWT / org-key auth handle it. |
| 57 | + |
| 58 | + # The GetProfileAndOrg middleware resolves the PAT first (so org |
| 59 | + # context is set before RequireOrgContext runs) and stashes it on the |
| 60 | + # request. Reuse it to avoid a second DB lookup and a double |
| 61 | + # last_used_at write. Fall back to resolving here for any code path |
| 62 | + # that bypasses the middleware (e.g. RequestFactory unit tests). |
| 63 | + pat = getattr(request, "_pat", None) |
| 64 | + if pat is None: |
| 65 | + pat = resolve_valid_pat(raw) |
| 66 | + |
| 67 | + profile = pat.profile |
| 68 | + |
| 69 | + request.profile = profile |
| 70 | + request.org = pat.org |
| 71 | + request.META["org"] = str(pat.org.id) |
| 72 | + request.META["mcp_token_id"] = str(pat.id) |
| 73 | + |
| 74 | + now = timezone.now() |
| 75 | + if pat.last_used_at is None or (now - pat.last_used_at).total_seconds() > 60: |
| 76 | + PersonalAccessToken.objects.filter(pk=pat.pk).update(last_used_at=now) |
| 77 | + pat.last_used_at = now |
| 78 | + |
| 79 | + return (profile.user, pat) |
| 80 | + |
| 81 | + |
| 82 | +class PATAuthenticationScheme(OpenApiAuthenticationExtension): |
| 83 | + target_class = "common.pat_auth.PATAuthentication" |
| 84 | + name = "PersonalAccessToken" |
| 85 | + |
| 86 | + def get_security_definition(self, auto_schema): |
| 87 | + return { |
| 88 | + "type": "http", |
| 89 | + "scheme": "bearer", |
| 90 | + "description": "Personal access token (bcrm_pat_…) for agent/MCP access", |
| 91 | + } |
0 commit comments