2016-07-28 2 views
0

Flink DataSet API를 사용하여 Oracle 데이터베이스를 쿼리하고 있습니다. 이를 위해 Flink JDBCInputFormat을 java.sql.Resultset을 반환하도록 사용자 정의했습니다. Flink 연산자를 사용하여 결과 집합에 대한 추가 작업을 수행해야합니다.Apache Flink JDBC InputFormat throwing java.net.SocketException : 소켓이 닫혔습니다.

public class JDBCInputFormat extends RichInputFormat<ResultSet, InputSplit> implements ResultTypeQueryable { 

@Override 
public void open(InputSplit inputSplit) throws IOException { 
       Class.forName(drivername); 
        dbConn = DriverManager.getConnection(dbURL, username, password); 
       statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); 
       resultSet = statement.executeQuery(); 
} 

@Override 
public void close() throws IOException { 
      if(statement != null) { 
        statement.close(); 
       } 
       if(resultSet != null) 
        resultSet.close(); 
       if(dbConn != null) { 
        dbConn.close(); 
       } 
} 

@Override 
public boolean reachedEnd() throws IOException { 
     isLastRecord = resultSet.isLast(); 
    return isLastRecord; 
} 

@Override 
public ResultSet nextRecord(ResultSet row) throws IOException{ 
     if(!isLastRecord){    
      resultSet.next(); 
     } 
     return resultSet; 
} 

}

이 페치 행 쿼리 갖는 한계 미만에서 작동 : 는 XYZ에서 A, B, C를 선택

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); 
    environment.setParallelism(1); 
    @SuppressWarnings("unchecked") 
    DataSource<ResultSet> source 
      = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() 
        .setUsername("username") 
        .setPassword("password") 
        .setDrivername("driver_name") 
        .setDBUrl("jdbcUrl") 
        .setQuery("query") 
        .finish(),  
        new GenericTypeInfo<ResultSet>(ResultSet.class) 
      ); 
    source.print(); 

    environment.execute(); 

} 

다음은 정의 JDBCInputFormat 인 어디 rownum < = 10; 하지만 약 1 백만의 데이터를 가진 모든 행을 인출하려고 때, 나는 행의 임의의 숫자 가져 오는 후 아래 예외를 받고 입니다 :

java.sql.SQLRecoverableException: Io exception: Socket closed 
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:101) 
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263) 
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:521) 
at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1024) 
at oracle.jdbc.driver.OracleResultSetImpl.close_or_fetch_from_next(OracleResultSetImpl.java:314) 
at oracle.jdbc.driver.OracleResultSetImpl.next(OracleResultSetImpl.java:228) 
at oracle.jdbc.driver.ScrollableResultSet.cacheRowAt(ScrollableResultSet.java:1839) 
at oracle.jdbc.driver.ScrollableResultSet.isValidRow(ScrollableResultSet.java:1823) 
at oracle.jdbc.driver.ScrollableResultSet.isLast(ScrollableResultSet.java:349) 
at JDBCInputFormat.reachedEnd(JDBCInputFormat.java:98) 
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
at java.lang.Thread.run(Thread.java:745) 

에 의한를 : java.net.SocketException의 : 소켓이 닫혔습니다. java.net.SocketOutputStream.socketWrite0 (기본 메소드)

그래서이 문제를 어떻게 해결할 수 있습니까?

답변

1

일반 기록처럼 ResultSet을 배송 할 수 있다고 생각하지 않습니다. 이것은 내부적으로 데이터베이스 서버에 대한 연결을 유지하는 상태 저장 객체입니다. Flink 연산자간에 전송되는 레코드로 ResultSet을 사용하면 네트워크를 통해 다른 시스템으로 전달되고 직렬화되고 직렬화되고 다른 JVM 프로세스의 다른 스레드로 전달 될 수 있습니다. 그건 작동하지 않습니다.

연결에 따라 ResultSet은 동일한 스레드에서 같은 시스템에 머물러있을 수 있습니다. 이는 사용자에게 도움이 될 수 있습니다. 연산자 내에서 데이터베이스를 쿼리하려면이 함수를 RichMapPartitionFunction으로 구현할 수 있습니다. 그렇지 않으면 데이터 소스에서 ResultSet을 읽고 결과 행을 전달합니다.