This commit is contained in:
kochetkov 2024-09-19 15:36:45 +03:00
parent 702920da9f
commit 8fe886c626
3 changed files with 254 additions and 227 deletions

View file

@ -1,6 +1,5 @@
package org.micord.service; package org.micord.service;
import org.micord.exceptions.FileNotModifiedException;
import org.micord.models.*; import org.micord.models.*;
import org.micord.utils.ConfigLoader; import org.micord.utils.ConfigLoader;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -8,7 +7,6 @@ import org.springframework.stereotype.Service;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
@Service @Service
public class ApiService { public class ApiService {

View file

@ -30,69 +30,6 @@ public class S3Service {
@Autowired @Autowired
private SqlAqlService sqlAndAqlService; private SqlAqlService sqlAndAqlService;
@Autowired
private HttpClient httpClient;
public void processS3Requests(List<S3Request> s3Requests, List<String> ids) {
if (s3Requests != null) {
s3Requests.forEach(request -> {
List<CompletableFuture<Void>> futures = ids.stream()
.map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id)))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> logger.info("Successfully processed all S3 requests."))
.exceptionally(ex -> {
logger.error("Failed to process S3 requests", ex);
return null;
});
});
}
}
private void processS3Request(S3Request request, String id) {
try {
List<String> files = new ArrayList<>();
if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
for (RequestArgument argument : request.getRequestArguments()) {
try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) {
String query = argument.getRequestURL();
List<String> result = sqlAndAqlService.fetchFileListFromDatabaseSQL(connection, query);
if (result != null && !result.isEmpty()) {
files.addAll(result);
}
} catch (SQLException e) {
logger.error("Failed to execute query for RequestArgument", e);
}
}
}
files.forEach(file -> {
HttpRequest httpRequest;
try {
httpRequest = S3HttpConnection.buildHttpRequest(request, file);
} catch (Exception e) {
throw new RuntimeException(e);
}
httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
.thenAccept(response -> {
if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT || response.statusCode() == HttpURLConnection.HTTP_OK) {
logger.info("Successfully deleted object for ID {}", id);
} else {
logger.error("Failed to delete object for ID {}. Response code: {}", id, response.statusCode());
}
})
.exceptionally(ex -> {
logger.error("Failed to delete object for ID {}", id, ex);
return null;
});
});
} catch (Exception e) {
logger.error("Failed to process S3 request for id: {}", id, e);
}
}
} }

View file

@ -1,24 +1,9 @@
package org.micord.service; package org.micord.service;
import com.arangodb.ArangoCursor; import java.net.HttpURLConnection;
import com.arangodb.ArangoDBException; import java.net.http.HttpClient;
import com.arangodb.ArangoDatabase; import java.net.http.HttpRequest;
import com.arangodb.entity.StreamTransactionEntity; import java.net.http.HttpResponse;
import com.arangodb.model.AqlQueryOptions;
import com.arangodb.model.StreamTransactionOptions;
import com.arangodb.model.TransactionOptions;
import org.micord.config.ArangoDBConnection;
import org.micord.config.DatabaseConnection;
import org.micord.models.AqlRequest;
import org.micord.models.RequestArgument;
import org.micord.models.Requests;
import org.micord.models.SqlRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
@ -27,6 +12,27 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import com.arangodb.ArangoCursor;
import com.arangodb.ArangoDBException;
import com.arangodb.ArangoDatabase;
import com.arangodb.entity.StreamTransactionEntity;
import com.arangodb.model.AqlQueryOptions;
import com.arangodb.model.StreamTransactionOptions;
import org.micord.config.ArangoDBConnection;
import org.micord.config.DatabaseConnection;
import org.micord.config.S3HttpConnection;
import org.micord.models.AqlRequest;
import org.micord.models.RequestArgument;
import org.micord.models.Requests;
import org.micord.models.S3Request;
import org.micord.models.SqlRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/** /**
* @author Maksim Tereshin * @author Maksim Tereshin
@ -34,183 +40,269 @@ import java.util.Map;
@Service @Service
public class SqlAqlService { public class SqlAqlService {
private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class); private static final Logger logger = LoggerFactory.getLogger(SqlAqlService.class);
@Autowired @Autowired
private S3Service s3Service; private HttpClient httpClient;
@Transactional public void processS3Requests(List<S3Request> s3Requests, List<String> ids) {
public void processSqlAndAqlRequests(Requests config, List<String> ids) { if (s3Requests != null) {
if (config.getSqlRequests() != null) { s3Requests.forEach(request -> {
for (SqlRequest request : config.getSqlRequests()) { List<CompletableFuture<Void>> futures = ids.stream()
processSqlRequests(request, ids); .map(id -> CompletableFuture.runAsync(() -> processS3Request(request, id)))
} .toList();
}
if (config.getAqlRequests() != null) { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
for (AqlRequest request : config.getAqlRequests()) { .thenRun(() -> logger.info("Successfully processed all S3 requests."))
processAqlRequests(request, ids); .exceptionally(ex -> {
} logger.error("Failed to process S3 requests", ex);
} return null;
s3Service.processS3Requests(config.getS3Requests(), ids); });
} });
}
}
private void processSqlRequests(SqlRequest request, List<String> ids) { private void processS3Request(S3Request request, String id) {
try (Connection connection = DatabaseConnection.getConnection(request.getSqlConnectionParams())) { try {
String query = buildSqlQuery(request, String.join(",", ids)); List<String> files = new ArrayList<>();
executeSqlQuery(connection, query);
} catch (SQLException e) {
logger.error("SQL query execution failed", e);
}
}
private String buildSqlQuery(SqlRequest request, String ids) { if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
StringBuilder clauseBuilder = new StringBuilder(); for (RequestArgument argument : request.getRequestArguments()) {
try (Connection connection = DatabaseConnection.getConnection(
argument.getSqlConnectionParams())) {
String query = argument.getRequestURL();
List<String> result = fetchFileListFromDatabaseSQL(connection, query);
if (result != null && !result.isEmpty()) {
files.addAll(result);
}
}
catch (SQLException e) {
logger.error("Failed to execute query for RequestArgument", e);
}
}
}
if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) { files.forEach(file -> {
for (RequestArgument argument : request.getRequestArguments()) { HttpRequest httpRequest;
clauseBuilder.append(argument.getId()).append(" IN (").append(ids).append(")"); try {
httpRequest = S3HttpConnection.buildHttpRequest(request, file);
}
catch (Exception e) {
throw new RuntimeException(e);
}
if (argument.getSqlConnectionParams() != null) { httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString())
try (Connection connection = DatabaseConnection.getConnection(argument.getSqlConnectionParams())) { .thenAccept(response -> {
String query = argument.getRequestURL(); if (response.statusCode() == HttpURLConnection.HTTP_NO_CONTENT
List<String> result = fetchFileListFromDatabaseSQL(connection, query); || response.statusCode() == HttpURLConnection.HTTP_OK) {
logger.info("Successfully deleted object for ID {}", id);
}
else {
logger.error("Failed to delete object for ID {}. Response code: {}", id,
response.statusCode()
);
}
})
.exceptionally(ex -> {
logger.error("Failed to delete object for ID {}", id, ex);
return null;
});
});
if (result != null && !result.isEmpty()) { }
String resultSet = String.join(", ", result.stream() catch (Exception e) {
.map(s -> "'" + s + "'") logger.error("Failed to process S3 request for id: {}", id, e);
.toArray(String[]::new)); }
clauseBuilder.append(" OR ").append(argument.getId()).append(" IN (").append(resultSet).append(")"); }
}
} catch (SQLException e) {
logger.error("Failed to execute query for RequestArgument", e);
}
}
}
}
String clause = clauseBuilder.toString(); @Transactional
return request.getRequestURL() public void processSqlAndAqlRequests(Requests config, List<String> ids) {
.replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase()) if (config.getSqlRequests() != null) {
.replace("${clause}", clause) for (SqlRequest request : config.getSqlRequests()) {
.replace("${ids}", ids); processSqlRequests(request, ids);
} }
}
private void executeSqlQuery(Connection connection, String query) throws SQLException { if (config.getAqlRequests() != null) {
try (PreparedStatement stmt = connection.prepareStatement(query)) { for (AqlRequest request : config.getAqlRequests()) {
stmt.execute(); processAqlRequests(request, ids);
} }
} }
processS3Requests(config.getS3Requests(), ids);
}
public List<String> fetchFileListFromDatabaseSQL(Connection connection, String query) throws SQLException { private void processSqlRequests(SqlRequest request, List<String> ids) {
List<String> results = new ArrayList<>(); try (Connection connection = DatabaseConnection.getConnection(
try (PreparedStatement stmt = connection.prepareStatement(query); request.getSqlConnectionParams())) {
ResultSet rs = stmt.executeQuery()) { String query = buildSqlQuery(request, String.join(",", ids));
while (rs.next()) { executeSqlQuery(connection, query);
results.add(rs.getString(1)); // Fetch the first column }
} catch (SQLException e) {
} logger.error("SQL query execution failed", e);
return results; }
} }
private void processAqlRequests(AqlRequest request, List<String> ids) { private String buildSqlQuery(SqlRequest request, String ids) {
ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams()); StringBuilder clauseBuilder = new StringBuilder();
RequestArgument requestArgument = request.getRequestArguments().get(0); if (request.getRequestArguments() != null && !request.getRequestArguments().isEmpty()) {
List<String> aqlCollectionRead = requestArgument.getAqlCollectionRead(); for (RequestArgument argument : request.getRequestArguments()) {
String aqlCollectionWrite = requestArgument.getId(); clauseBuilder.append(argument.getId()).append(" IN (").append(ids).append(")");
StreamTransactionEntity tx = null; if (argument.getSqlConnectionParams() != null) {
try { try (Connection connection = DatabaseConnection.getConnection(
StreamTransactionOptions options = new StreamTransactionOptions() argument.getSqlConnectionParams())) {
.writeCollections(aqlCollectionWrite) String query = argument.getRequestURL();
.readCollections(aqlCollectionRead.toArray(new String[0])); List<String> result = fetchFileListFromDatabaseSQL(connection, query);
tx = arangoDb.beginStreamTransaction(options); if (result != null && !result.isEmpty()) {
String transactionId = tx.getId(); String resultSet = String.join(", ", result.stream()
.map(s -> "'" + s + "'")
.toArray(String[]::new));
clauseBuilder.append(" OR ")
.append(argument.getId())
.append(" IN (")
.append(resultSet)
.append(")");
}
}
catch (SQLException e) {
logger.error("Failed to execute query for RequestArgument", e);
}
}
}
}
logger.info("Stream transaction started with ID: {}", transactionId); String clause = clauseBuilder.toString();
return request.getRequestURL()
.replace("${DB}", request.getSqlConnectionParams().getJdbcDatabase())
.replace("${clause}", clause)
.replace("${ids}", ids);
}
List<String> entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids, transactionId); private void executeSqlQuery(Connection connection, String query) throws SQLException {
executeMainAqlRequest(arangoDb, request, entities, transactionId); try (PreparedStatement stmt = connection.prepareStatement(query)) {
stmt.execute();
}
}
arangoDb.commitStreamTransaction(transactionId); public List<String> fetchFileListFromDatabaseSQL(Connection connection, String query)
logger.info("Stream transaction with ID {} committed successfully", transactionId); throws SQLException {
List<String> results = new ArrayList<>();
try (PreparedStatement stmt = connection.prepareStatement(query);
ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
results.add(rs.getString(1)); // Fetch the first column
}
}
return results;
}
} catch (ArangoDBException e) { private void processAqlRequests(AqlRequest request, List<String> ids) {
if (tx != null) { ArangoDatabase arangoDb = ArangoDBConnection.getConnection(request.getAqlConnectionParams());
arangoDb.abortStreamTransaction(tx.getId());
logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e);
}
throw new RuntimeException("Failed to execute AQL request within a stream transaction", e);
}
logger.info("Successfully executed AQL request"); RequestArgument requestArgument = request.getRequestArguments().get(0);
} List<String> aqlCollectionRead = requestArgument.getAqlCollectionRead();
String aqlCollectionWrite = requestArgument.getId();
private List<String> executeSelectAqlRequest(ArangoDatabase arangoDb, List<RequestArgument> requestArguments, List<String> ids, String transactionId) { StreamTransactionEntity tx = null;
List<String> entityIdList = new ArrayList<>(); try {
StreamTransactionOptions options = new StreamTransactionOptions()
.writeCollections(aqlCollectionWrite)
.readCollections(aqlCollectionRead.toArray(new String[0]));
RequestArgument argument = requestArguments.get(0); tx = arangoDb.beginStreamTransaction(options);
String transactionId = tx.getId();
String query = argument.getRequestURL(); logger.info("Stream transaction started with ID: {}", transactionId);
String entityType = argument.getId();
Map<String, Object> bindVars = new HashMap<>(); List<String> entities = executeSelectAqlRequest(arangoDb, request.getRequestArguments(), ids,
bindVars.put("ids", ids); transactionId
);
executeMainAqlRequest(arangoDb, request, entities, transactionId);
AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); arangoDb.commitStreamTransaction(transactionId);
logger.info("Stream transaction with ID {} committed successfully", transactionId);
try (ArangoCursor<Map> cursor = arangoDb.query(query, Map.class, bindVars, aqlQueryOptions)) { }
while (cursor.hasNext()) { catch (ArangoDBException e) {
Map result = cursor.next(); if (tx != null) {
switch (entityType) { arangoDb.abortStreamTransaction(tx.getId());
case "applicationId": logger.error("Stream transaction with ID {} aborted due to an error", tx.getId(), e);
entityIdList.add((String) result.get("applicationId")); }
break; throw new RuntimeException("Failed to execute AQL request within a stream transaction", e);
case "edgesId": }
entityIdList.addAll((List<String>) result.get("edgesId"));
break;
case "subjectId":
entityIdList.addAll((List<String>) result.get("subjectId"));
break;
case "historyId":
entityIdList.addAll((List<String>) result.get("historyId"));
break;
case "interdepreqId":
entityIdList.addAll((List<String>) result.get("interdepreqId"));
break;
default:
throw new IllegalArgumentException("Invalid requestArgumentId: " + entityType);
}
}
} catch (Exception e) {
logger.error("Failed to execute AQL query", e);
}
return entityIdList; logger.info("Successfully executed AQL request");
} }
private void executeMainAqlRequest(ArangoDatabase arangoDb, AqlRequest request, List<String> entityIdList, String transactionId) { private List<String> executeSelectAqlRequest(ArangoDatabase arangoDb,
if (entityIdList == null || entityIdList.isEmpty()) { List<RequestArgument> requestArguments,
logger.warn("No entities found for main AQL request."); List<String> ids, String transactionId) {
return; List<String> entityIdList = new ArrayList<>();
}
String entity = request.getRequestArguments().get(0).getId(); RequestArgument argument = requestArguments.get(0);
Map<String, Object> bindVars = new HashMap<>(); String query = argument.getRequestURL();
bindVars.put("ids", entityIdList); String entityType = argument.getId();
Map<String, Object> bindVars = new HashMap<>();
bindVars.put("ids", ids);
AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId);
try (ArangoCursor<Map> cursor = arangoDb.query(query, Map.class, bindVars, aqlQueryOptions)) {
while (cursor.hasNext()) {
Map result = cursor.next();
switch (entityType) {
case "applicationId":
entityIdList.add((String) result.get("applicationId"));
break;
case "edgesId":
entityIdList.addAll((List<String>) result.get("edgesId"));
break;
case "subjectId":
entityIdList.addAll((List<String>) result.get("subjectId"));
break;
case "historyId":
entityIdList.addAll((List<String>) result.get("historyId"));
break;
case "interdepreqId":
entityIdList.addAll((List<String>) result.get("interdepreqId"));
break;
default:
throw new IllegalArgumentException("Invalid requestArgumentId: " + entityType);
}
}
}
catch (Exception e) {
logger.error("Failed to execute AQL query", e);
}
return entityIdList;
}
private void executeMainAqlRequest(ArangoDatabase arangoDb, AqlRequest request,
List<String> entityIdList, String transactionId) {
if (entityIdList == null || entityIdList.isEmpty()) {
logger.warn("No entities found for main AQL request.");
return;
}
String entity = request.getRequestArguments().get(0).getId();
Map<String, Object> bindVars = new HashMap<>();
bindVars.put("ids", entityIdList);
String finalQuery = request.getRequestURL() String finalQuery = request.getRequestURL()
.replace("${entity}", entity); .replace("${entity}", entity);
AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId); AqlQueryOptions aqlQueryOptions = new AqlQueryOptions().streamTransactionId(transactionId);
arangoDb.query(finalQuery, null, bindVars, aqlQueryOptions); arangoDb.query(finalQuery, null, bindVars, aqlQueryOptions);
logger.info("Successfully removed {}: {}", entity, entityIdList); logger.info("Successfully removed {}: {}", entity, entityIdList);
} }
} }