Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Once this is done you can now make the following calls against the vehicle manag

login(self)

#OTP Details are Alpha - not used yet.
#OTP Details are Alpha - not used yet.

#Sent OTP
send_otp(self, method)
Expand Down
13 changes: 11 additions & 2 deletions hyundai_kia_connect_api/ApiImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
GEO_LOCATION_PROVIDERS,
OPENSTREETMAP,
GOOGLE,
OTP_NOTIFY_TYPE,
)

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,6 +57,7 @@ class WindowRequestOptions:
@dataclass
class OTPRequest:
request_id: str | None
otp_key: str | None
has_email: bool | None
has_sms: bool | None
email: str | None
Expand Down Expand Up @@ -100,11 +102,18 @@ def login(
"""Login into cloud endpoints and return Token or OTP Details if OTP is triggered"""
pass

def send_otp(self, otp_request: OTPRequest, otp_destination: str, otp_via: str) -> None:
def send_otp(self, otp_request: OTPRequest, notify_type: OTP_NOTIFY_TYPE) -> None:
"""Sends OTP to the user via selected destination and via"""
pass

def verify_otp(self, otp_request: OTPRequest, otp_code: str) -> Token:
def verify_otp_and_complete_login(
self,
username: str,
password: str,
otp_code: str,
otp_request: OTPRequest,
pin: str | None = None,
) -> Token:
"""Confirms OTP code sent to the user"""
pass

Expand Down
168 changes: 18 additions & 150 deletions hyundai_kia_connect_api/KiaUvoApiUSA.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context

from .ApiImpl import ApiImpl, ClimateRequestOptions
from .ApiImpl import ApiImpl, ClimateRequestOptions, OTPRequest
from .Token import Token
from .Vehicle import Vehicle
from .const import (
Expand All @@ -25,9 +25,9 @@
ORDER_STATUS,
TEMPERATURE_UNITS,
VEHICLE_LOCK_ACTION,
OTP_NOTIFY_TYPE,
)
from .utils import get_child_value, parse_datetime
from .exceptions import AuthenticationError


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -240,92 +240,22 @@ def _complete_login_with_otp(
)
return final_sid

def start_login(
self,
username: str,
password: str,
token: Token | None = None,
) -> tuple[Token | None, dict | None]:
"""Start login and return either a Token or an OTP context.

Parameters
----------
username : str
User email address
password : str
User password
token : Token | None
Existing token with stored rmtoken for reuse

Returns
-------
tuple[Token | None, dict | None]
(Token, None) if login succeeded without OTP, otherwise (None, ctx)
where ctx contains 'otpKey', 'xid', 'email', 'phone', 'hasEmail', 'hasPhone'.
"""
url = self.API_URL + "prof/authUser"
data = {
"deviceKey": self.device_id,
"deviceType": 2,
"userCredential": {"userId": username, "password": password},
}
headers = self.api_headers()
if token and getattr(token, "device_id", None):
self.device_id = token.device_id

if token and token.refresh_token:
_LOGGER.debug(f"{DOMAIN} - Attempting start_login with stored rmtoken")
headers["rmtoken"] = token.refresh_token
response = self.session.post(url, json=data, headers=headers)
_LOGGER.debug(f"{DOMAIN} - Start Sign In Response {response.text}")
response_json = response.json()
session_id = response.headers.get("sid")
if session_id:
_LOGGER.debug(f"got session id {session_id}")
valid_until = dt.datetime.now(dt.timezone.utc) + LOGIN_TOKEN_LIFETIME
existing_rmtoken = token.refresh_token if token else None
return (
Token(
username=username,
password=password,
access_token=session_id,
refresh_token=existing_rmtoken,
valid_until=valid_until,
device_id=self.device_id,
),
None,
)
if "payload" in response_json and "otpKey" in response_json["payload"]:
payload = response_json["payload"]
xid = response.headers.get("xid", "")
ctx = {
"otpKey": payload["otpKey"],
"xid": xid,
"email": payload.get("email"),
"phone": payload.get("phone"),
"hasEmail": bool(payload.get("hasEmail")),
"hasPhone": bool(payload.get("hasPhone")),
"rmTokenExpired": bool(payload.get("rmTokenExpired")),
}
return None, ctx
raise Exception(
f"{DOMAIN} - No session id returned in start_login. Response: {response.text}"
)

def send_otp(self, otp_key: str, notify_type: str, xid: str) -> dict:
def send_otp(self, otp_request: OTPRequest, notify_type: OTP_NOTIFY_TYPE) -> dict:
"""Public helper to send OTP to the selected destination."""
return self._send_otp(otp_key, notify_type, xid)
return self._send_otp(otp_request.otp_key, notify_type, otp_request.request_id)

def verify_otp_and_complete_login(
self,
username: str,
password: str,
otp_key: str,
xid: str,
otp_code: str,
otp_request: OTPRequest,
pin: str | None,
) -> Token:
"""Verify OTP and complete the login producing a Token."""
sid, rmtoken = self._verify_otp(otp_key, otp_code, xid)
sid, rmtoken = self._verify_otp(
otp_request.otp_key, otp_code, otp_request.request_id
)
final_sid = self._complete_login_with_otp(username, password, sid, rmtoken)
_LOGGER.debug(f"got final session id {final_sid}")
_LOGGER.info(f"{DOMAIN} - Storing rmtoken for future logins")
Expand All @@ -337,6 +267,7 @@ def verify_otp_and_complete_login(
refresh_token=rmtoken,
valid_until=valid_until,
device_id=self.device_id,
pin=pin,
)

def login(
Expand All @@ -359,7 +290,7 @@ def login(
Existing token with stored rmtoken for reuse
otp_handler : Callable[[dict], dict], optional
Non-interactive OTP handler. Called twice:
- stage='choose_destination' -> return {'notify_type': 'EMAIL'|'PHONE'}
- stage='choose_destination' -> return {'notify_type': 'EMAIL'|'SMS'}
- stage='input_code' -> return {'otp_code': '<code>'}
pin : str, optional

Expand Down Expand Up @@ -404,76 +335,13 @@ def login(
payload = response_json["payload"]
if payload.get("rmTokenExpired"):
_LOGGER.info(f"{DOMAIN} - Stored rmtoken has expired, need new OTP")
otp_key = payload["otpKey"]
xid = response.headers.get("xid", "")
_LOGGER.info(f"{DOMAIN} - OTP required for login")
_LOGGER.info(f"{DOMAIN} - Email: {payload.get('email', 'N/A')}")
_LOGGER.info(f"{DOMAIN} - Phone: {payload.get('phone', 'N/A')}")
notify_type = "EMAIL"
handler = otp_handler or getattr(self, "_otp_handler", None)
if handler:
try:
ctx_choice = {
"stage": "choose_destination",
"hasEmail": bool(payload.get("hasEmail")),
"hasPhone": bool(payload.get("hasPhone")),
"email": payload.get("email"),
"phone": payload.get("phone"),
}
res = handler(ctx_choice) or {}
nt = str(res.get("notify_type", notify_type)).upper()
if nt in ("EMAIL", "PHONE"):
notify_type = nt
except Exception:
_LOGGER.debug(
f"{DOMAIN} - otp_handler choose_destination failed; using default"
)
else:
if payload.get("hasEmail") and payload.get("hasPhone"):
print("\nOTP Authentication Required")
print(f"Email: {payload.get('email', 'N/A')}")
print(f"Phone: {payload.get('phone', 'N/A')}")
choice = (
input("Send OTP to (E)mail or (P)hone? [E/P]: ").strip().upper()
)
if choice == "P":
notify_type = "PHONE"
elif payload.get("hasPhone"):
notify_type = "PHONE"
self._send_otp(otp_key, notify_type, xid)
if not handler:
print(f"\nOTP sent to {notify_type.lower()}")
otp_code = None
if handler:
try:
ctx_code = {
"stage": "input_code",
"notify_type": notify_type,
"otpKey": otp_key,
"xid": xid,
}
res2 = handler(ctx_code) or {}
otp_code = str(res2.get("otp_code", "")).strip()
except Exception:
_LOGGER.debug(f"{DOMAIN} - otp_handler input_code failed")
if not otp_code:
if handler is None:
otp_code = input("Enter OTP code: ").strip()
else:
raise AuthenticationError(f"{DOMAIN} - OTP code required")
sid, rmtoken = self._verify_otp(otp_key, otp_code, xid)
final_sid = self._complete_login_with_otp(username, password, sid, rmtoken)
_LOGGER.debug(f"got final session id {final_sid}")
_LOGGER.info(f"{DOMAIN} - Storing rmtoken for future logins")
valid_until = dt.datetime.now(dt.timezone.utc) + LOGIN_TOKEN_LIFETIME
return Token(
username=username,
password=password,
access_token=final_sid,
refresh_token=rmtoken,
valid_until=valid_until,
device_id=self.device_id,
pin=pin,
return OTPRequest(
otp_key=payload["otpKey"],
request_id=response.headers.get("xid", ""),
email=payload.get("email"),
phone=payload.get("phone"),
has_email=bool(payload.get("hasEmail")),
has_phone=bool(payload.get("hasPhone")),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
raise Exception(
f"{DOMAIN} - No session id returned in login. Response: {response.text} headers {response.headers} cookies {response.cookies}"
Expand Down
19 changes: 13 additions & 6 deletions hyundai_kia_connect_api/VehicleManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
REGIONS,
VALET_MODE_ACTION,
VEHICLE_LOCK_ACTION,
OTP_NOTIFY_TYPE,
)
from .exceptions import APIError, AuthenticationOTPRequired
from .HyundaiBlueLinkApiBR import HyundaiBlueLinkApiBR
Expand Down Expand Up @@ -106,11 +107,17 @@ def login(self) -> bool | OTPRequest:
self.otp_request = result
return result

def send_otp(self, otp_destination: str, otp_via: str) -> None:
self.api.send_otp(self.otp_request, otp_destination, otp_via)

def verify_otp(self, otp_code: str) -> None:
self.token = self.api.verify_otp(self.otp_request, otp_code)
def send_otp(self, notify_type: OTP_NOTIFY_TYPE) -> None:
self.api.send_otp(self.otp_request, notify_type)

def verify_otp_and_complete_login(self, otp_code: str) -> None:
self.token = self.api.verify_otp_and_complete_login(
username=self.username,
password=self.password,
otp_code=otp_code,
otp_request=self.otp_request,
pin=self.pin,
)
self.initialize_vehicles()

def initialize_vehicles(self):
Expand Down Expand Up @@ -207,7 +214,7 @@ def check_and_refresh_token(self) -> bool:
if isinstance(result, Token):
self.token: Token = result
self.initialize_vehicles()
if isinstance(result, OTPOptions):
if isinstance(result, OTPRequest):
raise AuthenticationOTPRequired("OTP required to refresh token")
self.vehicles = self.api.refresh_vehicles(self.token, self.vehicles)
return True
Expand Down
5 changes: 5 additions & 0 deletions hyundai_kia_connect_api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ class WINDOW_STATE(IntEnum):
class VALET_MODE_ACTION(Enum):
ACTIVATE = "activate"
DEACTIVATE = "deactivate"


class OTP_NOTIFY_TYPE(Enum):
EMAIL = "EMAIL"
SMS = "SMS"
Loading