Skip to content

Commit 32c1af1

Browse files
committed
DATAES-799 - Support optimistic locking for full update scenario using seq_no + primary_term.
Implement seq_no + primary_term support for ReactiveElasticsearchTemplate
1 parent e38f941 commit 32c1af1

File tree

4 files changed

+171
-2
lines changed

4 files changed

+171
-2
lines changed

src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
2222
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
2323
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
24+
import org.springframework.data.elasticsearch.core.mapping.SeqNoPrimaryTerm;
2425
import org.springframework.data.mapping.IdentifierAccessor;
2526
import org.springframework.data.mapping.PersistentPropertyAccessor;
2627
import org.springframework.data.mapping.context.MappingContext;
@@ -35,6 +36,7 @@
3536
* @author Mark Paluch
3637
* @author Christoph Strobl
3738
* @author Peter-Josef Meisch
39+
* @author Roman Puchkovskiy
3840
* @since 3.2
3941
*/
4042
class EntityOperations {
@@ -256,6 +258,21 @@ interface AdaptibleEntity<T> extends Entity<T> {
256258
@Override
257259
@Nullable
258260
Number getVersion();
261+
262+
/**
263+
* Returns whether there is a property with type SeqNoPrimaryTerm in this entity.
264+
*
265+
* @return true if there is SeqNoPrimaryTerm property
266+
* @since 4.0
267+
*/
268+
boolean hasSeqNoPrimaryTerm();
269+
270+
/**
271+
* Returns SeqNoPropertyTerm for this entity.
272+
*
273+
* @return SeqNoPrimaryTerm, may be {@literal null}
274+
*/
275+
@Nullable SeqNoPrimaryTerm getSeqNoPrimaryTerm();
259276
}
260277

261278
/**
@@ -333,6 +350,16 @@ public Number getVersion() {
333350
return null;
334351
}
335352

353+
@Override
354+
public boolean hasSeqNoPrimaryTerm() {
355+
return false;
356+
}
357+
358+
@Override
359+
public SeqNoPrimaryTerm getSeqNoPrimaryTerm() {
360+
return null;
361+
}
362+
336363
/*
337364
* (non-Javadoc)
338365
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
@@ -588,6 +615,19 @@ public Number getVersion() {
588615
return propertyAccessor.getProperty(versionProperty, Number.class);
589616
}
590617

618+
@Override
619+
public boolean hasSeqNoPrimaryTerm() {
620+
return entity.hasSeqNoPrimaryTermProperty();
621+
}
622+
623+
@Override
624+
public SeqNoPrimaryTerm getSeqNoPrimaryTerm() {
625+
626+
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = entity.getRequiredSeqNoPrimaryTermProperty();
627+
628+
return propertyAccessor.getProperty(seqNoPrimaryTermProperty, SeqNoPrimaryTerm.class);
629+
}
630+
591631
/*
592632
* (non-Javadoc)
593633
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.elasticsearch.index.VersionType.*;
1919

20+
import org.springframework.data.elasticsearch.core.mapping.SeqNoPrimaryTerm;
2021
import reactor.core.publisher.Flux;
2122
import reactor.core.publisher.Mono;
2223
import reactor.util.function.Tuple2;
@@ -347,7 +348,23 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In
347348

348349
request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
349350

350-
if (entity.isVersionedEntity()) {
351+
boolean usingSeqNo = false;
352+
if (entity.hasSeqNoPrimaryTerm()) {
353+
SeqNoPrimaryTerm seqNoPrimaryTerm = entity.getSeqNoPrimaryTerm();
354+
355+
if (seqNoPrimaryTerm != null) {
356+
if (seqNoPrimaryTerm.getSequenceNumber() != null) {
357+
request.setIfSeqNo(seqNoPrimaryTerm.getSequenceNumber());
358+
usingSeqNo = true;
359+
}
360+
if (seqNoPrimaryTerm.getPrimaryTerm() != null) {
361+
request.setIfPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm());
362+
}
363+
}
364+
}
365+
366+
// seq_no and version are incompatible in the same request
367+
if (!usingSeqNo && entity.isVersionedEntity()) {
351368

352369
Number version = entity.getVersion();
353370

@@ -356,6 +373,7 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In
356373
request.versionType(EXTERNAL);
357374
}
358375
}
376+
359377
return request;
360378
}
361379

src/main/java/org/springframework/data/elasticsearch/core/mapping/ElasticsearchPersistentEntity.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,14 @@ public interface ElasticsearchPersistentEntity<T> extends PersistentEntity<T, El
117117
*/
118118
@Nullable
119119
ElasticsearchPersistentProperty getSeqNoPrimaryTermProperty();
120+
121+
default ElasticsearchPersistentProperty getRequiredSeqNoPrimaryTermProperty() {
122+
ElasticsearchPersistentProperty property = this.getSeqNoPrimaryTermProperty();
123+
if (property != null) {
124+
return property;
125+
} else {
126+
throw new IllegalStateException(String.format("Required SeqNoPrimaryTerm property not found for %s!",
127+
this.getType()));
128+
}
129+
}
120130
}

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import static java.util.Collections.*;
1819
import static org.assertj.core.api.Assertions.*;
1920
import static org.elasticsearch.index.query.QueryBuilders.*;
2021
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
@@ -24,8 +25,10 @@
2425
import lombok.Data;
2526
import lombok.EqualsAndHashCode;
2627
import lombok.NoArgsConstructor;
28+
import org.elasticsearch.index.query.IdsQueryBuilder;
2729
import org.springframework.dao.OptimisticLockingFailureException;
2830
import org.springframework.data.elasticsearch.core.mapping.SeqNoPrimaryTerm;
31+
import org.springframework.data.elasticsearch.core.query.Query;
2932
import reactor.core.publisher.Mono;
3033
import reactor.test.StepVerifier;
3134

@@ -858,6 +861,61 @@ void shouldReturnEmptyFluxOnSaveAllWithEmptyInput() {
858861
.verifyComplete();
859862
}
860863

864+
@Test // DATAES-799
865+
void getShouldReturnSeqNoPrimaryTerm() {
866+
OptimisticEntity original = new OptimisticEntity();
867+
original.setMessage("It's fine");
868+
OptimisticEntity saved = template.save(original).block();
869+
870+
template.get(saved.getId(), OptimisticEntity.class)
871+
.as(StepVerifier::create)
872+
.assertNext(this::assertThatSeqNoPrimaryTermIsFilled)
873+
.verifyComplete();
874+
}
875+
876+
private void assertThatSeqNoPrimaryTermIsFilled(OptimisticEntity retrieved) {
877+
assertThat(retrieved.seqNoPrimaryTerm).isNotNull();
878+
assertThat(retrieved.seqNoPrimaryTerm.getSequenceNumber()).isNotNull();
879+
assertThat(retrieved.seqNoPrimaryTerm.getSequenceNumber()).isNotNegative();
880+
assertThat(retrieved.seqNoPrimaryTerm.getPrimaryTerm()).isNotNull();
881+
assertThat(retrieved.seqNoPrimaryTerm.getPrimaryTerm()).isPositive();
882+
}
883+
884+
@Test // DATAES-799
885+
void multiGetShouldReturnSeqNoPrimaryTerm() {
886+
OptimisticEntity original = new OptimisticEntity();
887+
original.setMessage("It's fine");
888+
OptimisticEntity saved = template.save(original).block();
889+
890+
template.multiGet(multiGetQueryForOne(saved.getId()), OptimisticEntity.class, template.getIndexCoordinatesFor(OptimisticEntity.class))
891+
.as(StepVerifier::create)
892+
.assertNext(this::assertThatSeqNoPrimaryTermIsFilled)
893+
.verifyComplete();
894+
}
895+
896+
private Query multiGetQueryForOne(String id) {
897+
return new NativeSearchQueryBuilder().withIds(singletonList(id)).build();
898+
}
899+
900+
@Test // DATAES-799
901+
void searchShouldReturnSeqNoPrimaryTerm() {
902+
OptimisticEntity original = new OptimisticEntity();
903+
original.setMessage("It's fine");
904+
OptimisticEntity saved = template.save(original).block();
905+
906+
template.search(searchQueryForOne(saved.getId()), OptimisticEntity.class, template.getIndexCoordinatesFor(OptimisticEntity.class))
907+
.map(SearchHit::getContent)
908+
.as(StepVerifier::create)
909+
.assertNext(this::assertThatSeqNoPrimaryTermIsFilled)
910+
.verifyComplete();
911+
}
912+
913+
private Query searchQueryForOne(String id) {
914+
return new NativeSearchQueryBuilder()
915+
.withFilter(new IdsQueryBuilder().addIds(id))
916+
.build();
917+
}
918+
861919
@Test // DATAES-799
862920
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEntityWithSeqNoPrimaryTermProperty() {
863921
OptimisticEntity original = new OptimisticEntity();
@@ -876,6 +934,40 @@ void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEnt
876934
.expectError(OptimisticLockingFailureException.class)
877935
.verify();
878936
}
937+
938+
@Test // DATAES-799
939+
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnVersionedEntityWithSeqNoPrimaryTermProperty() {
940+
OptimisticAndVersionedEntity original = new OptimisticAndVersionedEntity();
941+
original.setMessage("It's fine");
942+
OptimisticAndVersionedEntity saved = template.save(original).block();
943+
944+
OptimisticAndVersionedEntity forEdit1 = template.get(saved.getId(), OptimisticAndVersionedEntity.class).block();
945+
OptimisticAndVersionedEntity forEdit2 = template.get(saved.getId(), OptimisticAndVersionedEntity.class).block();
946+
947+
forEdit1.setMessage("It'll be ok");
948+
template.save(forEdit1).block();
949+
950+
forEdit2.setMessage("It'll be great");
951+
template.save(forEdit2)
952+
.as(StepVerifier::create)
953+
.expectError(OptimisticLockingFailureException.class)
954+
.verify();
955+
}
956+
957+
@Test // DATAES-799
958+
void shouldAllowFullReplaceOfEntityWithBothSeqNoPrimaryTermAndVersion() {
959+
OptimisticAndVersionedEntity original = new OptimisticAndVersionedEntity();
960+
original.setMessage("It's fine");
961+
OptimisticAndVersionedEntity saved = template.save(original).block();
962+
963+
OptimisticAndVersionedEntity forEdit = template.get(saved.getId(), OptimisticAndVersionedEntity.class).block();
964+
965+
forEdit.setMessage("It'll be ok");
966+
template.save(forEdit)
967+
.as(StepVerifier::create)
968+
.expectNextCount(1)
969+
.verifyComplete();
970+
}
879971

880972
@Data
881973
@Document(indexName = "marvel")
@@ -952,10 +1044,19 @@ static class SampleEntity {
9521044
}
9531045

9541046
@Data
955-
@Document(indexName = DEFAULT_INDEX)
1047+
@Document(indexName = "test-index-reactive-optimistic-entity-template")
9561048
static class OptimisticEntity {
9571049
@Id private String id;
9581050
private String message;
9591051
private SeqNoPrimaryTerm seqNoPrimaryTerm;
9601052
}
1053+
1054+
@Data
1055+
@Document(indexName = "test-index-reactive-optimistic-and-versioned-entity-template")
1056+
static class OptimisticAndVersionedEntity {
1057+
@Id private String id;
1058+
private String message;
1059+
private SeqNoPrimaryTerm seqNoPrimaryTerm;
1060+
@Version private Long version;
1061+
}
9611062
}

0 commit comments

Comments
 (0)