Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Once this is done you can now make the following calls against the vehicle manag

login(self)

#OTP Details are Alpha - not used yet.
#OTP Details are Alpha - not used yet.
#Sent OTP
send_otp(self, method)

Expand Down
13 changes: 11 additions & 2 deletions hyundai_kia_connect_api/ApiImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
GEO_LOCATION_PROVIDERS,
OPENSTREETMAP,
GOOGLE,
OTP_NOTIFY_TYPE,
)

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,6 +57,7 @@ class WindowRequestOptions:
@dataclass
class OTPRequest:
request_id: str | None
otp_key: str | None
has_email: bool | None
has_sms: bool | None
email: str | None
Expand Down Expand Up @@ -100,11 +102,18 @@ def login(
"""Login into cloud endpoints and return Token or OTP Details if OTP is triggered"""
pass

def send_otp(self, otp_request: OTPRequest, otp_destination: str, otp_via: str) -> None:
def send_otp(self, otp_request: OTPRequest, notify_type: OTP_NOTIFY_TYPE) -> None:
"""Sends OTP to the user via selected destination and via"""
pass

def verify_otp(self, otp_request: OTPRequest, otp_code: str) -> Token:
def verify_otp_and_complete_login(
self,
username: str,
password: str,
otp_code: str,
otp_request: OTPRequest,
pin: str | None = None,
) -> Token:
"""Confirms OTP code sent to the user"""
pass

Expand Down
166 changes: 17 additions & 149 deletions hyundai_kia_connect_api/KiaUvoApiUSA.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context

from .ApiImpl import ApiImpl, ClimateRequestOptions
from .ApiImpl import ApiImpl, ClimateRequestOptions, OTPRequest
from .Token import Token
from .Vehicle import Vehicle
from .const import (
Expand All @@ -25,9 +25,9 @@
ORDER_STATUS,
TEMPERATURE_UNITS,
VEHICLE_LOCK_ACTION,
OTP_NOTIFY_TYPE,
)
from .utils import get_child_value, parse_datetime
from .exceptions import AuthenticationError


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -240,92 +240,22 @@ def _complete_login_with_otp(
)
return final_sid

def start_login(
self,
username: str,
password: str,
token: Token | None = None,
) -> tuple[Token | None, dict | None]:
"""Start login and return either a Token or an OTP context.

Parameters
----------
username : str
User email address
password : str
User password
token : Token | None
Existing token with stored rmtoken for reuse

Returns
-------
tuple[Token | None, dict | None]
(Token, None) if login succeeded without OTP, otherwise (None, ctx)
where ctx contains 'otpKey', 'xid', 'email', 'phone', 'hasEmail', 'hasPhone'.
"""
url = self.API_URL + "prof/authUser"
data = {
"deviceKey": self.device_id,
"deviceType": 2,
"userCredential": {"userId": username, "password": password},
}
headers = self.api_headers()
if token and getattr(token, "device_id", None):
self.device_id = token.device_id

if token and token.refresh_token:
_LOGGER.debug(f"{DOMAIN} - Attempting start_login with stored rmtoken")
headers["rmtoken"] = token.refresh_token
response = self.session.post(url, json=data, headers=headers)
_LOGGER.debug(f"{DOMAIN} - Start Sign In Response {response.text}")
response_json = response.json()
session_id = response.headers.get("sid")
if session_id:
_LOGGER.debug(f"got session id {session_id}")
valid_until = dt.datetime.now(dt.timezone.utc) + LOGIN_TOKEN_LIFETIME
existing_rmtoken = token.refresh_token if token else None
return (
Token(
username=username,
password=password,
access_token=session_id,
refresh_token=existing_rmtoken,
valid_until=valid_until,
device_id=self.device_id,
),
None,
)
if "payload" in response_json and "otpKey" in response_json["payload"]:
payload = response_json["payload"]
xid = response.headers.get("xid", "")
ctx = {
"otpKey": payload["otpKey"],
"xid": xid,
"email": payload.get("email"),
"phone": payload.get("phone"),
"hasEmail": bool(payload.get("hasEmail")),
"hasPhone": bool(payload.get("hasPhone")),
"rmTokenExpired": bool(payload.get("rmTokenExpired")),
}
return None, ctx
raise Exception(
f"{DOMAIN} - No session id returned in start_login. Response: {response.text}"
)

def send_otp(self, otp_key: str, notify_type: str, xid: str) -> dict:
def send_otp(self, otp_request: OTPRequest, notify_type: OTP_NOTIFY_TYPE) -> dict:
"""Public helper to send OTP to the selected destination."""
return self._send_otp(otp_key, notify_type, xid)
return self._send_otp(otp_request.otp_key, notify_type, otp_request.request_id)

def verify_otp_and_complete_login(
self,
username: str,
password: str,
otp_key: str,
xid: str,
otp_code: str,
otp_request: OTPRequest,
pin: str | None,
) -> Token:
"""Verify OTP and complete the login producing a Token."""
sid, rmtoken = self._verify_otp(otp_key, otp_code, xid)
sid, rmtoken = self._verify_otp(
otp_request.otp_key, otp_code, otp_request.request_id
)
final_sid = self._complete_login_with_otp(username, password, sid, rmtoken)
_LOGGER.debug(f"got final session id {final_sid}")
_LOGGER.info(f"{DOMAIN} - Storing rmtoken for future logins")
Expand All @@ -337,6 +267,7 @@ def verify_otp_and_complete_login(
refresh_token=rmtoken,
valid_until=valid_until,
device_id=self.device_id,
pin=pin,
)

def login(
Expand Down Expand Up @@ -404,76 +335,13 @@ def login(
payload = response_json["payload"]
if payload.get("rmTokenExpired"):
_LOGGER.info(f"{DOMAIN} - Stored rmtoken has expired, need new OTP")
otp_key = payload["otpKey"]
xid = response.headers.get("xid", "")
_LOGGER.info(f"{DOMAIN} - OTP required for login")
_LOGGER.info(f"{DOMAIN} - Email: {payload.get('email', 'N/A')}")
_LOGGER.info(f"{DOMAIN} - Phone: {payload.get('phone', 'N/A')}")
notify_type = "EMAIL"
handler = otp_handler or getattr(self, "_otp_handler", None)
if handler:
try:
ctx_choice = {
"stage": "choose_destination",
"hasEmail": bool(payload.get("hasEmail")),
"hasPhone": bool(payload.get("hasPhone")),
"email": payload.get("email"),
"phone": payload.get("phone"),
}
res = handler(ctx_choice) or {}
nt = str(res.get("notify_type", notify_type)).upper()
if nt in ("EMAIL", "PHONE"):
notify_type = nt
except Exception:
_LOGGER.debug(
f"{DOMAIN} - otp_handler choose_destination failed; using default"
)
else:
if payload.get("hasEmail") and payload.get("hasPhone"):
print("\nOTP Authentication Required")
print(f"Email: {payload.get('email', 'N/A')}")
print(f"Phone: {payload.get('phone', 'N/A')}")
choice = (
input("Send OTP to (E)mail or (P)hone? [E/P]: ").strip().upper()
)
if choice == "P":
notify_type = "PHONE"
elif payload.get("hasPhone"):
notify_type = "PHONE"
self._send_otp(otp_key, notify_type, xid)
if not handler:
print(f"\nOTP sent to {notify_type.lower()}")
otp_code = None
if handler:
try:
ctx_code = {
"stage": "input_code",
"notify_type": notify_type,
"otpKey": otp_key,
"xid": xid,
}
res2 = handler(ctx_code) or {}
otp_code = str(res2.get("otp_code", "")).strip()
except Exception:
_LOGGER.debug(f"{DOMAIN} - otp_handler input_code failed")
if not otp_code:
if handler is None:
otp_code = input("Enter OTP code: ").strip()
else:
raise AuthenticationError(f"{DOMAIN} - OTP code required")
sid, rmtoken = self._verify_otp(otp_key, otp_code, xid)
final_sid = self._complete_login_with_otp(username, password, sid, rmtoken)
_LOGGER.debug(f"got final session id {final_sid}")
_LOGGER.info(f"{DOMAIN} - Storing rmtoken for future logins")
valid_until = dt.datetime.now(dt.timezone.utc) + LOGIN_TOKEN_LIFETIME
return Token(
username=username,
password=password,
access_token=final_sid,
refresh_token=rmtoken,
valid_until=valid_until,
device_id=self.device_id,
pin=pin,
return OTPRequest(
otp_key=payload["otpKey"],
request_id=response.headers.get("xid", ""),
email=payload.get("email"),
phone=payload.get("phone"),
has_email=bool(payload.get("hasEmail")),
has_phone=bool(payload.get("hasPhone")),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
raise Exception(
f"{DOMAIN} - No session id returned in login. Response: {response.text} headers {response.headers} cookies {response.cookies}"
Expand Down
19 changes: 13 additions & 6 deletions hyundai_kia_connect_api/VehicleManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
REGIONS,
VALET_MODE_ACTION,
VEHICLE_LOCK_ACTION,
OTP_NOTIFY_TYPE,
)
from .exceptions import APIError, AuthenticationOTPRequired
from .HyundaiBlueLinkApiBR import HyundaiBlueLinkApiBR
Expand Down Expand Up @@ -106,11 +107,17 @@ def login(self) -> bool | OTPRequest:
self.otp_request = result
return result

def send_otp(self, otp_destination: str, otp_via: str) -> None:
self.api.send_otp(self.otp_request, otp_destination, otp_via)

def verify_otp(self, otp_code: str) -> None:
self.token = self.api.verify_otp(self.otp_request, otp_code)
def send_otp(self, notify_type: OTP_NOTIFY_TYPE) -> None:
self.api.send_otp(self.otp_request, notify_type)

def verify_otp_and_complete_login(self, otp_code: str) -> None:
self.token = self.api.verify_otp_and_complete_login(
username=self.username,
password=self.password,
otp_code=otp_code,
otp_request=self.otp_request,
pin=self.pin,
)
self.initialize_vehicles()

def initialize_vehicles(self):
Expand Down Expand Up @@ -207,7 +214,7 @@ def check_and_refresh_token(self) -> bool:
if isinstance(result, Token):
self.token: Token = result
self.initialize_vehicles()
if isinstance(result, OTPOptions):
if isinstance(result, OTPRequest):
raise AuthenticationOTPRequired("OTP required to refresh token")
self.vehicles = self.api.refresh_vehicles(self.token, self.vehicles)
return True
Expand Down
5 changes: 5 additions & 0 deletions hyundai_kia_connect_api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ class WINDOW_STATE(IntEnum):
class VALET_MODE_ACTION(Enum):
ACTIVATE = "activate"
DEACTIVATE = "deactivate"


class OTP_NOTIFY_TYPE(Enum):
EMAIL = "EMAIL"
PHONE = "PHONE"
Copy link
Copy Markdown
Contributor

@nussdk nussdk Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be SMS.

DEBUG:hyundai_kia_connect_api.KiaUvoApiUSA:hyundai_kia_connect_api - Send OTP Response {"status":{"statusCode":1,"errorType":3,"errorCode":1164,"errorMessage":"notifyType must be EMAIL or SMS"}}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The old code this is removing had phone and email. I will adjust.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have adjusted. Please review and let me know if looks correct. For when it presents options I assume that still states Phone not SMS?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw api response
{"status":{"statusCode":0,"errorType":0,"errorCode":0,"errorMessage":"Success with response body"},"payload":{"otpKey":"XXX","hasEmail":true,"hasPhone":true,"email":"*****","phone":"() **","emailVerifyStatus":true,"phoneVerifyStatus":true}}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes UI will be on that side but what options the client presents must come from here since not everyone sets up all types.

Loading