Skip to content

Commit e38f941

Browse files
committed
DATAES-799 - Support optimistic locking for full update scenario using seq_no + primary_term.
Make tests for ElasticsearchTemplate pass
1 parent a8643c4 commit e38f941

File tree

5 files changed

+126
-11
lines changed

5 files changed

+126
-11
lines changed

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslator.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.ElasticsearchStatusException;
2424
import org.elasticsearch.common.ValidationException;
25+
import org.elasticsearch.index.engine.VersionConflictEngineException;
2526
import org.elasticsearch.rest.RestStatus;
2627
import org.springframework.dao.DataAccessException;
2728
import org.springframework.dao.DataAccessResourceFailureException;
@@ -75,15 +76,22 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
7576
}
7677

7778
private boolean isSeqNoConflict(ElasticsearchException exception) {
78-
if (!(exception instanceof ElasticsearchStatusException)) {
79-
return false;
79+
80+
if (exception instanceof ElasticsearchStatusException) {
81+
ElasticsearchStatusException statusException = (ElasticsearchStatusException) exception;
82+
return statusException.status() == RestStatus.CONFLICT
83+
&& statusException.getMessage() != null
84+
&& statusException.getMessage().contains("type=version_conflict_engine_exception")
85+
&& statusException.getMessage().contains("version conflict, required seqNo");
86+
}
87+
88+
if (exception instanceof VersionConflictEngineException) {
89+
VersionConflictEngineException versionConflictEngineException = (VersionConflictEngineException) exception;
90+
return versionConflictEngineException.getMessage() != null
91+
&& versionConflictEngineException.getMessage().contains("version conflict, required seqNo");
8092
}
8193

82-
ElasticsearchStatusException statusException = (ElasticsearchStatusException) exception;
83-
return statusException.status() == RestStatus.CONFLICT
84-
&& statusException.getMessage() != null
85-
&& statusException.getMessage().contains("type=version_conflict_engine_exception")
86-
&& statusException.getMessage().contains("version conflict, required seqNo");
94+
return false;
8795
}
8896

8997
private boolean indexAvailable(ElasticsearchException ex) {

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.get.GetResponse;
2828
import org.elasticsearch.action.get.MultiGetRequestBuilder;
2929
import org.elasticsearch.action.index.IndexRequestBuilder;
30+
import org.elasticsearch.action.index.IndexResponse;
3031
import org.elasticsearch.action.search.MultiSearchRequest;
3132
import org.elasticsearch.action.search.MultiSearchResponse;
3233
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -89,6 +90,9 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
8990
private Client client;
9091
@Nullable private String searchTimeout;
9192

93+
// TODO: is it correct to use it here?
94+
private final ElasticsearchExceptionTranslator exceptionTranslator = new ElasticsearchExceptionTranslator();
95+
9296
// region Initialization
9397
public ElasticsearchTemplate(Client client) {
9498
this.client = client;
@@ -145,7 +149,14 @@ public String index(IndexQuery query, IndexCoordinates index) {
145149
maybeCallbackBeforeConvertWithQuery(query, index);
146150

147151
IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index);
148-
String documentId = indexRequestBuilder.execute().actionGet().getId();
152+
ActionFuture<IndexResponse> future = indexRequestBuilder.execute();
153+
IndexResponse response;
154+
try {
155+
response = future.actionGet();
156+
} catch (RuntimeException e) {
157+
throw translateException(e);
158+
}
159+
String documentId = response.getId();
149160

150161
// We should call this because we are not going through a mapper.
151162
Object queryObject = query.getObject();
@@ -360,4 +371,22 @@ public Client getClient() {
360371
return client;
361372
}
362373
// endregion
374+
375+
/**
376+
* translates an Exception if possible. Exceptions that are no {@link RuntimeException}s are wrapped in a
377+
* RuntimeException
378+
*
379+
* @param exception the Exception to map
380+
* @return the potentially translated RuntimeException.
381+
* @since 4.0
382+
*/
383+
private RuntimeException translateException(Exception exception) {
384+
385+
RuntimeException runtimeException = exception instanceof RuntimeException ? (RuntimeException) exception
386+
: new RuntimeException(exception.getMessage(), exception);
387+
RuntimeException potentiallyTranslatedException = exceptionTranslator
388+
.translateExceptionIfPossible(runtimeException);
389+
390+
return potentiallyTranslatedException != null ? potentiallyTranslatedException : runtimeException;
391+
}
363392
}

src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,13 @@ public IndexRequestBuilder indexRequestBuilder(Client client, IndexQuery query,
382382
indexRequestBuilder.setVersionType(versionType);
383383
}
384384

385+
if (query.getSeqNo() != null) {
386+
indexRequestBuilder.setIfSeqNo(query.getSeqNo());
387+
}
388+
if (query.getPrimaryTerm() != null) {
389+
indexRequestBuilder.setIfPrimaryTerm(query.getPrimaryTerm());
390+
}
391+
385392
return indexRequestBuilder;
386393
}
387394

@@ -808,6 +815,9 @@ private SearchRequestBuilder prepareSearchRequestBuilder(Query query, Client cli
808815
.setSearchType(query.getSearchType()) //
809816
.setVersion(true) //
810817
.setTrackScores(query.getTrackScores());
818+
if (hasSeqNoPrimaryTermProperty(clazz)) {
819+
searchRequestBuilder.seqNoAndPrimaryTerm(true);
820+
}
811821

812822
if (query.getSourceFilter() != null) {
813823
SourceFilter sourceFilter = query.getSourceFilter();

src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchExceptionTranslatorTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
import static org.assertj.core.api.Assertions.*;
44

55
import org.elasticsearch.ElasticsearchStatusException;
6+
import org.elasticsearch.index.engine.VersionConflictEngineException;
7+
import org.elasticsearch.index.shard.ShardId;
68
import org.elasticsearch.rest.RestStatus;
79
import org.junit.jupiter.api.Test;
810
import org.springframework.dao.DataAccessException;
911
import org.springframework.dao.OptimisticLockingFailureException;
1012

13+
import java.util.UUID;
14+
1115
/**
1216
* @author Roman Puchkovskiy
1317
*/
@@ -26,4 +30,17 @@ void shouldConvertElasticsearchStatusExceptionWithSeqNoConflictToOptimisticLocki
2630
assertThat(translated.getMessage()).startsWith("Cannot index a document due to seq_no+primary_term conflict");
2731
assertThat(translated.getCause()).isSameAs(ex);
2832
}
33+
34+
@Test // DATAES-799
35+
void shouldConvertVersionConflictEngineExceptionWithSeqNoConflictToOptimisticLockingFailureException() {
36+
VersionConflictEngineException ex = new VersionConflictEngineException(
37+
new ShardId("index", "uuid", 1), "exception-id",
38+
"Elasticsearch exception [type=version_conflict_engine_exception, reason=[WPUUsXEB6uuA6j8_A7AB]: version conflict, required seqNo [34], primary term [16]. current document has seqNo [35] and primary term [16]]");
39+
40+
DataAccessException translated = translator.translateExceptionIfPossible(ex);
41+
42+
assertThat(translated).isInstanceOf(OptimisticLockingFailureException.class);
43+
assertThat(translated.getMessage()).startsWith("Cannot index a document due to seq_no+primary_term conflict");
44+
assertThat(translated.getCause()).isSameAs(ex);
45+
}
2946
}

src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.Arrays;
2424
import java.util.HashSet;
2525

26+
import org.elasticsearch.action.index.IndexAction;
2627
import org.elasticsearch.action.index.IndexRequest;
28+
import org.elasticsearch.action.index.IndexRequestBuilder;
2729
import org.elasticsearch.action.search.SearchAction;
2830
import org.elasticsearch.action.search.SearchRequest;
2931
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -172,7 +174,23 @@ void shouldIncludeSeqNoAndPrimaryTermFromIndexQueryToIndexRequest() {
172174
}
173175

174176
@Test // DATAES-799
175-
void shouldNotRequestSeqNoAndPrimartyTermViaSearchRequestWhenEntityClassDoesNotContainSeqNoPrimaryTermProperty() {
177+
void shouldIncludeSeqNoAndPrimaryTermFromIndexQueryToIndexRequestBuilder() {
178+
when(client.prepareIndex(anyString(), anyString()))
179+
.thenReturn(new IndexRequestBuilder(client, IndexAction.INSTANCE));
180+
181+
IndexQuery query = new IndexQuery();
182+
query.setObject(new Person());
183+
query.setSeqNo(1L);
184+
query.setPrimaryTerm(2L);
185+
186+
IndexRequestBuilder builder = requestFactory.indexRequestBuilder(client, query, IndexCoordinates.of("persons"));
187+
188+
assertThat(builder.request().ifSeqNo()).isEqualTo(1L);
189+
assertThat(builder.request().ifPrimaryTerm()).isEqualTo(2L);
190+
}
191+
192+
@Test // DATAES-799
193+
void shouldNotRequestSeqNoAndPrimaryTermViaSearchRequestWhenEntityClassDoesNotContainSeqNoPrimaryTermProperty() {
176194
Query query = new NativeSearchQueryBuilder().build();
177195

178196
SearchRequest request = requestFactory.searchRequest(query, Person.class, IndexCoordinates.of("persons"));
@@ -181,7 +199,7 @@ void shouldNotRequestSeqNoAndPrimartyTermViaSearchRequestWhenEntityClassDoesNotC
181199
}
182200

183201
@Test // DATAES-799
184-
void shouldRequestSeqNoAndPrimartyTermViaSearchRequestWhenEntityClassContainsSeqNoPrimaryTermProperty() {
202+
void shouldRequestSeqNoAndPrimaryTermViaSearchRequestWhenEntityClassContainsSeqNoPrimaryTermProperty() {
185203
Query query = new NativeSearchQueryBuilder().build();
186204

187205
SearchRequest request = requestFactory.searchRequest(query, EntityWithSeqNoPrimaryTerm.class,
@@ -191,14 +209,47 @@ void shouldRequestSeqNoAndPrimartyTermViaSearchRequestWhenEntityClassContainsSeq
191209
}
192210

193211
@Test // DATAES-799
194-
void shouldNotRequestSeqNoAndPrimartyTermViaSearchRequestWhenEntityClassIsNull() {
212+
void shouldNotRequestSeqNoAndPrimaryTermViaSearchRequestWhenEntityClassIsNull() {
195213
Query query = new NativeSearchQueryBuilder().build();
196214

197215
SearchRequest request = requestFactory.searchRequest(query, null, IndexCoordinates.of("persons"));
198216

199217
assertThat(request.source().seqNoAndPrimaryTerm()).isNull();
200218
}
201219

220+
@Test // DATAES-799
221+
void shouldNotRequestSeqNoAndPrimaryTermViaSearchRequestBuilderWhenEntityClassDoesNotContainSeqNoPrimaryTermProperty() {
222+
when(client.prepareSearch(any())).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE));
223+
Query query = new NativeSearchQueryBuilder().build();
224+
225+
SearchRequestBuilder builder = requestFactory.searchRequestBuilder(client, query, Person.class,
226+
IndexCoordinates.of("persons"));
227+
228+
assertThat(builder.request().source().seqNoAndPrimaryTerm()).isNull();
229+
}
230+
231+
@Test // DATAES-799
232+
void shouldRequestSeqNoAndPrimaryTermViaSearchRequestBuilderWhenEntityClassContainsSeqNoPrimaryTermProperty() {
233+
when(client.prepareSearch(any())).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE));
234+
Query query = new NativeSearchQueryBuilder().build();
235+
236+
SearchRequestBuilder builder = requestFactory.searchRequestBuilder(client, query,
237+
EntityWithSeqNoPrimaryTerm.class, IndexCoordinates.of("seqNoPrimaryTerm"));
238+
239+
assertThat(builder.request().source().seqNoAndPrimaryTerm()).isTrue();
240+
}
241+
242+
@Test // DATAES-799
243+
void shouldNotRequestSeqNoAndPrimaryTermViaSearchRequestBuilderWhenEntityClassIsNull() {
244+
when(client.prepareSearch(any())).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE));
245+
Query query = new NativeSearchQueryBuilder().build();
246+
247+
SearchRequestBuilder builder = requestFactory.searchRequestBuilder(client, query, null,
248+
IndexCoordinates.of("persons"));
249+
250+
assertThat(builder.request().source().seqNoAndPrimaryTerm()).isNull();
251+
}
252+
202253
static class Person {
203254
@Nullable @Id String id;
204255
@Nullable @Field(name = "last-name") String lastName;

0 commit comments

Comments
 (0)