2017-10-16 12 views
2

클러스터에서 Akka 스케줄러의 단일 인스턴스를 실행하려고합니다. 현재 내 스케쥴러는 내 로컬에서 정상적으로 작동하지만 예상대로 작동하지 않습니다. 스케줄러는 DB에서 주문을 선택하여 카프카 (Kafka) 주제로 푸시합니다.
akka 2.5.6 (Java)을 사용하고 있습니다. 나는 official Doc을 통해 갔지만 많은 도움을주지 못했습니다. 도움이 될 것입니다.클러스터에서 Akka Scheduler를 한 번 실행하는 방법은 무엇입니까?

public class OrderReprocessActor extends UntypedActor { 


    LoggingAdapter log = Logging.getLogger(getContext().system(), this); 
    OrderProcessorJdbcConnection orderProcessorJdbcConnection; 
    private final String SELECT_QUERY_TO_GET_FAILED_ORDER="SELECT * FROM ORDER_HISTORY WHERE ORDER_STATUS = ?"; 
    CommonPropsUtil commonPropsUtil; 

    final Cluster cluster = Cluster.get(getContext().system());   

    public static Props getProps() { 
     return Props.create(OrderReprocessActor.class); 
    } 

    @Inject 
    public OrderReprocessActor(OrderProcessorJdbcConnection orderProcessorJdbcConnection , CommonPropsUtil commonPropsUtil){ 
     this.orderProcessorJdbcConnection = orderProcessorJdbcConnection; 
     this.commonPropsUtil = commonPropsUtil; 
    } 

    @Override 
    public void onReceive(Object message) throws Throwable { 

     String failedStatus = (String) message; 
     List<OrderHistory> failedOrderList = getOrders(failedStatus); 
     pushOrderToKafka(failedOrderList); 
     String intervalSeconds = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.ORDER_REPROCESSOR_SCHEDULER_INTERVAL); 
     if(StringUtils.isNotEmpty(intervalSeconds)) 
     { 
      int interval = Integer.parseInt(intervalSeconds); 
      getContext().system().scheduler().scheduleOnce(Duration.create(interval, TimeUnit.SECONDS), 
        () -> { 
         getSelf().tell(failedStatus, ActorRef.noSender()); 
        }, getContext().system().dispatcher()); 
     } 
    } 

    /** 
    * This method takes the failedOrderList and pushes to Kafka Topic 
    * 
    */ 
    private void pushOrderToKafka(List<OrderHistory> failedOrders) { 

     log.info("Entering pushOrderToKafka()"); 
     String kafkaOrderTopic = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_SUBMIT_ORDER_TOPIC); 
     Properties props = getKafkaProperties(); 
     Producer<String, Order> producer = new KafkaProducer<>(props); 
     for (OrderHistory orderHistory : failedOrders) { 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       Order order = objectMapper.readValue(orderHistory.getOrderData().toString(),Order.class); 
       log.info("******************Order ID..."+orderHistory.getOrderId()); 
       producer.send(new ProducerRecord<String, Order>(kafkaOrderTopic, orderHistory.getOrderId(), order)).get(); 
      } catch (IOException e) { 
       log.error("IOException caught , message="+e.getMessage()); 
      } catch (InterruptedException e) { 
       log.error("InterruptedException caught , message="+e.getMessage()); 
      } catch (ExecutionException e) { 
       log.error("ExecutionException caught , message="+e.getMessage()); 
      } 
     } 
     producer.close(); 
     log.info("Exiting pushOrderToKafka()"); 
    } 

    /** 
    * This method return kafka connection properties 
    * @return 
    */ 
    private Properties getKafkaProperties() { 
     String kafkaBootStrapServers = commonPropsUtil.getCommonPropsValueForKey(CommonConstants.KAFKA_BOOTSTRAP_SERVERS); 
     Properties props = new Properties(); 
     props.put(CommonConstants.BOOTSTRAP_SERVERS, kafkaBootStrapServers); 
     props.put(CommonConstants.KEY_SERIALIZER, VZWCommonConstants.STRING_SERIALIZER); 
     props.put(CommonConstants.VALUE_SERIALIZER, VZWCommonConstants.ORDER_SERIALIZER); 
     return props; 
    } 

    /** 
    *This method get all the failed Order from DB 
    * @return List<OrderReprocessActor.OrderHistory> 
    * @throws SQLException 
    */ 
    private List<OrderReprocessActor.OrderHistory> getOrders(String failStatus) throws SQLException { 
     log.info("Entering getAllFailedOrdersFromDB()"); 
     Connection connection = orderProcessorJdbcConnection.getConnection(); 

     try { 
      PreparedStatement pstmt = connection.prepareStatement(SELECT_QUERY_TO_GET_FAILED_ORDER); 
      pstmt.setString(1,failStatus); 
      ResultSet rersultSet = pstmt.executeQuery(); 
      return getOrdersFromResultSet(rersultSet); 
     }catch (SQLException e){ 
      log.error("SQLException caught while fetching failed Order from DB"); 
      log.error(e.getMessage()); 
     }finally { 
       orderProcessorJdbcConnection.releaseConnection(connection); 
     } 
     log.info("Exiting getAllFailedOrdersFromDB()"); 
     return null; 
    } 

    /** 
    * Retrives order from sql result set 
    * @param rersultSet 
    * @return 
    * @throws SQLException 
    */ 
    private List<OrderHistory> getOrdersFromResultSet(ResultSet rersultSet) throws SQLException { 
     List<OrderReprocessActor.OrderHistory> failedOrderList = new ArrayList<>(); 
     while(rersultSet.next()){ 
      String orderId = rersultSet.getString("order_id"); 
      String orderData = rersultSet.getString("order_data"); 
      OrderHistory orderHistory = new OrderHistory(); 
      orderHistory.setOrderId(orderId); 
      orderHistory.setOrderData(orderData); 
      failedOrderList.add(orderHistory); 
     } 
     return failedOrderList; 
    } 

    public static class OrderHistory{ 

     private String orderId; 
     private String orderData; 
     public String getOrderId() { 
      return orderId; 
     } 
     public void setOrderId(String orderId) { 
      this.orderId = orderId; 
     } 
     public String getOrderData() { 
      return orderData; 
     } 
     public void setOrderData(String orderData) { 
      this.orderData = orderData; 
     } 
    } 


} 

답변

0

귀하의 OrderReprocessActorcluster singleton으로 지정하십시오. 문서에서 :

클러스터 싱글 톤 패턴은 akka.cluster.singleton.ClusterSingletonManager에 의해 구현됩니다. 모든 클러스터 노드 중 하나의 싱글 톤 액터 인스턴스 또는 특정 역할로 태그가 지정된 노드 그룹을 관리합니다. ClusterSingletonManager는 클러스터에서 모든 노드 또는 지정된 역할을 가진 모든 노드에서 시작될 것으로 예상되는 액터입니다. 실제 싱글 톤 액터는 제공된 Props에서 자식 액터를 생성하여 가장 오래된 노드의 ClusterSingletonManager에 의해 시작됩니다. ClusterSingletonManager는 최대 하나의 싱글 톤 인스턴스가 어느 시점에서 실행되고 있는지 확인합니다.

+0

안녕하세요. 문제의 링크를 살펴 보았습니다. 그러나 나는 내가 가진 것과 통합하는 방법을 이해할 수 없다. 이론적으로 나는 그 방법이 무엇인지 이해하지만 코드가 현명한 것은 분명하지 않습니다. –

-1

는 클러스터 싱글은과 같이 생성됩니다

final ClusterSingletonManagerSettings settings = 
    ClusterSingletonManagerSettings.create(system); 

system.actorOf(
    ClusterSingletonManager.props(
    Props.create(Consumer.class,() -> new Consumer(queue, testActor)), 
    TestSingletonMessages.end(), 
    settings), 
    "consumer"); 

Akka는 배우 만 클러스터의 리더 노드에 생성되어 있는지 확인합니다.

는 싱글 배우를 사용하려면 해당 경로를 통해에 대한 프록시 요청 :

ClusterSingletonProxySettings proxySettings = 
    ClusterSingletonProxySettings.create(system); 

ActorRef proxy = 
    system.actorOf(ClusterSingletonProxy.props("/user/consumer", proxySettings), 
    "consumerProxy"); 

이러한 예는 the docs에서 적응하고 있습니다.

+0

Consumer.class가 포함 된 패키지와 대기열 유형을 알려주시겠습니까? 내 코드에 대기열이 없습니다. –

+0

코드 복사 만 복사하면 평판이 좋은 사용자가 기대하는 바가 아닙니다. –

+0

당신은 이미 메시지를 스케줄하는 방법을 알고 있고, 클러스터 싱글 톤에 메시지를 작성하고 보내는 방법에 대한 추가 세부 사항을 통해 모두 함께 문자열 화하는 방법을 명확히해야합니다. 질문에 정확하게 작동하지 않는 방법을 지정하지 않았습니다. 어쩌면 그것은 당신에게 더 나은 대답을 줄 수 있습니다. – Synesso