Reactor pattern을 Python으로 구현하려고합니다. 나는 꽤 괜찮은 시작이 multiprocessing
과 select.select
을 사용하고 있다고 생각한다. 그러나, 나는 서버를 스트레스 테스트하려고하는데, 간단한 DoS 클라이언트를 써서 연결로 넘치게했다. 그러나 나는 재미있는 오류가 발생합니다 :동시 소켓 연결을 더 허용하려면 어떻게합니까?
[WinError 10061] No connection could be made because the target machine actively refused it
이것에 대해 흥미로운 것은 내가 서버에 backlog amount에 대한 socket.listen(5)
을하고있어 것입니다. 독자가 select.select
에서 준비를 마친 후에 나는 그 수를 표시합니다. 그리고 나는 단지 1 또는 2를 가졌을뿐입니다.
소수의 스레드 (~ 20)에 대해서는 내가 숨 막히지 않았지만 더 많은 수 (50+)의 경우 연결을 거부하는 경향이 있습니다.
서버 또는 클라이언트 측 (또는 OS/소켓 수준)에서 제 문제가 있습니까? 이 문제를 해결할 수 있습니까? 그렇다면 어떻게?
클라이언트
import threading
import time
import socket
from contextlib import contextmanager
IP = '127.0.0.1'
PORT = 4200
@contextmanager
def open_socket(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((ip, port))
yield sock
finally:
sock.close()
class Flood(threading.Thread):
def __init__(self, id):
super(Flood, self).__init__()
self.id = id
self.failed = False
def run(self):
try:
with open_socket(IP, PORT) as sock:
msg = "Hello this is some data from %d" % self.id
sock.send(msg.encode())
except Exception as e:
print(e)
self.failed = True
def make_threads(count):
return [Flood(_) for _ in range(count)]
threads = make_threads(5000)
start = time.time()
for t in threads:
t.start()
for t in threads:
t.join()
print("Failed: ", sum(1 if x.failed else 0 for x in threads))
print("Done in %f seconds" % (time.time() - start))
서버
import sys
import logging
import socket
import select
import time
import queue
from multiprocessing import Process, Queue, Value
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())
IP = '127.0.0.1'
PORT = 4200
keep_running = True
def dispatcher(q, keeprunning):
try:
while keeprunning:
val = None
try:
val = q.get(True, 5)
if val:
log.debug(val[0].recv(1024).decode())
val[0].shutdown(socket.SHUT_RDWR)
val[0].close()
except queue.Empty:
pass
log.debug("Dispatcher quitting")
except KeyboardInterrupt:
log.debug("^C caught, dispatcher quitting")
def mainloop(sock):
readers, writers, errors = [sock], [], []
timeout = 5
while True:
readers, writers, errors = select.select(readers,
writers,
errors,
timeout)
incoming = yield readers, writers, errors
if incoming and len(incoming) == 3:
readers, writers, errors = incoming
if not readers:
readers.append(sock)
def run_server():
keeprunning = Value('b', True)
q = Queue()
p = Process(target=dispatcher, args=(q, keep_running))
try:
p.start()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((IP, PORT))
sock.listen(50)
sock.setblocking(0)
loop = mainloop(sock)
for readers, writers, errors in loop:
if readers:
client, addr = readers[0].accept()
q.put((client, addr))
log.debug('*'*50)
log.debug('%d Readers', len(readers))
log.debug('%d Writers', len(writers))
log.debug('%d Errors', len(errors))
except KeyboardInterrupt:
log.info("^C caught, shutting down...")
finally:
keeprunning.value = False
sock.close()
p.join()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: test.py (client|server)")
elif sys.argv[1] == 'client':
run_client()
elif sys.argv[1] == 'server':
run_server()
BTW :'socket.create_connection()'이 존재합니다. 최소한 클라이언트 쪽에서는 IPv4와 IPv6을 처리합니다. – glglgl