Refactor deletion logic in DeletionService to improve batch processing and error handling for vector store deletions
This commit is contained in:
@@ -7,6 +7,8 @@ import java.util.concurrent.CompletableFuture;
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.ai.document.Document;
|
||||||
|
import org.springframework.ai.vectorstore.SearchRequest;
|
||||||
import org.springframework.ai.vectorstore.VectorStore;
|
import org.springframework.ai.vectorstore.VectorStore;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||||
@@ -17,10 +19,8 @@ import com.olympus.apollo.exception.vectorStoreMetaDetailsEmptyException;
|
|||||||
import com.olympus.apollo.repository.KSDocumentRepository;
|
import com.olympus.apollo.repository.KSDocumentRepository;
|
||||||
import com.olympus.apollo.repository.KSGitInfoRepository;
|
import com.olympus.apollo.repository.KSGitInfoRepository;
|
||||||
import com.olympus.apollo.repository.KSGitIngestionInfoRepository;
|
import com.olympus.apollo.repository.KSGitIngestionInfoRepository;
|
||||||
import com.olympus.apollo.repository.KSIngestionInfoRepository;
|
|
||||||
import com.olympus.apollo.repository.KSTextsRepository;
|
import com.olympus.apollo.repository.KSTextsRepository;
|
||||||
import com.olympus.apollo.repository.KSVideoRepository;
|
import com.olympus.apollo.repository.KSVideoRepository;
|
||||||
import com.olympus.apollo.repository.VectorStoreRepository;
|
|
||||||
import com.olympus.dto.DeleteGitRepoDetailsRequest;
|
import com.olympus.dto.DeleteGitRepoDetailsRequest;
|
||||||
import com.olympus.dto.DeletionRequest;
|
import com.olympus.dto.DeletionRequest;
|
||||||
import com.olympus.dto.ResultDTO;
|
import com.olympus.dto.ResultDTO;
|
||||||
@@ -42,18 +42,12 @@ public class DeletionService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private KSTextsRepository ksTextsRepository;
|
private KSTextsRepository ksTextsRepository;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private KSIngestionInfoRepository ksIngestionInfoRepository;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KSGitIngestionInfoRepository ksGitIngestionInfoRepository;
|
private KSGitIngestionInfoRepository ksGitIngestionInfoRepository;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private KSGitInfoRepository ksGitInfoRepository;
|
private KSGitInfoRepository ksGitInfoRepository;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private VectorStoreRepository vectorStoreRepository;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SimpMessagingTemplate simpMessagingTemplate;
|
private SimpMessagingTemplate simpMessagingTemplate;
|
||||||
|
|
||||||
@@ -79,26 +73,45 @@ public class DeletionService {
|
|||||||
@Async("asyncTaskExecutor")
|
@Async("asyncTaskExecutor")
|
||||||
public void deleteRecordsOnlyFromVectorStore(DeletionRequest deletionRequest) {
|
public void deleteRecordsOnlyFromVectorStore(DeletionRequest deletionRequest) {
|
||||||
try {
|
try {
|
||||||
|
KSDocument ksDocument = ksDocumentRepository.findById(deletionRequest.getKsDocumentId()).orElseThrow();
|
||||||
KSDocument ksDocument = ksDocumentRepository.findById(deletionRequest.getKsDocumentId()).get();
|
|
||||||
ksDocument.setIngestionStatus("DELETING");
|
ksDocument.setIngestionStatus("DELETING");
|
||||||
ksDocumentRepository.save(ksDocument);
|
ksDocumentRepository.save(ksDocument);
|
||||||
|
|
||||||
String rag_filter = "KsDocumentId=='"+deletionRequest.getKsDocumentId()+"'";
|
|
||||||
|
|
||||||
logger.info("Starting deletion");
|
logger.info("Starting deletion");
|
||||||
vectorStore.delete(rag_filter);
|
|
||||||
|
|
||||||
//elimino dal vectorStore ma mantengo il record
|
String rag_filter = "KsDocumentId=='" + deletionRequest.getKsDocumentId() + "'";
|
||||||
|
|
||||||
|
|
||||||
|
SearchRequest searchRequest = SearchRequest.builder()
|
||||||
|
.query(" ")
|
||||||
|
.filterExpression(rag_filter)
|
||||||
|
.topK(Integer.MAX_VALUE)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<String> idsToDelete = vectorStore.similaritySearch(searchRequest)
|
||||||
|
.stream()
|
||||||
|
.map(Document::getId)
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
logger.info("Found {} documents to delete for KsDocumentId: {}", idsToDelete.size(), deletionRequest.getKsDocumentId());
|
||||||
|
|
||||||
|
//Batch per eliminare i file con più richieste
|
||||||
|
final int DELETE_BATCH_SIZE = 500;
|
||||||
|
for (int i = 0; i < idsToDelete.size(); i += DELETE_BATCH_SIZE) {
|
||||||
|
int end = Math.min(i + DELETE_BATCH_SIZE, idsToDelete.size());
|
||||||
|
List<String> batch = idsToDelete.subList(i, end);
|
||||||
|
logger.info("Deleting batch from {} to {}", i, end);
|
||||||
|
vectorStore.delete(batch);
|
||||||
|
}
|
||||||
|
|
||||||
ksDocument.setIngestionStatus("LOADED");
|
ksDocument.setIngestionStatus("LOADED");
|
||||||
Date now = new Date();
|
ksDocument.setIngestionDate(new Date());
|
||||||
ksDocument.setIngestionDate(now);
|
|
||||||
|
|
||||||
ksDocumentRepository.save(ksDocument);
|
ksDocumentRepository.save(ksDocument);
|
||||||
|
|
||||||
logger.info("KSDocument with id {} deleted from VectorStore successfully.", deletionRequest.getKsDocumentId());
|
logger.info("KSDocument with id {} deleted from VectorStore successfully.", deletionRequest.getKsDocumentId());
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("An error occurred while deleting records: ", e+" "+Thread.currentThread().getName());
|
logger.error("An error occurred while deleting records: ", e);
|
||||||
throw new RuntimeException("An error occurred while deleting records", e);
|
throw new RuntimeException("An error occurred while deleting records", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user