2013-03-02 2 views
0

unittests를 사용하여 TCP 클라이언트 통신의 정확성을 검사하므로 제어 할 수있는 TCP 서버가 필요하고 클라이언트에 보낼 수 있습니다. 파이썬의 TCP 서버 제어

server = Server("localhost", 5555) 
server.start() 
client.connect() 
self.assertTrue("login_message" in server.received_data) 
server.send_to_client(reject_messages) 
self.assertTrue("login_again" in server.received_data) 
time.sleep(10) 
self.assertTrue("login_again_and_again" in server.newest_received_data) 
server.stop() 
self.assertTrue("login failed" in client.logs) 

내가 전체 흐름 제어, 서버를 구현하기 위해 무엇을 사용할 수 필요합니다 같은
간단한 테스트를 찾아야한다? 나는 사람이 필요한 경우 알고 있지만, 어쨌든하지 않는

답변

1

..
는 지금은 SocketServer 스레드 사용하려고하지만 데이터,도를 제어하는 ​​둘 다 접근을하지 않습니다 내가 gevent 서버를 사용

from gevent import sleep, socket 
from gevent.server import StreamServer 

class Connection: 
    def __init__(self, host, port): 
     self.server = StreamServer((host, port), self.handle) 
     self.data = [] 
     self.socks = [] 
     self.pointer = 0 

    def handle(self, sock, address): 
     self.socks.append(sock) 
     while True: 
      line = sock.recv(1024) 
      if line: 
       self.data += [line] 
      else: 
       break 
     sock.close() 
     self.socks.remove(sock) 

    def send(self, msg): 
     if self.socks: 
      sock2send = self.socks[-1] 
      try: 
       sock2send.send(msg) 
      except IOError, e: 
       print "Can't send message '%s'! Exception:" % msg, e 
     else: 
      print "No sockets to send the message to" 

    def start(self): 
     self.server.start() 

    def serve_forever(self): 
     self.server.serve_forever() 

    def close(self): 
     self.server.stop() 
     for sock in self.socks: 
      sock.close() 

    def new_data(self): 
     newest = self.data[self.pointer:] 
     self.pointer = len(self.data) 
     return newest 

그리고는 유닛 테스트는 다음과 같습니다

def testTCPClient(self):  
    j = lambda x: "".join(x) 

    server = Connection("", 5555)    
    server.start() 
    client.run() 
    sleep(3) 
    data = j(server.new_data()) 
    self.assertTrue("login" in data) 
    sleep(2) 
    server.send("login approve") 
    sleep(2) 
    data = j(server.new_data()) 
    self.assertTrue("after_login" in data) 
    server.send("logout") 
    sleep(2) 
    data = j(server.new_data()) 
    self.assertTrue("received_logout" in data) 
    server.close() 
    self.assertTrue("disconnected" in client.logs)