나는 MySQL로부터 데이터를 읽어서 Cassandra에 저장한다. 왜냐하면 Sqoop은 Cassandra 로의 직접적인 import를 지원하지 않기 때문이다. 나는 프로듀서 - 소비자 프레임 워크를 사용하여 MySQL에서 많은 수의 레코드 (수백만)로 인해 같은 결과를 얻고 있습니다. 하지만 ReadTimeOut Exception (com.datastax.driver.core.exceptions.DriverException : 읽기 중 타임 아웃)이 나타납니다. 한 Producer 클래스를 MySQL에서 데이터를 읽고 한 큐에 넣습니다. 해당 대기열에서 데이터를 읽고 Cassndra로 푸시하는 소비자 클래스가 하나 있습니다. 이 두 클래스 사이의 조정 브리지 역할을하는 관리자 클래스가 하나 있습니다. 프로듀서 클래스 : -cassandra-driver-core를 사용하여 Cassandra에서 타임 아웃 예외를 읽음
public class MySQLPrintJobProducer implements Runnable {
private BlockingQueue<PrintJobDAO> printerJobQueue = null;
private Connection conn = null;
public MySQLPrintJobProducer(BlockingQueue<PrintJobDAO> printerJobQueue) throws MySQLClientException {
this.printerJobQueue = printerJobQueue;
connect();
}
private void connect() throws MySQLClientException {
try {
Class.forName(MySQLClientConstants.MYSQL_JDBC_DRIVER);
conn = DriverManager.getConnection("jdbc:mysql://mysqlserverhose/mysqldb?user=mysqluser&password=mysqlpasswd");
} catch (ClassNotFoundException e) {
throw new MySQLClientException(ExceptionUtils.getStackTrace(e));
} catch (SQLException e) {
throw new MySQLClientException(ExceptionUtils.getStackTrace(e));
}
}
public void run() {
ResultSet rs = null;
Statement stmt = null;
PreparedStatement pStmt = null;
try {
stmt = conn.createStatement();
// Get total number of print jobs stored.
rs = stmt.executeQuery(MySQLClientConstants.PRINT_JOB_COUNT_QUERY);
int totalPrintJobs = 0;
if(rs != null) {
while(rs.next()) {
totalPrintJobs = rs.getInt(1);
}
}
// Determine the number of iterations.
int rowOffset = 1;
int totalIteration = ((totalPrintJobs/ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE) + 1);
pStmt = conn.prepareStatement(MySQLClientConstants.PRINT_JOB_FETCH_QUERY);
int totalRecordsFetched = 0;
// Iterate over to fetch Print Job Records in bathces and put it into the queue.
for(int i = 1; i <= totalIteration; i++) {
pStmt.setInt(1, rowOffset);
pStmt.setInt(2, ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE);
System.out.println("In iteration : " + i + ", Row Offset : " + rowOffset);
rs = pStmt.executeQuery();
synchronized (this.printerJobQueue) {
if(this.printerJobQueue.remainingCapacity() > 0) {
while(rs.next()) {
totalRecordsFetched = rs.getRow();
printerJobQueue.offer(new PrintJobDAO(rs.getInt(1), rs.getInt(2), rs.getString(3), rs.getDate(4),
rs.getTimestamp(5), rs.getInt(6), rs.getInt(7), rs.getInt(8), rs.getInt(9),
rs.getInt(10), rs.getFloat(11), rs.getFloat(12), rs.getInt(13), rs.getFloat(14), rs.getInt(15),
rs.getDouble(16), rs.getDouble(17), rs.getDouble(18), rs.getDouble(19), rs.getDouble(20),
rs.getFloat(21)));
this.printerJobQueue.notifyAll();
}
System.out.println("In iteration : " + i + ", Records Fetched : " + totalRecordsFetched +
", Queue Size : " + printerJobQueue.size());
rowOffset += ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE;
} else {
System.out.println("Print Job Queue is full, waiting for Consumer thread to clear.");
this.printerJobQueue.wait();
}
}
}
} catch (SQLException e) {
System.err.println(ExceptionUtils.getStackTrace(e));
} catch (InterruptedException e) {
System.err.println(ExceptionUtils.getStackTrace(e));
} finally {
try {
if(null != rs) {
rs.close();
}
if(null != stmt) {
stmt.close();
}
if(null != pStmt) {
pStmt.close();
}
} catch (SQLException e) {
System.err.println(ExceptionUtils.getStackTrace(e));
}
}
ExportManager.setProducerCompleted(true);
}
}
소비자 등급 : -
public class CassandraPrintJobConsumer implements Runnable {
private Cluster cluster = null;
private Session session = null;
private BlockingQueue<PrintJobDAO> printerJobQueue = null;
public CassandraPrintJobConsumer(BlockingQueue<PrintJobDAO> printerJobQueue) throws CassandraClientException {
this.printerJobQueue = printerJobQueue;
cluster = Cluster.builder().withPort(9042).addContactPoint("http://cassandrahost").build();
}
public void run() {
int printJobConsumed = 0;
int batchInsertCount = 1;
if(cluster.isClosed()) {
connect();
}
session = cluster.connect();
PreparedStatement ps = session.prepare(CassandraClientConstants.INSERT_PRINT_JOB_DATA);
BatchStatement batch = new BatchStatement();
synchronized (this.printerJobQueue) {
while(true) {
if(!this.printerJobQueue.isEmpty()) {
for(int i = 1; i <= ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE; i++) {
PrintJobDAO printJob = printerJobQueue.poll();
batch.add(ps.bind(printJob.getJobID(), printJob.getUserID(), printJob.getType(), printJob.getGpDate(), printJob.getDateTimes(),
printJob.getAppName(), printJob.getPrintedPages(), printJob.getSavedPages(), printJob.getPrinterID(), printJob.getWorkstationID(),
printJob.getPrintedCost(), printJob.getSavedCost(), printJob.getSourcePrinterID(), printJob.getSourcePrinterPrintedCost(),
printJob.getJcID(), printJob.getCoverageC(), printJob.getCoverageM(), printJob.getCoverageY(), printJob.getCoverageK(),
printJob.getCoverageTotal(), printJob.getPagesAnalyzed()));
printJobConsumed++;
}
session.execute(batch);
System.out.println("After Batch - " + batchInsertCount + ", record insert count : " + printJobConsumed);
batchInsertCount++;
this.printerJobQueue.notifyAll();
} else {
System.out.println("Print Job Queue is empty, nothing to export.");
try {
this.printerJobQueue.wait();
} catch (InterruptedException e) {
System.err.println(ExceptionUtils.getStackTrace(e));
}
}
if(ExportManager.isProducerCompleted() && this.printerJobQueue.isEmpty()) {
break;
}
}
}
}
}
관리자 등급 : -
public class ExportManager {
private static boolean isInitalized = false;
private static boolean producerCompleted = false;
private static MySQLPrintJobProducer printJobProducer = null;
private static CassandraPrintJobConsumer printJobConsumer = null;
private static BlockingQueue<PrintJobDAO> printJobQueue = null;
public static boolean isProducerCompleted() {
return producerCompleted;
}
public static void setProducerCompleted(boolean producerCompleted) {
ExportManager.producerCompleted = producerCompleted;
}
private static void init() throws MySQLClientException, CassandraClientException {
if(!isInitalized) {
printJobQueue = new LinkedBlockingQueue<PrintJobDAO>(ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE * 2);
printJobProducer = new MySQLPrintJobProducer(printJobQueue);
printJobConsumer = new CassandraPrintJobConsumer(printJobQueue);
isInitalized = true;
}
}
public static void exportPrintJobs() throws ExportException {
try {
init();
} catch (MySQLClientException e) {
throw new ExportException("Print Job Export failed.", e);
} catch (CassandraClientException e) {
throw new ExportException("Print Job Export failed.", e);
}
Thread producerThread = new Thread(printJobProducer);
Thread consumerThread = new Thread(printJobConsumer);
consumerThread.start();
producerThread.start();
}
}
TestNG의 클래스 -
public class TestExportManager {
@Test
public void testExportPrintJobs() {
try {
ExportManager.exportPrintJobs();
Thread.currentThread().join();
} catch (ExportException e) {
Assert.fail("ExportManager.exportPrintJobs() failed.", e);
} catch (InterruptedException e) {
Assert.fail("ExportManager.exportPrintJobs() failed.", e);
}
}
}
가 나는 또한 만든 일부 구성 변경 link에 따라 아직도 나는 18000 ~ 20000 개의 레코드를 삽입 한 후에 예외를 겪고있다.
Exception in thread "Thread-2" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.10.80
(com.datastax.driver.core.exceptions.DriverException: Timeout during read))
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:64)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
at com.datastax.driver.core.SessionManager.execute(SessionManager.java:91)
at com.incendiary.ga.client.cassandra.CassandraPrintJobConsumer.run(CassandraPrintJobConsumer.java:108)
at java.lang.Thread.run(Unknown Source)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.10.80 (com.datastax.drive
r.core.exceptions.DriverException: Timeout during read))
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:100)
at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:171)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
실제 문제의 원인을 파악할 수 없습니다. 카산드라 시스템 로그에서 예외를 찾을 수 없었습니다. Apache Cassandra 2.0.7과 cassandra-driver-core 2.0.1을 사용하고 있습니다.
아무도이 문제에 직면 해 있지 않은 것으로 보입니다. 여전히 디버깅에서 앞으로 나아갈 수있는 모든 포인터는 높이 평가 될 것입니다. 나는이 문제로 인해 막혔다. – Nayan
나는이 똑같은 문제를보고있다. 문제를 해결하기 위해 tomcat 인스턴스 (클라이언트 응용 프로그램은 Tomcat에서 실행 됨)를 다시 시작해야했습니다. 이것이 부적절한 연결 문제 일 수 있는지 궁금합니다. – AlphaGeek