파티션을 사용하여 다중 스레드에서 JAVAEE7 일괄 처리를 실행하려고합니다.
일괄 처리는 간단합니다. 일련의 난수를 읽고 3 개의 스레드를 사용하여 합계를 씁니다.여러 스레드에서 JAVAEE7 일괄 처리를 실행할 때 오류가 발생했습니다.
내 작업 XML
<job id="partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
version="1.0">
<step id="process" next="cleanup">
<chunk item-count="3">
<reader ref="partitionProcessIR">
<properties>
<property name="start" value="#{partitionPlan['start']}" />
<property name="end" value="#{partitionPlan['end']}" />
</properties>
</reader>
<processor ref="partitionProcessIP" />
<writer ref="partitionProcessIW" />
</chunk>
<partition>
<mapper ref="partitionMapperImpl" />
</partition>
</step>
<step id="cleanup">
<batchlet ref="partitionCleanupBatchlet"></batchlet>
</step>
</job>
내 PartitionMapperImpl :
@Override
public PartitionPlan mapPartitions() throws Exception {
// TODO Auto-generated method stub
return new PartitionPlanImpl() {
@Override
public int getPartitions() {
return 3;
}
@Override
public int getThreads() {
return 3;
}
@Override
public Properties[] getPartitionProperties() {
int totalRecords = getTotalRecords();
int partItems = totalRecords/getPartitions();
int remainItems = totalRecords % getPartitions();
Properties[] props = new Properties[getPartitions()];
for (int i = 0; i < getPartitions(); i++) {
props[i] = new Properties();
props[i].setProperty("start", String.valueOf(i * partItems));
// if this is the last partition, add remaining items
if (i == getPartitions() - 1) {
props[i].setProperty("end", String.valueOf((i + 1) * partItems + remainItems));
} else {
props[i].setProperty("end", String.valueOf((i + 1) * partItems));
}
}
return props;
}
};
}
private int getTotalRecords() {
return 50;
}
내 리더 :
@Override
public void open(Serializable checkpoint) throws Exception {
int start = new Integer(startProperty);
int end = new Integer(endProperty);
List<Integer> listNumber = new ArrayList<>();
for (int i = start; i < end; i++) {
int rand = (int) (Math.random() * 10);
listNumber.add(rand);
}
iterator = listNumber.iterator();
}
@Override
public Integer readItem() throws Exception {
if (iterator.hasNext()) {
return iterator.next();
}
// end read
return null;
}
내 프로세서
@Override
public Integer processItem(Object arg0) throws Exception {
Integer rand = (Integer) arg0;
return rand;
}
,451,515,
내 작가
@Override
public void writeItems(List<Object> arg0) throws Exception {
int sum = 0;
for (Object object : arg0) {
Integer rand = (Integer) object;
sum += rand;
}
System.out.println(Thread.currentThread().getId() + " | SUM OF CHUNK: " + sum);
}
내가이 배치를 실행
, 다음과 같은 오류가 발생했습니다. 나는 이것이 derby 데이터베이스에 동시에 serveral checkpoint를 저장하는 것과 관련이 있다고 생각한다.2017-03-02t15 : 22 : 45.955 + 0700 | 정보 : 275 | 천국의 합 : 13 2017-03-02T15 : 22 : 45.958 + 0700 | 정보 : 316 | 중첩 : 17 2017-03-02T15 : 23 : 05.971 + 0700 | 읽기 - 프로세스 - 쓰기 루프에서의 오류 com.ibm.jbatch.container.exception.BatchContainerServiceException : [프로세스에 대한 검사 점 데이터를 유지할 수 없습니다 ] 에서 com.ibm.jbatch.container.persistence.CheckpointManager.checkpoint com.ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeChunk (ChunkStepControllerImpl.java:644)에서 (CheckpointManager.java:133) 에서 COM com.i에서 .ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeCoreStep com.ibm.jbatch.container.impl.BaseStepControllerImpl.execute (BaseStepControllerImpl.java:144)에서 (ChunkStepControllerImpl.java:764) bm.jbatch.container.impl.ExecutionTransitioner.doExecutionLoop com.ibm.jbatch에서 com.ibm.jbatch.container.impl.JobThreadRootControllerImpl.originateExecutionOnThread (JobThreadRootControllerImpl.java:110) 에서 (ExecutionTransitioner.java:112). container.util.BatchWorkUnit.run (BatchWorkUnit.java:80) ( ) java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) at java.util.concurrent.FutureTask.run (FutureTask.java : JA 에서 java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142에서 org.glassfish.enterprise.concurrent.internal.ManagedFutureTask.run (ManagedFutureTask.java:141) )에서 266) org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl $ ManagedThread.run에서 va.util.concurrent.ThreadPoolExecutor $ Worker.run java.lang.Thread.run (Thread.java:745)에서 (ThreadPoolExecutor.java:617) (ManagedThreadFactoryImpl.java:250) 발생 원인 : com.ibm.jbatch.container.exception.PersistenceException : java.sql.SQLTransactionRollbackException : ??????????????????? (XID : {77885156, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id = ?부여 된 XID : {77885155, X} 잠금 : ROW, CHECKPOINTDATA, (110,28) 대기중인 XID : {77885155, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id =? XID : {77885156, X} ???????? XID : 77885156? 물고기에서 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.updateCheckpointData (JBatchJDBCPersistenceManager.java:388) 에서 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData (JBatchJDBCPersistenceManager.java:503) 에서. payara.jbatch.persistence.rdbms.LazyBootPersistenceManager.updateCheckpointData com.ibm.jbatch.container.persistence.CheckpointManager.checkpoint (CheckpointManager.java:128)에서 (LazyBootPersistenceManager.java:230) 는 ...에 의한 이상 13 : java.sql.SQLTransactionRollbackException : ????????????????????????????????????????????????????????? : 잠금 : ROW, CHECKPOINTDATA, (110,27) 대기중인 XID : {77885156, S}, APP, 선택 ID, obj from CHECKPOINTDATA 여기서 id =? 부여 된 XID : {77885155, X} 잠금 : ROW, CHECKPOINTDATA, (110,28) 대기중인 XID : {77885155, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id =? XID : {77885156, X} ???????? XID : 77885156? org.apache.derby.impl에서 org.apache.derby.impl.jdbc.Util.generateCsSQLException에서 org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException (알 소스) (알 소스)에 . org.apache.derby.impl.jdbc.EmbedConnection.handleException에서 org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException (알 소스)에 jdbc.TransactionResourceImpl.wrapInSQLException (알 소스) (알 소스) 에서 org.apache.derby.impl.jdbc.ConnectionChild.handleException (알 수없는 소스) at org.apache.derby.impl.jdbc.EmbedResultSet.closeOnTransactionError (알 수 없음 출처) 에서 org.apache.derby.impl.jdbc.EmbedResultSet.movePosition (알 수없는 출처) 에서 org.apache.derby.impl.jdbc.EmbedResultSet.next (알 수없는 출처) 에서 com.sun.gjc.spi .base.ResultSetWrapper.next fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData (JBatchJDBCPersistenceManager.java:498)에서 (ResultSetWrapper.java:103) 는 ...에 의한 이상 16은 java.sql.SQLException을 : ??????????????????????????? : 잠금 : ROW, CHECKPOINTDATA, (110,27) 대기중인 XID : {77885156, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id =? 부여 된 XID : {77885155, X} 잠금 : ROW, CHECKPOINTDATA, (110,28) 대기중인 XID : {77885155, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id =? XID : {77885156, X} ???????? XID : 77885156? ERROR 40001 : org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA (알 소스)에서 org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException (알 소스)에서 ... (27)이 더에 의해 발생 : ??????????????????????????? : 잠금 : ROW, CHECKPOINTDATA, (110,27) 대기중인 XID : {77885156, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id =? 부여 된 XID : {77885155, X} 잠금 : ROW, CHECKPOINTDATA, (110,28) 대기중인 XID : {77885155, S}, APP, select id, obj from CHECKPOINTDATA 여기서 id =? XID : {77885156, X} ???????? XID : 77885156? 에서 org.apache.derby.iapi.error.StandardException.newException (알 수 없음 출처) at org.apache.derby.impl.services.locks.Deadlock. org.apache.derby.impl.services.locks.ConcurrentLockSet.zeroDurationLockObject에서 org.apache.derby.impl.services.locks.ConcurrentLockSet.lockObject (알 소스) (알 소스)에 buildException (알 소스) org.apache에서 org.apache.derby.impl.services.locks.ConcurrentPool.zeroDurationlockObject에서 org.apache.derby.impl.services.locks.AbstractPool.zeroDurationlockObject (알 소스) (알 소스)에서. derby.impl.store.raw.xact.RowLocking2nohold.lockRecordForRead (알 수 없음 출처) at org.apache.derby.impl.store.access.conglomerate.OpenConglomerate.lockPositionForRead (알 수 없음 org.apache.derby.impl.store.access.heap.HeapScan.fetchNextGroup에서 org.apache.derby.impl.store.access.conglomerate.GenericScanController.fetchRows (알 소스) 10 소스) (알 원본) org.apache에서 org.apache.derby.impl.sql.execute.BulkTableScanResultSet.getNextRowCore에서 org.apache.derby.impl.sql.execute.BulkTableScanResultSet.reloadArray (알 소스) (알 소스)에서 .derby.impl.sql.execute.BasicNoPutResultSetImpl.getNextRow (알 소스) ... 20 개
당신은 어떤 아이디어를 어떻게해야합니까 이 문제를 해결하려면?
또는 3 개 이상의 스레드에서 실행할 수있는 샘플은 정말 유용합니다.
미리 감사드립니다.