2017-09-18 4 views
0

관련 : 나는 AKKA의 HTTP 응용 프로그램이찾을 수 java.util.concurrent.Future 필수 scala.concurrent.Future

How to integrate akka streams kafka (reactive-kafka) into akka http application?

, 나는 ': scala.concurrent.Future wrapper for java.util.concurrent.Future

이 내 다른 질문에서 온 다음과 같이, 내 경로에서의 onComplete 함수에서 카프카에 메시지/ProducerRecord를 전송하고 싶습니다 :

val producer : KafkaProducer = new KafkaProducer(producerSettings) 

val routes : Route = 
    post { 
    entity(as[User]) { user => 
     val createUser = userService.create(user) 
     onSuccess(createUser) { 
     case Invalid(y: NonEmptyList[Err]) => 
      complete(BadRequest -> "invalid user") 
     case Valid(u: User) => { 
      val producerRecord = 
      new ProducerRecord[Array[Byte], String]("topic1","some message") 

      onComplete(producer.send(producerRecord)) { _ => 
      complete(ToResponseMarshallable((StatusCodes.Created, u))) 
      } 
     } 
     } 
    } 
    } 

을하지만, 의 onComplete (제조자 producerRecord를 보내)는 다음 타입 불일치 에러 발생된다

[오류] 발견 미래 [org.apache.kafka.clients.producer.RecordMetadata] (java.util.concurrent의) 오류 ] 필수 : ​​미래 [org.apache.kafka.clients.producer.RecordMetadata]

어쩌면만큼 제작자를 사용하여,이 주위에 어떤 방법이 onCompleteRecordMetadata _ {=>] [오차 (scala.concurrent에서) 싱크 ( http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) 대신 producer.send 함수가 필요합니까?

답변

2

Cake's Scala based Kafka client을 활용하면 Java 선물을 실행하고 스칼라 선물을 돌려주는 작업을 수행 할 수 있습니다. org.apache.kafka.clients.producer.KafkaProducer 대신 cakesolutions.kafka.KafkaProducer을 만들면 코드의 나머지 부분은 사실 그대로 유지되어야합니다.

또는 높은 수준의 Akka HTTP DSL을 계속 사용하면서 Reactive Kafka을 활용하여이를 분류 할 수 있습니다. 당신은 카프카 싱크,이 방법으로 생산 기록을 실행하여 작업을 수행 할 수 있습니다

val producerSink = Producer.plainSink(producerSettings) 

... 
     // inside the route 
     val producerRecord = 
      new ProducerRecord[Array[Byte], String]("topic1", "some message") 

     onComplete(Source.single(producerRecord).runWith(producerSink)) { _ => 
      complete(ToResponseMarshallable((StatusCodes.Created, u))) 
     } 
1

가 특정 질문에 대답하기 위해, scala-java8-compat 라이브러리는 java8와 스칼라 선물 사이의 컨버터를 제공한다.

특히, 당신은 아마 당신에게 최고의 결과를 얻을 것이다 (위 스테파노에 의해 제안) 스칼라 친화적 인 API 자체가 클라이언트 라이브러리를 사용하여, 그러나 java.util.concurrent.Futurescala.concurrent.Future

을 변환 FutureConverters.toScala(producer.send(producerRecord))를 사용할 수 있습니다.