2012-05-02 1 views
4

리소스를 풀하는 방법을 파악하는 데 어려움을 겪고 있습니다. 문제가 100 %가 아니라 실험 중이라고 생각하기 시작했습니다. 요점은 내가 뭘하려는 서버에 채널의 수영장을 만들고 스레드가 그들을 사용하고 있는지 확인합니다. 성공적으로 채널 수를 얻는 데 성공했습니다. 즉, 업로드 할 항목 (풀링하지 않고 각 스레드에 새로운 채널을 만드는 것)과 하나의 채널을 만드는 데 성공했습니다 (풀링을하지 않거나 새로운 채널 필요에 따라).newFixedThreadPool이 개체 풀에서 예상대로 작동하지 않습니다.

스레드가 풀과 상호 작용하는 방식이 문제 일 수 있으므로 newCachedThreadPool을 만들려고 했으므로 스레드가 작동하지 않는 한 오래 죽지 않도록해야하지만 사용중인 채널에 오류가 발생하면 폐쇄되었습니다. 내 수영장에 destroyObject 메서드가 있지만 그것을 호출하는 이유를 이해하지 못한다 그래서 내가 그것을 다음 댓글을 경우 작동하지만 단 하나의 채널을 만들고 매우 매우 느린 300 작업/초없이 대 스레드 된 풀 30k/초). 나는 그것의 종결을 의심하고, 이것을 검증 할 어떤 방법이 있는가? 종결한다면 내가 사용할 수있는 대안이 있는가? 여기

이 코드의 (모든 rabbitmq 물건을 무시, 그 너무 나는 결과를 모니터링 할 수 있습니다) :

import org.apache.commons.pool.BasePoolableObjectFactory; 
import org.apache.commons.pool.ObjectPool; 
import org.apache.commons.pool.PoolableObjectFactory; 
import org.apache.commons.pool.impl.GenericObjectPool; 

import java.io.IOException; 
import java.util.NoSuchElementException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingDeque; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.MessageProperties; 

public class PoolExample { 

    private static ExecutorService executor_worker; 

    static { 
     final int numberOfThreads_ThreadPoolExecutor = 20; 
     executor_worker = Executors.newCachedThreadPool(); 
     executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS, 
              new LinkedBlockingDeque<Runnable>()); 
    } 

    private static ObjectPool<Channel> pool; 

    public static void main(String[] args) throws Exception { 
     System.out.println("starting..");   
     ObjectPool<Channel> pool = 
       new GenericObjectPool<Channel>(
       new ConnectionPoolableObjectFactory(), 50); 
     for (int x = 0; x<500000000; x++) { 
      executor_worker.submit(new MyRunnable(x, pool)); 
     } 
     //executor_worker.shutdown(); 
     //pool.close(); 
    } 
} 

class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> { 
    Channel channel; 
    Connection connection; 

    public ConnectionPoolableObjectFactory() throws IOException { 
     System.out.println("hello world"); 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     channel = connection.createChannel(); 
    } 

    @Override 
    public Channel makeObject() throws Exception { 
     //channel = connection.createChannel(); 
     return channel; 
    } 

    @Override 
    public boolean validateObject(Channel channel) { 
     return channel.isOpen(); 
    } 

    @Override 
    public void destroyObject(Channel channel) throws Exception { 
     channel.close(); 
    } 

    @Override 
    public void passivateObject(Channel channel) throws Exception { 
     //System.out.println("sent back to queue"); 
    } 
} 

class MyRunnable implements Runnable{ 
    protected int x = 0; 
    protected ObjectPool<Channel> pool; 

    public MyRunnable(int x, ObjectPool<Channel> pool) { 
     // TODO Auto-generated constructor stub 
     this.x = x; 
     this.pool = pool; 
    } 

    public void run(){ 
     try { 
       Channel channel = pool.borrowObject(); 
       String message = Integer.toString(x); 
       channel.basicPublish("", "task_queue", 
         MessageProperties.PERSISTENT_TEXT_PLAIN, 
         message.getBytes()); 
       pool.returnObject(channel); 
     } catch (NoSuchElementException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IllegalStateException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

추신을 나는 기본적으로 몇 가지 질문을하고 문서를 읽고 이것을 이해하려고 노력했다. 나는 잘못된 방향으로 완전히 갔을 수도있다. 그래서 당신이 보거나 문제가 있다면 내 방식으로 보내라.

플롯은 두껍게 :에서

(필자는 스레드 작업을 제출) 루프를위한 주요 방법의 I 추가 :

Set<Thread> threadSet = Thread.getAllStackTraces().keySet(); 
    System.out.println(threadSet.size()); //number of threads 
    System.out.println(pool.getNumActive()); 

그것은 (I 20를했다하더라도 나에게 25 개 스레드를 보여줍니다)과 수영장에 20 항목. 그러나 rabbitmq UI를 보면 단 하나의 채널과 하나의 연결 만 보입니다. 채널을 만들고 실행 파일에 제출하면 많은 채널이 생성되지만 (결코 닫히지 않습니다). 나는 무슨 일이 일어나고 있는지, 왜 그 결과가 예상과 다른지 이해하지 못합니다.

+0

아마도 'executor_worker ='Executor_worker = Executors.newCachedThreadPool();') 코드에 아무 것도하지 않는 2 명의 실행자를 할당하는 것이 목적일까요? – assylias

+0

@assylias 튜토리얼과 그 예제에서 newCachedThreadPool을 구현 한 방법을 읽었습니다. 나는 그것이 틀렸다는 것을 깨닫지 못했다. executor에게 cachedthreadpool을 사용하는 방법을 알려주는 방법이라고 생각했다. – Lostsoul

+0

표준 캐시 된 쓰레드 풀을 사용하려면, 다음과 같이 작성하면된다 :'executor_worker = Executors.newCachedThreadPool (numberOfThreads_ThreadPoolExecutor);' 다른 라인. – assylias

답변

1

문제는 ConnectionPoolableObjectFactory가 하나의 Channel 객체 만 생성한다는 것입니다. makeObject이 호출 될 때마다 새 채널을 만들어야하는 것으로 보입니다. 이 같은

그래서 어쩌면 구현해야 뭔가 :이 각 공장을 가정

public class ConnectionPoolableObjectFactory 
     extends BasePoolableObjectFactory<Channel> { 

    private final Connection connection; 

    private ConnectionPoolableObjectFactory() { 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     connection = factory.newConnection(); 
    } 

    @Override 
    public Channel makeObject() throws Exception { 
     return connection.createChannel(); 
    } 

    @Override 
    public boolean validateObject(Channel channel) { 
     return channel.isOpen(); 
    } 

    @Override 
    public void destroyObject(Channel channel) throws Exception { 
     channel.close(); 
    } 

    @Override 
    public void passivateObject(Channel channel) throws Exception { 
     //System.out.println("sent back to queue"); 
    } 
} 

는 단일 연결에서 여러 채널을 만듭니다.

+0

흥미 롭습니다. 코드는 스레드 당 연결을 만들고 연결 당 하나의 채널을 만듭니다.그것의 확실히 한 걸음 더 가까이지만, 단 하나의 연결에서만 여러 채널을 유지하려고했습니다. 나는 newConnectionFactory'라는 코드 메소드를 변경하여 변수에 새로운 연결을 반환하려고했지만 null 포인터 예외를 얻습니다. 모든 아이디어를 어떻게 한 번만 연결을 만들 수 있습니까? – Lostsoul

+0

NullPointerException의 출처는 어디입니까? 새 채널이 단일 연결에서 만들어 지도록 내 대답을 수정했습니다. 나는 완전한 프로그램을 셋업 할 시간이 없기 때문에 그것을 실행할 수 없다. – Jeremy

+0

Jeremy 정말 고마워. 그것은 효과가있다! nullpointer는 채널을 빌린 영역에서 발생했습니다 (기본적으로 채널을 얻지 못함을 의미 함). 수정 된 결과, 이제 여러 채널과 하나의 연결이 있습니다. – Lostsoul