2017-02-21 13 views
0

단일 파일에 결과를 기록하기 위해 콜백과 함께 제공되는 수백 개의 동시 http-kit.client/get 요청을 시작합니다.동시 http-kit/get 인스턴스 내에서 i/o 콜백을 사용하는 가장 쉬운 방법

스레드 안전성을 처리하는 좋은 방법은 무엇입니까? chan<!!core.asyc에서 사용 하시겠습니까? 당신의 통찰력에 대한

(defn launch-async [channel url]                                 
    (http/get url {:timeout 5000                                 
       :user-agent "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:10.0) Gecko/20100101 Firefox/10.0"}            
      (fn [{:keys [status headers body error]}]                            
      (if error                                   
       (put! channel (json/generate-string {:url url :headers headers :status status}))                 
       (put! channel (json/generate-string body))))))                          

(defn process-async [channel func]                                
    (when-let [response (<!! channel)]                                
    (func response)))                                   

(defn http-gets-async [func urls]                                
    (let [channel (chan)]                                   
    (doall (map #(launch-async channel %) urls))                             
    (process-async channel func)))  

감사 :

는 여기에 내가 고려할 것입니다 코드입니다.

답변

3

예를 들어 이미 core.async를 사용하고 있으므로 몇 가지 문제와 해결 방법을 지적 할 수 있습니다. 다른 대답은 좀 더 기본적인 접근법을 사용하고 있으며, 나는 더 간단한 접근법만으로도 진심으로 동의합니다. 그러나 채널을 사용하면 벡터를 통한 매핑이 필요하지 않은 데이터를 쉽게 사용할 수 있습니다. 응답이 많으면 시간이 지남에 따라 커질 수 있습니다. 다음 문제를 고려하고 해결할 수있는 방법 :

(1) URL 목록에 1024 개가 넘는 요소가있는 경우 현재 버전이 다운됩니다. 풋 및 테이크의 내부 버퍼는 비동기 적입니다 (즉, put!take!은 차단하지 않지만 항상 즉시 반환합니다). 제한은 1024입니다. 이는 채널의 제한없는 비동기 사용을 방지하기위한 것입니다. 직접 확인하려면 (http-gets-async println (repeat 1025 "http://blah-blah-asdf-fakedomain.com"))으로 전화하십시오.

할 일이 있다면 할 일이있을 때만 채널에 뭔가를 넣는 것입니다. 이것을 역압이라고합니다. go block best practices에있는 우수한 위키에서 페이지를 가져 와서 http-kit 콜백에서 이것을 수행하는 영리한 방법은 put! 콜백 옵션을 사용하여 다음 http get을 시작하는 것입니다.

(defn launch-async 
    [channel [url & urls]] 
    (when url 
    (http/get url {:timeout 5000 
        :user-agent "Mozilla"} 
       (fn [{:keys [status headers body error]}] 
       (let [put-on-chan (if error 
            (json/generate-string {:url url :headers headers :status status}) 
            (json/generate-string body))] 
        (put! channel put-on-chan (fn [_] (launch-async channel urls)))))))) 

(2) 다음으로, 당신은 하나 개의 응답을 처리하는 것 같다 다음 put! 즉시 성공하면 당신은 당신이 채널의 버퍼 넘어 갈 수있는 상황이 결코 있도록이 만 발생합니다.

(defn process-async 
    [channel func] 
    (go-loop [] 
    (when-let [response (<! channel)] 
     (func response) 
     (recur)))) 

(3) 여기 http-gets-async 기능입니다 : 대신, 이동 루프를 사용합니다. 나는 당신이 처음에 요청의 좋은 버스트를 해고 도움이 될 것으로, 여기에 버퍼를 추가로 더 피해를 볼 : 이제

(defn http-gets-async 
    [func urls] 
    (let [channel (chan 1000)] 
    (launch-async channel urls) 
    (process-async channel func))) 

, 당신은 백으로, 무한 개수의 URL을 처리 할 수있는 능력을 가지고 압력. 이를 테스트하려면 카운터를 정의한 다음 처리 기능이이 카운터를 증가시켜 진행 상황을 확인하십시오. 이 같이

(def responses (atom 0)) 
(http-gets-async (fn [_] (swap! responses inc)) 
       (repeat 1000000 "http://localhost:8000")) 

모든 비동기, 함수는 즉시 반환됩니다에 쾅하기 쉬운 로컬 호스트 URL을 사용하여 (요청 수천 개의 등으로 말하자면, 구글, 수백을 발사 권하고 싶지 않다) 그리고 당신은 @responses 성장 볼 수 있습니다.

처리 기능을 process-async에서 실행하는 대신 다른 흥미로운 일은 채널 자체의 변환기로 선택적으로 적용 할 수 있습니다.

(defn process-async 
    [channel] 
    (go-loop [] 
    (when-let [_ (<! channel)] 
     (recur)))) 

(defn http-gets-async 
    [func urls] 
    (let [channel (chan 10000 (map func))] ;; <-- transducer on channel 
    (launch-async channel urls) 
    (process-async channel))) 

은 채널 닫히고 (위, 그것은 열려 있습니다) 있도록 구축을 포함하여이 작업을 수행하는 방법에는 여러 가지가 있습니다. 원한다면 java.util.concurrent 프리미티브가 있습니다. 사용하기 쉽습니다. 가능성은 매우 다양합니다.

+0

큰 URL 시퀀스가 ​​제공되면 'launch-asyc'이 오버플로 될 것으로 예상 했었습니다. 왜 그렇지 않은가? (그것이 공식적인 제안으로 주어 졌기 때문에 나는 그것을 추측한다). 감사합니다 – user3639782

+0

또 다른 문제. 그것은 repl (boot repl)에 코드를 보내는 한 파일에 씁니다. 그러나 같은 코드를'-main' 함수 안에 넣고 스크립트로 실행할 때 아무 일도 일어나지 않습니다. 그런 식으로 행동해야합니까? 감사합니다. – user3639782

+1

@ user3639782 'repeat'함수에 의해 생성 된 시퀀스에서 가져온 각 요소는 "주문형"으로 생성됩니다. 즉, 시퀀스가 ​​게으르고 실제로 무한 할 수 있습니다. 그래서, url의 목록은 글자 그대로 메모리를 거의 사용하지 않습니다. 다른 질문에 대해서는 파일에 쓰는 것이 무슨 뜻인지 잘 모르겠습니다. – Josh

1

이것은 core.async를 사용하지 않을 정도로 간단합니다. 저장하는 아톰에 응답의 벡터를 사용하고 이렇게하면 모든 응답을 볼 때까지 원자의 내용을 읽는 별도의 스레드를 갖게됩니다. 그런 다음 http-kit 콜백에서 swap!을 원자에 직접 응답 할 수 있습니다.

core.async를 사용하려면 http-kit 스레드 풀을 차단하지 않도록 버퍼링 된 채널을 권장합니다.