MMCT TEAM
Server IP : 111.118.215.189  /  Your IP : 216.73.216.185
Web Server : Apache
System : Linux md-in-83.webhostbox.net 4.19.286-203.ELK.el7.x86_64 #1 SMP Wed Jun 14 04:33:55 CDT 2023 x86_64
User : a1673wkz ( 2475)
PHP Version : 8.2.25
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON
Directory (0755) :  /lib64/python2.7/site-packages/psutil/tests/

[  Home  ][  C0mmand  ][  Upload File  ]

Current File : //lib64/python2.7/site-packages/psutil/tests/test_connections.pyc
�
S��]c@s�dZddlZddlZddlZddlZddlZddlmZddlmZddlmZddlm	Z	ddlm
Z
ddlZddlmZdd	lm
Z
dd
lmZddlmZddlmZdd
lmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddlm!Z!ddlm"Z"ddlm#Z#ddlm$Z$ddlm%Z%ddlm&Z&dd lm'Z'dd!lm(Z(dd"lm)Z)dd#lm*Z*dd$lm+Z+dd%lm,Z,ej-�Z.e/ed&e0��Z1d'e0fd(��YZ2d)e2e)j3fd*��YZ4d+e2e)j3fd,��YZ5d-e2e)j3fd.��YZ6d/e2e)j3fd0��YZ7e)j8e%d1�d2e2e)j3fd3��Y�Z9d4e)j3fd5��YZ:e;d6kr�dd7l<m=Z=e=e>�ndS(8s;Tests for net_connections() and Process.connections() APIs.i����N(tclosing(tAF_INET(tAF_INET6(t
SOCK_DGRAM(tSOCK_STREAM(tFREEBSD(tLINUX(tMACOS(tNETBSD(tOPENBSD(tPOSIX(tSUNOS(tWINDOWS(t
supports_ipv6(tPY3(tAF_UNIX(tbind_socket(tbind_unix_socket(tcheck_net_address(tcreate_sockets(tenum(t
get_free_port(tHAS_CONNECTIONS_UNIX(tpyrun(t
reap_children(tsafe_rmpath(tskip_on_access_denied(tSKIP_SYSCONS(ttcp_socketpair(tTESTFN(tTRAVIS(tunittest(tunix_socket_path(tunix_socketpair(t
wait_for_filetSOCK_SEQPACKETtBasecBs/eZd�Zd�Zdd�Zd�ZRS(cCs<tt�ts8tjdd�}|s8t|��ndS(Ntkindtall(RRRtthisproctconnectionstAssertionError(tselftcons((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytsetUp;s
cCsCtt�t�ts?tjdd�}|s?t|��ndS(NR%R&(RRRRR'R(R)(R*R+((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttearDownBs

R&cCs�ytjd|�}Wn!tjk
r9tr3dS�nXg|D]}|j|krA|d ^qA}|j�|j�|j||�dS(s�Given a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        R%Ni����(tpsutiltnet_connectionstAccessDeniedRtpidtsorttassertEqual(R*R1t	proc_consR%tsys_constc((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcompare_procsys_connectionsKs,

cs��fd�}�fd�}�fd�}�fd�}�fd�}||�||�||�||�||�dS(s*Check validity of a connection namedtuple.cs�t|�dk}�jt|�d	��j|d|j��j|d|j��j|d|j��j|d|j��j|d|j��j|d|j�|r��j|d|j	�ndS(
Niiiiiiii(ii(
tlentassertInR3tfdtfamilyttypetladdrtraddrtstatusR1(tconnthas_pid(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_ntupleasc	s%�j|jtttf�tdk	rLt|jtj�sjt	|��nt|jt
�sjt	|��|jtkr�tj|j|j�}t
j|��Ty|j|jddf�Wn.tjk
r�}|jtjkr��q�nXWdQXn(|jtkr!�j|jtj�ndS(Ni(R9R;RRRRtNonet
isinstancetIntEnumR)tinttsocketR<t
contextlibRtbindR=terrorterrnot
EADDRNOTAVAILR3R?R.t	CONN_NONE(R@tsterr(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_familyms$cs��j|jtttf�tdk	rLt|jtj�sjt	|��nt|jt
�sjt	|��|jtkr��j|jt
j�ndS(N(R9R<RRR#RRCRDRER)RFR3R?R.RM(R@(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
check_type�s$cs�x�|j|jfD]�}|jttfkr��j|t�|sJqn�j|jt�d|jkowdkns�t	|j��t
|j|j�q|jtkr�j|t
�qqWdS(Nii��(R=R>R;RRtassertIsInstancettupletportRFR)RtipRtstr(R@taddr(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_addrs�s.cs��j|jt�gtt�D]$}|jd�r tt|�^q }�j|j|�|jt	t
fkr�|jtkr��j
|jtj�n�j|jtj�dS(NtCONN_(RRR?RVtdirR.t
startswithtgetattrR9R;RRR<RtassertNotEqualRMR3(R@txtvalids(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_status�s$$N((R*R@RBRPRQRXR`((R*sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_connection_ntuple_s






(t__name__t
__module__R,R-R7Ra(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR$9s			tTestBasecBs5eZejed�d��Zd�Zd�ZRS(s
requires rootcCs>t��/x'tjdd�D]}|j|�qWWdQXdS(NR%R&(RR.R/Ra(R*R@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_system�s
cCsDt��5x-tj�jdd�D]}|j|�q#WWdQXdS(NR%R&(RR.tProcessR(Ra(R*R@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_process�s
cCs6|jttjdd�|jttjdd�dS(NR%s???(tassertRaisest
ValueErrorR'R(R.R/(R*((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_invalid_kind�s(RbRcRtskipIfRReRgRj(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyRd�s	tTestUnconnectedSocketscBs�eZdZd�Zd�Zd�Zeje�d�d��Z	d�Z
eje�d�d��Zejed�d	��Z
ejed�d
��ZRS(s;Tests sockets which are open but not connected to anything.cCs�tjdd�}tg|D]}|j|f^q�}trN||j�S|jt|�d�|djdkr�|j||j�j|j��n|dSdS(NR%R&iii����(R'R(tdictR:RtfilenoR3R8(R*tsockR+R6tsmap((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytget_conn_from_sock�s(&c	CsJ|j|�}|j|�|jdkrG|j|j|j��n|j|j|j�|j|j|jtj	tj
��|j�}|r�tr�t
|t�r�|j�}n|jtkr�|d }n|jtkr�tr�n|j|j|�|jtkrFtrFtjdd�}|jtj�|dd�n|S(s�Given a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        i����iR%R&(RqRaR:R3RnR;R<t
getsockoptRGt
SOL_SOCKETtSO_TYPEtgetsocknameRRDtbytestdecodeRRR	R=RR'R(R7tostgetpid(R*RoR@R=R+((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck_socket�s&

cCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns	127.0.0.1RW(RRRRRRzR>R)R3R?R.tCONN_LISTEN(R*RWRoR@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_tcp_v4�s
sIPv6 not supportedcCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns::1RW(RRRRRRzR>R)R3R?R.R{(R*RWRoR@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_tcp_v6�s
cCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns	127.0.0.1RW(RRRRRRzR>R)R3R?R.RM(R*RWRoR@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_udp_v4�s
cCsldt�f}ttttd|���<}|j|�}|jsLt�|j|j	t
j�WdQXdS(Ns::1RW(RRRRRRzR>R)R3R?R.RM(R*RWRoR@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_udp_v6s
s
POSIX onlycCslt��]}tt|dt���<}|j|�}|jsFt�|j|jt	j
�WdQXWdQXdS(NR<(R RRRRzR>R)R3R?R.RM(R*tnameRoR@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
test_unix_tcp
s
cCslt��]}tt|dt���<}|j|�}|jsFt�|j|jt	j
�WdQXWdQXdS(NR<(R RRRRzR>R)R3R?R.RM(R*R�RoR@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
test_unix_udps
(RbRct__doc__RqRzR|RRkR
R}R~RR
R�R�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyRl�s	
	#		tTestConnectedSocketcBsEeZdZejed�d��Zejed�d��ZRS(sJTest socket pairs which are are actually connected to
    each other.
    sunreliable on SUONScCs�dt�f}tjdd�s(t�ttd|�\}}z`tjdd�}|jt|�d�|j|djt	j
�|j|djt	j
�Wd|j�|j�XdS(Ns	127.0.0.1R%ttcp4RWiii(RR'R(R)RRR3R8R?R.tCONN_ESTABLISHEDtclose(R*RWtservertclientR+((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_tcp%s
s
POSIX onlyc
Cs�t���}t|�\}}z�tjdd�}|djoJ|djsTt�|djok|djsut�tr�g|D]}|jdkr�|^q�}n|jt	|�d�t
s�ts�tr#|j|djd�|j|djd�|j||djp|dj�n�t
rux�|dj|dj|dj|djfD]}|j|d�qXWnH|j|djp�|dj|�|j|djp�|dj|�Wd|j�|j�XWdQXdS(NR%tunixiis/var/run/logit(R R!R'R(R=R>R)RR3R8RRRR	R�(R*R�R�R�R+R6RW((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt	test_unix9s*!!+'$(
(	RbRcR�RRkRR�R
R�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR�stTestFilterscBs2eZd�Zede�d��Zd�ZRS(c
s)�fd�}t��|dtttgtttg�|dttgttg�|dtgttg�|dttgtg�|dtgtg�|dtgtg�|dttgtg�|d	tgtg�|d
tgtg�tr|dtgtttg�nWdQXdS(Ncs�x@tjd|�D],}�j|j|��j|j|�qWts�xCtjd|�D],}�j|j|��j|j|�q\WndS(NR%(R'R(R9R;R<RR.R/(R%tfamiliesttypesR@(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pytcheck^sR&tinettinet4ttcpR�ttcp6tudptudp4tudp6R�(RRRRRRR#R(R*R�((R*sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_filters]sB	
	

	


	


tonly_ifcs��fd�}tjd�}tjd�}ddlm}tjjt�}||�jdt	t
�ddd	|�}||�jdt	t
�ddd	|�}||�jdt	t�dd
d	|�}||�jdt	t�dd
d	|�}	t|�}
t
t|��}t|�}t
t|��}
t�r{t|�}t
t|��}t|	�}t
t|��}nd}d}d}d}x5tj�D]'}|j�}�jt|�d�x�|D]�}|j|
jkr|||t
t|dtjd�q�|j|jkrC|||t
t|
dtjd�q�|jt|dd�kr�|||tt|dtjd�q�|jt|dd�kr�|||tt|dtjd�q�q�Wq�WdS(Nc
s�d}�j|��j|j|��j|j|��j|j|��j|j|��j|j|�xN|D]F}	|jd|	�}
|	|kr�|
s�t�qy|
syt|
��qyWt	r��j
|j|g�ndS(
NR&R�R�tinet6R�R�R�R�R�R�R%(
sallsinetsinet4R�stcpstcp4stcp6sudpsudp4sudp6(RaR3R;R<R=R>R?R(R)RR7R1(tprocR@R;R<R=R>R?tkindst	all_kindsR%R+(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
check_conn�s

s
            import socket, time
            s = socket.socket($family, socket.SOCK_STREAM)
            s.bind(('$addr', 0))
            s.listen(5)
            with open('$testfn', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
        s�
            import socket, time
            s = socket.socket($family, socket.SOCK_DGRAM)
            s.bind(('$addr', 0))
            with open('$testfn', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
        i����(tTemplateR;RWs	127.0.0.1ttestfns::1iR&R�R�R�R�R�R�R1R�R�R�((sallsinetsinet4stcpstcp4((sallsinetsinet4sudpsudp4((sallsinetsinet6stcpstcp6((sallsinetsinet6sudpsudp6(ttextwraptdedenttstringR�RxtpathtbasenameRt
substituteRFRRRtevalR"R
RCR'tchildrenR(R3R8R1RR.R{RRMR\(R*R�ttcp_templatetudp_templateR�ttestfilet
tcp4_templatet
udp4_templatet
tcp6_templatet
udp6_templatet	tcp4_proct	tcp4_addrt	udp4_proct	udp4_addrt	tcp6_proct	tcp6_addrt	udp6_proct	udp6_addrtpR+R@((R*sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_combos�s`			



c
Cs�t���tjdd�}|jt|�t�r:dnd�x:|D]2}|j|jtt	f�|j|j
t�qHWtjdd�}|jt|�d�|j|djt�|j|dj
t�t�r6tjdd�}|jt|�d�|j|djt	�|j|dj
t�ntjdd�}|jt|�t�rfdnd�x:|D]2}|j|jtt	f�|j|j
t�qtWtjdd	�}|jt|�d�|j|djt�|j|dj
t�t�rbtjdd
�}|jt|�d�|j|djt	�|j|dj
t�ntjdd�}|jt|�t�r�dnd�x@|D]8}|j|jtt	f�|j|j
ttf�q�Wt�rMtjdd
�}|jt|�d�x=|D]2}|j|jt	�|j|j
ttf�qWnt
r�tr�tjdd�}|jt|�d�x=|D]2}|j|jt�|j|j
ttf�q�WnWdQXdS(NR%R�iiR�iR�R�R�R�R�iR�R�i(RR'R(R3R8R
R9R;RRR<RRRRR(R*R+R@((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt
test_count�s\
%
	%
	%
	
 

(RbRcR�RRR�R�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR�[s	+_s
requires roottTestSystemWideConnectionscBs8eZdZd�Zejeo$ed�d��ZRS(sTests for net_connections().cs��fd�}t���ddlm}x�|j�D]r\}}|dkr[tr[q6n|\}}tj|�}�jt|�tt	|���||||�q6WWdQXdS(Ncsfx_|D]W}�j|j|d|�|jtkrQ�j|j|d|�n�j|�qWdS(Ntmsg(R9R;RR<Ra(R+R�ttypes_R@(R*(sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR�'s

i����(t	conn_tmapR�(
Rtpsutil._commonR�titemsRR.R/R3R8tset(R*R�R�R%tgroupsR�R�R+((R*sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_it&s
"sunreliable on MACOS + TRAVISc
Cs|t��}t|�}WdQXg}d}xot|�D]a}tjjt�t|�}tj	d|�}t
|�}|j|j�|j
t|�q7Wx.t|�D] }tt|�}t|�q�Wgtjdd�D]}	|	j|kr�|	^q�}
xt|D]l}|jtg|
D]}	|	j|kr|	^q�|�tj|�}|jt|jd��|�qWdS(Ni
s                import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'%s', 'w') as f:
                        f.write(str(os.getpid()))
                    time.sleep(60)
                R%R&(RR8trangeRxR�trealpathRRVR�R�RtappendR1t
addCleanupRR"R.R/R3RfR((
R*tsockstexpectedtpidsttimestitfnametsrctsprocR^tsysconsR1R�((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_multi_sockets_procs:s*

1(	RbRcR�R�RRkRRR�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR�"s	tTestMisccBseZd�ZRS(cCs�g}g}x�tt�D]�}|jd�rtt|�}t|�}|j�sat|��|jt|�|j||�|j|�|j|�qqWt	r�tj
tjntr�tj
ndS(NRY(RZR.R[R\RVtisupperR)tassertNotInR�Rt	CONN_IDLEt
CONN_BOUNDRtCONN_DELETE_TCB(R*tintststrsR�tnumtstr_((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyttest_connection_constantsds 

(RbRcR�(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyR�bst__main__(trun(?R�RHRKRxRGR�RRRRRR.RRRRR	R
RRR�R
tpsutil._compatRtpsutil.testsRRRRRRRRRRRRRRRRRR R!R"RfR'R\tobjectR#R$tTestCaseRdRlR�R�RkR�R�Rbtpsutil.tests.runnerR�t__file__(((sC/usr/lib64/python2.7/site-packages/psutil/tests/test_connections.pyt<module>slqb=�?

MMCT - 2023