update config executor
This commit is contained in:
parent
fad38c6b15
commit
5d834578a2
11 changed files with 482 additions and 248 deletions
|
|
@ -23,25 +23,57 @@
|
|||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-framework-bom</artifactId>
|
||||
<version>6.1.12</version>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.jooq</groupId>
|
||||
<artifactId>jooq</artifactId>
|
||||
<version>3.19.11</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<version>3.3.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
<version>3.3.2</version>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-tx</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.atomikos</groupId>
|
||||
<artifactId>transactions-jta</artifactId>
|
||||
<version>6.0.0</version>
|
||||
<classifier>jakarta</classifier>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.atomikos</groupId>
|
||||
<artifactId>transactions-jdbc</artifactId>
|
||||
<version>6.0.0</version>
|
||||
<classifier>jakarta</classifier>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>jakarta.transaction</groupId>
|
||||
<artifactId>jakarta.transaction-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>8.0.33</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
|
|
@ -59,12 +91,6 @@
|
|||
<artifactId>jaxb-runtime</artifactId>
|
||||
<version>2.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>8.0.33</version>
|
||||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.arangodb</groupId>
|
||||
<artifactId>arangodb-java-driver</artifactId>
|
||||
|
|
@ -74,17 +100,6 @@
|
|||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-s3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.atomikos</groupId>
|
||||
<artifactId>transactions-jta</artifactId>
|
||||
<version>6.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.transaction</groupId>
|
||||
<artifactId>javax.transaction-api</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
<repositories>
|
||||
<repository>
|
||||
|
|
@ -98,10 +113,10 @@
|
|||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<version>3.3.3</version>
|
||||
<configuration>
|
||||
<mainClass>org.micord.Main</mainClass>
|
||||
</configuration>
|
||||
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
|
|
|
|||
|
|
@ -2,9 +2,12 @@ package org.micord.config;
|
|||
|
||||
import com.atomikos.icatch.jta.UserTransactionImp;
|
||||
import com.atomikos.icatch.jta.UserTransactionManager;
|
||||
import jakarta.transaction.TransactionManager;
|
||||
import jakarta.transaction.UserTransaction;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
import org.springframework.transaction.jta.JtaTransactionManager;
|
||||
|
||||
/**
|
||||
* @author Maksim Tereshin
|
||||
|
|
@ -13,14 +16,23 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
|
|||
@EnableTransactionManagement
|
||||
public class AtomikosConfig {
|
||||
|
||||
@Bean(initMethod = "init", destroyMethod = "close")
|
||||
public UserTransactionManager atomikosTransactionManager() {
|
||||
return new UserTransactionManager();
|
||||
@Bean
|
||||
public UserTransaction userTransaction() throws Throwable {
|
||||
UserTransactionImp userTransactionImp = new UserTransactionImp();
|
||||
userTransactionImp.setTransactionTimeout(300);
|
||||
return userTransactionImp;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public UserTransactionImp atomikosUserTransaction() {
|
||||
return new UserTransactionImp();
|
||||
public TransactionManager atomikosTransactionManager() throws Throwable {
|
||||
UserTransactionManager userTransactionManager = new UserTransactionManager();
|
||||
userTransactionManager.setForceShutdown(true);
|
||||
return userTransactionManager;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public JtaTransactionManager transactionManager() throws Throwable {
|
||||
return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,16 +1,22 @@
|
|||
package org.micord.config;
|
||||
|
||||
import com.atomikos.jdbc.AtomikosDataSourceBean;
|
||||
import org.micord.models.SqlConnectionParams;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* @author Maksim Tereshin
|
||||
*/
|
||||
public class DatabaseConnection {
|
||||
|
||||
private static final Map<String, DataSource> dataSources = new HashMap<>();
|
||||
|
||||
public static Connection getConnection(SqlConnectionParams params) throws SQLException {
|
||||
try {
|
||||
Class.forName(params.getJdbcDriverClassName());
|
||||
|
|
@ -18,7 +24,40 @@ public class DatabaseConnection {
|
|||
throw new SQLException("Unable to load the JDBC driver class", e);
|
||||
}
|
||||
|
||||
return DriverManager.getConnection(params.getJdbcUrl(), params.getJdbcUsername(), params.getJdbcPassword());
|
||||
return getXaDataSource(params).getConnection();
|
||||
}
|
||||
|
||||
public static DataSource getXaDataSource(SqlConnectionParams params) {
|
||||
String database = params.getJdbcDatabase();
|
||||
|
||||
if (!dataSources.containsKey(database)) {
|
||||
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
|
||||
xaDataSource.setUniqueResourceName("jdbcDatasource_" + database);
|
||||
xaDataSource.setXaDataSourceClassName(params.getJdbcXaDataSourceClassName());
|
||||
xaDataSource.setPoolSize(Integer.parseInt(params.getJdbcXaDataSourcePoolSize()));
|
||||
|
||||
Properties xaProperties = loadDatabaseProperties(params);
|
||||
xaDataSource.setXaProperties(xaProperties);
|
||||
|
||||
dataSources.put(database, xaDataSource);
|
||||
}
|
||||
|
||||
return dataSources.get(database);
|
||||
}
|
||||
|
||||
private static Properties loadDatabaseProperties(SqlConnectionParams params) {
|
||||
Properties xaProperties = new Properties();
|
||||
try {
|
||||
xaProperties.setProperty("user", params.getJdbcUsername());
|
||||
xaProperties.setProperty("password", params.getJdbcPassword());
|
||||
xaProperties.setProperty("serverName", params.getJdbcHost());
|
||||
xaProperties.setProperty("portNumber", String.valueOf(params.getJdbcPort()));
|
||||
xaProperties.setProperty("databaseName", params.getJdbcDatabase());
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to load database properties", e);
|
||||
}
|
||||
return xaProperties;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,27 +22,43 @@ public class S3HttpConnection {
|
|||
String host = connectionParams.getHost() + ":" + connectionParams.getPort();
|
||||
String s3Key = connectionParams.getS3Key();
|
||||
String s3Secret = connectionParams.getS3Secret();
|
||||
String method = connectionParams.getMethod().toUpperCase();
|
||||
String body = connectionParams.getBody();
|
||||
|
||||
// The resource to be deleted, typically a file in the S3 bucket
|
||||
String resource = "/" + file;
|
||||
String contentType = "application/octet-stream";
|
||||
String contentType = connectionParams.getContentType();
|
||||
String date = ZonedDateTime.now().format(DateTimeFormatter.RFC_1123_DATE_TIME);
|
||||
|
||||
// Generate a signature for the DELETE request
|
||||
String signature = generateSignature("DELETE", contentType, date, resource, s3Secret);
|
||||
String signature = generateSignature(method, contentType, date, resource, s3Secret);
|
||||
|
||||
// Build and return the HTTP DELETE request
|
||||
return HttpRequest.newBuilder()
|
||||
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
|
||||
.uri(URI.create("https://" + host + resource))
|
||||
.header("Host", host)
|
||||
.header("Date", date)
|
||||
.header("Content-Type", contentType)
|
||||
.header("Authorization", "AWS " + s3Key + ":" + signature)
|
||||
.DELETE()
|
||||
.build();
|
||||
.header("Authorization", "AWS " + s3Key + ":" + signature);
|
||||
|
||||
|
||||
switch (method) {
|
||||
case "DELETE":
|
||||
requestBuilder.DELETE();
|
||||
break;
|
||||
case "GET":
|
||||
requestBuilder.GET();
|
||||
break;
|
||||
case "PUT":
|
||||
requestBuilder.PUT(HttpRequest.BodyPublishers.ofString(body != null ? body : ""));
|
||||
break;
|
||||
case "POST":
|
||||
requestBuilder.POST(HttpRequest.BodyPublishers.ofString(body != null ? body : ""));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported HTTP method: " + method);
|
||||
}
|
||||
|
||||
return requestBuilder.build();
|
||||
}
|
||||
|
||||
// Utility method to generate a signature for the S3-compatible request
|
||||
private static String generateSignature(String method, String contentType, String date, String resource, String s3Secret) throws Exception {
|
||||
String stringToSign = method + "\n" +
|
||||
"\n" + // MD5 - not used for DELETE requests
|
||||
|
|
@ -50,7 +66,6 @@ public class S3HttpConnection {
|
|||
date + "\n" +
|
||||
resource;
|
||||
|
||||
// HMAC-SHA1 signature generation
|
||||
Mac mac = Mac.getInstance("HmacSHA1");
|
||||
SecretKeySpec secretKey = new SecretKeySpec(s3Secret.getBytes(StandardCharsets.UTF_8), "HmacSHA1");
|
||||
mac.init(secretKey);
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
public class RequestArgument {
|
||||
|
||||
private String id;
|
||||
private String aqlCollectionRead;
|
||||
private String requestURL;
|
||||
private SqlConnectionParams sqlConnectionParams;
|
||||
|
||||
|
|
@ -26,6 +27,11 @@ public class RequestArgument {
|
|||
return id;
|
||||
}
|
||||
|
||||
@XmlElement(name = "AqlCollectionRead")
|
||||
public String getAqlCollectionRead() {
|
||||
return aqlCollectionRead;
|
||||
}
|
||||
|
||||
@XmlElement(name = "RequestURL")
|
||||
public String getRequestURL() {
|
||||
return requestURL;
|
||||
|
|
|
|||
|
|
@ -14,6 +14,9 @@ public class S3ConnectionParams {
|
|||
private String s3Secret;
|
||||
private String host;
|
||||
private String port;
|
||||
private String contentType;
|
||||
private String method;
|
||||
private String body;
|
||||
|
||||
@XmlElement(name = "S3Key")
|
||||
public String getS3Key() {
|
||||
|
|
@ -35,4 +38,19 @@ public class S3ConnectionParams {
|
|||
return port;
|
||||
}
|
||||
|
||||
@XmlElement(name = "ContentType")
|
||||
public String getContentType() {
|
||||
return contentType;
|
||||
}
|
||||
|
||||
@XmlElement(name = "Method")
|
||||
public String getMethod() {
|
||||
return method;
|
||||
}
|
||||
|
||||
@XmlElement(name = "Body")
|
||||
public String getBody() {
|
||||
return body;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,15 +7,28 @@ import javax.xml.bind.annotation.XmlElement;
|
|||
@Setter
|
||||
public class SqlConnectionParams {
|
||||
|
||||
private String jdbcUrl;
|
||||
private String jdbcHost;
|
||||
private String jdbcPort;
|
||||
private String jdbcUsername;
|
||||
private String jdbcPassword;
|
||||
private String jdbcDriverClassName;
|
||||
private String jdbcXaDataSourceClassName;
|
||||
private String jdbcXaDataSourcePoolSize;
|
||||
private String jdbcDatabase;
|
||||
|
||||
@XmlElement(name = "JdbcUrl")
|
||||
public String getJdbcUrl() {
|
||||
return jdbcUrl;
|
||||
@XmlElement(name = "JdbcXaDataSourcePoolSize")
|
||||
public String getJdbcXaDataSourcePoolSize() {
|
||||
return jdbcXaDataSourcePoolSize;
|
||||
}
|
||||
|
||||
@XmlElement(name = "JdbcHost")
|
||||
public String getJdbcHost() {
|
||||
return jdbcHost;
|
||||
}
|
||||
|
||||
@XmlElement(name = "JdbcPort")
|
||||
public String getJdbcPort() {
|
||||
return jdbcPort;
|
||||
}
|
||||
|
||||
@XmlElement(name = "JdbcUsername")
|
||||
|
|
@ -33,6 +46,11 @@ public class SqlConnectionParams {
|
|||
return jdbcDriverClassName;
|
||||
}
|
||||
|
||||
@XmlElement(name = "JdbcXaDataSourceClassName")
|
||||
public String getJdbcXaDataSourceClassName() {
|
||||
return jdbcXaDataSourceClassName;
|
||||
}
|
||||
|
||||
@XmlElement(name = "JdbcDatabase")
|
||||
public String getJdbcDatabase() {
|
||||
return jdbcDatabase;
|
||||
|
|
|
|||
|
|
@ -1,44 +1,26 @@
|
|||
package org.micord.service;
|
||||
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.ArangoDatabase;
|
||||
import com.atomikos.icatch.jta.UserTransactionImp;
|
||||
import org.micord.config.ArangoDBConnection;
|
||||
import org.micord.config.DatabaseConnection;
|
||||
import org.micord.config.S3HttpConnection;
|
||||
import org.micord.exceptions.FileNotModifiedException;
|
||||
import org.micord.models.*;
|
||||
import org.micord.utils.ConfigLoader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
@Service
|
||||
public class ApiService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(ApiService.class);
|
||||
|
||||
@Autowired
|
||||
private ConfigLoader configLoader;
|
||||
|
||||
@Autowired
|
||||
private UserTransactionImp userTransaction;
|
||||
private S3Service s3Service;
|
||||
|
||||
@Autowired
|
||||
private HttpClient httpClient;
|
||||
private SqlAqlService sqlAndAqlService;
|
||||
|
||||
public CompletableFuture<String> process(String methodName, List<String> ids) throws FileNotFoundException, FileNotModifiedException {
|
||||
Optional<Requests> optionalConfig = configLoader.loadConfigIfModified(methodName);
|
||||
|
|
@ -49,181 +31,11 @@ public class ApiService {
|
|||
|
||||
Requests config = optionalConfig.get();
|
||||
|
||||
processRequests(config, ids);
|
||||
sqlAndAqlService.processSqlAndAqlRequests(config, ids);
|
||||
|
||||
return CompletableFuture.completedFuture("Processing complete");
|
||||
return CompletableFuture.runAsync(() -> {
|
||||
s3Service.processS3Requests(config.getS3Requests(), ids);
|
||||
}).thenApply(v -> "Processing complete");
|
||||
}
|
||||
|
||||
public void processRequests(Requests config, List<String> ids) {
|
||||
try {
|
||||
userTransaction.begin();
|
||||
|
||||
// Process SQL and AQL requests
|
||||
processSqlAndAqlRequests(config, ids);
|
||||
|
||||
userTransaction.commit();
|
||||
logger.info("Successfully processed requests for all IDs.");
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
userTransaction.rollback();
|
||||
logger.error("Transaction rolled back due to failure.", e);
|
||||
} catch (Exception rollbackException) {
|
||||
logger.error("Failed to rollback the transaction", rollbackException);
|
||||
}
|
||||
logger.error("Failed to process requests", e);
|
||||
}
|
||||
|
||||
// Process S3 requests
|
||||
processS3Requests(config.getS3Requests(), ids);
|
||||
}
|
||||
|
||||
private void processSqlAndAqlRequests(Requests config, List<String> ids) {
|
||||
for (SqlRequest request : config.getSqlRequests()) {
|
||||
processSqlRequests(request, ids);
|
||||
}
|
||||
|
||||
for (AqlRequest request : config.getAqlRequests()) {
|
||||
processAqlRequests(request, ids);
|
||||
}
|
||||
}
|
||||
|
||||
private void processSqlRequests(SqlRequest request, List<String> ids) {
|
||||
try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) {
|
||||
String query = buildSqlQuery(request, String.join(",", ids));
|
||||
executeSqlQuery(connection, query);
|
||||
} catch (SQLException e) {
|
||||
logger.error("SQL query execution failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildSqlQuery(SqlRequest request, String ids) {
|
||||
StringBuilder clauseBuilder = new StringBuilder("id IN (" + ids + ")");
|
||||
|
||||
if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
|
||||
for (RequestArgument argument : request.getRequestArguments()) {
|
||||
try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
|
||||
String query = argument.getRequestURL();
|
||||
List<String> result = fetchFileListFromDatabaseSQL(connection, query);
|
||||
|
||||
if (result != null && !result.isEmpty()) {
|
||||
String resultSet = String.join(", ", result.stream()
|
||||
.map(s -> "'" + s + "'")
|
||||
.toArray(String[]::new));
|
||||
clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Failed to execute query for RequestArgument", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String clause = clauseBuilder.toString();
|
||||
return request.getRequestURL().replace("${clause}", clause);
|
||||
}
|
||||
|
||||
private void executeSqlQuery(Connection connection, String query) throws SQLException {
|
||||
try (PreparedStatement stmt = connection.prepareStatement(query)) {
|
||||
stmt.execute();
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException {
|
||||
List<String> results = new ArrayList<>();
|
||||
try (PreparedStatement stmt = connection.prepareStatement(query);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
results.add(rs.getString(1)); // Fetch the first column
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// Process S3 Requests concurrently
|
||||
public void processS3Requests(List<S3Request> s3Requests, List<String> ids) {
|
||||
s3Requests.forEach(request -> {
|
||||
List<CompletableFuture<Void>> futures = ids.stream()
|
||||
.map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id)))
|
||||
.toList();
|
||||
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
||||
.thenRun(() -> logger.info("Successfully processed all S3 requests."))
|
||||
.exceptionally(ex -> {
|
||||
logger.error("Failed to process S3 requests", ex);
|
||||
return null;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private void processS3Request(S3Request request, String id) {
|
||||
try {
|
||||
List<String> files = new ArrayList<>();
|
||||
|
||||
if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
|
||||
for (RequestArgument argument : request.getRequestArguments()) {
|
||||
try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
|
||||
String query = argument.getRequestURL();
|
||||
List<String> result = fetchFileListFromDatabaseSQL(connection, query);
|
||||
if (result != null && !result.isEmpty()) {
|
||||
files.addAll(result);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Failed to execute query for RequestArgument", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
files.forEach(file -> {
|
||||
HttpRequest httpRequest;
|
||||
try {
|
||||
httpRequest = S3HttpConnection.buildHttpRequest(request, file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
|
||||
.thenAccept(response -> {
|
||||
if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) {
|
||||
logger.info("Successfully deleted object for ID {}", id);
|
||||
} else {
|
||||
logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode());
|
||||
}
|
||||
})
|
||||
.exceptionally(ex -> {
|
||||
logger.error("Failed to delete object for ID {}", id, ex);
|
||||
return null;
|
||||
});
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to process S3 request for id: {}", id, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Process AQL requests
|
||||
private void processAqlRequests(AqlRequest request, List<String> ids) {
|
||||
ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams());
|
||||
String aqlQuery = buildAqlQuery(request, ids);
|
||||
List<String> result = executeAqlQueryForStrings(arangoDb, aqlQuery);
|
||||
logger.info("Successfully executed AQL request and retrieved results: {}", result);
|
||||
}
|
||||
|
||||
private String buildAqlQuery(AqlRequest request, List<String> ids) {
|
||||
String collection = request.getAqlConnectionParams().getCollection();
|
||||
String inClause = "FOR doc IN " + collection + " FILTER doc.id IN [" +
|
||||
String.join(", ", ids.stream().map(id -> "'" + id + "'").toArray(String[]::new)) +
|
||||
"] RETURN doc";
|
||||
return request.getRequestURL().replace("${collection}", collection).replace("${clause}", inClause);
|
||||
}
|
||||
|
||||
private List<String> executeAqlQueryForStrings(ArangoDatabase arangoDb, String query) {
|
||||
List<String> results = new ArrayList<>();
|
||||
try (ArangoCursor<String> cursor = arangoDb.query(query, null, null, null)) {
|
||||
while (cursor.hasNext()) {
|
||||
results.add(cursor.next());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to execute AQL query", e);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,98 @@
|
|||
package org.micord.service;
|
||||
|
||||
import org.micord.config.DatabaseConnection;
|
||||
import org.micord.config.S3HttpConnection;
|
||||
import org.micord.models.RequestArgument;
|
||||
import org.micord.models.S3Request;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpResponse;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* @author Maksim Tereshin
|
||||
*/
|
||||
@Service
|
||||
public class S3Service {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(S3Service.class);
|
||||
|
||||
@Autowired
|
||||
private SqlAqlService sqlAndAqlService;
|
||||
|
||||
@Autowired
|
||||
private HttpClient httpClient;
|
||||
|
||||
public void processS3Requests(List<S3Request> s3Requests, List<String> ids) {
|
||||
if (s3Requests != null) {
|
||||
s3Requests.forEach(request -> {
|
||||
List<CompletableFuture<Void>> futures = ids.stream()
|
||||
.map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id)))
|
||||
.toList();
|
||||
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
||||
.thenRun(() -> logger.info("Successfully processed all S3 requests."))
|
||||
.exceptionally(ex -> {
|
||||
logger.error("Failed to process S3 requests", ex);
|
||||
return null;
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void processS3Request(S3Request request, String id) {
|
||||
try {
|
||||
List<String> files = new ArrayList<>();
|
||||
|
||||
if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
|
||||
for (RequestArgument argument : request.getRequestArguments()) {
|
||||
try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
|
||||
String query = argument.getRequestURL();
|
||||
List<String> result = sqlAndAqlService.fetchFileListFromDatabaseSQL(connection, query);
|
||||
if (result != null && !result.isEmpty()) {
|
||||
files.addAll(result);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Failed to execute query for RequestArgument", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
files.forEach(file -> {
|
||||
HttpRequest httpRequest;
|
||||
try {
|
||||
httpRequest = S3HttpConnection.buildHttpRequest(request, file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
|
||||
.thenAccept(response -> {
|
||||
if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) {
|
||||
logger.info("Successfully deleted object for ID {}", id);
|
||||
} else {
|
||||
logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode());
|
||||
}
|
||||
})
|
||||
.exceptionally(ex -> {
|
||||
logger.error("Failed to delete object for ID {}", id, ex);
|
||||
return null;
|
||||
});
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to process S3 request for id: {}", id, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,204 @@
|
|||
package org.micord.service;
|
||||
|
||||
import com.arangodb.ArangoCursor;
|
||||
import com.arangodb.ArangoDBException;
|
||||
import com.arangodb.ArangoDatabase;
|
||||
import com.arangodb.entity.StreamTransactionEntity;
|
||||
import com.arangodb.model.AqlQueryOptions;
|
||||
import com.arangodb.model.StreamTransactionOptions;
|
||||
import com.arangodb.model.TransactionOptions;
|
||||
import org.micord.config.ArangoDBConnection;
|
||||
import org.micord.config.DatabaseConnection;
|
||||
import org.micord.models.AqlRequest;
|
||||
import org.micord.models.RequestArgument;
|
||||
import org.micord.models.Requests;
|
||||
import org.micord.models.SqlRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author Maksim Tereshin
|
||||
*/
|
||||
@Service
|
||||
public class SqlAqlService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class);
|
||||
|
||||
@Transactional
|
||||
public void processSqlAndAqlRequests(Requests config, List<String> ids) {
|
||||
if (config.getSqlRequests() != null) {
|
||||
for (SqlRequest request : config.getSqlRequests()) {
|
||||
processSqlRequests(request, ids);
|
||||
}
|
||||
}
|
||||
|
||||
if (config.getAqlRequests() != null) {
|
||||
for (AqlRequest request : config.getAqlRequests()) {
|
||||
processAqlRequests(request, ids);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processSqlRequests(SqlRequest request, List<String> ids) {
|
||||
try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) {
|
||||
String query = buildSqlQuery(request, String.join(",", ids));
|
||||
executeSqlQuery(connection, query);
|
||||
} catch (SQLException e) {
|
||||
logger.error("SQL query execution failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildSqlQuery(SqlRequest request, String ids) {
|
||||
StringBuilder clauseBuilder = new StringBuilder("id IN (" + ids + ")");
|
||||
|
||||
if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
|
||||
for (RequestArgument argument : request.getRequestArguments()) {
|
||||
try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
|
||||
String query = argument.getRequestURL();
|
||||
List<String> result = fetchFileListFromDatabaseSQL(connection, query);
|
||||
|
||||
if (result != null && !result.isEmpty()) {
|
||||
String resultSet = String.join(", ", result.stream()
|
||||
.map(s -> "'" + s + "'")
|
||||
.toArray(String[]::new));
|
||||
clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Failed to execute query for RequestArgument", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String clause = clauseBuilder.toString();
|
||||
return request.getRequestURL().replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase()).replace("${clause}", clause);
|
||||
}
|
||||
|
||||
private void executeSqlQuery(Connection connection, String query) throws SQLException {
|
||||
try (PreparedStatement stmt = connection.prepareStatement(query)) {
|
||||
stmt.execute();
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException {
|
||||
List<String> results = new ArrayList<>();
|
||||
try (PreparedStatement stmt = connection.prepareStatement(query);
|
||||
ResultSet rs = stmt.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
results.add(rs.getString(1)); // Fetch the first column
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private void processAqlRequests(AqlRequest request, List<String> ids) {
|
||||
ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams());
|
||||
|
||||
RequestArgument requestArgument = request.getRequestArguments().get(0);
|
||||
String aqlCollectionRead = requestArgument.getAqlCollectionRead();
|
||||
String aqlCollectionWrite = requestArgument.getId();
|
||||
|
||||
StreamTransactionEntity tx = null;
|
||||
try {
|
||||
StreamTransactionOptions options = new StreamTransactionOptions()
|
||||
.writeCollections(aqlCollectionWrite)
|
||||
.readCollections(aqlCollectionRead);
|
||||
|
||||
tx = arangoDb.beginStreamTransaction(options);
|
||||
String transactionId = tx.getId();
|
||||
|
||||
logger.info("Stream transaction started with ID: {}", transactionId);
|
||||
|
||||
List<String> entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids, transactionId);
|
||||
executeMainAqlRequest(arangoDb, request, entities, transactionId);
|
||||
|
||||
arangoDb.commitStreamTransaction(transactionId);
|
||||
logger.info("Stream transaction with ID {} committed successfully", transactionId);
|
||||
|
||||
} catch (ArangoDBException e) {
|
||||
if (tx != null) {
|
||||
arangoDb.abortStreamTransaction(tx.getId());
|
||||
logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e);
|
||||
}
|
||||
throw new RuntimeException("Failed to execute AQL request within a stream transaction", e);
|
||||
}
|
||||
|
||||
logger.info("Successfully executed AQL request");
|
||||
}
|
||||
|
||||
private List<String> executeSelectAqlRequest(ArangoDatabase arangoDb, List<RequestArgument> requestArguments, List<String> ids, String transactionId) {
|
||||
List<String> entityIdList = new ArrayList<>();
|
||||
|
||||
RequestArgument argument = requestArguments.get(0);
|
||||
|
||||
String query = argument.getRequestURL();
|
||||
String entityType = argument.getId();
|
||||
|
||||
Map<String, Object> bindVars = new HashMap<>();
|
||||
bindVars.put("ids", ids);
|
||||
|
||||
AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId);
|
||||
|
||||
try (ArangoCursor<Map> cursor = arangoDb.query(query, Map.class, bindVars, aqlQueryOptions)) {
|
||||
while (cursor.hasNext()) {
|
||||
Map result = cursor.next();
|
||||
switch (entityType) {
|
||||
case "applicationId":
|
||||
entityIdList.add((String) result.get("applicationId"));
|
||||
break;
|
||||
case "edgesId":
|
||||
entityIdList.addAll((List<String>) result.get("edgesId"));
|
||||
break;
|
||||
case "subjectId":
|
||||
entityIdList.addAll((List<String>) result.get("subjectId"));
|
||||
break;
|
||||
case "historyId":
|
||||
entityIdList.addAll((List<String>) result.get("historyId"));
|
||||
break;
|
||||
case "interdepreqId":
|
||||
entityIdList.addAll((List<String>) result.get("interdepreqId"));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid requestArgumentId: " + entityType);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to execute AQL query", e);
|
||||
}
|
||||
|
||||
return entityIdList;
|
||||
}
|
||||
|
||||
private void executeMainAqlRequest(ArangoDatabase arangoDb, AqlRequest request, List<String> entityIdList, String transactionId) {
|
||||
if (entityIdList == null || entityIdList.isEmpty()) {
|
||||
logger.warn("No entities found for main AQL request.");
|
||||
return;
|
||||
}
|
||||
|
||||
String entity = request.getRequestArguments().get(0).getId();
|
||||
|
||||
Map<String, Object> bindVars = new HashMap<>();
|
||||
bindVars.put("ids", entityIdList);
|
||||
|
||||
|
||||
String finalQuery = request.getRequestURL()
|
||||
.replace("${entity}", entity);
|
||||
|
||||
AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId);
|
||||
|
||||
arangoDb.query(finalQuery, null, bindVars, aqlQueryOptions);
|
||||
|
||||
logger.info("Successfully removed {}: {}", entity, entityIdList);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
configDirectory: /Users/maksim/Projects/Micord/configs
|
||||
|
||||
|
||||
#spring:
|
||||
# datasource:
|
||||
# url: jdbc:mysql://localhost:3306/micord?useSSL=false
|
||||
|
|
@ -14,5 +13,3 @@ configDirectory: /Users/maksim/Projects/Micord/configs
|
|||
# properties:
|
||||
# hibernate:
|
||||
# dialect: org.hibernate.dialect.MySQLDialect
|
||||
server:
|
||||
port: 8090
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue