리소스를 풀하는 방법을 파악하는 데 어려움을 겪고 있습니다. 문제가 100 %가 아니라 실험 중이라고 생각하기 시작했습니다. 요점은 내가 뭘하려는 서버에 채널의 수영장을 만들고 스레드가 그들을 사용하고 있는지 확인합니다. 성공적으로 채널 수를 얻는 데 성공했습니다. 즉, 업로드 할 항목 (풀링하지 않고 각 스레드에 새로운 채널을 만드는 것)과 하나의 채널을 만드는 데 성공했습니다 (풀링을하지 않거나 새로운 채널 필요에 따라).newFixedThreadPool이 개체 풀에서 예상대로 작동하지 않습니다.
스레드가 풀과 상호 작용하는 방식이 문제 일 수 있으므로 newCachedThreadPool
을 만들려고 했으므로 스레드가 작동하지 않는 한 오래 죽지 않도록해야하지만 사용중인 채널에 오류가 발생하면 폐쇄되었습니다. 내 수영장에 destroyObject
메서드가 있지만 그것을 호출하는 이유를 이해하지 못한다 그래서 내가 그것을 다음 댓글을 경우 작동하지만 단 하나의 채널을 만들고 매우 매우 느린 300 작업/초없이 대 스레드 된 풀 30k/초). 나는 그것의 종결을 의심하고, 이것을 검증 할 어떤 방법이 있는가? 종결한다면 내가 사용할 수있는 대안이 있는가? 여기
import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class PoolExample {
private static ExecutorService executor_worker;
static {
final int numberOfThreads_ThreadPoolExecutor = 20;
executor_worker = Executors.newCachedThreadPool();
executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
}
private static ObjectPool<Channel> pool;
public static void main(String[] args) throws Exception {
System.out.println("starting..");
ObjectPool<Channel> pool =
new GenericObjectPool<Channel>(
new ConnectionPoolableObjectFactory(), 50);
for (int x = 0; x<500000000; x++) {
executor_worker.submit(new MyRunnable(x, pool));
}
//executor_worker.shutdown();
//pool.close();
}
}
class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
Channel channel;
Connection connection;
public ConnectionPoolableObjectFactory() throws IOException {
System.out.println("hello world");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
channel = connection.createChannel();
}
@Override
public Channel makeObject() throws Exception {
//channel = connection.createChannel();
return channel;
}
@Override
public boolean validateObject(Channel channel) {
return channel.isOpen();
}
@Override
public void destroyObject(Channel channel) throws Exception {
channel.close();
}
@Override
public void passivateObject(Channel channel) throws Exception {
//System.out.println("sent back to queue");
}
}
class MyRunnable implements Runnable{
protected int x = 0;
protected ObjectPool<Channel> pool;
public MyRunnable(int x, ObjectPool<Channel> pool) {
// TODO Auto-generated constructor stub
this.x = x;
this.pool = pool;
}
public void run(){
try {
Channel channel = pool.borrowObject();
String message = Integer.toString(x);
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
pool.returnObject(channel);
} catch (NoSuchElementException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
추신을 나는 기본적으로 몇 가지 질문을하고 문서를 읽고 이것을 이해하려고 노력했다. 나는 잘못된 방향으로 완전히 갔을 수도있다. 그래서 당신이 보거나 문제가 있다면 내 방식으로 보내라.
플롯은 두껍게 :에서
(필자는 스레드 작업을 제출) 루프를위한 주요 방법의 I 추가 :
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
System.out.println(threadSet.size()); //number of threads
System.out.println(pool.getNumActive());
그것은 (I 20를했다하더라도 나에게 25 개 스레드를 보여줍니다)과 수영장에 20 항목. 그러나 rabbitmq UI를 보면 단 하나의 채널과 하나의 연결 만 보입니다. 채널을 만들고 실행 파일에 제출하면 많은 채널이 생성되지만 (결코 닫히지 않습니다). 나는 무슨 일이 일어나고 있는지, 왜 그 결과가 예상과 다른지 이해하지 못합니다.
아마도 'executor_worker ='Executor_worker = Executors.newCachedThreadPool();') 코드에 아무 것도하지 않는 2 명의 실행자를 할당하는 것이 목적일까요? – assylias
@assylias 튜토리얼과 그 예제에서 newCachedThreadPool을 구현 한 방법을 읽었습니다. 나는 그것이 틀렸다는 것을 깨닫지 못했다. executor에게 cachedthreadpool을 사용하는 방법을 알려주는 방법이라고 생각했다. – Lostsoul
표준 캐시 된 쓰레드 풀을 사용하려면, 다음과 같이 작성하면된다 :'executor_worker = Executors.newCachedThreadPool (numberOfThreads_ThreadPoolExecutor);' 다른 라인. – assylias