Flink DataSet API를 사용하여 Oracle 데이터베이스를 쿼리하고 있습니다. 이를 위해 Flink JDBCInputFormat을 java.sql.Resultset을 반환하도록 사용자 정의했습니다. Flink 연산자를 사용하여 결과 집합에 대한 추가 작업을 수행해야합니다.Apache Flink JDBC InputFormat throwing java.net.SocketException : 소켓이 닫혔습니다.
public class JDBCInputFormat extends RichInputFormat<ResultSet, InputSplit> implements ResultTypeQueryable {
@Override
public void open(InputSplit inputSplit) throws IOException {
Class.forName(drivername);
dbConn = DriverManager.getConnection(dbURL, username, password);
statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
resultSet = statement.executeQuery();
}
@Override
public void close() throws IOException {
if(statement != null) {
statement.close();
}
if(resultSet != null)
resultSet.close();
if(dbConn != null) {
dbConn.close();
}
}
@Override
public boolean reachedEnd() throws IOException {
isLastRecord = resultSet.isLast();
return isLastRecord;
}
@Override
public ResultSet nextRecord(ResultSet row) throws IOException{
if(!isLastRecord){
resultSet.next();
}
return resultSet;
}
}
이 페치 행 쿼리 갖는 한계 미만에서 작동 : 는 XYZ에서 A, B, C를 선택
public static void main(String[] args) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
@SuppressWarnings("unchecked")
DataSource<ResultSet> source
= environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setUsername("username")
.setPassword("password")
.setDrivername("driver_name")
.setDBUrl("jdbcUrl")
.setQuery("query")
.finish(),
new GenericTypeInfo<ResultSet>(ResultSet.class)
);
source.print();
environment.execute();
}
다음은 정의 JDBCInputFormat 인 어디 rownum < = 10; 하지만 약 1 백만의 데이터를 가진 모든 행을 인출하려고 때, 나는 행의 임의의 숫자 가져 오는 후 아래 예외를 받고 입니다 :
java.sql.SQLRecoverableException: Io exception: Socket closed
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:101)
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:521)
at oracle.jdbc.driver.T4CPreparedStatement.fetch(T4CPreparedStatement.java:1024)
at oracle.jdbc.driver.OracleResultSetImpl.close_or_fetch_from_next(OracleResultSetImpl.java:314)
at oracle.jdbc.driver.OracleResultSetImpl.next(OracleResultSetImpl.java:228)
at oracle.jdbc.driver.ScrollableResultSet.cacheRowAt(ScrollableResultSet.java:1839)
at oracle.jdbc.driver.ScrollableResultSet.isValidRow(ScrollableResultSet.java:1823)
at oracle.jdbc.driver.ScrollableResultSet.isLast(ScrollableResultSet.java:349)
at JDBCInputFormat.reachedEnd(JDBCInputFormat.java:98)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
에 의한를 : java.net.SocketException의 : 소켓이 닫혔습니다. java.net.SocketOutputStream.socketWrite0 (기본 메소드)
그래서이 문제를 어떻게 해결할 수 있습니까?