Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions plugin.video.orange.fr/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# 2.x

## [2.3.6](https://github.com/f-lawe/plugin.video.orange.fr/releases/tag/v2.3.6) - 2025-04-25

### Fixed
- Auth when using username/password (following new Otange TV website changes)
- Prevent Kodi error when stream info is incomplete

### Changed
- Rework auth process in order to reduce API calls when reusing session data

## [2.3.5](https://github.com/f-lawe/plugin.video.orange.fr/releases/tag/v2.3.5) - 2025-04-18

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion plugin.video.orange.fr/addon.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<addon id="plugin.video.orange.fr" name="Orange TV France" version="2.3.5" provider-name="Flawe">
<addon id="plugin.video.orange.fr" name="Orange TV France" version="2.3.6" provider-name="Flawe,Remzouille">
<requires>
<import addon="xbmc.python" version="3.0.1"/>
<import addon="script.module.requests" version="2.31.0"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import re
from abc import ABC
from datetime import date, datetime, timedelta
from datetime import date, datetime, timedelta, timezone
from time import strptime
from typing import List
from urllib.parse import urlencode
Expand All @@ -13,10 +13,10 @@
from requests import Session
from requests.exceptions import JSONDecodeError, RequestException

from lib.exceptions import AuthenticationRequired, StreamDataDecodeError, StreamNotIncluded
from lib.exceptions import AuthenticationRequired, StreamDataDecodeError, StreamNotIncluded, StreamRequestException
from lib.providers.abstract_provider import AbstractProvider
from lib.utils.kodi import build_addon_url, get_addon_setting, get_drm, get_global_setting, log, set_addon_setting
from lib.utils.request import request, request_json
from lib.utils.request import get_random_ua, request, request_json

_PROGRAMS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/live/v3/applications/STB4PC/programs?period={period}&epgIds=all&mco={mco}"
_CATCHUP_CHANNELS_ENDPOINT = "https://rp-ott-mediation-tv.woopic.com/api-gw/catchup/v4/applications/PC/channels"
Expand All @@ -28,7 +28,7 @@
_CATCHUP_STREAM_ENDPOINT = "https://mediation-tv.orange.fr/all/api-gw/catchup/v4/auth/accountToken/applications/PC/videos/{stream_id}/stream?terminalModel=WEB_PC&terminalId="

_STREAM_LOGO_URL = "https://proxymedia.woopic.com/api/v1/images/2090{path}"
_LIVE_HOMEPAGE_URL = "https://tv.orange.fr/"
_HOMEPAGE_URL = "https://tv.orange.fr/"
_LOGIN_URL = "https://login.orange.fr"

_LICENSE_ENDPOINT = "https://mediation-tv.orange.fr/all/api-gw/license/v1/auth/accountToken"
Expand Down Expand Up @@ -69,10 +69,6 @@ def get_streams(self) -> list:
for channel in channels
]

def _get_channel_live_id(self, channel: dict) -> str:
"""Get live id for given channel."""
return channel["technicalChannels"]["live"][0]["liveTargetURLRelativePath"]

def get_epg(self) -> dict:
"""Load EPG data from Orange and convert it to JSON-EPG format."""
past_days_to_display = get_global_setting("epg.pastdaystodisplay", int)
Expand Down Expand Up @@ -204,25 +200,94 @@ def _get_catchup_videos(self, channel_id: str, category_id: str, article_id: str

def _get_stream_info(self, stream_endpoint: str, stream_id: str) -> dict:
"""Load stream info from Orange."""
tv_token, tv_token_expires, wassup = self._retrieve_auth_data()
stream_endpoint_url = stream_endpoint.format(stream_id=stream_id)
now = datetime.now(timezone.utc)
session_data = get_addon_setting("provider.session_data", dict)

session = Session()
session.headers = {
"Accept": "application/xhtml+xml,application/xml",
"Content-Type": "application/json",
"User-Agent": get_random_ua(),
}

if self._is_session_data_valid(session_data, now):
try:
stream_info = self._request_stream_info(stream_endpoint_url, session_data)
except StreamRequestException:
if get_addon_setting("provider.use_credentials", bool):
self._login(session)

session_data = self._refresh_session_data(session, now)
stream_info = self._request_stream_info(stream_endpoint_url, session_data)
return stream_info

if get_addon_setting("provider.use_credentials", bool):
self._login(session)

session_data = self._refresh_session_data(session, now)
stream_info = self._request_stream_info(stream_endpoint_url, session_data)

return stream_info

def _is_session_data_valid(self, session_data: dict, at: datetime = None) -> bool:
"""Check if session data is valid."""
if not session_data.get("tv_token") or not session_data.get("wassup"):
return False

if at is None:
at = datetime.now(timezone.utc)

if not session_data.get("tv_token_expires") or at.timestamp() > session_data.get("tv_token_expires"):
return False

try:
stream_endpoint_url = stream_endpoint.format(stream_id=stream_id)
headers = {"tv_token": f"Bearer {tv_token}", "Cookie": f"wassup={wassup}"}
decoded_wassup = bytes.fromhex(session_data.get("wassup")).decode()
xwvd = re.search("\|X_WASSUP_VALID_DATE=(.*?)\|", decoded_wassup).group(1)
wassup_expires_at = datetime(*(strptime(xwvd, "%Y%m%d%H%M%S")[0:6])).replace(tzinfo=timezone.utc)
return wassup_expires_at > at
except (TypeError, AttributeError):
return False

def _refresh_session_data(self, session: Session, now: datetime) -> dict:
"""Fetch session data from home page."""
try:
response = request("GET", _HOMEPAGE_URL, session=session)
session_data = {
"tv_token": json.loads(re.search('"token":(".*?")', response.text).group(1)),
"tv_token_expires": now.timestamp() + 30 * 60,
"wassup": session.cookies.get("wassup"),
}
except RequestException as e:
raise AuthenticationRequired("Cannot initiate new session (request failed)") from e
except AttributeError as e:
raise AuthenticationRequired("Cannot initiate new session (tv token not found)") from e
except JSONDecodeError as e:
raise StreamDataDecodeError("Cannot initiate new session (tv token not loaded") from e

set_addon_setting("provider.session_data", dict(session_data))
return session_data

def _request_stream_info(self, stream_endpoint_url: str, session_data: dict) -> dict:
"""Load stream data from Orange."""
try:
headers = {
"tv_token": f"Bearer {session_data.get('tv_token')}",
"Cookie": f"wassup={session_data.get('wassup')}",
}
res = request("GET", stream_endpoint_url, headers=headers)
stream = res.json()
log("Initiate new session", xbmc.LOGINFO)
except RequestException as e:
if e.response.status_code == 403:
raise StreamNotIncluded() from e
else:
raise AuthenticationRequired("Cannot initiate new session") from e
raise AuthenticationRequired("Cannot load stream data") from e
except JSONDecodeError as e:
raise StreamDataDecodeError() from e

return self._compute_stream_info(stream, tv_token, wassup)
return self._format_stream_info(stream, session_data)

def _compute_stream_info(self, stream: dict, tv_token: str, wassup: str) -> dict:
def _format_stream_info(self, stream: dict, session_data: dict) -> dict:
"""Compute stream info."""
protectionData = stream.get("protectionData") or stream.get("protectionDatas")
path = stream.get("streamURL") or stream.get("url")
Expand All @@ -244,9 +309,9 @@ def _compute_stream_info(self, stream: dict, tv_token: str, wassup: str) -> dict
"licence_server_url": license_server_url,
"headers": urlencode(
{
"tv_token": f"Bearer {tv_token}",
"tv_token": f"Bearer {session_data.get('tv_token')}",
"Content-Type": "",
"Cookie": f"wassup={wassup}",
"Cookie": f"wassup={session_data.get('wassup')}",
}
),
"post_data": "R{SSM}",
Expand All @@ -259,89 +324,35 @@ def _compute_stream_info(self, stream: dict, tv_token: str, wassup: str) -> dict
log(stream_info, xbmc.LOGDEBUG)
return stream_info

def _retrieve_auth_data(self, login: str = None, password: str = None) -> (str, str, str):
"""Retreive auth data from Orange (tv token and wassup cookie)."""
provider_session_data = get_addon_setting("provider.session_data", dict)
tv_token, tv_token_expires, wassup = (
provider_session_data.get(k) for k in ("tv_token", "tv_token_expires", "wassup")
)

if not tv_token_expires or datetime.utcnow().timestamp() > tv_token_expires:
session = Session()

if not self._is_wassup_expired(wassup):
log("Cookie reuse", xbmc.LOGINFO)
session.headers["Cookie"] = f"wassup={wassup}"

try:
response = request("GET", _LIVE_HOMEPAGE_URL, session=session)
tv_token = json.loads(re.search('"token":(".*?")', response.text).group(1))
except AttributeError:
log("Login required", xbmc.LOGINFO)
self._login(session)
response = request("GET", _LIVE_HOMEPAGE_URL, session=session)
tv_token = json.loads(re.search('"token":(".*?")', response.text).group(1))

tv_token_expires = datetime.utcnow().timestamp() + 30 * 60

if "wassup" in session.cookies:
wassup = session.cookies.get("wassup")

provider_session_data = {
"tv_token": tv_token,
"tv_token_expires": tv_token_expires,
"wassup": wassup,
}
set_addon_setting("provider.session_data", provider_session_data)

log(f"tv_token: {tv_token}, tv_token_expires: {tv_token_expires}, wassup: {wassup}", xbmc.LOGDEBUG)
return tv_token, tv_token_expires, wassup

def _is_wassup_expired(self, wassup: str) -> bool:
try:
wassup = bytes.fromhex(wassup).decode()
xwvd = re.search("\|X_WASSUP_VALID_DATE=(.*?)\|", wassup).group(1)
wassup_expires = datetime(*(strptime(xwvd, "%Y%m%d%H%M%S")[0:6])).timestamp()
return datetime.utcnow().timestamp() > wassup_expires
except (TypeError, AttributeError):
return True

def _login(self, session):
def _login(self, session: Session):
"""Login to Orange."""
login, password = get_addon_setting("provider.username"), get_addon_setting("provider.password")
session.headers = {
"Accept": "application/xhtml+xml,application/xml",
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/json",
}

try:
request("GET", _LOGIN_URL, headers=session.headers, session=session)
data = json.dumps({})
request("POST", f"{_LOGIN_URL}/api/access", data=data, session=session)
except RequestException:
log("Error while authenticating (init)", xbmc.LOGWARNING)
return

try:
request(
"POST",
f"{_LOGIN_URL}/api/login",
data=json.dumps({"login": login, "params": {}}),
session=session,
)
data = json.dumps({"login": get_addon_setting("provider.username"), "loginOrigin": "input"})
request("POST", f"{_LOGIN_URL}/api/login", data=data, session=session)
except RequestException:
log("Error while authenticating (login)", xbmc.LOGWARNING)
return

try:
request(
"POST",
f"{_LOGIN_URL}/api/password",
data=json.dumps({"password": password, "remember": True}),
s=session,
)
data = json.dumps({"password": get_addon_setting("provider.password"), "remember": True})
request("POST", f"{_LOGIN_URL}/api/password", data=data, session=session)
except RequestException:
log("Error while authenticating (password)", xbmc.LOGWARNING)

if "wassup" not in session.cookies:
log("Error while authenticating (wassup not found)", xbmc.LOGWARNING)

def _get_channel_live_id(self, channel: dict) -> str:
"""Get live id for given channel."""
return channel["technicalChannels"]["live"][0]["liveTargetURLRelativePath"]

def _extract_logo(self, logos: list, definition_type: str = "mobileAppliDark") -> str:
for logo in logos:
if logo["definitionType"] == definition_type:
Expand Down
2 changes: 1 addition & 1 deletion plugin.video.orange.fr/resources/lib/utils/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def create_play_item(stream_info: dict = None, inputstream_addon: str = "") -> L
play_item.setMimeType(stream_info.get("mime_type"))

play_item.setProperty("inputstream", inputstream_addon)
play_item.setProperty("inputstream.adaptive.manifest_type", stream_info["protocol"])
play_item.setProperty("inputstream.adaptive.manifest_type", stream_info.get("protocol"))
play_item.setProperty("inputstream.adaptive.play_timeshift_buffer", "true")

drm_config = stream_info.get("drm_config", {})
Expand Down
17 changes: 8 additions & 9 deletions plugin.video.orange.fr/resources/lib/utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,26 @@
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Safari/605.1.1", # noqa: E501
]

_RANDOM_USER_AGENT = _USER_AGENTS[randint(0, len(_USER_AGENTS) - 1)]


def get_random_ua() -> str:
"""Get a randomised user agent."""
return _USER_AGENTS[randint(0, len(_USER_AGENTS) - 1)]
return _RANDOM_USER_AGENT


def request(method: str, url: str, headers: Mapping[str, str] = None, data=None, session: Session = None) -> Response:
"""Send HTTP request using requests."""
if headers is None:
headers = {}

headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
default_headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "*",
"Sec-Fetch-Mode": "cors",
"User-Agent": get_random_ua(),
**headers,
}

session = session if session is not None else Session()
headers = {**(session.headers if session is not None else default_headers), **(headers or {})}
session = session or Session()

log(f"Fetching {url}", xbmc.LOGDEBUG)
res = session.request(method, url, headers=headers, data=data)
Expand Down