U
    #i                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZmZmZmZ d dlmZmZmZ ed	d
dZG dd dZdS )    N)defaultdict)wraps)AnyDefaultDictDictList)Channel)NotConnectedError)HEARTBEAT_PAYLOADPHOENIX_CHANNELChannelEventsMessage)CallbackT_ParamSpecT_Retvalfuncc                    s&   t  tjtjtd fdd}|S )N)argskwargsreturnc                     s   | d j st j | |S )Nr   )	connectedr	   __name__)r   r   r    7/tmp/pip-unpacked-wheel-935k1slo/realtime/connection.pywrapper   s    

z"ensure_connection.<locals>.wrapper)r   r   r   r   r   )r   r   r   r   r   ensure_connection   s    r   c                   @   s   e Zd Zdi dfeeeeef eddddZe	dddd	Z
ddd
dZddddZddddZe	ddddZddddZddddZe	eedddZddddZdS )SocketF   N)urlauto_reconnectparamshb_intervalr   c                 C   s@   || _ tt| _d| _|| _|| _|  d| _|| _tt| _dS )a  
        `Socket` is the abstraction for an actual socket connection that receives and 'reroutes' `Message` according to its `topic` and `event`.
        Socket-Channel has a 1-many relationship.
        Socket-Topic has a 1-many relationship.
        :param url: Websocket URL of the Realtime server. starts with `ws://` or `wss://`
        :param params: Optional parameters for connection.
        :param hb_interval: WS connection is kept alive by sending a heartbeat message. Optional, defaults to 5.
        FN)	r   r   listchannelsr   r    r!   Z
kept_aliver   )selfr   r   r    r!   r   r   r   __init__    s    
zSocket.__init__)r   c                 C   s&   t  }|t |  |   dS )z
        Wrapper for async def _listen() to expose a non-async interface
        In most cases, this should be the last method executed as it starts an infinite listening loop.
        :return: None
        N)asyncioget_event_looprun_until_completeZgather_listen_keep_aliver$   Zloopr   r   r   listen:   s    zSocket.listenc                    s   zr| j  I dH }tf t|}|jtjkr2W q | j	|j
g D ],}|jD ] }|jd|jfkrL||j qLqBW q  tjjk
r   | jrtd |  I dH  | j D ] \}}|D ]}| I dH  qqntd Y qY q X q dS )zN
        An infinite loop that keeps listening.
        :return: None
        N*5Connection with server closed, trying to reconnect..."Connection with the server closed.)ws_connectionrecvr   jsonloadseventr   Zreplyr#   gettopicZ	listenerscallbackpayload
websockets
exceptionsConnectionClosedr   logginginfo_connectitemsZ_join	exception)r$   msgZchannelZclr6   r#   r   r   r   r)   D   s(    

zSocket._listenc                 C   s    t  }||   d| _dS )zR
        Wrapper for async def _connect() to expose a non-async interface
        TN)r&   r'   r(   r>   r   r+   r   r   r   connectb   s    zSocket.connectc                    s<   t | jI d H }|jr0td || _d| _ntdd S )NzConnection was successfulTzConnection Failed)	r9   rB   r   openr<   r=   r0   r   	Exception)r$   r0   r   r   r   r>   j   s    
zSocket._connectc                 C   s    t  }||   d| _dS )zP
        Wrapper for async def _close() to expose a non-async interface
        FN)r&   r'   r(   _closer   r+   r   r   r   closet   s    zSocket.closec                    s   | j  I d H  d S )N)r0   rF   )r$   r   r   r   rE   }   s    zSocket._closec                    s   z@t ttjtdd}| jt|I dH  t	
| jI dH  W q  tjjk
r   | jrttd |  I dH  ntd Y qY q X q dS )zx
        Sending heartbeat to server every 5 seconds
        Ping - pong messages to verify connection is alive
        N)r6   r4   r8   refr.   r/   )dictr   r   Z	heartbeatr
   r0   sendr2   dumpsr&   sleepr!   r9   r:   r;   r   r<   r=   r>   r@   )r$   datar   r   r   r*      s"    
zSocket._keep_alive)r6   r   c                 C   s"   t | || j}| j| | |S )z
        :param topic: Initializes a channel and creates a two-way association with the socket
        :return: Channel
        )r   r    r#   append)r$   r6   chanr   r   r   set_channel   s    zSocket.set_channelc                 C   sD   | j  D ]4\}}|D ]&}td| ddd |jD  d qq
dS )zd
        Prints a list of topics and event the socket is listening to
        :return: None
        zTopic: z | Events: c                 S   s   g | ]\}}|qS r   r   ).0e_r   r   r   
<listcomp>   s     z"Socket.summary.<locals>.<listcomp>]N)r#   r?   print	callbacks)r$   r6   ZchansrN   r   r   r   summary   s    zSocket.summary)r   
__module____qualname__strboolr   r   intr%   r   r,   r)   rB   r>   rF   rE   r*   r   rO   rW   r   r   r   r   r      s,   
	

r   )r&   r2   r<   collectionsr   	functoolsr   typingr   r   r   r   r9   Zrealtime.channelr   Zrealtime.exceptionsr	   Zrealtime.messager
   r   r   r   Zrealtime.typesr   r   r   r   r   r   r   r   r   <module>   s   