From 5d834578a23043b37f183cba8c66f90446fb019d Mon Sep 17 00:00:00 2001 From: kochetkov Date: Wed, 11 Sep 2024 12:48:19 +0300 Subject: [PATCH] update config executor --- сonfig-data-executor/pom.xml | 73 ++++--- .../java/org/micord/config/AtomikosConfig.java | 22 +- .../org/micord/config/DatabaseConnection.java | 43 +++- .../org/micord/config/S3HttpConnection.java | 37 +++- .../org/micord/models/RequestArgument.java | 6 + .../org/micord/models/S3ConnectionParams.java | 18 ++ .../org/micord/models/SqlConnectionParams.java | 26 ++- .../java/org/micord/service/ApiService.java | 200 +---------------- .../java/org/micord/service/S3Service.java | 98 +++++++++ .../java/org/micord/service/SqlAqlService.java | 204 ++++++++++++++++++ .../src/main/resources/application.yml | 3 - 11 files changed, 482 insertions(+), 248 deletions(-) create mode 100644 сonfig-data-executor/src/main/java/org/micord/service/S3Service.java create mode 100644 сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java diff --git a/сonfig-data-executor/pom.xml b/сonfig-data-executor/pom.xml index 67f4a98..220c5f6 100644 --- a/сonfig-data-executor/pom.xml +++ b/сonfig-data-executor/pom.xml @@ -23,25 +23,57 @@ pom import + + org.springframework + spring-framework-bom + 6.1.12 + import + + + org.springframework.boot + spring-boot-dependencies + 3.3.3 + pom + import + - - org.jooq - jooq - 3.19.11 - - org.springframework.boot spring-boot-starter-web - 3.3.2 - org.springframework.boot - spring-boot-starter-data-jpa - 3.3.2 + org.springframework + spring-tx + + + com.atomikos + transactions-jta + 6.0.0 + jakarta + + + com.atomikos + transactions-jdbc + 6.0.0 + jakarta + + + jakarta.transaction + jakarta.transaction-api + + + mysql + mysql-connector-java + 8.0.33 + runtime + + + org.postgresql + postgresql + runtime org.projectlombok @@ -59,12 +91,6 @@ jaxb-runtime 2.3.1 - - mysql - mysql-connector-java - 8.0.33 - runtime - com.arangodb arangodb-java-driver @@ -74,17 +100,6 @@ com.amazonaws aws-java-sdk-s3 - - com.atomikos - transactions-jta - 6.0.0 - - - javax.transaction - javax.transaction-api - 1.3 - - @@ -98,10 +113,10 @@ org.springframework.boot spring-boot-maven-plugin + 3.3.3 org.micord.Main - @@ -112,4 +127,4 @@ - \ No newline at end of file + diff --git a/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java b/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java index 7f731eb..e6cb7e7 100644 --- a/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java +++ b/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java @@ -2,9 +2,12 @@ package org.micord.config; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; +import jakarta.transaction.TransactionManager; +import jakarta.transaction.UserTransaction; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.jta.JtaTransactionManager; /** * @author Maksim Tereshin @@ -13,14 +16,23 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @EnableTransactionManagement public class AtomikosConfig { - @Bean(initMethod = "init", destroyMethod = "close") - public UserTransactionManager atomikosTransactionManager() { - return new UserTransactionManager(); + @Bean + public UserTransaction userTransaction() throws Throwable { + UserTransactionImp userTransactionImp = new UserTransactionImp(); + userTransactionImp.setTransactionTimeout(300); + return userTransactionImp; } @Bean - public UserTransactionImp atomikosUserTransaction() { - return new UserTransactionImp(); + public TransactionManager atomikosTransactionManager() throws Throwable { + UserTransactionManager userTransactionManager = new UserTransactionManager(); + userTransactionManager.setForceShutdown(true); + return userTransactionManager; + } + + @Bean + public JtaTransactionManager transactionManager() throws Throwable { + return new JtaTransactionManager(userTransaction(), atomikosTransactionManager()); } } diff --git a/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java b/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java index 0057bf0..d762779 100644 --- a/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java +++ b/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java @@ -1,16 +1,22 @@ package org.micord.config; +import com.atomikos.jdbc.AtomikosDataSourceBean; import org.micord.models.SqlConnectionParams; +import javax.sql.DataSource; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; /** * @author Maksim Tereshin */ public class DatabaseConnection { + private static final Map dataSources = new HashMap<>(); + public static Connection getConnection(SqlConnectionParams params) throws SQLException { try { Class.forName(params.getJdbcDriverClassName()); @@ -18,7 +24,40 @@ public class DatabaseConnection { throw new SQLException("Unable to load the JDBC driver class", e); } - return DriverManager.getConnection(params.getJdbcUrl(), params.getJdbcUsername(), params.getJdbcPassword()); + return getXaDataSource(params).getConnection(); + } + + public static DataSource getXaDataSource(SqlConnectionParams params) { + String database = params.getJdbcDatabase(); + + if (!dataSources.containsKey(database)) { + AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); + xaDataSource.setUniqueResourceName("jdbcDatasource_" + database); + xaDataSource.setXaDataSourceClassName(params.getJdbcXaDataSourceClassName()); + xaDataSource.setPoolSize(Integer.parseInt(params.getJdbcXaDataSourcePoolSize())); + + Properties xaProperties = loadDatabaseProperties(params); + xaDataSource.setXaProperties(xaProperties); + + dataSources.put(database, xaDataSource); + } + + return dataSources.get(database); + } + + private static Properties loadDatabaseProperties(SqlConnectionParams params) { + Properties xaProperties = new Properties(); + try { + xaProperties.setProperty("user", params.getJdbcUsername()); + xaProperties.setProperty("password", params.getJdbcPassword()); + xaProperties.setProperty("serverName", params.getJdbcHost()); + xaProperties.setProperty("portNumber", String.valueOf(params.getJdbcPort())); + xaProperties.setProperty("databaseName", params.getJdbcDatabase()); + + } catch (Exception e) { + throw new RuntimeException("Failed to load database properties", e); + } + return xaProperties; } } diff --git a/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java b/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java index 3ee7750..592abb7 100644 --- a/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java +++ b/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java @@ -22,27 +22,43 @@ public class S3HttpConnection { String host = connectionParams.getHost() + ":" + connectionParams.getPort(); String s3Key = connectionParams.getS3Key(); String s3Secret = connectionParams.getS3Secret(); + String method = connectionParams.getMethod().toUpperCase(); + String body = connectionParams.getBody(); - // The resource to be deleted, typically a file in the S3 bucket String resource = "/" + file; - String contentType = "application/octet-stream"; + String contentType = connectionParams.getContentType(); String date = ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME); - // Generate a signature for the DELETE request - String signature = generateSignature("DELETE", contentType, date, resource, s3Secret); + String signature = generateSignature(method, contentType, date, resource, s3Secret); - // Build and return the HTTP DELETE request - return HttpRequest.newBuilder() + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() .uri(URI.create("https://" + host + resource)) .header("Host", host) .header("Date", date) .header("Content-Type", contentType) - .header("Authorization", "AWS " + s3Key + ":" + signature) - .DELETE() - .build(); + .header("Authorization", "AWS " + s3Key + ":" + signature); + + + switch (method) { + case "DELETE": + requestBuilder.DELETE(); + break; + case "GET": + requestBuilder.GET(); + break; + case "PUT": + requestBuilder.PUT(HttpRequest.BodyPublishers.ofString(body != null ? body : "")); + break; + case "POST": + requestBuilder.POST(HttpRequest.BodyPublishers.ofString(body != null ? body : "")); + break; + default: + throw new IllegalArgumentException("Unsupported HTTP method: " + method); + } + + return requestBuilder.build(); } - // Utility method to generate a signature for the S3-compatible request private static String generateSignature(String method, String contentType, String date, String resource, String s3Secret) throws Exception { String stringToSign = method + "\n" + "\n" + // MD5 - not used for DELETE requests @@ -50,7 +66,6 @@ public class S3HttpConnection { date + "\n" + resource; - // HMAC-SHA1 signature generation Mac mac = Mac.getInstance("HmacSHA1"); SecretKeySpec secretKey = new SecretKeySpec(s3Secret.getBytes(StandardCharsets.UTF_8), "HmacSHA1"); mac.init(secretKey); diff --git a/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java b/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java index 44951f4..3c69964 100644 --- a/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java +++ b/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java @@ -13,6 +13,7 @@ import javax.xml.bind.annotation.XmlRootElement; public class RequestArgument { private String id; + private String aqlCollectionRead; private String requestURL; private SqlConnectionParams sqlConnectionParams; @@ -26,6 +27,11 @@ public class RequestArgument { return id; } + @XmlElement(name = "AqlCollectionRead") + public String getAqlCollectionRead() { + return aqlCollectionRead; + } + @XmlElement(name = "RequestURL") public String getRequestURL() { return requestURL; diff --git a/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java b/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java index 6472ab9..29fc61b 100644 --- a/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java +++ b/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java @@ -14,6 +14,9 @@ public class S3ConnectionParams { private String s3Secret; private String host; private String port; + private String contentType; + private String method; + private String body; @XmlElement(name = "S3Key") public String getS3Key() { @@ -35,4 +38,19 @@ public class S3ConnectionParams { return port; } + @XmlElement(name = "ContentType") + public String getContentType() { + return contentType; + } + + @XmlElement(name = "Method") + public String getMethod() { + return method; + } + + @XmlElement(name = "Body") + public String getBody() { + return body; + } + } diff --git a/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java b/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java index 97cdeb1..cad71b3 100644 --- a/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java +++ b/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java @@ -7,15 +7,28 @@ import javax.xml.bind.annotation.XmlElement; @Setter public class SqlConnectionParams { - private String jdbcUrl; + private String jdbcHost; + private String jdbcPort; private String jdbcUsername; private String jdbcPassword; private String jdbcDriverClassName; + private String jdbcXaDataSourceClassName; + private String jdbcXaDataSourcePoolSize; private String jdbcDatabase; - @XmlElement(name = "JdbcUrl") - public String getJdbcUrl() { - return jdbcUrl; + @XmlElement(name = "JdbcXaDataSourcePoolSize") + public String getJdbcXaDataSourcePoolSize() { + return jdbcXaDataSourcePoolSize; + } + + @XmlElement(name = "JdbcHost") + public String getJdbcHost() { + return jdbcHost; + } + + @XmlElement(name = "JdbcPort") + public String getJdbcPort() { + return jdbcPort; } @XmlElement(name = "JdbcUsername") @@ -33,6 +46,11 @@ public class SqlConnectionParams { return jdbcDriverClassName; } + @XmlElement(name = "JdbcXaDataSourceClassName") + public String getJdbcXaDataSourceClassName() { + return jdbcXaDataSourceClassName; + } + @XmlElement(name = "JdbcDatabase") public String getJdbcDatabase() { return jdbcDatabase; diff --git a/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java b/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java index 79820ea..b014ea7 100644 --- a/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java +++ b/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java @@ -1,44 +1,26 @@ package org.micord.service; -import com.arangodb.ArangoCursor; -import com.arangodb.ArangoDatabase; -import com.atomikos.icatch.jta.UserTransactionImp; -import org.micord.config.ArangoDBConnection; -import org.micord.config.DatabaseConnection; -import org.micord.config.S3HttpConnection; import org.micord.exceptions.FileNotModifiedException; import org.micord.models.*; import org.micord.utils.ConfigLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.FileNotFoundException; -import java.net.HttpURLConnection; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.*; import java.util.concurrent.CompletableFuture; @Service public class ApiService { - private static final Logger logger = LoggerFactory.getLogger(ApiService.class); - @Autowired private ConfigLoader configLoader; @Autowired - private UserTransactionImp userTransaction; + private S3Service s3Service; @Autowired - private HttpClient httpClient; + private SqlAqlService sqlAndAqlService; public CompletableFuture process(String methodName, List ids) throws FileNotFoundException, FileNotModifiedException { Optional optionalConfig = configLoader.loadConfigIfModified(methodName); @@ -49,181 +31,11 @@ public class ApiService { Requests config = optionalConfig.get(); - processRequests(config, ids); + sqlAndAqlService.processSqlAndAqlRequests(config, ids); - return CompletableFuture.completedFuture("Processing complete"); + return CompletableFuture.runAsync(() -> { + s3Service.processS3Requests(config.getS3Requests(), ids); + }).thenApply(v -> "Processing complete"); } - public void processRequests(Requests config, List ids) { - try { - userTransaction.begin(); - - // Process SQL and AQL requests - processSqlAndAqlRequests(config, ids); - - userTransaction.commit(); - logger.info("Successfully processed requests for all IDs."); - } catch (Exception e) { - try { - userTransaction.rollback(); - logger.error("Transaction rolled back due to failure.", e); - } catch (Exception rollbackException) { - logger.error("Failed to rollback the transaction", rollbackException); - } - logger.error("Failed to process requests", e); - } - - // Process S3 requests - processS3Requests(config.getS3Requests(), ids); - } - - private void processSqlAndAqlRequests(Requests config, List ids) { - for (SqlRequest request : config.getSqlRequests()) { - processSqlRequests(request, ids); - } - - for (AqlRequest request : config.getAqlRequests()) { - processAqlRequests(request, ids); - } - } - - private void processSqlRequests(SqlRequest request, List ids) { - try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) { - String query = buildSqlQuery(request, String.join(",", ids)); - executeSqlQuery(connection, query); - } catch (SQLException e) { - logger.error("SQL query execution failed", e); - } - } - - private String buildSqlQuery(SqlRequest request, String ids) { - StringBuilder clauseBuilder = new StringBuilder("id IN (" + ids + ")"); - - if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { - for (RequestArgument argument : request.getRequestArguments()) { - try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { - String query = argument.getRequestURL(); - List result = fetchFileListFromDatabaseSQL(connection, query); - - if (result != null && !result.isEmpty()) { - String resultSet = String.join(", ", result.stream() - .map(s -> "'" + s + "'") - .toArray(String[]::new)); - clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")"); - } - } catch (SQLException e) { - logger.error("Failed to execute query for RequestArgument", e); - } - } - } - - String clause = clauseBuilder.toString(); - return request.getRequestURL().replace("${clause}", clause); - } - - private void executeSqlQuery(Connection connection, String query) throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement(query)) { - stmt.execute(); - } - } - - private List fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException { - List results = new ArrayList<>(); - try (PreparedStatement stmt = connection.prepareStatement(query); - ResultSet rs = stmt.executeQuery()) { - while (rs.next()) { - results.add(rs.getString(1)); // Fetch the first column - } - } - return results; - } - - // Process S3 Requests concurrently - public void processS3Requests(List s3Requests, List ids) { - s3Requests.forEach(request -> { - List> futures = ids.stream() - .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id))) - .toList(); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenRun(() -> logger.info("Successfully processed all S3 requests.")) - .exceptionally(ex -> { - logger.error("Failed to process S3 requests", ex); - return null; - }); - }); - } - - private void processS3Request(S3Request request, String id) { - try { - List files = new ArrayList<>(); - - if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { - for (RequestArgument argument : request.getRequestArguments()) { - try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { - String query = argument.getRequestURL(); - List result = fetchFileListFromDatabaseSQL(connection, query); - if (result != null && !result.isEmpty()) { - files.addAll(result); - } - } catch (SQLException e) { - logger.error("Failed to execute query for RequestArgument", e); - } - } - } - - files.forEach(file -> { - HttpRequest httpRequest; - try { - httpRequest = S3HttpConnection.buildHttpRequest(request, file); - } catch (Exception e) { - throw new RuntimeException(e); - } - - httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()) - .thenAccept(response -> { - if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) { - logger.info("Successfully deleted object for ID {}", id); - } else { - logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode()); - } - }) - .exceptionally(ex -> { - logger.error("Failed to delete object for ID {}", id, ex); - return null; - }); - }); - - } catch (Exception e) { - logger.error("Failed to process S3 request for id: {}", id, e); - } - } - - // Process AQL requests - private void processAqlRequests(AqlRequest request, List ids) { - ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams()); - String aqlQuery = buildAqlQuery(request, ids); - List result = executeAqlQueryForStrings(arangoDb, aqlQuery); - logger.info("Successfully executed AQL request and retrieved results: {}", result); - } - - private String buildAqlQuery(AqlRequest request, List ids) { - String collection = request.getAqlConnectionParams().getCollection(); - String inClause = "FOR doc IN " + collection + " FILTER doc.id IN [" + - String.join(", ", ids.stream().map(id -> "'" + id + "'").toArray(String[]::new)) + - "] RETURN doc"; - return request.getRequestURL().replace("${collection}", collection).replace("${clause}", inClause); - } - - private List executeAqlQueryForStrings(ArangoDatabase arangoDb, String query) { - List results = new ArrayList<>(); - try (ArangoCursor cursor = arangoDb.query(query, null, null, null)) { - while (cursor.hasNext()) { - results.add(cursor.next()); - } - } catch (Exception e) { - logger.error("Failed to execute AQL query", e); - } - return results; - } } \ No newline at end of file diff --git a/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java b/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java new file mode 100644 index 0000000..0a5798d --- /dev/null +++ b/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java @@ -0,0 +1,98 @@ +package org.micord.service; + +import org.micord.config.DatabaseConnection; +import org.micord.config.S3HttpConnection; +import org.micord.models.RequestArgument; +import org.micord.models.S3Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.net.HttpURLConnection; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * @author Maksim Tereshin + */ +@Service +public class S3Service { + + private static final Logger logger = LoggerFactory.getLogger(S3Service.class); + + @Autowired + private SqlAqlService sqlAndAqlService; + + @Autowired + private HttpClient httpClient; + + public void processS3Requests(List s3Requests, List ids) { + if (s3Requests != null) { + s3Requests.forEach(request -> { + List> futures = ids.stream() + .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id))) + .toList(); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenRun(() -> logger.info("Successfully processed all S3 requests.")) + .exceptionally(ex -> { + logger.error("Failed to process S3 requests", ex); + return null; + }); + }); + } + } + + private void processS3Request(S3Request request, String id) { + try { + List files = new ArrayList<>(); + + if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { + for (RequestArgument argument : request.getRequestArguments()) { + try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { + String query = argument.getRequestURL(); + List result = sqlAndAqlService.fetchFileListFromDatabaseSQL(connection, query); + if (result != null && !result.isEmpty()) { + files.addAll(result); + } + } catch (SQLException e) { + logger.error("Failed to execute query for RequestArgument", e); + } + } + } + + files.forEach(file -> { + HttpRequest httpRequest; + try { + httpRequest = S3HttpConnection.buildHttpRequest(request, file); + } catch (Exception e) { + throw new RuntimeException(e); + } + + httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()) + .thenAccept(response -> { + if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) { + logger.info("Successfully deleted object for ID {}", id); + } else { + logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode()); + } + }) + .exceptionally(ex -> { + logger.error("Failed to delete object for ID {}", id, ex); + return null; + }); + }); + + } catch (Exception e) { + logger.error("Failed to process S3 request for id: {}", id, e); + } + } + +} diff --git a/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java b/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java new file mode 100644 index 0000000..527ba79 --- /dev/null +++ b/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java @@ -0,0 +1,204 @@ +package org.micord.service; + +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDBException; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.StreamTransactionEntity; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.model.StreamTransactionOptions; +import com.arangodb.model.TransactionOptions; +import org.micord.config.ArangoDBConnection; +import org.micord.config.DatabaseConnection; +import org.micord.models.AqlRequest; +import org.micord.models.RequestArgument; +import org.micord.models.Requests; +import org.micord.models.SqlRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Maksim Tereshin + */ +@Service +public class SqlAqlService { + + private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class); + + @Transactional + public void processSqlAndAqlRequests(Requests config, List ids) { + if (config.getSqlRequests() != null) { + for (SqlRequest request : config.getSqlRequests()) { + processSqlRequests(request, ids); + } + } + + if (config.getAqlRequests() != null) { + for (AqlRequest request : config.getAqlRequests()) { + processAqlRequests(request, ids); + } + } + } + + private void processSqlRequests(SqlRequest request, List ids) { + try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) { + String query = buildSqlQuery(request, String.join(",", ids)); + executeSqlQuery(connection, query); + } catch (SQLException e) { + logger.error("SQL query execution failed", e); + } + } + + private String buildSqlQuery(SqlRequest request, String ids) { + StringBuilder clauseBuilder = new StringBuilder("id IN (" + ids + ")"); + + if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { + for (RequestArgument argument : request.getRequestArguments()) { + try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { + String query = argument.getRequestURL(); + List result = fetchFileListFromDatabaseSQL(connection, query); + + if (result != null && !result.isEmpty()) { + String resultSet = String.join(", ", result.stream() + .map(s -> "'" + s + "'") + .toArray(String[]::new)); + clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")"); + } + } catch (SQLException e) { + logger.error("Failed to execute query for RequestArgument", e); + } + } + } + + String clause = clauseBuilder.toString(); + return request.getRequestURL().replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase()).replace("${clause}", clause); + } + + private void executeSqlQuery(Connection connection, String query) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(query)) { + stmt.execute(); + } + } + + public List fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException { + List results = new ArrayList<>(); + try (PreparedStatement stmt = connection.prepareStatement(query); + ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + results.add(rs.getString(1)); // Fetch the first column + } + } + return results; + } + + private void processAqlRequests(AqlRequest request, List ids) { + ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams()); + + RequestArgument requestArgument = request.getRequestArguments().get(0); + String aqlCollectionRead = requestArgument.getAqlCollectionRead(); + String aqlCollectionWrite = requestArgument.getId(); + + StreamTransactionEntity tx = null; + try { + StreamTransactionOptions options = new StreamTransactionOptions() + .writeCollections(aqlCollectionWrite) + .readCollections(aqlCollectionRead); + + tx = arangoDb.beginStreamTransaction(options); + String transactionId = tx.getId(); + + logger.info("Stream transaction started with ID: {}", transactionId); + + List entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids, transactionId); + executeMainAqlRequest(arangoDb, request, entities, transactionId); + + arangoDb.commitStreamTransaction(transactionId); + logger.info("Stream transaction with ID {} committed successfully", transactionId); + + } catch (ArangoDBException e) { + if (tx != null) { + arangoDb.abortStreamTransaction(tx.getId()); + logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e); + } + throw new RuntimeException("Failed to execute AQL request within a stream transaction", e); + } + + logger.info("Successfully executed AQL request"); + } + + private List executeSelectAqlRequest(ArangoDatabase arangoDb, List requestArguments, List ids, String transactionId) { + List entityIdList = new ArrayList<>(); + + RequestArgument argument = requestArguments.get(0); + + String query = argument.getRequestURL(); + String entityType = argument.getId(); + + Map bindVars = new HashMap<>(); + bindVars.put("ids", ids); + + AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); + + try (ArangoCursor cursor = arangoDb.query(query, Map.class, bindVars, aqlQueryOptions)) { + while (cursor.hasNext()) { + Map result = cursor.next(); + switch (entityType) { + case "applicationId": + entityIdList.add((String) result.get("applicationId")); + break; + case "edgesId": + entityIdList.addAll((List) result.get("edgesId")); + break; + case "subjectId": + entityIdList.addAll((List) result.get("subjectId")); + break; + case "historyId": + entityIdList.addAll((List) result.get("historyId")); + break; + case "interdepreqId": + entityIdList.addAll((List) result.get("interdepreqId")); + break; + default: + throw new IllegalArgumentException("Invalid requestArgumentId: " + entityType); + } + } + } catch (Exception e) { + logger.error("Failed to execute AQL query", e); + } + + return entityIdList; + } + + private void executeMainAqlRequest(ArangoDatabase arangoDb, AqlRequest request, List entityIdList, String transactionId) { + if (entityIdList == null || entityIdList.isEmpty()) { + logger.warn("No entities found for main AQL request."); + return; + } + + String entity = request.getRequestArguments().get(0).getId(); + + Map bindVars = new HashMap<>(); + bindVars.put("ids", entityIdList); + + + String finalQuery = request.getRequestURL() + .replace("${entity}", entity); + + AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); + + arangoDb.query(finalQuery, null, bindVars, aqlQueryOptions); + + logger.info("Successfully removed {}: {}", entity, entityIdList); + } + +} diff --git a/сonfig-data-executor/src/main/resources/application.yml b/сonfig-data-executor/src/main/resources/application.yml index 7c65eb4..f64b73f 100644 --- a/сonfig-data-executor/src/main/resources/application.yml +++ b/сonfig-data-executor/src/main/resources/application.yml @@ -1,6 +1,5 @@ configDirectory: /Users/maksim/Projects/Micord/configs - #spring: # datasource: # url: jdbc:mysql://localhost:3306/micord?useSSL=false @@ -14,5 +13,3 @@ configDirectory: /Users/maksim/Projects/Micord/configs # properties: # hibernate: # dialect: org.hibernate.dialect.MySQLDialect -server: - port: 8090