내 자신의 stateful 전송 데코레이터 (SmtpConnectTransport
)를 구현하는 것이 트릭을 만들었습니다. 초기화시 주어진 프레임을 삽입합니다. initframe 형식을 매개 변수로 사용하여 일반적인 솔루션으로 만들 수 있다고 상상해보십시오. 결국 코덱은 파싱 및 직렬화 외에도 평범한 것에서 아무 것도하지 않아도됩니다.
연결 직후 프레임이 오면 서비스에서 원하는 환영 메시지 또는 배너를 생성 할 수 있습니다. 스팸 감지에 사용될 서비스의 혜택을 받기 위해 로컬 및 원격 소켓 주소를 SmtpCommand::Connect
에 포함 시켰습니다.
내 직감이 정확했지만, 실용적인 녹슨 금속 펠트처럼 느껴졌다. D 나는 행복하다.이 얼마나 samotop is coming together. 다음은 코드입니다.
use std::io;
use std::str;
use bytes::Bytes;
use model::response::SmtpReply;
use model::request::SmtpCommand;
use protocol::codec::SmtpCodec;
use tokio_proto::streaming::pipeline::{Frame, Transport, ServerProto};
use tokio_io::codec::Framed;
use futures::{Stream, Sink, StartSend, Poll, Async};
use protocol::parser::SmtpParser;
use protocol::writer::SmtpSerializer;
type Error = io::Error;
type CmdFrame = Frame<SmtpCommand, Bytes, Error>;
type RplFrame = Frame<SmtpReply,(), Error>;
pub struct SmtpProto;
impl<TIO: NetSocket + 'static> ServerProto<TIO> for SmtpProto {
type Error = Error;
type Request = SmtpCommand;
type RequestBody = Bytes;
type Response = SmtpReply;
type ResponseBody =();
type Transport = SmtpConnectTransport<Framed<TIO, SmtpCodec<'static>>>;
type BindTransport = io::Result<Self::Transport>;
fn bind_transport(&self, io: TIO) -> Self::BindTransport {
// save local and remote socket address so we can use it as the first frame
let initframe = Frame::Message {
body: false,
message: SmtpCommand::Connect {
local_addr: io.local_addr().ok(),
peer_addr: io.peer_addr().ok(),
},
};
let codec = SmtpCodec::new(
SmtpParser::session_parser(),
SmtpSerializer::answer_serializer(),
);
let upstream = io.framed(codec);
let transport = SmtpConnectTransport::new(upstream, initframe);
Ok(transport)
}
}
pub struct SmtpConnectTransport<TT> {
initframe: Option<CmdFrame>,
upstream: TT,
}
impl<TT> SmtpConnectTransport<TT> {
pub fn new(upstream: TT, initframe: CmdFrame) -> Self {
Self {
upstream,
initframe: Some(initframe),
}
}
}
impl<TT> Stream for SmtpConnectTransport<TT>
where
TT: 'static + Stream<Error = Error, Item = CmdFrame>,
{
type Error = Error;
type Item = CmdFrame;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.initframe.take() {
Some(frame) => {
println!("transport initializing");
Ok(Async::Ready(Some(frame)))
}
None => self.upstream.poll(),
}
}
}
impl<TT> Sink for SmtpConnectTransport<TT>
where
TT: 'static + Sink<SinkError = Error, SinkItem = RplFrame>,
{
type SinkError = Error;
type SinkItem = RplFrame;
fn start_send(&mut self, request: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error> {
self.upstream.start_send(request)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.upstream.poll_complete()
}
fn close(&mut self) -> Poll<(), io::Error> {
self.upstream.close()
}
}
impl<TT> Transport for SmtpConnectTransport<TT>
where
TT: 'static,
TT: Stream<Error = Error, Item = CmdFrame>,
TT: Sink<SinkError = Error, SinkItem = RplFrame>,
{
}
pub trait NetSocket: AsyncRead + AsyncWrite {
fn peer_addr(&self) -> Result<SocketAddr>;
fn local_addr(&self) -> Result<SocketAddr>;
}
impl NetSocket for TcpStream {
fn peer_addr(&self) -> Result<SocketAddr> {
self.peer_addr()
}
fn local_addr(&self) -> Result<SocketAddr> {
self.local_addr()
}
}