2017-12-18 7 views
1

멀티 스레딩을 이해하기 위해 놀고 있었으므로 서버가 클라이언트에게 명령을 보내는 클라이언트/서버 응용 프로그램을 작성했습니다. 'a'는 서버에 응답을 보냅니다.파이썬 다중 스레드 서버는 한 번에 하나의 클라이언트 메시지를 처리 ​​할 수 ​​있습니다.

서버 코드에서 소켓 2 개와 스레드 1 개를 만들었습니다. 첫 번째 소켓은 연결된 모든 (가입 된) 클라이언트에게 명령을 전송 (게시)합니다. 스레드에서 두 번째 소켓은 클라이언트로부터의 응답을 기다리지 만 스레드가 일부 차단 작업 (예 : 클라이언트에서 보낸 정보를 데이터베이스에 저장)을 수행하기 때문에 소켓 (req-rep 소켓)은 동시에 여러 메시지를 수신 할 수 있습니다.

server.py

import zmq 
import logging 
import threading 
import time 

logging.basicConfig(level=logging.DEBUG) 


class Server(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.pub_port = 7777 
     self.rep_port = 7778 

     self.pub_socket = None 
     self.rep_socket = None 
     self.interface = "*" 

    def bind_ports(self): 
     logging.debug("[bind_ports] binding the ports....") 
     self.pub_socket = self.context.socket(zmq.PUB) 
     pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port) 
     self.pub_socket.bind(pub_bind_str) 

     self.rep_socket = self.context.socket(zmq.REP) 
     rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port) 
     self.rep_socket.bind(rep_bind_str) 

    def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      logging.info("[received_data] data <{}>".format(flow)) 
      self.rep_socket.send(b"\x00") 
      self.blocking_op(cl_data) 

    def blocking_op(self, data): 
     time.sleep(1) # simulating some blocking operations e.g. storing info in a database 

    def push_instruction(self, cmd): 
     logging.debug("[push_inst] Sending the instruction <%s> to the clients...", 
     # logging.debug("[push_inst] Sending the instruction <%s> to the agents ...", 
     cmd) 
     instruction = {"cmd": cmd} 
     self.pub_socket.send_json(instruction) 

    def create_thread(self): 
     thread = threading.Thread(target=self.received_info) 
     thread.daemon = True 
     thread.start() 
     logging.debug("[create_thread] Thread created <{}>".format(
                 thread.is_alive())) 

    def start_main_loop(self): 
     logging.debug("[start_main_loop] Loop started....") 
     self.bind_ports() 
     self.create_thread() 

     while True: 
      cmd = input("Enter your command: ") 
      self.push_instruction(cmd) 

if __name__ == "__main__": 
    Server().start_main_loop() 

client.py

import zmq 
import logging 
import random 
import time 

logging.basicConfig(level=logging.DEBUG) 

class Client(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.sub_socket = None 
     self.req_socket = None 

     self.pub_port = 7777 
     self.req_port = 7778 
     self.server_ip = 'localhost' 

     self.client_id = "" 

    def connect_to_server(self): 
     logging.debug("[conn_to_serv] Connecting to the server ....") 
     self.sub_socket = self.context.socket(zmq.SUB) 
     self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") 
     conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port) 
     self.sub_socket.connect(conn_str) 

     self.req_socket = self.context.socket(zmq.REQ) 
     req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port) 
     self.req_socket.connect(req_conn_str) 

    def get_instruction(self): 
     inst = self.sub_socket.recv_json() 
     logging.debug("[get_inst] Server sent inst") 
     cmd = inst["cmd"] 
     return cmd 
    def send_flow(self, x, y): 
     flow = { 
      "client_id": self.client_id, 
      "x": x, 
      "y": y 
     } 
     self.req_socket.send_json(flow) 

    def start_main_loop(self): 
     logging.debug("starting the main loop ....") 
     self.client_id = input("What is your id: ") 
     self.connect_to_server() 

     while True: 
      inst = self.get_instruction() 
      logging.info("[Main_loop] inst<{}>".format(inst)) 
      if inst == "a": 
       # time.sleep(random.uniform(.6, 1.5)) 
       self.send_flow("xxx", "yyy") 
       self.req_socket.recv() 
       logging.debug("[main_loop] server received the flow") 

if __name__ == "__main__": 
    Client().start_main_loop() 

누군가가 여러 클라이언트의 메시지를 제공 할 수 있도록 나 서버를 향상시킬 수 있다면 나는 그것을 감사하겠습니다 같은 시간.

+0

응답 처리가 차단되거나 오랜 시간이 걸릴 경우 좋은 방법은'receive_info()'에서 응답을 읽고 실제 처리를 수행하는 스레드를 시작하는 것입니다. 이 스레드의 실행은 걸리는만큼 오래 걸리지 만 메인 루프를 차단하지는 않습니다. – Hannu

답변

1

코드 및 테스트를 실행할 수 없지만 문제가 receive_info() 인 경우 실제 응답을 처리하기 위해 스레드를 실행하여 무시할 수 있습니다. 이런 식으로 뭔가가 (오타를 포함 할 수 있습니다, 당신의 코드를 테스트하지 할 수 없습니다 -. 예를 들어 flow이 무엇인지 모른다)

def handle_response(self, data): 
    logging.info("[received_data] data <{}>".format(flow)) 
    self.rep_socket.send(b"\x00") 
    self.blocking_op(data) 

def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      _t = threading.Thread(target=self.handle_response, args=(cl_data,)) 
      _t.start() 

이 그대로 당신의 received_info() 루프를 가지고 있지만, 대신 처리를하고 거기에 응답을 처리하기 위해 새 스레드가 시작됩니다. 완료하는 데 걸리는 시간이 소요되고 실이 끊어 지지만 received_info()은 즉시 새로운 응답을 기다릴 준비가됩니다.

+0

Hannu 대단히 고마워요, 효과가있었습니다. 그런데 args = (cl_data)에서 cl_data 이후에 혼수 상태가되는 이유는 무엇입니까? 추가 질문 : 1000 클라이언트를 처리하고 싶다면 쓰레드를 사용하거나 gevent (또는 asyncio)를 사용하는 것이 더 낫다고 생각합니까? – Corey

+0

쉼표는 하나의 인수 만 전달하고'args'는 튜플이어야합니다. 여러 개의 인수를 전달한 경우 끝에 쉼표없이 args = (a, b, c)를 지정할 수 있지만 한 항목에서 튜플을 만드는 가장 간단한 방법입니다. – Hannu

+0

저는 asyncio에 대한 전문가가 아니므로 성능에 대해 언급 할 수 없습니다. 파이썬은 GIL 때문에 가장 효율적인 병렬 처리 언어가 아닙니다. 스레드를 시도하고 문제가 있으면 조사하십시오. 스레드로 절대적으로 괜찮을 수도 있습니다. – Hannu