Skip to content

DATAES-799 - Support optimistic locking for full update scenario using seq_no + primary_term #441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
092e79b
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 25, 2020
72d6177
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 25, 2020
d45997e
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 25, 2020
c209fb4
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 25, 2020
7ab32c4
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
7742d8c
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
a4be9df
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
a8643c4
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
e38f941
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
32c1af1
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
f330010
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 26, 2020
73b2132
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 27, 2020
dbd5e96
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 27, 2020
58fae7e
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 27, 2020
672ba9c
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 27, 2020
bcf64b0
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 27, 2020
5cb14d5
Make logger field uppercase
rpuch Apr 28, 2020
99a6817
Clarify the warning message
rpuch Apr 28, 2020
3df1529
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 28, 2020
5e030ea
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 28, 2020
bddf1d3
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 28, 2020
282f3fe
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 28, 2020
4b1d779
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 28, 2020
d49e307
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 28, 2020
2fbca4f
DATAES-799 - Support optimistic locking for full update scenario usin…
rpuch Apr 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.util.CloseableIterator;
Expand Down Expand Up @@ -438,18 +439,40 @@ private Long getEntityVersion(Object entity) {
return null;
}

@Nullable
private SeqNoPrimaryTerm getEntitySeqNoPrimaryTerm(Object entity) {
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
ElasticsearchPersistentProperty property = persistentEntity.getSeqNoPrimaryTermProperty();

if (property != null) {
Object seqNoPrimaryTerm = persistentEntity.getPropertyAccessor(entity).getProperty(property);

if (seqNoPrimaryTerm != null && SeqNoPrimaryTerm.class.isAssignableFrom(seqNoPrimaryTerm.getClass())) {
return (SeqNoPrimaryTerm) seqNoPrimaryTerm;
}
}

return null;
}

private <T> IndexQuery getIndexQuery(T entity) {
String id = getEntityId(entity);

if (id != null) {
id = elasticsearchConverter.convertId(id);
}

return new IndexQueryBuilder() //
IndexQueryBuilder builder = new IndexQueryBuilder() //
.withId(id) //
.withVersion(getEntityVersion(entity)) //
.withObject(entity) //
.build();
.withObject(entity);
SeqNoPrimaryTerm seqNoPrimaryTerm = getEntitySeqNoPrimaryTerm(entity);
if (seqNoPrimaryTerm != null) {
builder.withSeqNoPrimaryTerm(seqNoPrimaryTerm);
} else {
// version cannot be used together with seq_no and primary_term
builder.withVersion(getEntityVersion(entity));
}
return builder.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
* @param clazz the type of the object to be returned
* @param index the index from which the object is read.
* @return the found object
* @deprecated since 4.0, use {@link #getById(String, Class, IndexCoordinates)}
* @deprecated since 4.0, use {@link #get(String, Class, IndexCoordinates)}
*/
@Deprecated
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.rest.RestStatus;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
Expand All @@ -35,6 +38,7 @@
/**
* @author Christoph Strobl
* @author Peter-Josef Meisch
* @author Roman Puchkovskiy
* @since 3.2
*/
public class ElasticsearchExceptionTranslator implements PersistenceExceptionTranslator {
Expand All @@ -50,6 +54,12 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return new NoSuchIndexException(ObjectUtils.nullSafeToString(elasticsearchException.getMetadata("es.index")),
ex);
}

if (isSeqNoConflict(elasticsearchException)) {
return new OptimisticLockingFailureException("Cannot index a document due to seq_no+primary_term conflict",
elasticsearchException);
}

return new UncategorizedElasticsearchException(ex.getMessage(), ex);
}

Expand All @@ -65,6 +75,25 @@ public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return null;
}

private boolean isSeqNoConflict(ElasticsearchException exception) {

if (exception instanceof ElasticsearchStatusException) {
ElasticsearchStatusException statusException = (ElasticsearchStatusException) exception;
return statusException.status() == RestStatus.CONFLICT
&& statusException.getMessage() != null
&& statusException.getMessage().contains("type=version_conflict_engine_exception")
&& statusException.getMessage().contains("version conflict, required seqNo");
}

if (exception instanceof VersionConflictEngineException) {
VersionConflictEngineException versionConflictEngineException = (VersionConflictEngineException) exception;
return versionConflictEngineException.getMessage() != null
&& versionConflictEngineException.getMessage().contains("version conflict, required seqNo");
}

return false;
}

private boolean indexAvailable(ElasticsearchException ex) {

List<String> metadata = ex.getMetadata("es.index_uuid");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
private Client client;
@Nullable private String searchTimeout;

private final ElasticsearchExceptionTranslator exceptionTranslator = new ElasticsearchExceptionTranslator();

// region Initialization
public ElasticsearchTemplate(Client client) {
this.client = client;
Expand Down Expand Up @@ -145,7 +148,14 @@ public String index(IndexQuery query, IndexCoordinates index) {
maybeCallbackBeforeConvertWithQuery(query, index);

IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index);
String documentId = indexRequestBuilder.execute().actionGet().getId();
ActionFuture<IndexResponse> future = indexRequestBuilder.execute();
IndexResponse response;
try {
response = future.actionGet();
} catch (RuntimeException e) {
throw translateException(e);
}
String documentId = response.getId();

// We should call this because we are not going through a mapper.
Object queryObject = query.getObject();
Expand Down Expand Up @@ -360,4 +370,22 @@ public Client getClient() {
return client;
}
// endregion

/**
* translates an Exception if possible. Exceptions that are no {@link RuntimeException}s are wrapped in a
* RuntimeException
*
* @param exception the Exception to map
* @return the potentially translated RuntimeException.
* @since 4.0
*/
private RuntimeException translateException(Exception exception) {

RuntimeException runtimeException = exception instanceof RuntimeException ? (RuntimeException) exception
: new RuntimeException(exception.getMessage(), exception);
RuntimeException potentiallyTranslatedException = exceptionTranslator
.translateExceptionIfPossible(runtimeException);

return potentiallyTranslatedException != null ? potentiallyTranslatedException : runtimeException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.context.MappingContext;
Expand All @@ -35,6 +36,7 @@
* @author Mark Paluch
* @author Christoph Strobl
* @author Peter-Josef Meisch
* @author Roman Puchkovskiy
* @since 3.2
*/
class EntityOperations {
Expand Down Expand Up @@ -256,6 +258,21 @@ interface AdaptibleEntity<T> extends Entity<T> {
@Override
@Nullable
Number getVersion();

/**
* Returns whether there is a property with type SeqNoPrimaryTerm in this entity.
*
* @return true if there is SeqNoPrimaryTerm property
* @since 4.0
*/
boolean hasSeqNoPrimaryTerm();

/**
* Returns SeqNoPropertyTerm for this entity.
*
* @return SeqNoPrimaryTerm, may be {@literal null}
*/
@Nullable SeqNoPrimaryTerm getSeqNoPrimaryTerm();
}

/**
Expand Down Expand Up @@ -333,6 +350,16 @@ public Number getVersion() {
return null;
}

@Override
public boolean hasSeqNoPrimaryTerm() {
return false;
}

@Override
public SeqNoPrimaryTerm getSeqNoPrimaryTerm() {
return null;
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
Expand Down Expand Up @@ -588,6 +615,19 @@ public Number getVersion() {
return propertyAccessor.getProperty(versionProperty, Number.class);
}

@Override
public boolean hasSeqNoPrimaryTerm() {
return entity.hasSeqNoPrimaryTermProperty();
}

@Override
public SeqNoPrimaryTerm getSeqNoPrimaryTerm() {

ElasticsearchPersistentProperty seqNoPrimaryTermProperty = entity.getRequiredSeqNoPrimaryTermProperty();

return propertyAccessor.getProperty(seqNoPrimaryTermProperty, SeqNoPrimaryTerm.class);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.support.VersionInfo;
Expand Down Expand Up @@ -347,7 +348,19 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In

request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);

if (entity.isVersionedEntity()) {
boolean usingSeqNo = false;
if (entity.hasSeqNoPrimaryTerm()) {
SeqNoPrimaryTerm seqNoPrimaryTerm = entity.getSeqNoPrimaryTerm();

if (seqNoPrimaryTerm != null) {
request.setIfSeqNo(seqNoPrimaryTerm.getSequenceNumber());
request.setIfPrimaryTerm(seqNoPrimaryTerm.getPrimaryTerm());
usingSeqNo = true;
}
}

// seq_no and version are incompatible in the same request
if (!usingSeqNo && entity.isVersionedEntity()) {

Number version = entity.getVersion();

Expand All @@ -356,6 +369,7 @@ private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, In
request.versionType(EXTERNAL);
}
}

return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
*
* @author Peter-Josef Meisch
* @author Sascha Woo
* @author Roman Puchkovskiy
* @since 4.0
*/
class RequestFactory {
Expand Down Expand Up @@ -342,6 +343,13 @@ public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
indexRequest.versionType(versionType);
}

if (query.getSeqNo() != null) {
indexRequest.setIfSeqNo(query.getSeqNo());
}
if (query.getPrimaryTerm() != null) {
indexRequest.setIfPrimaryTerm(query.getPrimaryTerm());
}

return indexRequest;
}

Expand Down Expand Up @@ -374,6 +382,13 @@ public IndexRequestBuilder indexRequestBuilder(Client client, IndexQuery query,
indexRequestBuilder.setVersionType(versionType);
}

if (query.getSeqNo() != null) {
indexRequestBuilder.setIfSeqNo(query.getSeqNo());
}
if (query.getPrimaryTerm() != null) {
indexRequestBuilder.setIfPrimaryTerm(query.getPrimaryTerm());
}

return indexRequestBuilder;
}

Expand Down Expand Up @@ -618,6 +633,9 @@ private SearchRequest prepareSearchRequest(Query query, @Nullable Class<?> clazz
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.version(true);
sourceBuilder.trackScores(query.getTrackScores());
if (hasSeqNoPrimaryTermProperty(clazz)) {
sourceBuilder.seqNoAndPrimaryTerm(true);
}

if (query.getSourceFilter() != null) {
SourceFilter sourceFilter = query.getSourceFilter();
Expand Down Expand Up @@ -681,7 +699,20 @@ private SearchRequest prepareSearchRequest(Query query, @Nullable Class<?> clazz
return request;
}

@SuppressWarnings("unchecked")
private boolean hasSeqNoPrimaryTermProperty(@Nullable Class<?> entityClass) {

if (entityClass == null) {
return false;
}

if (!elasticsearchConverter.getMappingContext().hasPersistentEntityFor(entityClass)) {
return false;
}

ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext().getRequiredPersistentEntity(entityClass);
return entity.hasSeqNoPrimaryTermProperty();
}

public PutMappingRequest putMappingRequest(IndexCoordinates index, Document mapping) {
PutMappingRequest request = new PutMappingRequest(index.getIndexName());
request.source(mapping);
Expand Down Expand Up @@ -784,6 +815,9 @@ private SearchRequestBuilder prepareSearchRequestBuilder(Query query, Client cli
.setSearchType(query.getSearchType()) //
.setVersion(true) //
.setTrackScores(query.getTrackScores());
if (hasSeqNoPrimaryTermProperty(clazz)) {
searchRequestBuilder.seqNoAndPrimaryTerm(true);
}

if (query.getSourceFilter() != null) {
SourceFilter sourceFilter = query.getSourceFilter();
Expand Down
Loading