Pull request #684: RED-6270
Merge in RED/persistence-service from RED-6270 to master * commit '4ad76962ec8786f518c62f57f273c4c3a37755ec': RED-6270: Extracted common qualifiers into a constant RED-6270: Changed batch execution to use the batch size provided by the configuration RED-6270: Implemented the adding of dictionary entries via batch updates to improve writing speed RED-6270: Removed default connection value because it had no effect RED-6270: Extracted common code into own methods and corrected jdbc-string generation (not all params were concatenated correctly)
This commit is contained in:
commit
23d49172d5
@ -1,11 +1,17 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
import org.springframework.cloud.openfeign.support.PageJacksonModule;
|
||||
import org.springframework.cloud.openfeign.support.SortJacksonModule;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
@ -21,6 +27,9 @@ import com.iqser.red.service.persistence.management.v1.processor.settings.FileMa
|
||||
@EnableFeignClients(basePackageClasses = {PDFTronClient.class, StatusReportClient.class, SearchClient.class, RedactionClient.class})
|
||||
public class PersistenceServiceProcessorConfiguration {
|
||||
|
||||
public static final String TENANT_DATA_SOURCE_QUALIFIER = "multiTenantDataSource";
|
||||
|
||||
|
||||
@Bean
|
||||
public PageJacksonModule pageJacksonModule() {
|
||||
|
||||
@ -51,4 +60,13 @@ public class PersistenceServiceProcessorConfiguration {
|
||||
return retryTemplate;
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
@DependsOn(TENANT_DATA_SOURCE_QUALIFIER)
|
||||
@Primary
|
||||
public JdbcTemplate tenantJdbcTemplate(@Qualifier(TENANT_DATA_SOURCE_QUALIFIER) DataSource dataSource) {
|
||||
|
||||
return new JdbcTemplate(dataSource);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,5 +1,10 @@
|
||||
package com.iqser.red.service.persistence.management.v1.processor.utils.jdbc;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.multitenancy.entity.DatabaseConnectionEntity;
|
||||
import com.iqser.red.service.persistence.service.v1.api.shared.model.multitenancy.DatabaseConnection;
|
||||
|
||||
@ -8,75 +13,87 @@ import lombok.experimental.UtilityClass;
|
||||
@UtilityClass
|
||||
public class JDBCUtils {
|
||||
|
||||
public String buildJdbcUrl(DatabaseConnectionEntity databaseConnectionEntity) {
|
||||
|
||||
public String buildJdbcUrl(DatabaseConnectionEntity databaseConnectionEntity){
|
||||
StringBuilder sb = new StringBuilder("jdbc:")
|
||||
.append(databaseConnectionEntity.getDriver())
|
||||
.append("://")
|
||||
.append(databaseConnectionEntity.getHost())
|
||||
.append(':')
|
||||
.append(databaseConnectionEntity.getPort())
|
||||
.append('/')
|
||||
.append(databaseConnectionEntity.getDatabase());
|
||||
StringBuilder sb = createJdbcConnectionStringBuilder(databaseConnectionEntity.getDriver(),
|
||||
databaseConnectionEntity.getHost(),
|
||||
databaseConnectionEntity.getPort(),
|
||||
databaseConnectionEntity.getDatabase());
|
||||
|
||||
if(databaseConnectionEntity.getParams() != null) {
|
||||
sb.append('?');
|
||||
databaseConnectionEntity.getParams().forEach((k, v) -> sb.append('&').append(k).append(v));
|
||||
}
|
||||
Map<String, String> params = getConnectionParameters(databaseConnectionEntity);
|
||||
appendParams(sb, params);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
public String buildJdbcUrl(DatabaseConnection databaseConnection){
|
||||
StringBuilder sb = new StringBuilder("jdbc:")
|
||||
.append(databaseConnection.getDriver())
|
||||
.append("://")
|
||||
.append(databaseConnection.getHost())
|
||||
.append(':')
|
||||
.append(databaseConnection.getPort())
|
||||
.append('/')
|
||||
.append(databaseConnection.getDatabase());
|
||||
if(databaseConnection.getParams() != null) {
|
||||
sb.append('?');
|
||||
databaseConnection.getParams().forEach((k, v) -> sb.append('&').append(k).append(v));
|
||||
}
|
||||
private StringBuilder createJdbcConnectionStringBuilder(String driver, String host, String port, String database) {
|
||||
|
||||
return new StringBuilder("jdbc:").append(driver).append("://").append(host).append(':').append(port).append('/').append(database);
|
||||
}
|
||||
|
||||
|
||||
private Map<String, String> getConnectionParameters(DatabaseConnectionEntity databaseConnectionEntity) {
|
||||
|
||||
return Optional.ofNullable(databaseConnectionEntity.getParams()).orElseGet(HashMap::new);
|
||||
}
|
||||
|
||||
|
||||
public String buildJdbcUrl(DatabaseConnection databaseConnection) {
|
||||
|
||||
StringBuilder sb = createJdbcConnectionStringBuilder(databaseConnection.getDriver(),
|
||||
databaseConnection.getHost(),
|
||||
databaseConnection.getPort(),
|
||||
databaseConnection.getDatabase());
|
||||
Map<String, String> params = getConnectionParameters(databaseConnection);
|
||||
appendParams(sb, params);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
public String buildJdbcUrlWithSchema(DatabaseConnection databaseConnection){
|
||||
StringBuilder sb = new StringBuilder("jdbc:")
|
||||
.append(databaseConnection.getDriver())
|
||||
.append("://")
|
||||
.append(databaseConnection.getHost())
|
||||
.append(':')
|
||||
.append(databaseConnection.getPort())
|
||||
.append('/')
|
||||
.append(databaseConnection.getDatabase())
|
||||
.append('?')
|
||||
.append("currentSchema=")
|
||||
.append(databaseConnection.getSchema());
|
||||
if(databaseConnection.getParams() != null) {
|
||||
databaseConnection.getParams().forEach((k, v) -> sb.append('&').append(k).append(v));
|
||||
private void appendParams(StringBuilder sb, Map<String, String> params) {
|
||||
|
||||
if (!params.isEmpty()) {
|
||||
sb.append("?");
|
||||
List<String> paramsAsStrings = params.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toList();
|
||||
sb.append(String.join("&", paramsAsStrings));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public String buildJdbcUrlWithSchema(DatabaseConnection databaseConnection) {
|
||||
|
||||
return createJdbcConnectionString(databaseConnection.getDriver(),
|
||||
databaseConnection.getHost(),
|
||||
databaseConnection.getPort(),
|
||||
databaseConnection.getDatabase(),
|
||||
databaseConnection.getSchema(),
|
||||
getConnectionParameters(databaseConnection));
|
||||
}
|
||||
|
||||
|
||||
private static String createJdbcConnectionString(String driver, String host, String port, String database, String schema, Map<String, String> connectionParameters) {
|
||||
|
||||
StringBuilder sb = createJdbcConnectionStringBuilder(driver, host, port, database);
|
||||
connectionParameters.put("currentSchema", schema);
|
||||
appendParams(sb, connectionParameters);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public String buildJdbcUrlWithSchema(DatabaseConnectionEntity databaseConnection){
|
||||
StringBuilder sb = new StringBuilder("jdbc:")
|
||||
.append(databaseConnection.getDriver())
|
||||
.append("://")
|
||||
.append(databaseConnection.getHost())
|
||||
.append(':')
|
||||
.append(databaseConnection.getPort())
|
||||
.append('/')
|
||||
.append(databaseConnection.getDatabase())
|
||||
.append('?')
|
||||
.append("currentSchema=")
|
||||
.append(databaseConnection.getSchema());
|
||||
if(databaseConnection.getParams() != null) {
|
||||
databaseConnection.getParams().forEach((k, v) -> sb.append('&').append(k).append(v));
|
||||
}
|
||||
return sb.toString();
|
||||
|
||||
private Map<String, String> getConnectionParameters(DatabaseConnection databaseConnection) {
|
||||
|
||||
return Optional.ofNullable(databaseConnection.getParams()).orElseGet(HashMap::new);
|
||||
}
|
||||
|
||||
|
||||
public String buildJdbcUrlWithSchema(DatabaseConnectionEntity databaseConnection) {
|
||||
|
||||
return createJdbcConnectionString(databaseConnection.getDriver(),
|
||||
databaseConnection.getHost(),
|
||||
databaseConnection.getPort(),
|
||||
databaseConnection.getDatabase(),
|
||||
databaseConnection.getSchema(),
|
||||
getConnectionParameters(databaseConnection));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -12,20 +12,14 @@ import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.persistence.Column;
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.Table;
|
||||
import javax.transaction.Transactional;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.datasource.DataSourceUtils;
|
||||
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.iqser.red.service.persistence.management.v1.processor.service.persistence.mulitenancy.DynamicDataSourceBasedMultiTenantConnectionProvider;
|
||||
import com.iqser.red.service.persistence.management.v1.processor.utils.multitenancy.TenantContext;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -35,75 +29,64 @@ import lombok.SneakyThrows;
|
||||
@RequiredArgsConstructor
|
||||
public class JDBCWriteUtils {
|
||||
|
||||
private final String SQL_TEMPLATE = "INSERT INTO %s (%s) values (%s)";
|
||||
private static final String SQL_TEMPLATE = "INSERT INTO %s (%s) values (%s)";
|
||||
|
||||
private final Map<Class<?>, EntityMetadata> entityMetadataMap = new HashMap<>();
|
||||
|
||||
private final EntityManager entityManager;
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
@Transactional
|
||||
public <T> void saveBatch(final List<T> entities) {
|
||||
public <T> void saveBatch(List<T> entities) {
|
||||
|
||||
if (entities.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
var metadata = getEntityMetadata(entities.iterator().next());
|
||||
var metadata = getEntityMetadata(entities.get(0).getClass());
|
||||
|
||||
final int batchSize = 500;
|
||||
|
||||
var query = entityManager.createNativeQuery(metadata.getSqlStatement());
|
||||
for (int j = 0; j < entities.size(); j += batchSize) {
|
||||
|
||||
final List<T> batchList = entities.subList(j, Math.min(j + batchSize, entities.size()));
|
||||
|
||||
for (var entity : batchList) {
|
||||
int paramIndex = 1;
|
||||
for (var mapping : metadata.getFieldMethodMap().entrySet()) {
|
||||
query.setParameter(paramIndex++, mapping.getValue().invoke(entity));
|
||||
}
|
||||
|
||||
query.executeUpdate();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
entityManager.clear();
|
||||
jdbcTemplate.batchUpdate(metadata.getSqlStatement(), entities, getBatchSize(), metadata::setValues);
|
||||
}
|
||||
|
||||
|
||||
private <T> EntityMetadata getEntityMetadata(T entity) {
|
||||
private int getBatchSize() {
|
||||
|
||||
var existingMetadata = entityMetadataMap.get(entity.getClass());
|
||||
return environment.getProperty("spring.jpa.properties.hibernate.jdbc.batch_size", int.class, 500);
|
||||
}
|
||||
|
||||
|
||||
private EntityMetadata getEntityMetadata(Class<?> entityClass) {
|
||||
|
||||
var existingMetadata = entityMetadataMap.get(entityClass);
|
||||
if (existingMetadata != null) {
|
||||
return existingMetadata;
|
||||
}
|
||||
|
||||
var tableName = getTableName(entity);
|
||||
var args = getArgs(entity);
|
||||
var tableName = getTableName(entityClass);
|
||||
var args = getArgs(entityClass);
|
||||
var sql = String.format(SQL_TEMPLATE, tableName, String.join(", ", args.keySet()), args.keySet().stream().map(a -> "?").collect(Collectors.joining(", ")));
|
||||
|
||||
var metadata = new EntityMetadata(tableName, sql, args);
|
||||
entityMetadataMap.put(entity.getClass(), metadata);
|
||||
entityMetadataMap.put(entityClass, metadata);
|
||||
|
||||
return metadata;
|
||||
}
|
||||
|
||||
|
||||
private <T> String getTableName(T entity) {
|
||||
private String getTableName(Class<?> entityClass) {
|
||||
|
||||
var tableAnnot = entity.getClass().getDeclaredAnnotation(Table.class);
|
||||
var tableAnnot = entityClass.getDeclaredAnnotation(Table.class);
|
||||
return tableAnnot.name();
|
||||
}
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
private <T> Map<String, Method> getArgs(T entity) {
|
||||
private Map<String, Method> getArgs(Class<?> entityClass) {
|
||||
|
||||
var fields = entity.getClass().getDeclaredFields();
|
||||
var fields = entityClass.getDeclaredFields();
|
||||
Map<String, Method> entityMethodMap = new LinkedHashMap<>();
|
||||
for (var field : fields) {
|
||||
var annotations = field.getDeclaredAnnotations();
|
||||
@ -111,7 +94,7 @@ public class JDBCWriteUtils {
|
||||
if (annotation.annotationType().equals(Column.class)) {
|
||||
var columnAnnotation = (Column) annotation;
|
||||
var name = StringUtils.isEmpty(columnAnnotation.name()) ? toSnakeCase(field.getName()) : columnAnnotation.name();
|
||||
entityMethodMap.put(name, entity.getClass().getMethod(getMethodName(field)));
|
||||
entityMethodMap.put(name, entityClass.getMethod(getMethodName(field)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -144,6 +127,16 @@ public class JDBCWriteUtils {
|
||||
private String sqlStatement;
|
||||
private Map<String, Method> fieldMethodMap;
|
||||
|
||||
|
||||
@SneakyThrows
|
||||
public void setValues(PreparedStatement preparedStatement, Object entity) {
|
||||
|
||||
int paramIndex = 1;
|
||||
for (var mapping : getFieldMethodMap().entrySet()) {
|
||||
preparedStatement.setObject(paramIndex++, mapping.getValue().invoke(entity));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -22,7 +22,7 @@ spring:
|
||||
properties:
|
||||
hibernate:
|
||||
jdbc:
|
||||
batch_size: 50
|
||||
batch_size: 1000
|
||||
order_inserts: true
|
||||
order_updates: true
|
||||
cache:
|
||||
|
||||
@ -101,33 +101,33 @@ public class EntityPerformanceTest extends AbstractPersistenceServerServiceTest
|
||||
@Test
|
||||
public void testWritePerformance() {
|
||||
|
||||
var tenKEntries = generateEntries(10_000);
|
||||
final int numberOfEntries = 10_000;
|
||||
var entries = generateEntries(numberOfEntries);
|
||||
|
||||
var template = dossierTemplateTesterAndProvider.provideTestTemplate("test");
|
||||
var type1 = typeProvider.testAndProvideType(template, null, "t1");
|
||||
var type2 = typeProvider.testAndProvideType(template, null, "t2");
|
||||
|
||||
List<DictionaryEntryEntity> type1Entries = tenKEntries.stream().map(s -> new DictionaryEntryEntity(0, s, 1, false, type1.getTypeId())).collect(Collectors.toList());
|
||||
List<DictionaryEntryEntity> type1Entries = entries.stream().map(s -> new DictionaryEntryEntity(0, s, 1, false, type1.getTypeId())).collect(Collectors.toList());
|
||||
|
||||
List<DictionaryEntryEntity> type2Entries = tenKEntries.stream().map(s -> new DictionaryEntryEntity(0, s, 1, false, type2.getTypeId())).collect(Collectors.toList());
|
||||
List<DictionaryEntryEntity> type2Entries = entries.stream().map(s -> new DictionaryEntryEntity(0, s, 1, false, type2.getTypeId())).collect(Collectors.toList());
|
||||
|
||||
assertThat(entryRepository.count()).isEqualTo(0);
|
||||
long t1 = System.currentTimeMillis();
|
||||
entryRepository.saveAll(type1Entries);
|
||||
long jpaTime = System.currentTimeMillis() - t1;
|
||||
assertThat(entryRepository.count()).isEqualTo(10_000);
|
||||
assertThat(entryRepository.count()).isEqualTo(numberOfEntries);
|
||||
|
||||
t1 = System.currentTimeMillis();
|
||||
jdbcWriteUtils.saveBatch(type2Entries);
|
||||
long jdbcTime = System.currentTimeMillis() - t1;
|
||||
|
||||
assertThat(entryRepository.count()).isEqualTo(20_000);
|
||||
assertThat(entryRepository.count()).isEqualTo(2 * numberOfEntries);
|
||||
|
||||
// assertThat(jpaTime).isGreaterThan(jdbcTime);
|
||||
|
||||
System.out.println("JPA Time: " + jpaTime + "ms for 10k entries");
|
||||
System.out.println("JDBC Time: " + jdbcTime + "ms for 10k entries");
|
||||
|
||||
log.info("JPA Time: {} ms for {} entries", jpaTime, numberOfEntries);
|
||||
log.info("JDBC Time: {} ms for {} entries", jdbcTime, numberOfEntries);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@ spring:
|
||||
scheduler:
|
||||
instanceId: AUTO
|
||||
job-store-type: JDBC
|
||||
|
||||
main:
|
||||
allow-circular-references: true # FIXME
|
||||
jpa:
|
||||
@ -28,7 +27,7 @@ spring:
|
||||
properties:
|
||||
hibernate:
|
||||
jdbc:
|
||||
batch_size: 50
|
||||
batch_size: 1000
|
||||
order_inserts: true
|
||||
order_updates: true
|
||||
open-in-view: true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user