webflux
2025/9/24大约 1 分钟约 364 字
使用场景
前端需要实时接收后端数据
ServerSentEvents 后端推送事件 ssemaven 依赖 springboot-webflux
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>发布订阅模式 publisher
@Component
@Slf4j
public class EventPublisher {
// 重播模式 支持设置缓存数量
// private final Sinks.Many<String> eventSink = Sinks.many().replay().limit(10);
// 多播模式 支持设置背压缓存数量
// private final Sinks.Many<String> eventSink = Sinks.many().multicast().onBackpressureBuffer();
// 重播模式,只保留最新的 最接近websocket的实时发送
private final Sinks.Many<String> eventSink = Sinks.many().replay().latest();
// 发布事件(账户ID + 消息)
public void push(String accountId, String message) {
eventSink.tryEmitNext(accountId + ":" + message);
}
// 获取事件流
public Flux<String> stream() {
return eventSink.asFlux();
}
// 模拟添加数据
@PostConstruct
public void init() {
push("123", "1hello world"+ UUID.randomUUID().toString());
push("123", "2hello world"+ UUID.randomUUID().toString());
push("123", "3hello world"+ UUID.randomUUID().toString());
CompletableFuture.runAsync(() -> {
int i = 5;
while (true){
push("123", "%s hello world".formatted(i++)+ UUID.randomUUID().toString());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}控制层代码
@RestController
@RequestMapping("/flux")
@Slf4j
@RequiredArgsConstructor
public class FluxController {
private final EventPublisher eventPublisher;
// 开启sse
@GetMapping(value = "/account/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream(@PathVariable String id) {
log.info("request flux/account user:{}", id);
var userId = id;
return eventPublisher.stream()
.filter(msg -> {
return msg.startsWith(userId + ":");
}).onBackpressureDrop()
.map(msg -> msg.substring(userId.length() + 1))
;
}
}前端vue部分
// utils/sse.js
export function createSSE(url, onMessage) {
let es = new EventSource(url);
let retryDelay = 1000;
// 策略模式处理数据 e.data
es.onmessage = onMessage;
es.onerror = () => {
es.close();
setTimeout(() => {
es = createSSE(url, onMessage); // 递归重连
retryDelay = Math.min(retryDelay * 2, 30000); // 上限30秒
}, retryDelay);
};
return {
close: () => es.close()
};
}<script>
import { createSSE } from '@/utils/sse';
export default {
mounted() {
this.sse = createSSE('/api/sse', (e) => {
this.data = JSON.parse(e.data);
});
},
beforeDestroy() {
this.sse?.close();
}
};
</script>
<template>
</template>