2017-09-26 6 views
0

내 질문은, 나는 1000 레코드의 데이터 집합이있다. 나는이 같은 데이터를 처리하는 3 개의 쓰레드를 원한다. thread1은 레코드 1에서 300까지, thread2는 301에서 600으로 걸린다. 하나의 스레드가 요청을하고 한 번에 50 개의 레코드를 가져 와서 개체를 만들어 큐에 넣을 수 있습니다. 주 스레드가 대기열에서 동시에 데이터를 읽습니다.메인 스레드 소비자 및 기타 스레드 생산자

아래 코드는 내가 겪고있는 문제는 recordRead 변수가 스레드가 레코드 읽기를 시작해야하는 시작 지점을 알려줍니다. 하지만 각 스레드마다 다른 값을 설정할 수 있습니다 예를 들어 thread1에 대해 0이어야하며 recordsToRead는 300이어야하며 thread2의 경우 recordRead는 300이어야하며 300 + 300 = 600이되어야하며 마지막 스레드의 경우 recordsToRead는 600이어야합니다. 끝. pagesize = 50 pagesize, recordRead 및 recordToRead는 모두 주 클래스 및 주 스레드에 속하는 변수입니다.

ExecutorService service = Executors.newFixedThreadPool(nThreads); 
    while(nThreads > 0) { 
     nThreads--; 
     service.execute(new Runnable() { 

      @Override 
      public void run() { 
       // TODO Auto-generated method stub 

       do { 
        int respCode = 0; 
        int RecordsToRead = div; 
        JSONObject jsObj = new JSONObject(); 
        jsObj.put("pagesize", pageSize); 
        jsObj.put("start", recordsRead); 
        jsObj.put("searchinternalid", searchInternalId); 

        try { 
         boolean status = req.invoke(jsObj); 
         respCode = req.getResponseCode(); 

        } catch (Exception e) {   
         req.reset(); 
         e.printStackTrace(); 
         return true; 
        } 
        JSONObject jsResp = req.getResponseJson(); 
        //here jsResp will be added to ArrayBlockingQueue. 

        req.reset(); 
       }while(!isError && !isMaxLimit && recordsRead < RecordsToRead); 

      } 

     }); 
    } 

다음 루프는 대기열을 읽는 주 스레드 코드입니다. 어떻게 모든 스레드에 대해 recordsRead 및 recordToread를 설정할 수 있습니까?

최소한 하나의 스레드가 객체를 대기열에 삽입 할 때까지 주 스레드를 대기시키는 방법.

+0

'Runnable'의 하위 클래스를 클래스 ctor 매개 변수로 시작 위치 (및 그 밖의 것)를 사용하여 만들 수 있습니다. –

답변

0

나는 두 가지 문제점을 정의했다. 첫 번째 문제는 병렬 청크 계산을 수행하고 두 번째는 연속 파이프 체크를 생성하는 것입니다. 첫 번째 문제부터 시작할 수 있습니다. 사전 정의 된 크기로 병렬 계산을 수행하려면 fmpv가 fork-join 프레임 워크를 사용하는 것이 가장 좋습니다. 성능뿐만 아니라 (작업 도용은 실제로 효과적이기도하지만)보다 간단한 코드로 인해 발생합니다. 그러나 당신이 나에게 3 개의 쓰레드로 제한되어 있기 때문에 쓰레드를 직접 사용하는 것이 타당한 것처럼 보인다. 원하는대로 간단하게 다음과 같이 구현할 수 있습니다.

final int chunkSize = 300; 
    //you can also use total amount of job 
    //int totalWork = 1000 and chunk size equals totalWork/threadsNumber 
    final int threadsNumber = 3; 

    Thread[] threads = new Thread[threadsNumber]; 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     final int i = ii; 

     threads[ii] = new Thread(() -> { 
      //count your variable according the volume 
      // for example you can do so 
      int chunkStart = i * chunkSize; 
      int chunkEnd = chunkStart + chunkSize; 
      for(int j = chunkStart; j < chunkEnd; j++) { 
       //object creation with necessary proprs 
       //offer to queue here 
      } 
     }); 

     threads[ii].start(); 
    } 

    //your code here 
    //take here 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     try { 
     //this part is only as example 
     //you do not need it     
     //here if you want you can also w8 for completion of all threads 
      threads[ii].join(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

이제 소비에 관한 두 번째 문제가 있습니다. 이 puprose는 ConcurrentLinkedBlockingQueue (http://www.jgroups.org/javadoc/org/jgroups/util/ConcurrentLinkedBlockingQueue.html)와 같이 사용할 수 있습니다. 생산자 스레드에서 제안을하고 메인에서 테이크 메소드를 사용하십시오.

솔직히 말해서 나는 아직도 당신의 문제의 이유를 얻지 못했습니다. 연속 파이프 라인을 생성 하시겠습니까? 아니면 일회성 계산입니까?

또한이 코스를 수강하는 것이 좋습니다 : https://www.coursera.org/learn/parallel-programming-in-java/home/welcome. 이 정보는 문제 해결에 도움이되며 다양한 솔루션을 제공합니다. 동시 및 분산 컴퓨팅 코스도 있습니다.