diff --git a/сonfig-data-executor/pom.xml b/сonfig-data-executor/pom.xml
index 67f4a98..220c5f6 100644
--- a/сonfig-data-executor/pom.xml
+++ b/сonfig-data-executor/pom.xml
@@ -23,25 +23,57 @@
pom
import
+
+ org.springframework
+ spring-framework-bom
+ 6.1.12
+ import
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 3.3.3
+ pom
+ import
+
-
- org.jooq
- jooq
- 3.19.11
-
-
org.springframework.boot
spring-boot-starter-web
- 3.3.2
- org.springframework.boot
- spring-boot-starter-data-jpa
- 3.3.2
+ org.springframework
+ spring-tx
+
+
+ com.atomikos
+ transactions-jta
+ 6.0.0
+ jakarta
+
+
+ com.atomikos
+ transactions-jdbc
+ 6.0.0
+ jakarta
+
+
+ jakarta.transaction
+ jakarta.transaction-api
+
+
+ mysql
+ mysql-connector-java
+ 8.0.33
+ runtime
+
+
+ org.postgresql
+ postgresql
+ runtime
org.projectlombok
@@ -59,12 +91,6 @@
jaxb-runtime
2.3.1
-
- mysql
- mysql-connector-java
- 8.0.33
- runtime
-
com.arangodb
arangodb-java-driver
@@ -74,17 +100,6 @@
com.amazonaws
aws-java-sdk-s3
-
- com.atomikos
- transactions-jta
- 6.0.0
-
-
- javax.transaction
- javax.transaction-api
- 1.3
-
-
@@ -98,10 +113,10 @@
org.springframework.boot
spring-boot-maven-plugin
+ 3.3.3
org.micord.Main
-
@@ -112,4 +127,4 @@
-
\ No newline at end of file
+
diff --git a/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java b/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java
index 7f731eb..e6cb7e7 100644
--- a/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java
+++ b/сonfig-data-executor/src/main/java/org/micord/config/AtomikosConfig.java
@@ -2,9 +2,12 @@ package org.micord.config;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
+import jakarta.transaction.TransactionManager;
+import jakarta.transaction.UserTransaction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
+import org.springframework.transaction.jta.JtaTransactionManager;
/**
* @author Maksim Tereshin
@@ -13,14 +16,23 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
public class AtomikosConfig {
- @Bean(initMethod = "init", destroyMethod = "close")
- public UserTransactionManager atomikosTransactionManager() {
- return new UserTransactionManager();
+ @Bean
+ public UserTransaction userTransaction() throws Throwable {
+ UserTransactionImp userTransactionImp = new UserTransactionImp();
+ userTransactionImp.setTransactionTimeout(300);
+ return userTransactionImp;
}
@Bean
- public UserTransactionImp atomikosUserTransaction() {
- return new UserTransactionImp();
+ public TransactionManager atomikosTransactionManager() throws Throwable {
+ UserTransactionManager userTransactionManager = new UserTransactionManager();
+ userTransactionManager.setForceShutdown(true);
+ return userTransactionManager;
+ }
+
+ @Bean
+ public JtaTransactionManager transactionManager() throws Throwable {
+ return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
}
}
diff --git a/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java b/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java
index 0057bf0..d762779 100644
--- a/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java
+++ b/сonfig-data-executor/src/main/java/org/micord/config/DatabaseConnection.java
@@ -1,16 +1,22 @@
package org.micord.config;
+import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.micord.models.SqlConnectionParams;
+import javax.sql.DataSource;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
/**
* @author Maksim Tereshin
*/
public class DatabaseConnection {
+ private static final Map dataSources = new HashMap<>();
+
public static Connection getConnection(SqlConnectionParams params) throws SQLException {
try {
Class.forName(params.getJdbcDriverClassName());
@@ -18,7 +24,40 @@ public class DatabaseConnection {
throw new SQLException("Unable to load the JDBC driver class", e);
}
- return DriverManager.getConnection(params.getJdbcUrl(), params.getJdbcUsername(), params.getJdbcPassword());
+ return getXaDataSource(params).getConnection();
+ }
+
+ public static DataSource getXaDataSource(SqlConnectionParams params) {
+ String database = params.getJdbcDatabase();
+
+ if (!dataSources.containsKey(database)) {
+ AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
+ xaDataSource.setUniqueResourceName("jdbcDatasource_" + database);
+ xaDataSource.setXaDataSourceClassName(params.getJdbcXaDataSourceClassName());
+ xaDataSource.setPoolSize(Integer.parseInt(params.getJdbcXaDataSourcePoolSize()));
+
+ Properties xaProperties = loadDatabaseProperties(params);
+ xaDataSource.setXaProperties(xaProperties);
+
+ dataSources.put(database, xaDataSource);
+ }
+
+ return dataSources.get(database);
+ }
+
+ private static Properties loadDatabaseProperties(SqlConnectionParams params) {
+ Properties xaProperties = new Properties();
+ try {
+ xaProperties.setProperty("user", params.getJdbcUsername());
+ xaProperties.setProperty("password", params.getJdbcPassword());
+ xaProperties.setProperty("serverName", params.getJdbcHost());
+ xaProperties.setProperty("portNumber", String.valueOf(params.getJdbcPort()));
+ xaProperties.setProperty("databaseName", params.getJdbcDatabase());
+
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to load database properties", e);
+ }
+ return xaProperties;
}
}
diff --git a/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java b/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java
index 3ee7750..592abb7 100644
--- a/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java
+++ b/сonfig-data-executor/src/main/java/org/micord/config/S3HttpConnection.java
@@ -22,27 +22,43 @@ public class S3HttpConnection {
String host = connectionParams.getHost() + ":" + connectionParams.getPort();
String s3Key = connectionParams.getS3Key();
String s3Secret = connectionParams.getS3Secret();
+ String method = connectionParams.getMethod().toUpperCase();
+ String body = connectionParams.getBody();
- // The resource to be deleted, typically a file in the S3 bucket
String resource = "/" + file;
- String contentType = "application/octet-stream";
+ String contentType = connectionParams.getContentType();
String date = ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME);
- // Generate a signature for the DELETE request
- String signature = generateSignature("DELETE", contentType, date, resource, s3Secret);
+ String signature = generateSignature(method, contentType, date, resource, s3Secret);
- // Build and return the HTTP DELETE request
- return HttpRequest.newBuilder()
+ HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create("https://" + host + resource))
.header("Host", host)
.header("Date", date)
.header("Content-Type", contentType)
- .header("Authorization", "AWS " + s3Key + ":" + signature)
- .DELETE()
- .build();
+ .header("Authorization", "AWS " + s3Key + ":" + signature);
+
+
+ switch (method) {
+ case "DELETE":
+ requestBuilder.DELETE();
+ break;
+ case "GET":
+ requestBuilder.GET();
+ break;
+ case "PUT":
+ requestBuilder.PUT(HttpRequest.BodyPublishers.ofString(body != null ? body : ""));
+ break;
+ case "POST":
+ requestBuilder.POST(HttpRequest.BodyPublishers.ofString(body != null ? body : ""));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported HTTP method: " + method);
+ }
+
+ return requestBuilder.build();
}
- // Utility method to generate a signature for the S3-compatible request
private static String generateSignature(String method, String contentType, String date, String resource, String s3Secret) throws Exception {
String stringToSign = method + "\n" +
"\n" + // MD5 - not used for DELETE requests
@@ -50,7 +66,6 @@ public class S3HttpConnection {
date + "\n" +
resource;
- // HMAC-SHA1 signature generation
Mac mac = Mac.getInstance("HmacSHA1");
SecretKeySpec secretKey = new SecretKeySpec(s3Secret.getBytes(StandardCharsets.UTF_8), "HmacSHA1");
mac.init(secretKey);
diff --git a/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java b/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java
index 44951f4..3c69964 100644
--- a/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java
+++ b/сonfig-data-executor/src/main/java/org/micord/models/RequestArgument.java
@@ -13,6 +13,7 @@ import javax.xml.bind.annotation.XmlRootElement;
public class RequestArgument {
private String id;
+ private String aqlCollectionRead;
private String requestURL;
private SqlConnectionParams sqlConnectionParams;
@@ -26,6 +27,11 @@ public class RequestArgument {
return id;
}
+ @XmlElement(name = "AqlCollectionRead")
+ public String getAqlCollectionRead() {
+ return aqlCollectionRead;
+ }
+
@XmlElement(name = "RequestURL")
public String getRequestURL() {
return requestURL;
diff --git a/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java b/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java
index 6472ab9..29fc61b 100644
--- a/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java
+++ b/сonfig-data-executor/src/main/java/org/micord/models/S3ConnectionParams.java
@@ -14,6 +14,9 @@ public class S3ConnectionParams {
private String s3Secret;
private String host;
private String port;
+ private String contentType;
+ private String method;
+ private String body;
@XmlElement(name = "S3Key")
public String getS3Key() {
@@ -35,4 +38,19 @@ public class S3ConnectionParams {
return port;
}
+ @XmlElement(name = "ContentType")
+ public String getContentType() {
+ return contentType;
+ }
+
+ @XmlElement(name = "Method")
+ public String getMethod() {
+ return method;
+ }
+
+ @XmlElement(name = "Body")
+ public String getBody() {
+ return body;
+ }
+
}
diff --git a/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java b/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java
index 97cdeb1..cad71b3 100644
--- a/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java
+++ b/сonfig-data-executor/src/main/java/org/micord/models/SqlConnectionParams.java
@@ -7,15 +7,28 @@ import javax.xml.bind.annotation.XmlElement;
@Setter
public class SqlConnectionParams {
- private String jdbcUrl;
+ private String jdbcHost;
+ private String jdbcPort;
private String jdbcUsername;
private String jdbcPassword;
private String jdbcDriverClassName;
+ private String jdbcXaDataSourceClassName;
+ private String jdbcXaDataSourcePoolSize;
private String jdbcDatabase;
- @XmlElement(name = "JdbcUrl")
- public String getJdbcUrl() {
- return jdbcUrl;
+ @XmlElement(name = "JdbcXaDataSourcePoolSize")
+ public String getJdbcXaDataSourcePoolSize() {
+ return jdbcXaDataSourcePoolSize;
+ }
+
+ @XmlElement(name = "JdbcHost")
+ public String getJdbcHost() {
+ return jdbcHost;
+ }
+
+ @XmlElement(name = "JdbcPort")
+ public String getJdbcPort() {
+ return jdbcPort;
}
@XmlElement(name = "JdbcUsername")
@@ -33,6 +46,11 @@ public class SqlConnectionParams {
return jdbcDriverClassName;
}
+ @XmlElement(name = "JdbcXaDataSourceClassName")
+ public String getJdbcXaDataSourceClassName() {
+ return jdbcXaDataSourceClassName;
+ }
+
@XmlElement(name = "JdbcDatabase")
public String getJdbcDatabase() {
return jdbcDatabase;
diff --git a/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java b/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java
index 79820ea..b014ea7 100644
--- a/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java
+++ b/сonfig-data-executor/src/main/java/org/micord/service/ApiService.java
@@ -1,44 +1,26 @@
package org.micord.service;
-import com.arangodb.ArangoCursor;
-import com.arangodb.ArangoDatabase;
-import com.atomikos.icatch.jta.UserTransactionImp;
-import org.micord.config.ArangoDBConnection;
-import org.micord.config.DatabaseConnection;
-import org.micord.config.S3HttpConnection;
import org.micord.exceptions.FileNotModifiedException;
import org.micord.models.*;
import org.micord.utils.ConfigLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.FileNotFoundException;
-import java.net.HttpURLConnection;
-import java.net.http.HttpClient;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@Service
public class ApiService {
- private static final Logger logger = LoggerFactory.getLogger(ApiService.class);
-
@Autowired
private ConfigLoader configLoader;
@Autowired
- private UserTransactionImp userTransaction;
+ private S3Service s3Service;
@Autowired
- private HttpClient httpClient;
+ private SqlAqlService sqlAndAqlService;
public CompletableFuture process(String methodName, List ids) throws FileNotFoundException, FileNotModifiedException {
Optional optionalConfig = configLoader.loadConfigIfModified(methodName);
@@ -49,181 +31,11 @@ public class ApiService {
Requests config = optionalConfig.get();
- processRequests(config, ids);
+ sqlAndAqlService.processSqlAndAqlRequests(config, ids);
- return CompletableFuture.completedFuture("Processing complete");
+ return CompletableFuture.runAsync(() -> {
+ s3Service.processS3Requests(config.getS3Requests(), ids);
+ }).thenApply(v -> "Processing complete");
}
- public void processRequests(Requests config, List ids) {
- try {
- userTransaction.begin();
-
- // Process SQL and AQL requests
- processSqlAndAqlRequests(config, ids);
-
- userTransaction.commit();
- logger.info("Successfully processed requests for all IDs.");
- } catch (Exception e) {
- try {
- userTransaction.rollback();
- logger.error("Transaction rolled back due to failure.", e);
- } catch (Exception rollbackException) {
- logger.error("Failed to rollback the transaction", rollbackException);
- }
- logger.error("Failed to process requests", e);
- }
-
- // Process S3 requests
- processS3Requests(config.getS3Requests(), ids);
- }
-
- private void processSqlAndAqlRequests(Requests config, List ids) {
- for (SqlRequest request : config.getSqlRequests()) {
- processSqlRequests(request, ids);
- }
-
- for (AqlRequest request : config.getAqlRequests()) {
- processAqlRequests(request, ids);
- }
- }
-
- private void processSqlRequests(SqlRequest request, List ids) {
- try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) {
- String query = buildSqlQuery(request, String.join(",", ids));
- executeSqlQuery(connection, query);
- } catch (SQLException e) {
- logger.error("SQL query execution failed", e);
- }
- }
-
- private String buildSqlQuery(SqlRequest request, String ids) {
- StringBuilder clauseBuilder = new StringBuilder("id IN (" + ids + ")");
-
- if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
- for (RequestArgument argument : request.getRequestArguments()) {
- try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
- String query = argument.getRequestURL();
- List result = fetchFileListFromDatabaseSQL(connection, query);
-
- if (result != null && !result.isEmpty()) {
- String resultSet = String.join(", ", result.stream()
- .map(s -> "'" + s + "'")
- .toArray(String[]::new));
- clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")");
- }
- } catch (SQLException e) {
- logger.error("Failed to execute query for RequestArgument", e);
- }
- }
- }
-
- String clause = clauseBuilder.toString();
- return request.getRequestURL().replace("${clause}", clause);
- }
-
- private void executeSqlQuery(Connection connection, String query) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement(query)) {
- stmt.execute();
- }
- }
-
- private List fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException {
- List results = new ArrayList<>();
- try (PreparedStatement stmt = connection.prepareStatement(query);
- ResultSet rs = stmt.executeQuery()) {
- while (rs.next()) {
- results.add(rs.getString(1)); // Fetch the first column
- }
- }
- return results;
- }
-
- // Process S3 Requests concurrently
- public void processS3Requests(List s3Requests, List ids) {
- s3Requests.forEach(request -> {
- List> futures = ids.stream()
- .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id)))
- .toList();
-
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .thenRun(() -> logger.info("Successfully processed all S3 requests."))
- .exceptionally(ex -> {
- logger.error("Failed to process S3 requests", ex);
- return null;
- });
- });
- }
-
- private void processS3Request(S3Request request, String id) {
- try {
- List files = new ArrayList<>();
-
- if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
- for (RequestArgument argument : request.getRequestArguments()) {
- try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
- String query = argument.getRequestURL();
- List result = fetchFileListFromDatabaseSQL(connection, query);
- if (result != null && !result.isEmpty()) {
- files.addAll(result);
- }
- } catch (SQLException e) {
- logger.error("Failed to execute query for RequestArgument", e);
- }
- }
- }
-
- files.forEach(file -> {
- HttpRequest httpRequest;
- try {
- httpRequest = S3HttpConnection.buildHttpRequest(request, file);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
- .thenAccept(response -> {
- if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) {
- logger.info("Successfully deleted object for ID {}", id);
- } else {
- logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode());
- }
- })
- .exceptionally(ex -> {
- logger.error("Failed to delete object for ID {}", id, ex);
- return null;
- });
- });
-
- } catch (Exception e) {
- logger.error("Failed to process S3 request for id: {}", id, e);
- }
- }
-
- // Process AQL requests
- private void processAqlRequests(AqlRequest request, List ids) {
- ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams());
- String aqlQuery = buildAqlQuery(request, ids);
- List result = executeAqlQueryForStrings(arangoDb, aqlQuery);
- logger.info("Successfully executed AQL request and retrieved results: {}", result);
- }
-
- private String buildAqlQuery(AqlRequest request, List ids) {
- String collection = request.getAqlConnectionParams().getCollection();
- String inClause = "FOR doc IN " + collection + " FILTER doc.id IN [" +
- String.join(", ", ids.stream().map(id -> "'" + id + "'").toArray(String[]::new)) +
- "] RETURN doc";
- return request.getRequestURL().replace("${collection}", collection).replace("${clause}", inClause);
- }
-
- private List executeAqlQueryForStrings(ArangoDatabase arangoDb, String query) {
- List results = new ArrayList<>();
- try (ArangoCursor cursor = arangoDb.query(query, null, null, null)) {
- while (cursor.hasNext()) {
- results.add(cursor.next());
- }
- } catch (Exception e) {
- logger.error("Failed to execute AQL query", e);
- }
- return results;
- }
}
\ No newline at end of file
diff --git a/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java b/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java
new file mode 100644
index 0000000..0a5798d
--- /dev/null
+++ b/сonfig-data-executor/src/main/java/org/micord/service/S3Service.java
@@ -0,0 +1,98 @@
+package org.micord.service;
+
+import org.micord.config.DatabaseConnection;
+import org.micord.config.S3HttpConnection;
+import org.micord.models.RequestArgument;
+import org.micord.models.S3Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.net.HttpURLConnection;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * @author Maksim Tereshin
+ */
+@Service
+public class S3Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3Service.class);
+
+ @Autowired
+ private SqlAqlService sqlAndAqlService;
+
+ @Autowired
+ private HttpClient httpClient;
+
+ public void processS3Requests(List s3Requests, List ids) {
+ if (s3Requests != null) {
+ s3Requests.forEach(request -> {
+ List> futures = ids.stream()
+ .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id)))
+ .toList();
+
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .thenRun(() -> logger.info("Successfully processed all S3 requests."))
+ .exceptionally(ex -> {
+ logger.error("Failed to process S3 requests", ex);
+ return null;
+ });
+ });
+ }
+ }
+
+ private void processS3Request(S3Request request, String id) {
+ try {
+ List files = new ArrayList<>();
+
+ if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
+ for (RequestArgument argument : request.getRequestArguments()) {
+ try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
+ String query = argument.getRequestURL();
+ List result = sqlAndAqlService.fetchFileListFromDatabaseSQL(connection, query);
+ if (result != null && !result.isEmpty()) {
+ files.addAll(result);
+ }
+ } catch (SQLException e) {
+ logger.error("Failed to execute query for RequestArgument", e);
+ }
+ }
+ }
+
+ files.forEach(file -> {
+ HttpRequest httpRequest;
+ try {
+ httpRequest = S3HttpConnection.buildHttpRequest(request, file);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
+ .thenAccept(response -> {
+ if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) {
+ logger.info("Successfully deleted object for ID {}", id);
+ } else {
+ logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode());
+ }
+ })
+ .exceptionally(ex -> {
+ logger.error("Failed to delete object for ID {}", id, ex);
+ return null;
+ });
+ });
+
+ } catch (Exception e) {
+ logger.error("Failed to process S3 request for id: {}", id, e);
+ }
+ }
+
+}
diff --git a/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java b/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java
new file mode 100644
index 0000000..527ba79
--- /dev/null
+++ b/сonfig-data-executor/src/main/java/org/micord/service/SqlAqlService.java
@@ -0,0 +1,204 @@
+package org.micord.service;
+
+import com.arangodb.ArangoCursor;
+import com.arangodb.ArangoDBException;
+import com.arangodb.ArangoDatabase;
+import com.arangodb.entity.StreamTransactionEntity;
+import com.arangodb.model.AqlQueryOptions;
+import com.arangodb.model.StreamTransactionOptions;
+import com.arangodb.model.TransactionOptions;
+import org.micord.config.ArangoDBConnection;
+import org.micord.config.DatabaseConnection;
+import org.micord.models.AqlRequest;
+import org.micord.models.RequestArgument;
+import org.micord.models.Requests;
+import org.micord.models.SqlRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Maksim Tereshin
+ */
+@Service
+public class SqlAqlService {
+
+ private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class);
+
+ @Transactional
+ public void processSqlAndAqlRequests(Requests config, List ids) {
+ if (config.getSqlRequests() != null) {
+ for (SqlRequest request : config.getSqlRequests()) {
+ processSqlRequests(request, ids);
+ }
+ }
+
+ if (config.getAqlRequests() != null) {
+ for (AqlRequest request : config.getAqlRequests()) {
+ processAqlRequests(request, ids);
+ }
+ }
+ }
+
+ private void processSqlRequests(SqlRequest request, List ids) {
+ try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) {
+ String query = buildSqlQuery(request, String.join(",", ids));
+ executeSqlQuery(connection, query);
+ } catch (SQLException e) {
+ logger.error("SQL query execution failed", e);
+ }
+ }
+
+ private String buildSqlQuery(SqlRequest request, String ids) {
+ StringBuilder clauseBuilder = new StringBuilder("id IN (" + ids + ")");
+
+ if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
+ for (RequestArgument argument : request.getRequestArguments()) {
+ try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
+ String query = argument.getRequestURL();
+ List result = fetchFileListFromDatabaseSQL(connection, query);
+
+ if (result != null && !result.isEmpty()) {
+ String resultSet = String.join(", ", result.stream()
+ .map(s -> "'" + s + "'")
+ .toArray(String[]::new));
+ clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")");
+ }
+ } catch (SQLException e) {
+ logger.error("Failed to execute query for RequestArgument", e);
+ }
+ }
+ }
+
+ String clause = clauseBuilder.toString();
+ return request.getRequestURL().replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase()).replace("${clause}", clause);
+ }
+
+ private void executeSqlQuery(Connection connection, String query) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement(query)) {
+ stmt.execute();
+ }
+ }
+
+ public List fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException {
+ List results = new ArrayList<>();
+ try (PreparedStatement stmt = connection.prepareStatement(query);
+ ResultSet rs = stmt.executeQuery()) {
+ while (rs.next()) {
+ results.add(rs.getString(1)); // Fetch the first column
+ }
+ }
+ return results;
+ }
+
+ private void processAqlRequests(AqlRequest request, List ids) {
+ ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams());
+
+ RequestArgument requestArgument = request.getRequestArguments().get(0);
+ String aqlCollectionRead = requestArgument.getAqlCollectionRead();
+ String aqlCollectionWrite = requestArgument.getId();
+
+ StreamTransactionEntity tx = null;
+ try {
+ StreamTransactionOptions options = new StreamTransactionOptions()
+ .writeCollections(aqlCollectionWrite)
+ .readCollections(aqlCollectionRead);
+
+ tx = arangoDb.beginStreamTransaction(options);
+ String transactionId = tx.getId();
+
+ logger.info("Stream transaction started with ID: {}", transactionId);
+
+ List entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids, transactionId);
+ executeMainAqlRequest(arangoDb, request, entities, transactionId);
+
+ arangoDb.commitStreamTransaction(transactionId);
+ logger.info("Stream transaction with ID {} committed successfully", transactionId);
+
+ } catch (ArangoDBException e) {
+ if (tx != null) {
+ arangoDb.abortStreamTransaction(tx.getId());
+ logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e);
+ }
+ throw new RuntimeException("Failed to execute AQL request within a stream transaction", e);
+ }
+
+ logger.info("Successfully executed AQL request");
+ }
+
+ private List executeSelectAqlRequest(ArangoDatabase arangoDb, List requestArguments, List ids, String transactionId) {
+ List entityIdList = new ArrayList<>();
+
+ RequestArgument argument = requestArguments.get(0);
+
+ String query = argument.getRequestURL();
+ String entityType = argument.getId();
+
+ Map bindVars = new HashMap<>();
+ bindVars.put("ids", ids);
+
+ AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId);
+
+ try (ArangoCursor