2017-10-19 7 views
3

SMTP 서버는 클라이언트가 명령을 보내기 시작하기위한 신호 인 연결을 설정하면 환영 메시지를 표시해야합니다 (220 서비스 준비 완료). 이것은 tokio-proto의 요청 - 응답 패러다임과 충돌하는 것으로 보인다.클라이언트가 tokio-proto 서버에 연결할 때 환영 메시지/배너를 표시하는 방법은 무엇입니까?

나는 서버가 요청과 클라이언트 응답 (폐기 된 TURN)을 보내는 것과 같이 완전히 뒤집을 수 있다고 상상할 수 있지만, 지금은 연결시 환영 메시지, 일명 배너에만 관심이 있습니다. 그 후 클라이언트 요청 => 서버 응답이 보류됩니다.

나는 이것을 어디에서 구해야하는지 계속 생각하고 있지만, bind_server, bind_transport은 나에게 매우 비밀 스럽다. 운송 수단을 구현해야합니까?

코덱의 방법은 decode입니다. 문제는 decode 메서드가 어떤 종류의 의미가 디코딩 할 수있는 데이터가없는 한 호출되지 않습니다. 내가 거기에 후크하는 몇 가지 연결 초기화 방법이있을 것이라고 기대하지만 아무것도 찾았습니다.

fn decode(&mut self, buf: &mut BytesMut) -> Result { 

    if !self.initialized { 
     println!(
      "new connection from {:?} to {:?}", 
      self.peer_addr, 
      self.local_addr 
     ); 

     self.requests.push(SmtpCommand::Connect { 
      local_addr: self.local_addr, 
      peer_addr: self.peer_addr, 
     }); 

     self.initialized = true; 
    } 
    //... snip 
    match self.requests.is_empty() { 
     true => Ok(None), 
     false => Ok(Some(self.requests.remove(0))), 
    } 
} 

work-in-progress study project is on GitHub와 나는 또한 opened an issue with tokio-proto을했습니다.

답변

0

내 자신의 stateful 전송 데코레이터 (SmtpConnectTransport)를 구현하는 것이 트릭을 만들었습니다. 초기화시 주어진 프레임을 삽입합니다. initframe 형식을 매개 변수로 사용하여 일반적인 솔루션으로 만들 수 있다고 상상해보십시오. 결국 코덱은 파싱 및 직렬화 외에도 평범한 것에서 아무 것도하지 않아도됩니다.

연결 직후 프레임이 오면 서비스에서 원하는 환영 메시지 또는 배너를 생성 할 수 있습니다. 스팸 감지에 사용될 서비스의 혜택을 받기 위해 로컬 및 원격 소켓 주소를 SmtpCommand::Connect에 포함 시켰습니다.

내 직감이 정확했지만, 실용적인 녹슨 금속 펠트처럼 느껴졌다. D 나는 행복하다.이 얼마나 samotop is coming together. 다음은 코드입니다.

use std::io; 
use std::str; 
use bytes::Bytes; 
use model::response::SmtpReply; 
use model::request::SmtpCommand; 
use protocol::codec::SmtpCodec; 
use tokio_proto::streaming::pipeline::{Frame, Transport, ServerProto}; 
use tokio_io::codec::Framed; 
use futures::{Stream, Sink, StartSend, Poll, Async}; 
use protocol::parser::SmtpParser; 
use protocol::writer::SmtpSerializer; 

type Error = io::Error; 
type CmdFrame = Frame<SmtpCommand, Bytes, Error>; 
type RplFrame = Frame<SmtpReply,(), Error>; 

pub struct SmtpProto; 

impl<TIO: NetSocket + 'static> ServerProto<TIO> for SmtpProto { 
    type Error = Error; 
    type Request = SmtpCommand; 
    type RequestBody = Bytes; 
    type Response = SmtpReply; 
    type ResponseBody =(); 
    type Transport = SmtpConnectTransport<Framed<TIO, SmtpCodec<'static>>>; 
    type BindTransport = io::Result<Self::Transport>; 

    fn bind_transport(&self, io: TIO) -> Self::BindTransport { 
     // save local and remote socket address so we can use it as the first frame 
     let initframe = Frame::Message { 
      body: false, 
      message: SmtpCommand::Connect { 
       local_addr: io.local_addr().ok(), 
       peer_addr: io.peer_addr().ok(), 
      }, 
     }; 
     let codec = SmtpCodec::new(
      SmtpParser::session_parser(), 
      SmtpSerializer::answer_serializer(), 
     ); 
     let upstream = io.framed(codec); 
     let transport = SmtpConnectTransport::new(upstream, initframe); 
     Ok(transport) 
    } 
} 

pub struct SmtpConnectTransport<TT> { 
    initframe: Option<CmdFrame>, 
    upstream: TT, 
} 

impl<TT> SmtpConnectTransport<TT> { 
    pub fn new(upstream: TT, initframe: CmdFrame) -> Self { 
     Self { 
      upstream, 
      initframe: Some(initframe), 
     } 
    } 
} 

impl<TT> Stream for SmtpConnectTransport<TT> 
where 
    TT: 'static + Stream<Error = Error, Item = CmdFrame>, 
{ 
    type Error = Error; 
    type Item = CmdFrame; 

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { 
     match self.initframe.take() { 
      Some(frame) => { 
       println!("transport initializing"); 
       Ok(Async::Ready(Some(frame))) 
      } 
      None => self.upstream.poll(), 
     } 
    } 
} 

impl<TT> Sink for SmtpConnectTransport<TT> 
where 
    TT: 'static + Sink<SinkError = Error, SinkItem = RplFrame>, 
{ 
    type SinkError = Error; 
    type SinkItem = RplFrame; 

    fn start_send(&mut self, request: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error> { 
     self.upstream.start_send(request) 
    } 

    fn poll_complete(&mut self) -> Poll<(), io::Error> { 
     self.upstream.poll_complete() 
    } 

    fn close(&mut self) -> Poll<(), io::Error> { 
     self.upstream.close() 
    } 
} 

impl<TT> Transport for SmtpConnectTransport<TT> 
where 
    TT: 'static, 
    TT: Stream<Error = Error, Item = CmdFrame>, 
    TT: Sink<SinkError = Error, SinkItem = RplFrame>, 
{ 
} 


pub trait NetSocket: AsyncRead + AsyncWrite { 
    fn peer_addr(&self) -> Result<SocketAddr>; 
    fn local_addr(&self) -> Result<SocketAddr>; 
} 

impl NetSocket for TcpStream { 
    fn peer_addr(&self) -> Result<SocketAddr> { 
     self.peer_addr() 
    } 
    fn local_addr(&self) -> Result<SocketAddr> { 
     self.local_addr() 
    } 
}