2017-10-23 7 views
1

SmtpService에 처리가 완료되면 즉시 응답 헤더를 보내고 싶습니다.녹이에서 tokio_proto 및 tokio_service를 사용하여 메시지를 스트리밍하는 방법

C: DATA 
S: 354 Start mail input 
C: ... data ... 
C: ... more ... 
C: . 
S: 250 Ok 

내가 playground이 많이있어 : 이것은 SMTP 교환을 따라야 그것도 가능 또는 내가 대신 두 개의 메시지를 생성해야하는 경우 궁금 해요

#[macro_use] 
extern crate log; 
extern crate bytes; 
extern crate tokio_proto; 
extern crate tokio_service; 
extern crate futures; 

use std::io; 
use bytes::Bytes; 
use tokio_service::Service; 
use tokio_proto::streaming::{Message, Body}; 
use futures::{future, Future, Stream}; 
use futures::sync::oneshot; 
//use model::request::SmtpCommand; 
//use model::response::SmtpReply; 

#[derive(Eq, PartialEq, Debug)] 
enum SmtpCommand { 
    Data,  
} 
#[derive(Eq, PartialEq, Debug)] 
enum SmtpReply { 
    OkInfo, 
    StartMailInputChallenge, 
    TransactionFailure, 
    CommandNotImplementedFailure 
} 

pub struct SmtpService; 

impl Service for SmtpService { 
    // For non-streaming protocols, service errors are always io::Error 
    type Error = io::Error; 
    // These types must match the corresponding protocol types: 
    type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>; 
    type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>; 

    // The future for computing the response; box it for simplicity. 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    // Produce a future for computing a response from a request. 
    fn call(&self, command: Self::Request) -> Self::Future { 

     info!("Received {:?}", command); 

     match command { 
      Message::WithBody(cmd, cmd_body) => { 
       match cmd { 
        SmtpCommand::Data => { 
         // start => SmtpReply::StartMailInputChallenge 
         // ok => SmtpReply::OkInfo 
         // err => SmtpReply::TransactionFailure 

         let (tx, rx) = oneshot::channel(); 

         let fut = cmd_body 
           .inspect(|chunk| info!("data: {:?}", chunk)) 
           .map(|_| tx.send(SmtpReply::OkInfo)) 
           .map_err(|_| tx.send(SmtpReply::TransactionFailure)) 
           .map(|_| Body::from(rx)); 

         // ??? How to wire the fut future into the response message? 

         let msg = Message::WithBody(SmtpReply::StartMailInputChallenge, fut); 

         Box::new(future::ok(msg)) as Self::Future 
        } 
        _ => Box::new(future::ok(Message::WithoutBody(
         SmtpReply::CommandNotImplementedFailure, 
        ))), 
       } 
      } 
      Message::WithoutBody(cmd) => { 
       Box::new(future::ok(Message::WithoutBody(match cmd { 
        _ => SmtpReply::CommandNotImplementedFailure, 
       }))) 
      } 
     } 
    } 
} 

fn main() { 
    println!("Hello, world!"); 
} 

을 - 하나 DATA 및 실제 바이트 스트림에 대한 두 번째?

오류 메시지 구조가 일치하지 않습니다. 몸/미래는 장소에서 분명히 :

error[E0271]: type mismatch resolving `<futures::FutureResult<tokio_proto::streaming::Message<SmtpReply, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [[email protected]/main.rs:57:42: 57:76]>, [[email protected]/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [[email protected]/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [[email protected]/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>, std::io::Error> as futures::Future>::Item == tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>` 
    --> src/main.rs:66:25 
    | 
66 |       Box::new(future::ok(msg)) as Self::Future 
    |       ^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found struct `tokio_proto::streaming::Body` 
    | 
    = note: expected type `tokio_proto::streaming::Message<_, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [[email protected]/main.rs:57:42: 57:76]>, [[email protected]/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [[email protected]/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [[email protected]/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>` 
       found type `tokio_proto::streaming::Message<_, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>` 
    = note: required for the cast to the object type `futures::Future<Error=std::io::Error, Item=tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>>` 

답변

1
Response가 반환 될 때 call에 의해 반환 된 미래가 종료

; 그 미래에 더 이상의 행동을 "주도 할"수는 없습니다.

즉, (스트림 된) 본문을 생성하는 새 작업을 생성해야 함을 의미합니다. 그 경우 에서 Handle이 필요합니다.

또한 Bodyoneshot이 아닌 mpsc 채널에서 만들어야합니다. 당신은 많은 시체 덩어리를 보낼 수 있습니다.

Playground

#[macro_use] 
extern crate log; 
extern crate bytes; 
extern crate tokio_core; 
extern crate tokio_proto; 
extern crate tokio_service; 
extern crate futures; 

use std::io; 
use bytes::Bytes; 
use tokio_service::Service; 
use tokio_proto::streaming::{Message, Body}; 
use futures::{future, Future, Stream, Sink}; 
use futures::sync::mpsc; 
//use model::request::SmtpCommand; 
//use model::response::SmtpReply; 

#[derive(Eq, PartialEq, Debug)] 
pub enum SmtpCommand { 
    Data,  
} 
#[derive(Eq, PartialEq, Debug)] 
pub enum SmtpReply { 
    OkInfo, 
    StartMailInputChallenge, 
    TransactionFailure, 
    CommandNotImplementedFailure 
} 

pub struct SmtpService { 
    handle: tokio_core::reactor::Handle, 
} 

impl Service for SmtpService { 
    // For non-streaming protocols, service errors are always io::Error 
    type Error = io::Error; 
    // These types must match the corresponding protocol types: 
    type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>; 
    type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>; 

    // The future for computing the response; box it for simplicity. 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    // Produce a future for computing a response from a request. 
    fn call(&self, command: Self::Request) -> Self::Future { 

     info!("Received {:?}", command); 

     match command { 
      Message::WithBody(cmd, cmd_body) => { 
       match cmd { 
        SmtpCommand::Data => { 
         // start => SmtpReply::StartMailInputChallenge 
         // ok => SmtpReply::OkInfo 
         // err => SmtpReply::TransactionFailure 

         let (tx, rx) = mpsc::channel::<io::Result<SmtpReply>>(1); 

         let fut = cmd_body 
          // read cmd stream; for_each results in a Future, 
          // which completes when the stream is finished 
          .for_each(|chunk| { 
           info!("data: {:?}", chunk); 
           Ok(()) 
          }) 
          // now send the result body 
          .then(move |r| match r { 
           Ok(_) => tx.send(Ok(SmtpReply::OkInfo)), 
           Err(_) => tx.send(Ok(SmtpReply::TransactionFailure)), 
          }) 
          // could send further body messages: 
          // .and_then(|tx| tx.send(...)) 
          // ignore any send errors; spawn needs a future with 
          // Item=() and Error=(). 
          .then(|_| Ok(())) 
         ; 

         self.handle.spawn(fut); 

         let body : Body<SmtpReply, Self::Error> = Body::from(rx); 
         let msg : Self::Response = Message::WithBody(SmtpReply::StartMailInputChallenge, body); 

         Box::new(future::ok(msg)) as Self::Future 
        } 
        _ => Box::new(future::ok(Message::WithoutBody(
         SmtpReply::CommandNotImplementedFailure, 
        ))), 
       } 
      } 
      Message::WithoutBody(cmd) => { 
       Box::new(future::ok(Message::WithoutBody(match cmd { 
        _ => SmtpReply::CommandNotImplementedFailure, 
       }))) 
      } 
     } 
    } 
} 

fn main() { 
    println!("Hello, world!"); 
}