2012-04-01 5 views
2

간단한 ssh 서버를 사용하여 ssh bruteforce 시도를 캡처하려고하는데 코드가 우르르 고 작동하지만 사용자 이름/비밀번호 조합을 원래의 IP 주소와 일치시킬 수 없습니다.twisted.conch로 ssh 로그인 시도의 IP 추적

이것은 지금까지 얻을 수있는만큼 간단하지만 아직 로그하지 않고 표준 출력으로 인쇄합니다. 빌드 할 때 buildProtocol 정의를 사용하여 새 연결의 IP 주소를 출력했습니다. 그러나 나는 동시에 여러 클라이언트에서 여러 개의 ssh bruteforce 시도를 추적 할 수 있도록 사용자 이름과 암호 자격 증명과 함께 IP 주소를 얻고 싶습니다. IP 주소가 연결될 때만 가져올 수 있기 때문에 여러 연결을 동시에 추적 할 수 없습니다.

연결이나 연결을 참조 할 때 성공한 로그인을 언급하지 않고 있지만 내 코드에서는 불가능하며 의도하지 않았습니다. 나는 당신이 다시 연결해야하기 전에 3 번의 암호 시도를 허용하는 ssh 서버에 대한 단일 연결을 언급하고있다.

from zope.interface import implements 
from twisted.conch.unix import UnixSSHRealm 
from twisted.cred import portal 
from twisted.cred.credentials import IUsernamePassword 
from twisted.cred.checkers import ICredentialsChecker 
from twisted.cred.error import UnauthorizedLogin 
from twisted.conch.ssh import factory, userauth, keys, session 
from twisted.internet import reactor, defer 

with open('id_rsa') as privateBlobFile: 
    privateKey = privateBlobFile.read() 

with open('id_rsa.pub') as publicBlobFile: 
    publicKey = publicBlobFile.read() 

class FailDB: 
    credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker) 

    def requestAvatarId(self, credentials): 
     print"%s:%s" % (credentials.username, credentials.password) 
     return defer.fail(UnauthorizedLogin("invalid password")) 

class UnixSSHdFactory(factory.SSHFactory): 
    publicKeys = { 
     'ssh-rsa': keys.Key.fromString(data=publicKey) 
    } 
    privateKeys = { 
     'ssh-rsa': keys.Key.fromString(data=privateKey) 
    } 
    services = { 
     'ssh-userauth': userauth.SSHUserAuthServer 
    } 

    def buildProtocol(self, addr): 
     print addr 
     return factory.SSHFactory.buildProtocol(self, addr) 

if __name__ == '__main__': 
    portal = portal.Portal(UnixSSHRealm()) 
    portal.registerChecker(FailDB()) 
    UnixSSHdFactory.portal = portal 
    reactor.listenTCP(2022, UnixSSHdFactory()) 
    reactor.run() 

예 출력 :

IPv4Address(TCP, '127.0.0.1', 42141) 
root:password123 
root:123456 
root:letmein 

답변

0

당신은 SSHUserAuthServer에서 파생와 _ebBadAuth 방법을 재정 의하여 실패한 로그인 시도에서 주소를 얻을 수 있습니다. 예 :

당신은 또한 연결이 성공 할 수 있도록 원하는 경우
class AuthServer(userauth.SSHUserAuthServer): 
    def _ebBadAuth(self, reason): 
     addr = self.transport.getPeer().address.host 
     print("addr {} failed to log in as {} using {}" 
       .format(addr, self.user, self.method)) 
     userauth.SSHUserAuthServer._ebBadAuth(self, reason) 

, 당신은 SSHFactory.services 오버라이드 (override)하지 않는 것,하지만 factory.services.update({'ssh-userauth': AuthServer}) 같은 파생 클래스를 업데이트합니다.