2017-02-16 3 views
3

서버에 일련의 요청을 사용하고 싶지만 서버의 초당 제한 요청 비율은 10입니다. 루프에서 요청을 만들려고하면 모든 요청이 동시에 발생하기 때문에 속도 제한에 도달합니다.RxJS를 사용하여 무손실로 요청을 제한하는 방법 5

for(let i = 0; i < 20; i++) { 
    sendRequest(); 
} 

ReactiveX에는 관찰 가능한 스트림을 수정하기위한 많은 도구가 있지만 속도 제한을 구현할 도구를 찾지 못하는 것 같습니다. 나는 표준 지연을 추가하려고 시도했지만, 요청은 여전히 ​​이전에했던 것보다 100ms 늦게 같은 시간에 실행됩니다.

const queueRequest$ = new Rx.Subject<number>(); 

queueRequest$ 
    .delay(100) 
    .subscribe(queueData => { 
    console.log(queueData); 
    }); 

const queueRequest = (id) => queueRequest$.next(id); 

function fire20Requests() { 
    for (let i=0; i<20; i++) { 
    queueRequest(i); 
    } 
} 

fire20Requests(); 
setTimeout(fire20Requests, 1000); 
setTimeout(fire20Requests, 5000); 

debounceTimethrottleTime 운영자는 나뿐만 아니라 무엇을 찾고 있어요 유사하지만, 그 대신 손실의 손실이다. 이전 요청을 폐기하는 대신 모든 요청을 보존하고 싶습니다.

... 
queueRequest$ 
    .debounceTime(100) 
    .subscribe(queueData => { 
    sendRequest(); 
    }); 
... 

ReactiveX 및 Observables를 사용하여 속도 제한을 초과하지 않고 서버에 요청하는 방법은 무엇입니까?

답변

2

OP의 self answer (및 linked blog)의 구현은 항상 이상적이지 않은 지연을 부과합니다.

속도 제한 서비스가 초당 10 개의 요청을 허용하는 경우 다음 요청이 990 밀리 초 동안 이루어지지 않는 한 10 밀리 초 단위로 10 개의 요청을 처리 할 수 ​​있어야합니다.

아래의 구현은 제한이 적용되도록 지연을 적용하고 지연은 제한을 초과하는 요청에만 적용됩니다.

bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize) 

이것은 우리가 우리가 방출됩니다 의미 bufferTime(1000, null, 10)를 사용할 수 있다는 것을 의미 :

function rateLimit(source, count, period) { 
 

 
    return source 
 
    .scan((records, value) => { 
 

 
     const now = Date.now(); 
 
     const since = now - period; 
 

 
     // Keep a record of all values received within the last period. 
 

 
     records = records.filter((record) => record.until > since); 
 
     if (records.length >= count) { 
 

 
     // until is the time until which the value should be delayed. 
 

 
     const firstRecord = records[0]; 
 
     const lastRecord = records[records.length - 1]; 
 
     const until = firstRecord.until + (period * Math.floor(records.length/count)); 
 

 
     // concatMap is used below to guarantee the values are emitted 
 
     // in the same order in which they are received, so the delays 
 
     // are cumulative. That means the actual delay is the difference 
 
     // between the until times. 
 

 
     records.push({ 
 
      delay: (lastRecord.until < now) ? 
 
      (until - now) : 
 
      (until - lastRecord.until), 
 
      until, 
 
      value 
 
     }); 
 
     } else { 
 
     records.push({ 
 
      delay: 0, 
 
      until: now, 
 
      value 
 
     }); 
 
     } 
 
     return records; 
 

 
    }, []) 
 
    .concatMap((records) => { 
 

 
     const lastRecord = records[records.length - 1]; 
 
     const observable = Rx.Observable.of(lastRecord.value); 
 
     return lastRecord.delay ? observable.delay(lastRecord.delay) : observable; 
 
    }); 
 
} 
 

 
const start = Date.now(); 
 
rateLimit(
 
    Rx.Observable.range(1, 30), 
 
    10, 
 
    1000 
 
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

+0

참고로이 답변을 업데이트했습니다. 'until'계산은 최적이 아니 었습니다. 그것은 '지금'이 아니라 첫 번째 기록을 기반으로해야합니다. – cartant

1

This blog post는 RxJS 이벤트를 폐기에 좋은 것을 설명하는 훌륭한 일을하지, 그들은 대답에 온 방법,하지만 궁극적으로 당신이 찾고있는 코드는 다음과 같습니다

queueRequest$ 
    .concatMap(queueData => Rx.Observable.of(queueData).delay(100)) 
    .subscribe(() => { 
    sendRequest(); 
    }); 

concatMap가 추가

는을 연결 관측 가능한 스트림의 뒤쪽에 새롭게 관찰 가능하게 생성됨. 또한 delay을 사용하면 이벤트를 100ms 뒤로 푸시하여 초당 10 회의 요청을 허용합니다. You can view the full JSBin here, which logs to the console instead of firing requests.

1

는 사실, bufferTime() 연산자와 세 개의 인수를 사용하여이 작업을 수행하는 쉬운 방법이 max 1 이후의 버퍼는 또는입니다. null은 현재 버퍼가 방출 된 직후에 새로운 버퍼를 열기를 원함을 의미합니다. 당신은 다른 초기 지연 실험 할 수 https://jsbin.com/mijepam/19/edit?js,console

:

function mockRequest(val) { 
    return Observable 
    .of(val) 
    .delay(100) 
    .map(val => 'R' + val); 
} 

Observable 
    .range(0, 55) 
    .concatMap(val => Observable.of(val) 
    .delay(25) // async source of values 
    // .delay(175) 
) 

    .bufferTime(1000, null, 10) // collect all items for 1s 

    .concatMap(buffer => Observable 
    .from(buffer) // make requests 
    .delay(1000) // delay this batch by 1s (rate-limit) 
    .mergeMap(value => mockRequest(value)) // collect results regardless their initial order 
    .toArray() 
) 
    // .timestamp() 
    .subscribe(val => console.log(val)); 

라이브 데모를 참조하십시오.

[ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ] 
[ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ] 
[ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ] 
[ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ] 
[ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ] 
[ 'R50', 'R51', 'R52', 'R53', 'R54' ] 

을하지만 우리는 1 초 지연에 의해 제한하고 있기 때문에 .delay(175)으로 우리는 이하 10 개 항목의 배치를 방출합니다 : 만 25ms으로 요청이 (10)에 의해 일괄 적으로 전송됩니다.

[ 'R0', 'R1', 'R2', 'R3', 'R4' ] 
[ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ] 
[ 'R11', 'R12', 'R13', 'R14', 'R15' ] 
[ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ] 
[ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ] 
[ 'R28', 'R29', 'R30', 'R31', 'R32' ] 
[ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ] 
[ 'R39', 'R40', 'R41', 'R42', 'R43' ] 
[ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ] 
[ 'R50', 'R51', 'R52', 'R53', 'R54' ] 

그러나 필요한 것은 하나의 차이가 있습니다. 이 솔루션은 .bufferTime(1000, ...)delay(1000) 때문에 2 초 지연 후에 값을 처음에 시작합니다. 다른 모든 배출은 1 초 후에 발생합니다.

당신은 결국 사용할 수 있습니다

.bufferTime(1000, null, 10) 
.mergeAll() 
.bufferCount(10) 

이 10 개 항목을 항상 수집하고 후에 만이 요청을 수행하는거야. 이것은 아마도 더 효율적일 것입니다.

2

라이브러리를 작성하여 간격 당 최대 요청 수를 설정하면 구독을 지연하여 관찰 가능 횟수를 제한합니다. 테스트를 거쳐 예제가 제공됩니다. https://github.com/ohjames/rxjs-ratelimiter