Skip to content

Commit ec7414c

Browse files
authored
DATAES-817 - StreamQueries does only delete the last scrollid. (#449)
Original PR: #449
1 parent 3c9b0a7 commit ec7414c

File tree

8 files changed

+186
-71
lines changed

8 files changed

+186
-71
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@
3636
import java.net.InetSocketAddress;
3737
import java.nio.charset.StandardCharsets;
3838
import java.time.Duration;
39-
import java.util.ArrayList;
4039
import java.util.Collection;
41-
import java.util.Collections;
42-
import java.util.List;
4340
import java.util.Map.Entry;
4441
import java.util.Optional;
4542
import java.util.concurrent.TimeUnit;
@@ -93,7 +90,6 @@
9390
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
9491
import org.elasticsearch.rest.BytesRestResponse;
9592
import org.elasticsearch.rest.RestStatus;
96-
import org.elasticsearch.search.Scroll;
9793
import org.elasticsearch.search.SearchHit;
9894
import org.elasticsearch.search.SearchHits;
9995
import org.elasticsearch.search.aggregations.Aggregation;
@@ -105,6 +101,7 @@
105101
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
106102
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
107103
import org.springframework.data.elasticsearch.client.util.NamedXContents;
104+
import org.springframework.data.elasticsearch.client.util.ScrollState;
108105
import org.springframework.data.util.Lazy;
109106
import org.springframework.http.HttpHeaders;
110107
import org.springframework.http.HttpMethod;
@@ -115,7 +112,6 @@
115112
import org.springframework.util.Assert;
116113
import org.springframework.util.ObjectUtils;
117114
import org.springframework.util.ReflectionUtils;
118-
import org.springframework.util.StringUtils;
119115
import org.springframework.web.client.HttpClientErrorException;
120116
import org.springframework.web.client.HttpServerErrorException;
121117
import org.springframework.web.reactive.function.BodyExtractors;
@@ -835,8 +831,7 @@ private <T> Publisher<? extends T> handleClientError(String logId, Request reque
835831
.error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
836832
}
837833
return Mono.just(content);
838-
})
839-
.doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
834+
}).doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
840835
.flatMap(content -> doDecode(response, responseType, content));
841836
}
842837

@@ -893,42 +888,5 @@ public Collection<ElasticsearchHost> hosts() {
893888
}
894889
}
895890

896-
/**
897-
* Mutable state object holding scrollId to be used for {@link SearchScrollRequest#scroll(Scroll)}
898-
*
899-
* @author Christoph Strobl
900-
* @since 3.2
901-
*/
902-
private static class ScrollState {
903-
904-
private final Object lock = new Object();
905-
906-
private final List<String> pastIds = new ArrayList<>(1);
907-
@Nullable private String scrollId;
908-
909-
@Nullable
910-
String getScrollId() {
911-
return scrollId;
912-
}
913-
914-
List<String> getScrollIds() {
915-
916-
synchronized (lock) {
917-
return Collections.unmodifiableList(new ArrayList<>(pastIds));
918-
}
919-
}
920-
921-
void updateScrollId(String scrollId) {
922-
923-
if (StringUtils.hasText(scrollId)) {
924-
925-
synchronized (lock) {
926-
927-
this.scrollId = scrollId;
928-
pastIds.add(scrollId);
929-
}
930-
}
931-
}
932-
}
933891
// endregion
934892
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.client.util;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.LinkedHashSet;
21+
import java.util.List;
22+
import java.util.Set;
23+
24+
import org.elasticsearch.action.search.SearchScrollRequest;
25+
import org.elasticsearch.search.Scroll;
26+
import org.springframework.lang.Nullable;
27+
import org.springframework.util.StringUtils;
28+
29+
/**
30+
* Mutable state object holding scrollId to be used for {@link SearchScrollRequest#scroll(Scroll)}
31+
*
32+
* @author Christoph Strobl
33+
* @author Peter-Josef Meisch
34+
* @since 3.2
35+
*/
36+
public class ScrollState {
37+
38+
private final Object lock = new Object();
39+
40+
private final Set<String> pastIds = new LinkedHashSet<>();
41+
@Nullable private String scrollId;
42+
43+
public ScrollState() {}
44+
45+
public ScrollState(String scrollId) {
46+
updateScrollId(scrollId);
47+
}
48+
49+
@Nullable
50+
public String getScrollId() {
51+
return scrollId;
52+
}
53+
54+
public List<String> getScrollIds() {
55+
56+
synchronized (lock) {
57+
return Collections.unmodifiableList(new ArrayList<>(pastIds));
58+
}
59+
}
60+
61+
public void updateScrollId(String scrollId) {
62+
63+
if (StringUtils.hasText(scrollId)) {
64+
65+
synchronized (lock) {
66+
67+
this.scrollId = scrollId;
68+
pastIds.add(scrollId);
69+
}
70+
}
71+
}
72+
}

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.Arrays;
20+
import java.util.Collections;
2021
import java.util.HashMap;
2122
import java.util.Iterator;
2223
import java.util.List;
@@ -341,7 +342,14 @@ abstract protected <T> SearchScrollHits<T> searchScrollContinue(@Nullable String
341342
/*
342343
* internal use only, not for public API
343344
*/
344-
abstract protected void searchScrollClear(String scrollId);
345+
protected void searchScrollClear(String scrollId) {
346+
searchScrollClear(Collections.singletonList(scrollId));
347+
}
348+
349+
/*
350+
* internal use only, not for public API
351+
*/
352+
abstract protected void searchScrollClear(List<String> scrollIds);
345353

346354
abstract protected MultiSearchResponse.Item[] getMultiSearchResult(MultiSearchRequest request);
347355
// endregion

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,9 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
299299
}
300300

301301
@Override
302-
public void searchScrollClear(String scrollId) {
302+
public void searchScrollClear(List<String> scrollIds) {
303303
ClearScrollRequest request = new ClearScrollRequest();
304-
request.addScrollId(scrollId);
304+
request.scrollIds(scrollIds);
305305
execute(client -> client.clearScroll(request, RequestOptions.DEFAULT));
306306
}
307307

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,8 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
321321
}
322322

323323
@Override
324-
public void searchScrollClear(String scrollId) {
325-
client.prepareClearScroll().addScrollId(scrollId).execute().actionGet();
324+
public void searchScrollClear(List<String> scrollIds) {
325+
client.prepareClearScroll().setScrollIds(scrollIds).execute().actionGet();
326326
}
327327

328328
@Override

src/main/java/org/springframework/data/elasticsearch/core/StreamQueries.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616
package org.springframework.data.elasticsearch.core;
1717

1818
import java.util.Iterator;
19+
import java.util.List;
1920
import java.util.NoSuchElementException;
2021
import java.util.function.Consumer;
2122
import java.util.function.Function;
2223

2324
import org.elasticsearch.search.aggregations.Aggregations;
25+
import org.springframework.data.elasticsearch.client.util.ScrollState;
2426
import org.springframework.lang.Nullable;
2527
import org.springframework.util.Assert;
2628

@@ -38,12 +40,12 @@ abstract class StreamQueries {
3840
*
3941
* @param searchHits the initial hits
4042
* @param continueScrollFunction function to continue scrolling applies to the current scrollId.
41-
* @param clearScrollConsumer consumer to clear the scroll context by accepting the current scrollId.
43+
* @param clearScrollConsumer consumer to clear the scroll context by accepting the scrollIds to clear.
4244
* @param <T>
4345
* @return the {@link SearchHitsIterator}.
4446
*/
4547
static <T> SearchHitsIterator<T> streamResults(SearchScrollHits<T> searchHits,
46-
Function<String, SearchScrollHits<T>> continueScrollFunction, Consumer<String> clearScrollConsumer) {
48+
Function<String, SearchScrollHits<T>> continueScrollFunction, Consumer<List<String>> clearScrollConsumer) {
4749

4850
Assert.notNull(searchHits, "searchHits must not be null.");
4951
Assert.notNull(searchHits.getScrollId(), "scrollId of searchHits must not be null.");
@@ -59,17 +61,17 @@ static <T> SearchHitsIterator<T> streamResults(SearchScrollHits<T> searchHits,
5961

6062
// As we couldn't retrieve single result with scroll, store current hits.
6163
private volatile Iterator<SearchHit<T>> scrollHits = searchHits.iterator();
62-
private volatile String scrollId = searchHits.getScrollId();
6364
private volatile boolean continueScroll = scrollHits.hasNext();
65+
private volatile ScrollState scrollState = new ScrollState(searchHits.getScrollId());
6466

6567
@Override
6668
public void close() {
6769

6870
try {
69-
clearScrollConsumer.accept(scrollId);
71+
clearScrollConsumer.accept(scrollState.getScrollIds());
7072
} finally {
7173
scrollHits = null;
72-
scrollId = null;
74+
scrollState = null;
7375
}
7476
}
7577

@@ -102,9 +104,9 @@ public boolean hasNext() {
102104
}
103105

104106
if (!scrollHits.hasNext()) {
105-
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollId);
107+
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
106108
scrollHits = nextPage.iterator();
107-
scrollId = nextPage.getScrollId();
109+
scrollState.updateScrollId(nextPage.getScrollId());
108110
continueScroll = scrollHits.hasNext();
109111
}
110112

@@ -127,6 +129,5 @@ public void remove() {
127129
}
128130

129131
// utility constructor
130-
private StreamQueries() {
131-
}
132+
private StreamQueries() {}
132133
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.client.util;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import java.util.Arrays;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
/**
25+
* @author Peter-Josef Meisch
26+
*/
27+
class ScrollStateTest {
28+
29+
@Test // DATAES-817
30+
void shouldReturnLastSetScrollId() {
31+
ScrollState scrollState = new ScrollState();
32+
33+
scrollState.updateScrollId("id-1");
34+
scrollState.updateScrollId("id-2");
35+
36+
assertThat(scrollState.getScrollId()).isEqualTo("id-2");
37+
}
38+
39+
@Test
40+
void shouldReturnUniqueListOfUsedScrollIdsInCorrectOrder() {
41+
42+
ScrollState scrollState = new ScrollState();
43+
44+
scrollState.updateScrollId("id-1");
45+
scrollState.updateScrollId("id-2");
46+
scrollState.updateScrollId("id-1");
47+
scrollState.updateScrollId("id-3");
48+
scrollState.updateScrollId("id-2");
49+
50+
assertThat(scrollState.getScrollIds()).isEqualTo(Arrays.asList("id-1", "id-2", "id-3"));
51+
}
52+
}

0 commit comments

Comments
 (0)