diff --git a/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java b/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java index 3499bd0..7e692a6 100644 --- a/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java +++ b/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java @@ -1,6 +1,5 @@ package org.micord.service; -import org.micord.exceptions.FileNotModifiedException; import org.micord.models.*; import org.micord.utils.ConfigLoader; import org.springframework.beans.factory.annotation.Autowired; @@ -8,7 +7,6 @@ import org.springframework.stereotype.Service; import java.io.FileNotFoundException; import java.util.*; -import java.util.concurrent.CompletableFuture; @Service public class ApiService { diff --git a/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java b/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java index 0a5798d..cfb4eb8 100644 --- a/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java +++ b/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java @@ -30,69 +30,6 @@ public class S3Service { @Autowired private SqlAqlService sqlAndAqlService; - @Autowired - private HttpClient httpClient; - public void processS3Requests(List s3Requests, List ids) { - if (s3Requests != null) { - s3Requests.forEach(request -> { - List> futures = ids.stream() - .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id))) - .toList(); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenRun(() -> logger.info("Successfully processed all S3 requests.")) - .exceptionally(ex -> { - logger.error("Failed to process S3 requests", ex); - return null; - }); - }); - } - } - - private void processS3Request(S3Request request, String id) { - try { - List files = new ArrayList<>(); - - if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { - for (RequestArgument argument : request.getRequestArguments()) { - try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { - String query = argument.getRequestURL(); - List result = sqlAndAqlService.fetchFileListFromDatabaseSQL(connection, query); - if (result != null && !result.isEmpty()) { - files.addAll(result); - } - } catch (SQLException e) { - logger.error("Failed to execute query for RequestArgument", e); - } - } - } - - files.forEach(file -> { - HttpRequest httpRequest; - try { - httpRequest = S3HttpConnection.buildHttpRequest(request, file); - } catch (Exception e) { - throw new RuntimeException(e); - } - - httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()) - .thenAccept(response -> { - if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) { - logger.info("Successfully deleted object for ID {}", id); - } else { - logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode()); - } - }) - .exceptionally(ex -> { - logger.error("Failed to delete object for ID {}", id, ex); - return null; - }); - }); - - } catch (Exception e) { - logger.error("Failed to process S3 request for id: {}", id, e); - } - } } diff --git a/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java b/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java index 9ab9134..0255a2c 100644 --- a/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java +++ b/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java @@ -1,24 +1,9 @@ package org.micord.service; -import com.arangodb.ArangoCursor; -import com.arangodb.ArangoDBException; -import com.arangodb.ArangoDatabase; -import com.arangodb.entity.StreamTransactionEntity; -import com.arangodb.model.AqlQueryOptions; -import com.arangodb.model.StreamTransactionOptions; -import com.arangodb.model.TransactionOptions; -import org.micord.config.ArangoDBConnection; -import org.micord.config.DatabaseConnection; -import org.micord.models.AqlRequest; -import org.micord.models.RequestArgument; -import org.micord.models.Requests; -import org.micord.models.SqlRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - +import java.net.HttpURLConnection; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -27,6 +12,27 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoDBException; +import com.arangodb.ArangoDatabase; +import com.arangodb.entity.StreamTransactionEntity; +import com.arangodb.model.AqlQueryOptions; +import com.arangodb.model.StreamTransactionOptions; +import org.micord.config.ArangoDBConnection; +import org.micord.config.DatabaseConnection; +import org.micord.config.S3HttpConnection; +import org.micord.models.AqlRequest; +import org.micord.models.RequestArgument; +import org.micord.models.Requests; +import org.micord.models.S3Request; +import org.micord.models.SqlRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; /** * @author Maksim Tereshin @@ -34,183 +40,269 @@ import java.util.Map; @Service public class SqlAqlService { - private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class); + private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class); - @Autowired - private S3Service s3Service; + @Autowired + private HttpClient httpClient; - @Transactional - public void processSqlAndAqlRequests(Requests config, List ids) { - if (config.getSqlRequests() != null) { - for (SqlRequest request : config.getSqlRequests()) { - processSqlRequests(request, ids); - } - } + public void processS3Requests(List s3Requests, List ids) { + if (s3Requests != null) { + s3Requests.forEach(request -> { + List> futures = ids.stream() + .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id))) + .toList(); - if (config.getAqlRequests() != null) { - for (AqlRequest request : config.getAqlRequests()) { - processAqlRequests(request, ids); - } - } - s3Service.processS3Requests(config.getS3Requests(), ids); - } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenRun(() -> logger.info("Successfully processed all S3 requests.")) + .exceptionally(ex -> { + logger.error("Failed to process S3 requests", ex); + return null; + }); + }); + } + } - private void processSqlRequests(SqlRequest request, List ids) { - try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) { - String query = buildSqlQuery(request, String.join(",", ids)); - executeSqlQuery(connection, query); - } catch (SQLException e) { - logger.error("SQL query execution failed", e); - } - } + private void processS3Request(S3Request request, String id) { + try { + List files = new ArrayList<>(); - private String buildSqlQuery(SqlRequest request, String ids) { - StringBuilder clauseBuilder = new StringBuilder(); + if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { + for (RequestArgument argument : request.getRequestArguments()) { + try (Connection connection = DatabaseConnection.getConnection( + argument.getSqlConnectionParams())) { + String query = argument.getRequestURL(); + List result = fetchFileListFromDatabaseSQL(connection, query); + if (result != null && !result.isEmpty()) { + files.addAll(result); + } + } + catch (SQLException e) { + logger.error("Failed to execute query for RequestArgument", e); + } + } + } - if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { - for (RequestArgument argument : request.getRequestArguments()) { - clauseBuilder.append(argument.getId()).append(" IN (").append(ids).append(")"); + files.forEach(file -> { + HttpRequest httpRequest; + try { + httpRequest = S3HttpConnection.buildHttpRequest(request, file); + } + catch (Exception e) { + throw new RuntimeException(e); + } - if (argument.getSqlConnectionParams() != null) { - try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { - String query = argument.getRequestURL(); - List result = fetchFileListFromDatabaseSQL(connection, query); + httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()) + .thenAccept(response -> { + if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT + || response.statusCode() == HttpURLConnection.HTTP_OK) { + logger.info("Successfully deleted object for ID {}", id); + } + else { + logger.error("Failed to delete object for ID {}. Response code: {}", id, + response.statusCode() + ); + } + }) + .exceptionally(ex -> { + logger.error("Failed to delete object for ID {}", id, ex); + return null; + }); + }); - if (result != null && !result.isEmpty()) { - String resultSet = String.join(", ", result.stream() - .map(s -> "'" + s + "'") - .toArray(String[]::new)); - clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")"); - } - } catch (SQLException e) { - logger.error("Failed to execute query for RequestArgument", e); - } - } - } - } + } + catch (Exception e) { + logger.error("Failed to process S3 request for id: {}", id, e); + } + } - String clause = clauseBuilder.toString(); - return request.getRequestURL() - .replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase()) - .replace("${clause}", clause) - .replace("${ids}", ids); - } + @Transactional + public void processSqlAndAqlRequests(Requests config, List ids) { + if (config.getSqlRequests() != null) { + for (SqlRequest request : config.getSqlRequests()) { + processSqlRequests(request, ids); + } + } - private void executeSqlQuery(Connection connection, String query) throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement(query)) { - stmt.execute(); - } - } + if (config.getAqlRequests() != null) { + for (AqlRequest request : config.getAqlRequests()) { + processAqlRequests(request, ids); + } + } + processS3Requests(config.getS3Requests(), ids); + } - public List fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException { - List results = new ArrayList<>(); - try (PreparedStatement stmt = connection.prepareStatement(query); - ResultSet rs = stmt.executeQuery()) { - while (rs.next()) { - results.add(rs.getString(1)); // Fetch the first column - } - } - return results; - } + private void processSqlRequests(SqlRequest request, List ids) { + try (Connection connection = DatabaseConnection.getConnection( + request.getSqlConnectionParams())) { + String query = buildSqlQuery(request, String.join(",", ids)); + executeSqlQuery(connection, query); + } + catch (SQLException e) { + logger.error("SQL query execution failed", e); + } + } - private void processAqlRequests(AqlRequest request, List ids) { - ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams()); + private String buildSqlQuery(SqlRequest request, String ids) { + StringBuilder clauseBuilder = new StringBuilder(); - RequestArgument requestArgument = request.getRequestArguments().get(0); - List aqlCollectionRead = requestArgument.getAqlCollectionRead(); - String aqlCollectionWrite = requestArgument.getId(); + if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { + for (RequestArgument argument : request.getRequestArguments()) { + clauseBuilder.append(argument.getId()).append(" IN (").append(ids).append(")"); - StreamTransactionEntity tx = null; - try { - StreamTransactionOptions options = new StreamTransactionOptions() - .writeCollections(aqlCollectionWrite) - .readCollections(aqlCollectionRead.toArray(new String[0])); + if (argument.getSqlConnectionParams() != null) { + try (Connection connection = DatabaseConnection.getConnection( + argument.getSqlConnectionParams())) { + String query = argument.getRequestURL(); + List result = fetchFileListFromDatabaseSQL(connection, query); - tx = arangoDb.beginStreamTransaction(options); - String transactionId = tx.getId(); + if (result != null && !result.isEmpty()) { + String resultSet = String.join(", ", result.stream() + .map(s -> "'" + s + "'") + .toArray(String[]::new)); + clauseBuilder.append(" OR ") + .append(argument.getId()) + .append(" IN (") + .append(resultSet) + .append(")"); + } + } + catch (SQLException e) { + logger.error("Failed to execute query for RequestArgument", e); + } + } + } + } - logger.info("Stream transaction started with ID: {}", transactionId); + String clause = clauseBuilder.toString(); + return request.getRequestURL() + .replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase()) + .replace("${clause}", clause) + .replace("${ids}", ids); + } - List entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids, transactionId); - executeMainAqlRequest(arangoDb, request, entities, transactionId); + private void executeSqlQuery(Connection connection, String query) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(query)) { + stmt.execute(); + } + } - arangoDb.commitStreamTransaction(transactionId); - logger.info("Stream transaction with ID {} committed successfully", transactionId); + public List fetchFileListFromDatabaseSQL(Connection connection, String query) + throws SQLException { + List results = new ArrayList<>(); + try (PreparedStatement stmt = connection.prepareStatement(query); + ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + results.add(rs.getString(1)); // Fetch the first column + } + } + return results; + } - } catch (ArangoDBException e) { - if (tx != null) { - arangoDb.abortStreamTransaction(tx.getId()); - logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e); - } - throw new RuntimeException("Failed to execute AQL request within a stream transaction", e); - } + private void processAqlRequests(AqlRequest request, List ids) { + ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams()); - logger.info("Successfully executed AQL request"); - } + RequestArgument requestArgument = request.getRequestArguments().get(0); + List aqlCollectionRead = requestArgument.getAqlCollectionRead(); + String aqlCollectionWrite = requestArgument.getId(); - private List executeSelectAqlRequest(ArangoDatabase arangoDb, List requestArguments, List ids, String transactionId) { - List entityIdList = new ArrayList<>(); + StreamTransactionEntity tx = null; + try { + StreamTransactionOptions options = new StreamTransactionOptions() + .writeCollections(aqlCollectionWrite) + .readCollections(aqlCollectionRead.toArray(new String[0])); - RequestArgument argument = requestArguments.get(0); + tx = arangoDb.beginStreamTransaction(options); + String transactionId = tx.getId(); - String query = argument.getRequestURL(); - String entityType = argument.getId(); + logger.info("Stream transaction started with ID: {}", transactionId); - Map bindVars = new HashMap<>(); - bindVars.put("ids", ids); + List entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids, + transactionId + ); + executeMainAqlRequest(arangoDb, request, entities, transactionId); - AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); + arangoDb.commitStreamTransaction(transactionId); + logger.info("Stream transaction with ID {} committed successfully", transactionId); - try (ArangoCursor cursor = arangoDb.query(query, Map.class, bindVars, aqlQueryOptions)) { - while (cursor.hasNext()) { - Map result = cursor.next(); - switch (entityType) { - case "applicationId": - entityIdList.add((String) result.get("applicationId")); - break; - case "edgesId": - entityIdList.addAll((List) result.get("edgesId")); - break; - case "subjectId": - entityIdList.addAll((List) result.get("subjectId")); - break; - case "historyId": - entityIdList.addAll((List) result.get("historyId")); - break; - case "interdepreqId": - entityIdList.addAll((List) result.get("interdepreqId")); - break; - default: - throw new IllegalArgumentException("Invalid requestArgumentId: " + entityType); - } - } - } catch (Exception e) { - logger.error("Failed to execute AQL query", e); - } + } + catch (ArangoDBException e) { + if (tx != null) { + arangoDb.abortStreamTransaction(tx.getId()); + logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e); + } + throw new RuntimeException("Failed to execute AQL request within a stream transaction", e); + } - return entityIdList; - } + logger.info("Successfully executed AQL request"); + } - private void executeMainAqlRequest(ArangoDatabase arangoDb, AqlRequest request, List entityIdList, String transactionId) { - if (entityIdList == null || entityIdList.isEmpty()) { - logger.warn("No entities found for main AQL request."); - return; - } + private List executeSelectAqlRequest(ArangoDatabase arangoDb, + List requestArguments, + List ids, String transactionId) { + List entityIdList = new ArrayList<>(); - String entity = request.getRequestArguments().get(0).getId(); + RequestArgument argument = requestArguments.get(0); - Map bindVars = new HashMap<>(); - bindVars.put("ids", entityIdList); + String query = argument.getRequestURL(); + String entityType = argument.getId(); + + Map bindVars = new HashMap<>(); + bindVars.put("ids", ids); + + AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); + + try (ArangoCursor cursor = arangoDb.query(query, Map.class, bindVars, aqlQueryOptions)) { + while (cursor.hasNext()) { + Map result = cursor.next(); + switch (entityType) { + case "applicationId": + entityIdList.add((String) result.get("applicationId")); + break; + case "edgesId": + entityIdList.addAll((List) result.get("edgesId")); + break; + case "subjectId": + entityIdList.addAll((List) result.get("subjectId")); + break; + case "historyId": + entityIdList.addAll((List) result.get("historyId")); + break; + case "interdepreqId": + entityIdList.addAll((List) result.get("interdepreqId")); + break; + default: + throw new IllegalArgumentException("Invalid requestArgumentId: " + entityType); + } + } + } + catch (Exception e) { + logger.error("Failed to execute AQL query", e); + } + + return entityIdList; + } + + private void executeMainAqlRequest(ArangoDatabase arangoDb, AqlRequest request, + List entityIdList, String transactionId) { + if (entityIdList == null || entityIdList.isEmpty()) { + logger.warn("No entities found for main AQL request."); + return; + } + + String entity = request.getRequestArguments().get(0).getId(); + + Map bindVars = new HashMap<>(); + bindVars.put("ids", entityIdList); - String finalQuery = request.getRequestURL() - .replace("${entity}", entity); + String finalQuery = request.getRequestURL() + .replace("${entity}", entity); - AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); + AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); - arangoDb.query(finalQuery, null, bindVars, aqlQueryOptions); + arangoDb.query(finalQuery, null, bindVars, aqlQueryOptions); - logger.info("Successfully removed {}: {}", entity, entityIdList); - } + logger.info("Successfully removed {}: {}", entity, entityIdList); + } }