前言
在 Http
请求场景中, 有些 数据流
场景, 如 chatGPT
对话中的回复。
实现
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > cn.idea360</groupId > <artifactId > sse-demo</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > sse-demo</name > <description > sse-demo</description > <properties > <java.version > 1.8</java.version > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <project.reporting.outputEncoding > UTF-8</project.reporting.outputEncoding > <spring-boot.version > 2.6.13</spring-boot.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > com.squareup.okhttp3</groupId > <artifactId > okhttp-sse</artifactId > <version > 3.14.9</version > </dependency > </dependencies > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > ${spring-boot.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.1</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <version > ${spring-boot.version}</version > <configuration > <mainClass > cn.idea360.sse.SseDemoApplication</mainClass > <skip > true</skip > </configuration > <executions > <execution > <id > repackage</id > <goals > <goal > repackage</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
application.properties
1 2 3 4 server.port =8080 server.servlet.encoding.charset =UTF-8 server.servlet.encoding.force-response =true
web请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 package cn.idea360.sse.demos.web;import com.fasterxml.jackson.core.JsonProcessingException;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import okhttp3.*;import okhttp3.sse.EventSource;import okhttp3.sse.EventSourceListener;import okhttp3.sse.EventSources;import org.springframework.web.bind.annotation.CrossOrigin;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.PostConstruct;import java.io.IOException;import java.time.Duration;import java.util.Objects;import java.util.UUID;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.function.Consumer;@Slf4j @RestController public class BasicController { private final ExecutorService executor = Executors.newSingleThreadExecutor(); private OkHttpClient okHttpClient; @PostConstruct public void init () { Runtime.getRuntime().addShutdownHook(new Thread(() -> { executor.shutdown(); try { executor.awaitTermination(1 , TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e.toString()); } })); okHttpClient = defaultClient(UUID.randomUUID().toString().replace("-" , "" ), Duration.ofSeconds(60 )); } private OkHttpClient defaultClient (String token, Duration timeout) { return new OkHttpClient.Builder() .addInterceptor(new AuthenticationInterceptor(token)) .connectionPool(new ConnectionPool(5 , 1 , TimeUnit.SECONDS)) .connectTimeout(timeout) .readTimeout(timeout) .writeTimeout(timeout) .build(); } private static class AuthenticationInterceptor implements Interceptor { private final String token; public AuthenticationInterceptor (String token) { Objects.requireNonNull(token, "token required" ); this .token = token; } @Override public Response intercept (Chain chain) throws IOException { Request request = chain.request() .newBuilder() .header("Authorization" , "Bearer " + token) .build(); return chain.proceed(request); } } @GetMapping("/proxy") @CrossOrigin public SseEmitter proxy () throws JsonProcessingException { Request request = new Request.Builder().url("http://127.0.0.1:8080/sse" ).build(); EventSource.Factory factory = EventSources.createFactory(okHttpClient); SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); factory.newEventSource(request, new EventSourceListener() { private String completeMessage = "" ; @Setter private Consumer<String> onComplate = s -> log.info("completeMessage: {}" , s); @Override public void onOpen (EventSource eventSource, Response response) { log.info("onOpen" ); } @Override public void onEvent (EventSource eventSource, String id, String type, String data) { log.info("onEvent: {}" , data); if ("[DONE]" .equals(data)) { onComplate.accept(completeMessage); return ; } try { if (Objects.nonNull(data)) { completeMessage += data; emitter.send(data); } } catch (Exception e) { log.error("emitter send err" , e); emitter.completeWithError(e); } } @Override public void onClosed (EventSource eventSource) { log.info("onClosed" ); emitter.complete(); } @Override public void onFailure (EventSource eventSource, Throwable t, Response response) { log.info("onFailure" ); emitter.completeWithError(t); } }); return emitter; } @GetMapping("/sse") @CrossOrigin public SseEmitter streamDateTime () { SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE); sseEmitter.onCompletion(() -> log.info("SseEmitter is completed" )); sseEmitter.onTimeout(() -> log.info("SseEmitter is timed out" )); sseEmitter.onError(throwable -> log.info("SseEmitter got error:" , throwable)); executor.execute(() -> { for (int i = 0 ; i < 7 ; i++) { try { sseEmitter.send(i); Thread.sleep(1000 ); } catch (IOException | InterruptedException e) { log.error("sse err." , e); sseEmitter.completeWithError(e); } } try { sseEmitter.send("[DONE]" ); } catch (IOException e) { log.error("sse err." , e); sseEmitter.completeWithError(e); } sseEmitter.complete(); }); log.info("Controller exits" ); return sseEmitter; } }
注意 SseEmitter 本质上还是http请求, 在流输出完后记得执行 sseEmitter.complete()
断开连接。
简单应用场景
异步通知 : 例如提交异步任务后, 首先注册个监听回调. 后台在完成任务后通过 SseEmitter.send()
推送事件, 然后执行 sseEmitter.complete()
关闭本次回调连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j @RestController @RequestMapping("/events") @RequiredArgsConstructor public class EventController { public static final String MEMBER_ID_HEADER = "MemberId" ; private final EmitterService emitterService; private final NotificationService notificationService; @GetMapping public SseEmitter subscribeToEvents (@RequestHeader(name = MEMBER_ID_HEADER) String memberId) { log.debug("Subscribing member with id {}" , memberId); return emitterService.createEmitter(memberId); } @PostMapping @ResponseStatus(HttpStatus.ACCEPTED) public void publishEvent (@RequestHeader(name = MEMBER_ID_HEADER) String memberId, @RequestBody EventDto event) { log.debug("Publishing event {} for member with id {}" , event, memberId); notificationService.sendNotification(memberId, event); } }