2014-05-14 3 views
0

나는 MySQL로부터 데이터를 읽어서 Cassandra에 저장한다. 왜냐하면 Sqoop은 Cassandra 로의 직접적인 import를 지원하지 않기 때문이다. 나는 프로듀서 - 소비자 프레임 워크를 사용하여 MySQL에서 많은 수의 레코드 (수백만)로 인해 같은 결과를 얻고 있습니다. 하지만 ReadTimeOut Exception (com.datastax.driver.core.exceptions.DriverException : 읽기 중 타임 아웃)이 나타납니다. 한 Producer 클래스를 MySQL에서 데이터를 읽고 한 큐에 넣습니다. 해당 대기열에서 데이터를 읽고 Cassndra로 푸시하는 소비자 클래스가 하나 있습니다. 이 두 클래스 사이의 조정 브리지 역할을하는 관리자 클래스가 하나 있습니다. 프로듀서 클래스 : -cassandra-driver-core를 사용하여 Cassandra에서 타임 아웃 예외를 읽음

public class MySQLPrintJobProducer implements Runnable { 
    private BlockingQueue<PrintJobDAO> printerJobQueue = null; 
    private Connection conn = null; 
    public MySQLPrintJobProducer(BlockingQueue<PrintJobDAO> printerJobQueue) throws MySQLClientException { 
     this.printerJobQueue = printerJobQueue;  
     connect(); 
    } 

    private void connect() throws MySQLClientException { 
     try { 
      Class.forName(MySQLClientConstants.MYSQL_JDBC_DRIVER); 
      conn = DriverManager.getConnection("jdbc:mysql://mysqlserverhose/mysqldb?user=mysqluser&password=mysqlpasswd"); 
     } catch (ClassNotFoundException e) { 
      throw new MySQLClientException(ExceptionUtils.getStackTrace(e)); 
     } catch (SQLException e) { 
      throw new MySQLClientException(ExceptionUtils.getStackTrace(e)); 
     } 
    } 

    public void run() { 
     ResultSet rs = null; 
     Statement stmt = null; 
     PreparedStatement pStmt = null; 
     try { 
      stmt = conn.createStatement(); 
      // Get total number of print jobs stored. 
      rs = stmt.executeQuery(MySQLClientConstants.PRINT_JOB_COUNT_QUERY); 
      int totalPrintJobs = 0; 
      if(rs != null) { 
       while(rs.next()) { 
        totalPrintJobs = rs.getInt(1); 
       } 
      } 
      // Determine the number of iterations. 
      int rowOffset = 1; 
      int totalIteration = ((totalPrintJobs/ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE) + 1); 
      pStmt = conn.prepareStatement(MySQLClientConstants.PRINT_JOB_FETCH_QUERY); 
      int totalRecordsFetched = 0; 
      // Iterate over to fetch Print Job Records in bathces and put it into the queue. 
      for(int i = 1; i <= totalIteration; i++) {        
       pStmt.setInt(1, rowOffset); 
       pStmt.setInt(2, ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE); 
       System.out.println("In iteration : " + i + ", Row Offset : " + rowOffset); 
       rs = pStmt.executeQuery(); 
       synchronized (this.printerJobQueue) { 
        if(this.printerJobQueue.remainingCapacity() > 0) { 
         while(rs.next()) { 
          totalRecordsFetched = rs.getRow(); 
          printerJobQueue.offer(new PrintJobDAO(rs.getInt(1), rs.getInt(2), rs.getString(3), rs.getDate(4), 
           rs.getTimestamp(5), rs.getInt(6), rs.getInt(7), rs.getInt(8), rs.getInt(9), 
           rs.getInt(10), rs.getFloat(11), rs.getFloat(12), rs.getInt(13), rs.getFloat(14), rs.getInt(15), 
           rs.getDouble(16), rs.getDouble(17), rs.getDouble(18), rs.getDouble(19), rs.getDouble(20), 
           rs.getFloat(21))); 
          this.printerJobQueue.notifyAll(); 
         } 
         System.out.println("In iteration : " + i + ", Records Fetched : " + totalRecordsFetched + 
           ", Queue Size : " + printerJobQueue.size()); 
         rowOffset += ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE; 
        } else { 
         System.out.println("Print Job Queue is full, waiting for Consumer thread to clear."); 
         this.printerJobQueue.wait(); 
        } 
       } 
      }   
     } catch (SQLException e) { 
      System.err.println(ExceptionUtils.getStackTrace(e)); 
     } catch (InterruptedException e) { 
      System.err.println(ExceptionUtils.getStackTrace(e)); 
     } finally { 
      try { 
       if(null != rs) { 
        rs.close(); 
       } 
       if(null != stmt) { 
        stmt.close(); 
       } 
       if(null != pStmt) { 
        pStmt.close(); 
       } 
      } catch (SQLException e) { 
       System.err.println(ExceptionUtils.getStackTrace(e)); 
      } 
     } 
     ExportManager.setProducerCompleted(true); 
    } 
} 

소비자 등급 : -

public class CassandraPrintJobConsumer implements Runnable { 
    private Cluster cluster = null; 
    private Session session = null; 
    private BlockingQueue<PrintJobDAO> printerJobQueue = null; 

    public CassandraPrintJobConsumer(BlockingQueue<PrintJobDAO> printerJobQueue) throws CassandraClientException { 
     this.printerJobQueue = printerJobQueue;  
     cluster = Cluster.builder().withPort(9042).addContactPoint("http://cassandrahost").build(); 
    } 

    public void run() { 
     int printJobConsumed = 0; 
     int batchInsertCount = 1; 
     if(cluster.isClosed()) { 
      connect(); 
     } 
     session = cluster.connect(); 
     PreparedStatement ps = session.prepare(CassandraClientConstants.INSERT_PRINT_JOB_DATA); 
     BatchStatement batch = new BatchStatement(); 
     synchronized (this.printerJobQueue) { 
      while(true) { 
       if(!this.printerJobQueue.isEmpty()) { 
        for(int i = 1; i <= ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE; i++) { 
         PrintJobDAO printJob = printerJobQueue.poll(); 
         batch.add(ps.bind(printJob.getJobID(), printJob.getUserID(), printJob.getType(), printJob.getGpDate(), printJob.getDateTimes(), 
           printJob.getAppName(), printJob.getPrintedPages(), printJob.getSavedPages(), printJob.getPrinterID(), printJob.getWorkstationID(), 
           printJob.getPrintedCost(), printJob.getSavedCost(), printJob.getSourcePrinterID(), printJob.getSourcePrinterPrintedCost(), 
           printJob.getJcID(), printJob.getCoverageC(), printJob.getCoverageM(), printJob.getCoverageY(), printJob.getCoverageK(), 
           printJob.getCoverageTotal(), printJob.getPagesAnalyzed())); 
         printJobConsumed++;     
        } 
        session.execute(batch); 
        System.out.println("After Batch - " + batchInsertCount + ", record insert count : " + printJobConsumed); 
        batchInsertCount++; 
        this.printerJobQueue.notifyAll(); 
       } else { 
        System.out.println("Print Job Queue is empty, nothing to export."); 
        try { 
         this.printerJobQueue.wait(); 
        } catch (InterruptedException e) { 
         System.err.println(ExceptionUtils.getStackTrace(e)); 
        } 
       } 
       if(ExportManager.isProducerCompleted() && this.printerJobQueue.isEmpty()) { 
        break; 
       } 
      } 
     } 
    } 
} 

관리자 등급 : -

public class ExportManager { 
    private static boolean isInitalized = false; 
    private static boolean producerCompleted = false; 
    private static MySQLPrintJobProducer printJobProducer = null; 
    private static CassandraPrintJobConsumer printJobConsumer = null; 
    private static BlockingQueue<PrintJobDAO> printJobQueue = null; 

    public static boolean isProducerCompleted() { 
     return producerCompleted; 
    } 

    public static void setProducerCompleted(boolean producerCompleted) { 
     ExportManager.producerCompleted = producerCompleted; 
    } 

    private static void init() throws MySQLClientException, CassandraClientException { 
     if(!isInitalized) { 
      printJobQueue = new LinkedBlockingQueue<PrintJobDAO>(ExportManagerConstants.DATA_TRANSFER_BATCH_SIZE * 2); 
      printJobProducer = new MySQLPrintJobProducer(printJobQueue); 
      printJobConsumer = new CassandraPrintJobConsumer(printJobQueue); 
      isInitalized = true; 
     } 
    } 

    public static void exportPrintJobs() throws ExportException { 
     try { 
      init(); 
     } catch (MySQLClientException e) { 
      throw new ExportException("Print Job Export failed.", e); 
     } catch (CassandraClientException e) { 
      throw new ExportException("Print Job Export failed.", e); 
     } 
     Thread producerThread = new Thread(printJobProducer); 
     Thread consumerThread = new Thread(printJobConsumer); 
     consumerThread.start(); 
     producerThread.start(); 
    } 
} 

TestNG의 클래스 -

public class TestExportManager { 

    @Test 
    public void testExportPrintJobs() { 
     try { 
     ExportManager.exportPrintJobs(); 
     Thread.currentThread().join(); 
    } catch (ExportException e) { 
     Assert.fail("ExportManager.exportPrintJobs() failed.", e); 
    } catch (InterruptedException e) { 
     Assert.fail("ExportManager.exportPrintJobs() failed.", e); 
    } 
    } 
} 

가 나는 또한 만든 일부 구성 변경 link에 따라 아직도 나는 18000 ~ 20000 개의 레코드를 삽입 한 후에 예외를 겪고있다.

Exception in thread "Thread-2" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.10.80 
(com.datastax.driver.core.exceptions.DriverException: Timeout during read)) 
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:64) 
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256) 
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172) 
    at com.datastax.driver.core.SessionManager.execute(SessionManager.java:91) 
    at com.incendiary.ga.client.cassandra.CassandraPrintJobConsumer.run(CassandraPrintJobConsumer.java:108) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.10.80 (com.datastax.drive 
r.core.exceptions.DriverException: Timeout during read)) 
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:100) 
    at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:171) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    ... 1 more 

실제 문제의 원인을 파악할 수 없습니다. 카산드라 시스템 로그에서 예외를 찾을 수 없었습니다. Apache Cassandra 2.0.7과 cassandra-driver-core 2.0.1을 사용하고 있습니다.

+0

아무도이 문제에 직면 해 있지 않은 것으로 보입니다. 여전히 디버깅에서 앞으로 나아갈 수있는 모든 포인터는 높이 평가 될 것입니다. 나는이 문제로 인해 막혔다. – Nayan

+0

나는이 똑같은 문제를보고있다. 문제를 해결하기 위해 tomcat 인스턴스 (클라이언트 응용 프로그램은 Tomcat에서 실행 됨)를 다시 시작해야했습니다. 이것이 부적절한 연결 문제 일 수 있는지 궁금합니다. – AlphaGeek

답변

0

드라이버 쪽에서 읽기 시간을 늘릴 수 있습니다. 이 withSocket 메서드를 사용하면 SocketOption 클래스를 사용하여 시간을 읽을 수 있습니다. 기본적으로 읽기 시간 제한은 10 밀리 초입니다.