저는 매우 큰 MySQL 테이블을 가지고 있습니다. (수십억 행이 있습니다.) Cassandra의 ColumnFamily로 변환하고 싶습니다. 나는 헥터를 사용하고있어.MySQL 테이블을 Cassandra의 ColumnFamily로 변환하십시오. Hector가있는 느린 배치 변이
내가 먼저 같은 내 스키마를 만들 :
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String newKeyspaceName = "KeyspaceName";
String newColumnFamilyName = "CFName";
ThriftCluster cassandraCluster;
CassandraHostConfigurator cassandraHostConfigurator;
cassandraHostConfigurator = new CassandraHostConfigurator(host);
cassandraCluster = new ThriftCluster(clusterName, cassandraHostConfigurator);
BasicColumnFamilyDefinition columnFamilyDefinition = new BasicColumnFamilyDefinition();
columnFamilyDefinition.setKeyspaceName(newKeyspaceName);
columnFamilyDefinition.setName(newColumnFamilyName);
columnFamilyDefinition.setDefaultValidationClass("UTF8Type");
columnFamilyDefinition.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
columnFamilyDefinition.setComparatorType(ComparatorType.UTF8TYPE);
BasicColumnDefinition columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("id"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.INTEGERTYPE.getClassName());
columnDefinition.setIndexName("id_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
columnDefinition = new BasicColumnDefinition();
columnDefinition.setName(StringSerializer.get().toByteBuffer("status"));
columnDefinition.setIndexType(ColumnIndexType.KEYS);
columnDefinition.setValidationClass(ComparatorType.ASCIITYPE.getClassName());
columnDefinition.setIndexName("status_index");
columnFamilyDefinition.addColumnDefinition(columnDefinition);
.......
ColumnFamilyDefinition cfDef = new ThriftCfDef(columnFamilyDefinition);
KeyspaceDefinition keyspaceDefinition =
HFactory.createKeyspaceDefinition(newKeyspaceName, "org.apache.cassandra.locator.SimpleStrategy", 1, Arrays.asList(cfDef));
cassandraCluster.addKeyspace(keyspaceDefinition);
를이 완료되면, 나는 목록에 저장된 내 데이터를로드 I는 같은 namedParametersJdbcTemplate와 MySQL의 데이터를 가져 오는거야 가입일 :
String clusterName = "Test Cluster";
String host = "cassandra.lanhost.com:9160";
String KeyspaceName = "KeyspaceName";
String ColumnFamilyName = "CFName";
final StringSerializer serializer = StringSerializer.get();
public void insert(List<SqlParameterSource> dataToInsert) throws ExceptionParserInterrupted {
Keyspace workingKeyspace = null;
Cluster cassandraCluster = HFactory.getOrCreateCluster(clusterName, host);
workingKeyspace = HFactory.createKeyspace(KeyspaceName, cassandraCluster);
Mutator<String> mutator = HFactory.createMutator(workingKeyspace, serializer);
ColumnFamilyTemplate<String, String> template = new ThriftColumnFamilyTemplate<String, String>(workingKeyspace, ColumnFamilyName, serializer, serializer);
long t1 = System.currentTimeMillis();
for (SqlParameterSource data : dataToInsert) {
String keyId = "id" + (Integer) data.getValue("id");
mutator.addInsertion(keyId, ColumnFamilyName, HFactory.createColumn("id", (Integer) data.getValue("id"), StringSerializer.get(), IntegerSerializer.get()));
mutator.addInsertion(keyId,ColumnFamilyName, HFactory.createStringColumn("status", data.getValue("status").toString()));
...............
}
mutator.execute();
System.out.println(t1 - System.currentTimeMillis());
대략 100 시간의 라인을 대략 1 시간 안에 삽입합니다. 실제로 느립니다. 내 삽입물을 멀티 스레딩하는 것에 대해 들었지만,이 특별한 경우에는 무엇을해야할지 모르겠다. BatchMutate를 사용해야합니까?
제가 보조 인덱스에 대해 맞다 고 생각합니다. 더 나은 퍼포먼스를 얻기 위해 그들을 제한하려고 노력할 것입니다. – Xavier