2017-09-15 18 views
0

6 천 개의 URL을 크롤링하는 응용 프로그램이 있습니다.이 작업을 최소화하기 위해 모든 크롤링 할 URL의 ConcurrentLinkedQueue를 사용하는 RecursiveTask를 만들었습니다. 그것은 50까지 분할하고, 만약 그 큐가 비어 있다면 직접 크롤링하지만, 그렇지 않다면, 먼저 자신의 새로운 인스턴스를 생성하고 그것을 포크 (fork) 한 다음, 50의 서브 세트를 크롤링 한 후에 그 포크 된 타스크에 참여하게됩니다.ForkJoinFramework는 2 명의 직원 만 사용합니다

이제는 각 스레드가 자신의 모든 50 가지 작업을 동시에 수행 할 때까지 내 문제가 발생합니다. 그러나 2 명이 일을 멈추고 합류하기를 기다리고 다른 두 사람 만이 일하고 새로운 포크를 만들고 크롤링 페이지를 만듭니다.

이를 시각화하려면 스레드에서 mouch URL을 얼마나 크롤링하고 JavaFX GUI에 표시할지 지정합니다.

ForkJoinFramewok은 허용 된 네 개의 스레드 중 두 개만 사용하므로 잘못되었습니다. 그것을 바꾸려면 어떻게해야합니까? 여기

는 작업의 내 계산 방법 :

LOG.debug(
     Thread.currentThread().getId() + " Starting new Task with " 
      + urlsToCrawl.size() + " left." 
    ); 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < urlsToCrawl.size() && i < config.getMaximumUrlsPerTask(); i++) 
    { 
     urlsToCrawlSubset.offer(urlsToCrawl.poll()); 
    } 
    LOG.debug(
     Thread.currentThread().getId() + " Crated a Subset with " 
     + urlsToCrawlSubset.size() + "." 
    ); 
    LOG.debug(
     Thread.currentThread().getId() 
     + " Now the Urls to crawl only left " + urlsToCrawl.size() + "." 
    ); 

    if (urlsToCrawl.isEmpty()) 
    { 
     LOG.debug(Thread.currentThread().getId() + " Crawling the subset."); 
     crawlPage(urlsToCrawlSubset); 
    } 
    else 
    { 
     LOG.debug(
      Thread.currentThread().getId() 
       + " Creating a new Task and crawling the subset." 
     ); 
     final AbstractUrlTask<T, D> otherTask = createNewOwnInstance(); 
     otherTask.fork(); 
     crawlPage(urlsToCrawlSubset); 
     taskResults.addAll(otherTask.join()); 
    } 
    return taskResults; 

을 그리고 여기에 내 그림의 스냅 샷입니다 : enter image description here

추신 80 개의 스레드를 허용하는 경우 모든 URL이 50 개 크롤링 될 때까지 두 개만 사용됩니다. https://github.com/mediathekview/MServer/tree/feature/cleanup

+1

있습니까? – algrid

+1

내가 github 코드의 산을 통과 할 수있는 방법은 없습니다. 도움이 필요하면 sscc 예제를 작성하십시오. http://sscce.org/ 여기에 언급 된 것처럼 join()은 스레드의 최대 50 %를 처리합니다. http://coopsoft.com/ar/Calamity2Article.html#join – edharned

+0

작업을 제출하는 코드를 표시 할 수 있습니까? 수영장에? –

답변

0

내가 그것을 고정 : 당신이 관심이 있다면

는 그리고, 여기에 완전한 소스 코드입니다. 내 잘못은 내가 그때 작은 protions을 작업하고 반으로 나눠 대신 기다렸다가 다른 반쪽 등 나머지와 함께 내 자신을 다시 호출

다른 말로하면 내가 직접 splitted하고 올바른 일을하기 전에 모두가 쪼개 질 때까지 쪼개서 일을 시작하는 것입니다. 여기

은 지금의 모습 내 코드입니다 : 당신은 otherTask.join()가 호출 맞아 있는지

@Override 
protected Set<T> compute() 
{ 
    if (urlsToCrawl.size() <= config.getMaximumUrlsPerTask()) 
    { 
     crawlPage(urlsToCrawl); 
    } 
    else 
    { 
     final AbstractUrlTask<T, D> rightTask = createNewOwnInstance(createSubSet(urlsToCrawl)); 
     final AbstractUrlTask<T, D> leftTask = createNewOwnInstance(urlsToCrawl); 
     leftTask.fork(); 
     taskResults.addAll(rightTask.compute()); 
     taskResults.addAll(leftTask.join()); 
    } 
    return taskResults; 
} 

private ConcurrentLinkedQueue<D> createSubSet(final ConcurrentLinkedQueue<D> aBaseQueue) 
{ 
    final int halfSize = aBaseQueue.size()/2; 
    final ConcurrentLinkedQueue<D> urlsToCrawlSubset = new ConcurrentLinkedQueue<>(); 
    for (int i = 0; i < halfSize; i++) 
    { 
     urlsToCrawlSubset.offer(aBaseQueue.poll()); 
    } 
    return urlsToCrawlSubset; 
}