Skip to content

Commit bdb6348

Browse files
committed
Common DataWithMediaType class and common synchronization for ResponseBodyEmitter/SseEmitter
Issue: SPR-13223 Issue: SPR-13224
1 parent 1fcd465 commit bdb6348

File tree

3 files changed

+124
-107
lines changed

3 files changed

+124
-107
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java

Lines changed: 83 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

1919
import java.io.IOException;
20-
import java.util.LinkedHashMap;
21-
import java.util.Map;
20+
import java.util.LinkedHashSet;
21+
import java.util.Set;
2222

2323
import org.springframework.http.MediaType;
2424
import org.springframework.http.server.ServerHttpResponse;
@@ -54,18 +54,18 @@
5454
* </pre>
5555
*
5656
* @author Rossen Stoyanchev
57+
* @author Juergen Hoeller
5758
* @since 4.2
5859
*/
5960
public class ResponseBodyEmitter {
6061

6162
private final Long timeout;
6263

63-
private volatile Handler handler;
64+
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<DataWithMediaType>(8);
6465

65-
/* Cache for objects sent before handler is set. */
66-
private final Map<Object, MediaType> initHandlerCache = new LinkedHashMap<Object, MediaType>(10);
66+
private Handler handler;
6767

68-
private volatile boolean complete;
68+
private boolean complete;
6969

7070
private Throwable failure;
7171

@@ -110,32 +110,29 @@ public Long getTimeout() {
110110
protected void extendResponse(ServerHttpResponse outputMessage) {
111111
}
112112

113-
void initialize(Handler handler) throws IOException {
114-
synchronized (this) {
115-
this.handler = handler;
116-
for (Map.Entry<Object, MediaType> entry : this.initHandlerCache.entrySet()) {
117-
try {
118-
sendInternal(entry.getKey(), entry.getValue());
119-
}
120-
catch (Throwable ex) {
121-
return;
122-
}
123-
}
124-
if (this.complete) {
125-
if (this.failure != null) {
126-
this.handler.completeWithError(this.failure);
127-
}
128-
else {
129-
this.handler.complete();
130-
}
131-
}
132-
if (this.timeoutCallback != null) {
133-
this.handler.onTimeout(this.timeoutCallback);
113+
synchronized void initialize(Handler handler) throws IOException {
114+
this.handler = handler;
115+
116+
for (DataWithMediaType sendAttempt : this.earlySendAttempts) {
117+
sendInternal(sendAttempt.getData(), sendAttempt.getMediaType());
118+
}
119+
this.earlySendAttempts.clear();
120+
121+
if (this.complete) {
122+
if (this.failure != null) {
123+
this.handler.completeWithError(this.failure);
134124
}
135-
if (this.completionCallback != null) {
136-
this.handler.onCompletion(this.completionCallback);
125+
else {
126+
this.handler.complete();
137127
}
138128
}
129+
130+
if (this.timeoutCallback != null) {
131+
this.handler.onTimeout(this.timeoutCallback);
132+
}
133+
if (this.completionCallback != null) {
134+
this.handler.onCompletion(this.completionCallback);
135+
}
139136
}
140137

141138
/**
@@ -159,33 +156,29 @@ public void send(Object object) throws IOException {
159156
* @throws IOException raised when an I/O error occurs
160157
* @throws java.lang.IllegalStateException wraps any other errors
161158
*/
162-
public void send(Object object, MediaType mediaType) throws IOException {
159+
public synchronized void send(Object object, MediaType mediaType) throws IOException {
163160
Assert.state(!this.complete, "ResponseBodyEmitter is already set complete");
164161
sendInternal(object, mediaType);
165162
}
166163

167164
private void sendInternal(Object object, MediaType mediaType) throws IOException {
168-
if (object == null) {
169-
return;
170-
}
171-
if (this.handler == null) {
172-
synchronized (this) {
173-
if (this.handler == null) {
174-
this.initHandlerCache.put(object, mediaType);
175-
return;
165+
if (object != null) {
166+
if (this.handler != null) {
167+
try {
168+
this.handler.send(object, mediaType);
169+
}
170+
catch (IOException ex) {
171+
this.handler.completeWithError(ex);
172+
throw ex;
173+
}
174+
catch (Throwable ex) {
175+
this.handler.completeWithError(ex);
176+
throw new IllegalStateException("Failed to send " + object, ex);
176177
}
177178
}
178-
}
179-
try {
180-
this.handler.send(object, mediaType);
181-
}
182-
catch (IOException ex){
183-
this.handler.completeWithError(ex);
184-
throw ex;
185-
}
186-
catch (Throwable ex){
187-
this.handler.completeWithError(ex);
188-
throw new IllegalStateException("Failed to send " + object, ex);
179+
else {
180+
this.earlySendAttempts.add(new DataWithMediaType(object, mediaType));
181+
}
189182
}
190183
}
191184

@@ -194,12 +187,10 @@ private void sendInternal(Object object, MediaType mediaType) throws IOException
194187
* <p>A dispatch is made into the app server where Spring MVC completes
195188
* asynchronous request processing.
196189
*/
197-
public void complete() {
198-
synchronized (this) {
199-
this.complete = true;
200-
if (this.handler != null) {
201-
this.handler.complete();
202-
}
190+
public synchronized void complete() {
191+
this.complete = true;
192+
if (this.handler != null) {
193+
this.handler.complete();
203194
}
204195
}
205196

@@ -208,26 +199,22 @@ public void complete() {
208199
* <p>A dispatch is made into the app server where Spring MVC will pass the
209200
* exception through its exception handling mechanism.
210201
*/
211-
public void completeWithError(Throwable ex) {
212-
synchronized (this) {
213-
this.complete = true;
214-
this.failure = ex;
215-
if (this.handler != null) {
216-
this.handler.completeWithError(ex);
217-
}
202+
public synchronized void completeWithError(Throwable ex) {
203+
this.complete = true;
204+
this.failure = ex;
205+
if (this.handler != null) {
206+
this.handler.completeWithError(ex);
218207
}
219208
}
220209

221210
/**
222211
* Register code to invoke when the async request times out. This method is
223212
* called from a container thread when an async request times out.
224213
*/
225-
public void onTimeout(Runnable callback) {
226-
synchronized (this) {
227-
this.timeoutCallback = callback;
228-
if (this.handler != null) {
229-
this.handler.onTimeout(callback);
230-
}
214+
public synchronized void onTimeout(Runnable callback) {
215+
this.timeoutCallback = callback;
216+
if (this.handler != null) {
217+
this.handler.onTimeout(callback);
231218
}
232219
}
233220

@@ -237,12 +224,10 @@ public void onTimeout(Runnable callback) {
237224
* reason including timeout and network error. This method is useful for
238225
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
239226
*/
240-
public void onCompletion(Runnable callback) {
241-
synchronized (this) {
242-
this.completionCallback = callback;
243-
if (this.handler != null) {
244-
this.handler.onCompletion(callback);
245-
}
227+
public synchronized void onCompletion(Runnable callback) {
228+
this.completionCallback = callback;
229+
if (this.handler != null) {
230+
this.handler.onCompletion(callback);
246231
}
247232
}
248233

@@ -263,4 +248,28 @@ interface Handler {
263248
void onCompletion(Runnable callback);
264249
}
265250

251+
252+
/**
253+
* Simple struct for a data entry.
254+
*/
255+
static class DataWithMediaType {
256+
257+
private final Object data;
258+
259+
private final MediaType mediaType;
260+
261+
public DataWithMediaType(Object data, MediaType mediaType) {
262+
this.data = data;
263+
this.mediaType = mediaType;
264+
}
265+
266+
public Object getData() {
267+
return this.data;
268+
}
269+
270+
public MediaType getMediaType() {
271+
return this.mediaType;
272+
}
273+
}
274+
266275
}

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import java.io.IOException;
2020
import java.nio.charset.Charset;
2121
import java.util.Collections;
22-
import java.util.LinkedHashMap;
23-
import java.util.Map;
22+
import java.util.LinkedHashSet;
23+
import java.util.Set;
2424

2525
import org.springframework.http.HttpHeaders;
2626
import org.springframework.http.MediaType;
@@ -31,6 +31,7 @@
3131
* <a href="http://www.w3.org/TR/eventsource/">Server-Sent Events</a>.
3232
*
3333
* @author Rossen Stoyanchev
34+
* @author Juergen Hoeller
3435
* @since 4.2
3536
*/
3637
public class SseEmitter extends ResponseBodyEmitter {
@@ -41,6 +42,7 @@ public class SseEmitter extends ResponseBodyEmitter {
4142
@Override
4243
protected void extendResponse(ServerHttpResponse outputMessage) {
4344
super.extendResponse(outputMessage);
45+
4446
HttpHeaders headers = outputMessage.getHeaders();
4547
if (headers.getContentType() == null) {
4648
headers.setContentType(new MediaType("text", "event-stream"));
@@ -75,14 +77,12 @@ public void send(Object object) throws IOException {
7577
* @param object the object to write
7678
* @param mediaType a MediaType hint for selecting an HttpMessageConverter
7779
* @throws IOException raised when an I/O error occurs
78-
* @throws java.lang.IllegalStateException wraps any other errors
7980
*/
8081
@Override
8182
public void send(Object object, MediaType mediaType) throws IOException {
82-
if (object == null) {
83-
return;
83+
if (object != null) {
84+
send(event().data(object, mediaType));
8485
}
85-
send(event().data(object, mediaType));
8686
}
8787

8888
/**
@@ -95,18 +95,19 @@ public void send(Object object, MediaType mediaType) throws IOException {
9595
* </pre>
9696
* @param builder a builder for an SSE formatted event.
9797
* @throws IOException raised when an I/O error occurs
98-
* @throws java.lang.IllegalStateException wraps any other errors
9998
*/
10099
public void send(SseEventBuilder builder) throws IOException {
101-
Map<Object, MediaType> map = builder.build();
102-
for (Map.Entry<Object, MediaType> entry : map.entrySet()) {
103-
super.send(entry.getKey(), entry.getValue());
100+
Set<DataWithMediaType> dataToSend = ((SseEventBuilderImpl) builder).build();
101+
synchronized (this) {
102+
for (DataWithMediaType entry : dataToSend) {
103+
super.send(entry.getData(), entry.getMediaType());
104+
}
104105
}
105106
}
106107

107108

108109
public static SseEventBuilder event() {
109-
return new DefaultSseEventBuilder();
110+
return new SseEventBuilderImpl();
110111
}
111112

112113

@@ -144,22 +145,15 @@ public interface SseEventBuilder {
144145
* Add an SSE "data" line.
145146
*/
146147
SseEventBuilder data(Object object, MediaType mediaType);
147-
148-
/**
149-
* Return a map with objects that represent the data to be written to
150-
* the response as well as the required SSE text formatting that
151-
* surrounds it.
152-
*/
153-
Map<Object, MediaType> build();
154148
}
155149

156150

157151
/**
158152
* Default implementation of SseEventBuilder.
159153
*/
160-
private static class DefaultSseEventBuilder implements SseEventBuilder {
154+
private static class SseEventBuilderImpl implements SseEventBuilder {
161155

162-
private final Map<Object, MediaType> map = new LinkedHashMap<Object, MediaType>(4);
156+
private final Set<DataWithMediaType> dataToSend = new LinkedHashSet<DataWithMediaType>(4);
163157

164158
private StringBuilder sb;
165159

@@ -196,34 +190,33 @@ public SseEventBuilder data(Object object) {
196190
public SseEventBuilder data(Object object, MediaType mediaType) {
197191
append("data:");
198192
saveAppendedText();
199-
this.map.put(object, mediaType);
193+
this.dataToSend.add(new DataWithMediaType(object, mediaType));
200194
append("\n");
201195
return this;
202196
}
203197

204-
DefaultSseEventBuilder append(String text) {
198+
SseEventBuilderImpl append(String text) {
205199
if (this.sb == null) {
206200
this.sb = new StringBuilder();
207201
}
208202
this.sb.append(text);
209203
return this;
210204
}
211205

212-
private void saveAppendedText() {
213-
if (this.sb != null) {
214-
this.map.put(this.sb.toString(), TEXT_PLAIN);
215-
this.sb = null;
206+
Set<DataWithMediaType> build() {
207+
if ((this.sb == null || this.sb.length() == 0) && this.dataToSend.isEmpty()) {
208+
return Collections.<DataWithMediaType>emptySet();
216209
}
210+
append("\n");
211+
saveAppendedText();
212+
return this.dataToSend;
217213
}
218214

219-
@Override
220-
public Map<Object, MediaType> build() {
221-
if (this.sb == null || this.sb.length() == 0 && this.map.isEmpty()) {
222-
return Collections.<Object, MediaType>emptyMap();
215+
private void saveAppendedText() {
216+
if (this.sb != null) {
217+
this.dataToSend.add(new DataWithMediaType(this.sb.toString(), TEXT_PLAIN));
218+
this.sb = null;
223219
}
224-
append("\n");
225-
saveAppendedText();
226-
return this.map;
227220
}
228221
}
229222

0 commit comments

Comments
 (0)