2014-12-20 17 views
2

채팅 응용 프로그램이 있다고 가정 해 봅시다.특정 시간에 메시지를 정확하게 처리하십시오.

클라이언트가 일부 액터에 명령을 내린 채팅 메시지를 보냅니다. 이제는 그가 작성한 내용을 즉시 처리하고이 채팅에서 다른 사용자가 사용할 수있게하려고하므로이 명령을 처리합니다. 동시에 나는이 메시지를 채팅 기록 데이터베이스에 저장해야하지만 (지금 배우가 아니라) 스스로에게 말하고 싶다. 2 분마다 데이터베이스에 저장해야합니다. 그리고 충돌이 발생했다면 어쨌든 데이터베이스에 지속될 수 있어야합니다.

  1. 사용자 우리는 모든 사람에게이 메시지를 방송
  2. 채팅 룸 배우가이 메시지
  3. 으로 명령을받은 메시지를 보내고 어떤 종류의이 메시지를 추가

    나는 흐름이처럼 가정 대기열을 채팅 내역 데이터베이스에 보관합니다.

  4. 2 분 시간 제한이 경과하면 일부 지속 명령이 실행됩니다. 아직 도착하지 않은 모든 수신 채팅 메시지를 수집합니다.
  5. 모든 메시지와 함께 트랜잭션을 실행 한 다음 대기열에서 메시지를 제거합니다.
  6. 3 이후에 충돌이 발생하여 메시지가 지속되지 않으면 다시 유지해야합니다. 그들이 지속 되었다면 결코 그들을 다시 유지하려고 노력해서는 안됩니다.

Akka에서 어떻게 만드나요? 어떤 기능을 사용해야합니까/어떤 패턴을 사용해야합니까?

+2

http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2에서 확인하십시오. 실제로 사용 사례 # 4-5입니다. – abatyuk

답변

5

하나의 (조정자)는 채팅 명령에 대한 알림을 클라이언트에 보냅니다. 또 다른 (throttler) - 데이터를 2 분마다 데이터베이스로 푸시합니다. @abatyuk 말했듯이 또한 FSM-based implementation을 사용할 수 있습니다

class Coordinator extends Actor { 
    def receive = { 
    case command: Record => 
      broadcast(command) 
      throttler ! command 
    } 
} 


class Throttler extends Actor { 

    import system.dispatcher 

    val queue = mutable.List[Record] //it may be a cache instead 

    def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html 


    def receive = { 
     case Start => schedule 
     case command: Record => 
      queue ++= command 
     case Tick => 
      schedule 
      try { 
      //---open transaction here--- 
      for (r <- queue) push(r) 
      //---close transaction here--- 
      queue.clear //will not be cleared in case of exception 
      } catch {...} 
    } 
} 

: 귀하의 큐는 throttler의 단지 내부 상태가 될 것입니다.

메일 박스의로드를 줄여야하는 경우 Akka Work Pulling과 같은 배압 /로드 밸런싱 패턴을 시도해 볼 수 있습니다.

서버 노드 중 일부가 실패 할 경우 대기열 상태를 복구하기 위해 노드 자체를 보호하려면 Akka 클러스터를 사용하여 (수동으로) 대기열의 상태를 복제 할 수 있습니다. 이 경우 코디네이터는 Cluster Singleton이어야하며 무작위 배우 (버스를 사용할 수도 있음)에게 틱을 보내고 관리자로서 성공과 실패를 유지해야합니다. 수퍼바이저 상태가 손실 될 수 있으므로 노드를 통해 복제해야하므로 2 분 간격으로 상태를 병합해야하므로 과 같은 병합으로 인해 SortedSet을 대기열에 사용하는 것이 좋습니다.

Riak과 같은 저장소는 이미 clusterization problem을 해결하는 간단한 방법을 제공하므로 대기열로 사용할 수 있습니다 (조정자와 조절기는 모두 "상태 비 저장"싱글 톤입니다). Riak의 경우 데이터 병합이 문제가되지 않으므로 Available + Partitioning (CAP 정리)을 구성 할 수 있습니다. 대화 기록은 CRDT(conflict-free) 데이터 유형입니다.

또 다른 해결책은 Throttler로 WriteBehind 모드 (2 분마다 실행되도록 구성)가있는 캐시입니다.

이벤트 소싱을 통해 액터의 상태를 보호 할 수도 있지만 복원 후 모든 작업을 다시 실행해야 할 때 더욱 유용합니다 (필요하지 않은 경우 데이터베이스로 모든 항목이 다시 제출됩니다). 스냅 샷을 사용할 수 있습니다 (캐시를 직접 사용하는 것과 거의 같습니다).하지만 가용성에 신경 쓰면 로컬 FS 대신 캐시에 저장하는 것이 좋습니다 (SnapshotStore를 구현하여). 저장소 크기를 줄이려면 이전에 저장 한 스냅 샷을 삭제해야 할 수도 있습니다. 또한 상태를 잃어 버리지 않도록 모든 레코드를 동 기적으로 저장해야합니다.

P. 보낸 사람에게 (자바 스크립트로) 메시지를 확인하는 것을 잊지 마시고, 큐를 캐시로 사용해도 마지막 메시지 (사서함)를 잃어 버릴 수 있습니다.

P.S/2 데이터베이스가 느린 데다 사용할 수 없으므로 거의 항상 배우 상태의 지속성에 대한 나쁜 해결책입니다. MongoDB와 같이 강력하고 일관된 NoSQL 솔루션을 권장하지 않습니다. 궁극적 인 일관성이 최선의 선택입니다.

+0

기존 Akka 지속성을 활용할 수있는 방법이 있습니까? (이벤트 등) 큐 상태를 보호하려면? – bobby

+0

Akka Persistence (saveSnapshot)를 사용할 수 있지만 문제는 DB와 동일합니다. 속도가 꽤 느릴 수 있습니다. 또한 상태를 잃어 버리지 않도록 모든 레코드를 동 기적으로 저장해야하며 아마도 DB 또는 캐시 (로컬 파일이 신뢰할 수없는 경우 일 수도 있음)로 보내야하므로 실제로 큰 차이는 없습니다. 잠재적 인 가용성 문제는 해결되지 않을 것입니다 (여전히 클러스터가 필요합니다). – dk14

+0

deleteMessages를 사용하여 모든 작업을 다시 수행 할 수 없습니까? 그럼 saveSnapshot이 필요 없어. – bobby