-
Notifications
You must be signed in to change notification settings - Fork 565
/
Copy pathtest_regr1.py
119 lines (93 loc) · 3.22 KB
/
test_regr1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import asyncio
import queue
import multiprocessing
import signal
import threading
import unittest
import uvloop
from uvloop import _testbase as tb
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
transport.write(b'z')
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
self.transport.close()
def connection_lost(self, exc):
self.loop.stop()
class FailedTestError(BaseException):
pass
def run_server(quin, qout):
server_loop = None
def server_thread():
nonlocal server_loop
loop = server_loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
coro = loop.create_server(EchoServerProtocol, '127.0.0.1', 0)
server = loop.run_until_complete(coro)
addr = server.sockets[0].getsockname()
qout.put(addr)
loop.run_forever()
server.close()
loop.run_until_complete(server.wait_closed())
try:
loop.close()
except Exception as exc:
print(exc)
qout.put('stopped')
thread = threading.Thread(target=server_thread, daemon=True)
thread.start()
quin.get()
server_loop.call_soon_threadsafe(server_loop.stop)
thread.join(1)
class TestIssue39Regr(tb.UVTestCase):
"""See https://github.com/MagicStack/uvloop/issues/39 for details.
Original code to reproduce the bug is by Jim Fulton.
"""
def on_alarm(self, sig, fr):
if self.running:
raise FailedTestError
def run_test(self):
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
target=run_server,
args=(qin, qout),
daemon=True).start()
else:
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
multiprocessing.Process(
target=run_server,
args=(qin, qout),
daemon=True).start()
addr = qout.get()
loop = self.new_loop()
asyncio.set_event_loop(loop)
loop.create_task(
loop.create_connection(
lambda: EchoClientProtocol(loop),
host=addr[0], port=addr[1]))
loop.run_forever()
loop.close()
qin.put('stop')
qout.get()
@unittest.skipIf(
multiprocessing.get_start_method(False) == 'spawn',
'no need to test on macOS where spawn is used instead of fork')
def test_issue39_regression(self):
signal.signal(signal.SIGALRM, self.on_alarm)
signal.alarm(5)
try:
self.running = True
self.run_test()
except FailedTestError:
self.fail('deadlocked in libuv')
finally:
self.running = False
signal.signal(signal.SIGALRM, signal.SIG_IGN)