1515from requests .adapters import HTTPAdapter
1616from urllib3 .util .ssl_ import create_urllib3_context
1717
18- from .ApiImpl import ApiImpl , ClimateRequestOptions
18+ from .ApiImpl import ApiImpl , ClimateRequestOptions , OTPRequest
1919from .Token import Token
2020from .Vehicle import Vehicle
2121from .const import (
2525 ORDER_STATUS ,
2626 TEMPERATURE_UNITS ,
2727 VEHICLE_LOCK_ACTION ,
28+ OTP_NOTIFY_TYPE ,
2829)
2930from .utils import get_child_value , parse_datetime
30- from .exceptions import AuthenticationError
3131
3232
3333_LOGGER = logging .getLogger (__name__ )
@@ -240,92 +240,22 @@ def _complete_login_with_otp(
240240 )
241241 return final_sid
242242
243- def start_login (
244- self ,
245- username : str ,
246- password : str ,
247- token : Token | None = None ,
248- ) -> tuple [Token | None , dict | None ]:
249- """Start login and return either a Token or an OTP context.
250-
251- Parameters
252- ----------
253- username : str
254- User email address
255- password : str
256- User password
257- token : Token | None
258- Existing token with stored rmtoken for reuse
259-
260- Returns
261- -------
262- tuple[Token | None, dict | None]
263- (Token, None) if login succeeded without OTP, otherwise (None, ctx)
264- where ctx contains 'otpKey', 'xid', 'email', 'phone', 'hasEmail', 'hasPhone'.
265- """
266- url = self .API_URL + "prof/authUser"
267- data = {
268- "deviceKey" : self .device_id ,
269- "deviceType" : 2 ,
270- "userCredential" : {"userId" : username , "password" : password },
271- }
272- headers = self .api_headers ()
273- if token and getattr (token , "device_id" , None ):
274- self .device_id = token .device_id
275-
276- if token and token .refresh_token :
277- _LOGGER .debug (f"{ DOMAIN } - Attempting start_login with stored rmtoken" )
278- headers ["rmtoken" ] = token .refresh_token
279- response = self .session .post (url , json = data , headers = headers )
280- _LOGGER .debug (f"{ DOMAIN } - Start Sign In Response { response .text } " )
281- response_json = response .json ()
282- session_id = response .headers .get ("sid" )
283- if session_id :
284- _LOGGER .debug (f"got session id { session_id } " )
285- valid_until = dt .datetime .now (dt .timezone .utc ) + LOGIN_TOKEN_LIFETIME
286- existing_rmtoken = token .refresh_token if token else None
287- return (
288- Token (
289- username = username ,
290- password = password ,
291- access_token = session_id ,
292- refresh_token = existing_rmtoken ,
293- valid_until = valid_until ,
294- device_id = self .device_id ,
295- ),
296- None ,
297- )
298- if "payload" in response_json and "otpKey" in response_json ["payload" ]:
299- payload = response_json ["payload" ]
300- xid = response .headers .get ("xid" , "" )
301- ctx = {
302- "otpKey" : payload ["otpKey" ],
303- "xid" : xid ,
304- "email" : payload .get ("email" ),
305- "phone" : payload .get ("phone" ),
306- "hasEmail" : bool (payload .get ("hasEmail" )),
307- "hasPhone" : bool (payload .get ("hasPhone" )),
308- "rmTokenExpired" : bool (payload .get ("rmTokenExpired" )),
309- }
310- return None , ctx
311- raise Exception (
312- f"{ DOMAIN } - No session id returned in start_login. Response: { response .text } "
313- )
314-
315- def send_otp (self , otp_key : str , notify_type : str , xid : str ) -> dict :
243+ def send_otp (self , otp_request : OTPRequest , notify_type : OTP_NOTIFY_TYPE ) -> dict :
316244 """Public helper to send OTP to the selected destination."""
317- return self ._send_otp (otp_key , notify_type , xid )
245+ return self ._send_otp (otp_request . otp_key , notify_type , otp_request . request_id )
318246
319247 def verify_otp_and_complete_login (
320248 self ,
321249 username : str ,
322250 password : str ,
323- otp_key : str ,
324- xid : str ,
325251 otp_code : str ,
252+ otp_request : OTPRequest ,
253+ pin : str | None ,
326254 ) -> Token :
327255 """Verify OTP and complete the login producing a Token."""
328- sid , rmtoken = self ._verify_otp (otp_key , otp_code , xid )
256+ sid , rmtoken = self ._verify_otp (
257+ otp_request .otp_key , otp_code , otp_request .request_id
258+ )
329259 final_sid = self ._complete_login_with_otp (username , password , sid , rmtoken )
330260 _LOGGER .debug (f"got final session id { final_sid } " )
331261 _LOGGER .info (f"{ DOMAIN } - Storing rmtoken for future logins" )
@@ -337,14 +267,14 @@ def verify_otp_and_complete_login(
337267 refresh_token = rmtoken ,
338268 valid_until = valid_until ,
339269 device_id = self .device_id ,
270+ pin = pin ,
340271 )
341272
342273 def login (
343274 self ,
344275 username : str ,
345276 password : str ,
346277 token : Token = None ,
347- otp_handler : ty .Callable [[dict ], dict ] | None = None ,
348278 pin : str | None = None ,
349279 ) -> Token :
350280 """Login into cloud endpoints and return Token
@@ -359,7 +289,7 @@ def login(
359289 Existing token with stored rmtoken for reuse
360290 otp_handler : Callable[[dict], dict], optional
361291 Non-interactive OTP handler. Called twice:
362- - stage='choose_destination' -> return {'notify_type': 'EMAIL'|'PHONE '}
292+ - stage='choose_destination' -> return {'notify_type': 'EMAIL'|'SMS '}
363293 - stage='input_code' -> return {'otp_code': '<code>'}
364294 pin : str, optional
365295
@@ -377,19 +307,17 @@ def login(
377307 }
378308 if token and getattr (token , "device_id" , None ):
379309 self .device_id = token .device_id
380- if otp_handler is not None :
381- self ._otp_handler = otp_handler
382310 headers = self .api_headers ()
383311 if token and token .refresh_token :
384312 data ["deviceKey" ] = self .device_id
385- _LOGGER .debug (f"{ DOMAIN } - Attempting login with stored rmtoken " )
313+ _LOGGER .debug (f"{ DOMAIN } - Attempting login with stored Refresh Token " )
386314 headers ["rmtoken" ] = token .refresh_token
387315 response = self .session .post (url , json = data , headers = headers )
388316 _LOGGER .debug (f"{ DOMAIN } - Sign In Response { response .text } " )
389317 response_json = response .json ()
390318 session_id = response .headers .get ("sid" )
391319 if session_id :
392- _LOGGER .debug (f"got session id { session_id } " )
320+ _LOGGER .debug (f"Got session id { session_id } " )
393321 valid_until = dt .datetime .now (dt .timezone .utc ) + LOGIN_TOKEN_LIFETIME
394322 existing_rmtoken = token .refresh_token if token else None
395323 return Token (
@@ -404,81 +332,22 @@ def login(
404332 payload = response_json ["payload" ]
405333 if payload .get ("rmTokenExpired" ):
406334 _LOGGER .info (f"{ DOMAIN } - Stored rmtoken has expired, need new OTP" )
407- otp_key = payload ["otpKey" ]
408- xid = response .headers .get ("xid" , "" )
409- _LOGGER .info (f"{ DOMAIN } - OTP required for login" )
410- _LOGGER .info (f"{ DOMAIN } - Email: { payload .get ('email' , 'N/A' )} " )
411- _LOGGER .info (f"{ DOMAIN } - Phone: { payload .get ('phone' , 'N/A' )} " )
412- notify_type = "EMAIL"
413- handler = otp_handler or getattr (self , "_otp_handler" , None )
414- if handler :
415- try :
416- ctx_choice = {
417- "stage" : "choose_destination" ,
418- "hasEmail" : bool (payload .get ("hasEmail" )),
419- "hasPhone" : bool (payload .get ("hasPhone" )),
420- "email" : payload .get ("email" ),
421- "phone" : payload .get ("phone" ),
422- }
423- res = handler (ctx_choice ) or {}
424- nt = str (res .get ("notify_type" , notify_type )).upper ()
425- if nt in ("EMAIL" , "PHONE" ):
426- notify_type = nt
427- except Exception :
428- _LOGGER .debug (
429- f"{ DOMAIN } - otp_handler choose_destination failed; using default"
430- )
431- else :
432- if payload .get ("hasEmail" ) and payload .get ("hasPhone" ):
433- print ("\n OTP Authentication Required" )
434- print (f"Email: { payload .get ('email' , 'N/A' )} " )
435- print (f"Phone: { payload .get ('phone' , 'N/A' )} " )
436- choice = (
437- input ("Send OTP to (E)mail or (P)hone? [E/P]: " ).strip ().upper ()
438- )
439- if choice == "P" :
440- notify_type = "PHONE"
441- elif payload .get ("hasPhone" ):
442- notify_type = "PHONE"
443- self ._send_otp (otp_key , notify_type , xid )
444- if not handler :
445- print (f"\n OTP sent to { notify_type .lower ()} " )
446- otp_code = None
447- if handler :
448- try :
449- ctx_code = {
450- "stage" : "input_code" ,
451- "notify_type" : notify_type ,
452- "otpKey" : otp_key ,
453- "xid" : xid ,
454- }
455- res2 = handler (ctx_code ) or {}
456- otp_code = str (res2 .get ("otp_code" , "" )).strip ()
457- except Exception :
458- _LOGGER .debug (f"{ DOMAIN } - otp_handler input_code failed" )
459- if not otp_code :
460- if handler is None :
461- otp_code = input ("Enter OTP code: " ).strip ()
462- else :
463- raise AuthenticationError (f"{ DOMAIN } - OTP code required" )
464- sid , rmtoken = self ._verify_otp (otp_key , otp_code , xid )
465- final_sid = self ._complete_login_with_otp (username , password , sid , rmtoken )
466- _LOGGER .debug (f"got final session id { final_sid } " )
467- _LOGGER .info (f"{ DOMAIN } - Storing rmtoken for future logins" )
468- valid_until = dt .datetime .now (dt .timezone .utc ) + LOGIN_TOKEN_LIFETIME
469- return Token (
470- username = username ,
471- password = password ,
472- access_token = final_sid ,
473- refresh_token = rmtoken ,
474- valid_until = valid_until ,
475- device_id = self .device_id ,
476- pin = pin ,
335+ return OTPRequest (
336+ otp_key = payload ["otpKey" ],
337+ request_id = response .headers .get ("xid" , "" ),
338+ email = payload .get ("email" ),
339+ sms = payload .get ("phone" ),
340+ has_email = bool (payload .get ("hasEmail" )),
341+ has_sms = bool (payload .get ("hasPhone" )),
477342 )
478343 raise Exception (
479344 f"{ DOMAIN } - No session id returned in login. Response: { response .text } headers { response .headers } cookies { response .cookies } "
480345 )
481346
347+ def refresh_access_token (self , token : Token ) -> Token | OTPRequest :
348+ """Refresh the token using the refresh token"""
349+ return self .login (token .username , token .password , token )
350+
482351 def get_vehicles (self , token : Token ) -> list [Vehicle ]:
483352 """Return all Vehicle instances for a given Token"""
484353 url = self .API_URL + "ownr/gvl"
0 commit comments