Source code for pyopenvidu.openviduconnection

"""OpenViduConnection class."""
from typing import List, Optional
from requests_toolbelt.sessions import BaseUrlSession
from dataclasses import dataclass
from .exceptions import OpenViduConnectionDoesNotExistsError, OpenViduSessionDoesNotExistsError
from datetime import datetime
from .openvidupublisher import OpenViduPublisher
from .openvidusubscriber import OpenViduSubscriber


# Notice: Frozen should be changed to True in later versions of Python3 where a nice method for custom initializer is implemented
[docs]@dataclass(init=False, frozen=False) class OpenViduConnection(object): """ This is a base class for connection objects. """ id: str type: str session_id: str created_at: datetime active_at: Optional[datetime] platform: str server_data: Optional[str] publishers: List[OpenViduPublisher] subscribers: List[OpenViduSubscriber] is_valid: bool def _update_from_data(self, data: dict): # set property self.id = data['id'] self.type = data['type'] # Either IPCAM or WEBRTC self.session_id = data['sessionId'] self.created_at = datetime.utcfromtimestamp(data['createdAt'] / 1000.0) if data['activeAt']: self.active_at = datetime.utcfromtimestamp(data['activeAt'] / 1000.0) else: self.active_at = None self.platform = data['platform'] self.server_data = data.get('serverData', None) # set publishers publishers = [] if data['publishers']: # For some reason... this can be none for publisher_data in data['publishers']: publishers.append(OpenViduPublisher(self._session, self.session_id, publisher_data)) # set subscribers subscribers = [] if data['subscribers']: # For some reason... this can be none for subscriber_data in data['subscribers']: subscribers.append(OpenViduSubscriber(self._session, self.session_id, subscriber_data)) self.publishers = publishers self.subscribers = subscribers self.is_valid = True # Specific properties will be set in the inherited functions def __init__(self, session: BaseUrlSession, data: dict): """ Direct instantiation of this class is not supported! Use `OpenViduSession.connections` to get an instance of this class. """ self._session = session self._update_from_data(data) self._last_fetch_result = data
[docs] def fetch(self) -> bool: """ Updates every property of the connection object. :return: true if the Connection object status has changed with respect to the server, false if not. This applies to any property or sub-property of the object. """ if not self.is_valid: raise OpenViduConnectionDoesNotExistsError() r = self._session.get(f"sessions/{self.session_id}/connection/{self.id}") if r.status_code == 404: self.is_valid = False raise OpenViduConnectionDoesNotExistsError() elif r.status_code == 400: self.is_valid = False raise OpenViduSessionDoesNotExistsError() r.raise_for_status() new_data = r.json() is_changed = new_data != self._last_fetch_result if is_changed: self._update_from_data(new_data) self._last_fetch_result = new_data return is_changed
[docs] def force_disconnect(self): """ Forces the disconnection from the session. Remember to call fetch() after this call to fetch the current properties of the Session from OpenVidu Server! https://docs.openvidu.io/en/2.16.0/reference-docs/REST-API/#delete-openviduapisessionsltsession_idgtconnectionltconnection_idgt """ if not self.is_valid: raise OpenViduConnectionDoesNotExistsError() r = self._session.delete(f"sessions/{self.session_id}/connection/{self.id}") if r.status_code == 404: self.is_valid = False raise OpenViduConnectionDoesNotExistsError() if r.status_code == 400: self.is_valid = False raise OpenViduSessionDoesNotExistsError() r.raise_for_status() self.is_valid = False
[docs] def signal(self, type_: str = None, data: str = None): """ Sends a signal to this connection. https://docs.openvidu.io/en/2.16.0/reference-docs/REST-API/#post-openviduapisignal :param type_: Type of the signal. In the body example of the table above, only users subscribed to Session.on('signal:MY_TYPE') will trigger that signal. Users subscribed to Session.on('signal') will trigger signals of any type. :param data: Actual data of the signal. """ if not self.is_valid: raise OpenViduConnectionDoesNotExistsError() parameters = { "session": self.session_id, "to": [self.id], "type": type_, "data": data } parameters = {k: v for k, v in parameters.items() if v is not None} # send request r = self._session.post('signal', json=parameters) if r.status_code == 404: self.is_valid = False raise OpenViduSessionDoesNotExistsError() elif r.status_code == 400: raise ValueError() elif r.status_code == 406: self.is_valid = False raise OpenViduConnectionDoesNotExistsError() r.raise_for_status()
[docs] def force_unpublish_all_streams(self): """ Forces the user to unpublish all of their Stream. OpenVidu Browser will trigger the proper events on the client-side (streamDestroyed) with reason set to "forceUnpublishByServer". After this call, the instance of the object, should be considered invalid. Remember to call fetch() after this call to fetch the actual properties of the Session from OpenVidu Server! https://docs.openvidu.io/en/2.16.0/reference-docs/REST-API/#delete-openviduapisessionsltsession_idgtstreamltstream_idgt """ if not self.is_valid: raise OpenViduConnectionDoesNotExistsError() for publisher in self.publishers: publisher.force_unpublish()
@property def publisher_count(self) -> int: return len(self.publishers) @property def subscriber_count(self) -> int: return len(self.subscribers)
# Notice: Frozen should be changed to True in later versions of Python3 where a nice method for custom initializer is implemented
[docs]@dataclass(init=False, frozen=False) class OpenViduWEBRTCConnection(OpenViduConnection): """ This object represents an OpenVidu WEBRTC type of Connection. This is a connection between an user and a session. """ token: Optional[str] client_data: Optional[str] role: str kurento_options: dict def _update_from_data(self, data: dict): super()._update_from_data(data) # Set the common properties self.token = data.get('token', None) self.client_data = data.get('clientData', None) self.role = data['role'] self.kurento_options = data['kurentoOptions']
# Notice: Frozen should be changed to True in later versions of Python3 where a nice method for custom initializer is implemented
[docs]@dataclass(init=False, frozen=False) class OpenViduIPCAMConnection(OpenViduConnection): """ This object represents an OpenVidu IPCAM type of Connection. This is a connection between an IPCAM and a session. """ rtsp_uri: str adaptive_bitrate: bool only_play_with_subscribers: bool network_cache: int def _update_from_data(self, data: dict): super()._update_from_data(data) # Set the common properties self.rtsp_uri = data['rtspUri'] self.adaptive_bitrate = data['adaptativeBitrate'] self.only_play_with_subscribers = data['onlyPlayWithSubscribers'] self.network_cache = data['networkCache']