2016-09-17 9 views
2

나는 수천 개의 소스로부터 이벤트를 받고있는이 시나리오를 가지고있다. 각 소스는 현재 상태에 대한 정보를 전송합니다. 모든 이벤트를 처리하고 싶지만 각 소스의 최신 이벤트를 먼저 처리하여 현재보기가 최신 상태인지 확인하는 것이 더 중요합니다. 그래서 ConcurrentHashMap을 각 소스의 식별자를 키로 사용하고 LIFO 대기열 (스택)을 값으로 사용하려고 생각했습니다. 그런 다음 Map의 키를 반복하고 각 소스의 스택에서 한 항목 만 팝합니다.다수의 생산자 키로부터 공정한 dequeing을 가진 큐

키를 반복하고 각 키의 큐에서 항목을 가져 오는 동안 제작자가 대기열에 새 이벤트를 게시하여 잠재적으로 동시성 문제가 발생할 수 있다는 점이 걱정됩니다. 제작자는지도에 새 키를 추가 할 수도 있으며 MapentrySet을 반복하여 약하게 일치하는 것으로 보입니다. 새 항목이 후속 반복에서 처리되므로 큰 문제는 아닙니다. 이상적으로는 프로세스 속도를 높이기 위해 entrySet 스트림에서 일부 병렬 처리를 사용할 수도 있습니다.

더 깨끗한 접근 방법이 있는지 궁금합니다. 실제로 LIFO BlockingDequeue을 사용하여 최신 이벤트를 처리 할 수 ​​있었지만이 접근법의 문제점은 한 소스가 다른 이벤트보다 많은 이벤트를 보낼 수 있으므로 다른 이벤트보다 많은 이벤트가 처리 될 위험이 있다는 것입니다.

이러한 종류의 동작을 제공하는 다른 데이터 구조가 있습니까? 본질적으로 내가 찾고있는 것은 각 소스의 이벤트에 우선 순위를 부여하는 동시에 소비자가 처리 할 각 소스에 대한 공정한 기회를 제공하는 방법입니다.

답변

0

LIFO 대기열의 FIFO 대기열에 대해 생각하십니까? 각 소스는 LIFO 대기열에 추가하고 처리를 위해 FIFO 대기열에서 첫 번째 LIFO 대기열을 가져 와서 하나의 이벤트를 처리 한 다음 다시 FIFO 대기열에 넣습니다. LIFO 대기열이 단순히 FIFO 대기열에 추가되기 때문에이 방법으로도 새로운 소스에 아무런 문제가 없어야합니다.

이벤트를 올바른 LIFO 대기열에 추가하려면 소스 당 대기열을 알고있는 추가 HashMap을 유지 관리 할 수 ​​있으며 아직지도에없는 새 소스가 발생하면 LIFO 대기열을 FIFO 대기열.

+0

이것은 흥미 롭습니다. 그러나 문제는 특정 소스에 들어올 때 스택 (LIFO 대기열)에 새 이벤트를 추가 할 수 있어야한다는 것입니다. 즉, 대기열에서 해당 스택을 찾아야합니다. 원본의 LIFO 큐에 소스 키를 매핑하는 추가'ConcurrentHashMap '을 사용하여이 문제를 해결할 수있을 것이라고 생각합니다. – jbx

+0

예, 소스가 이벤트를 넣을 위치를 모르는 경우 HashMap이 수행해야합니다. – Vampire

+0

그래, 그래서 내가 쌓아 올리려는 것은 외부에서 Queue처럼 보이고 그냥 추가하고 제거하는 데이터 구조이지만, 내부적으로는 그에 따라 우선 순위 지정의 논리를하고있다. 귀하의 제안을 해시 맵과 결합 해 보겠습니다. – jbx

0

특히 유스 케이스의 유연성과 속도를 높이기 위해 직접 구조를 구현하는 것이 좋습니다.

순환 대기열을 사용하여 각 LIFO 대기열 (스택)을 저장하고 싶습니다. 순환 대기열은 꼬리에 요소를 추가하고 머리에서 읽지 만 (제거하지는 않음) 대기열입니다. 일단 head = tail이면 다시 시작합니다.

간단한 배열을 사용하여 자체 대기열을 만들 수 있습니다. 어레이에 대기열을 추가하는 등 작업을 중심으로 동기화를 관리하는 것은 그리 어렵지 않습니다. 필요한 경우 확장합니다. 배열에 대기열을 추가하는 것이 자주하는 일이 아니라고 생각합니다.

이것은 관리하기 쉽고 순환 대기열을 확장하여 항목에 액세스하는 빈도를 계산하고 항목에 대한 액세스 빈도를 조절할 수 있습니다 (소비자 스레드를 추가/제거하거나 조금만 기다리게 할 수도 있음). 엔트리에 의해 관리되는 스택에서 소비되기 전에).

다중 스레드를 사용하여 순환 대기열에서 요소를 읽을 때 스택에서 소비하기 전에 "레지스터"연산을 호출하여 스레드를 잠글 수 없습니다. 각 스레드는 ID를 가지며 ID를 "등록"합니다. 주어진 대기열 항목에 저장됩니다. 등록하기 전과 스택에서 팝하기 전에 스레드는 "등록 ID 읽기"작업을 수행하고 반환되는 ID는 자체 ID와 일치해야합니다.이는 주어진 대기열 항목을 "소유"하는 스레드 만 해당 스택에서 팝핑 할 수 있음을 의미합니다. 등록 프로세스의 등록/확인이 실패하면 다른 스레드가 해당 항목을 사용하고 있음을 의미하므로 현재 스레드는 다음 사용 가능한 항목으로 이동합니다.

저는 과거에 이런 종류의 전략을 사용 해왔고 매력처럼 확장되었습니다. 나는 이것이 당신에게 의미가 있기를 바랍니다.