SSE 协议与 Android 流式对话接入实践
Server-Sent Events 是什么、和 WebSocket 怎么选,以及用 OkHttp + Coroutines + Compose 在 Android 端实现 LLM 流式对话的完整落地路径。
为什么写这篇
接入大模型对话能力时,产品侧几乎都会要求 「打字机效果」——用户发一句,回答不是等整段 JSON 回来再展示,而是 token 逐字流出。后端若走 OpenAI 兼容接口,响应头里常见:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive这就是 SSE(Server-Sent Events)。它和 WebSocket 一样属于「长连接推送」,但语义更简单:服务端单向推、客户端用普通 HTTP POST 发消息。对「用户问一句、模型答一长段」的聊天场景,往往比 WebSocket 更省事。
做 Android 落地时,常见问题不是「懂不懂 SSE」,而是:
- OkHttp / Retrofit 默认按 完整 Response Body 解析,流式要另写
- 解析
data:行、处理event:/id:/ 空行分隔时容易踩坑 - 流式文本如何安全地喂给 ViewModel → Compose,并处理取消、重连、生命周期
这篇按 协议 → 选型 → Android 接入 → 对话模型实现 的顺序写,代码为可运行的骨架,可按项目替换为自家 BFF 或 OpenAI 兼容网关。
被问到时,我一般会先答这一句
SSE = 基于 HTTP 的服务端单向推送;客户端发请求用普通 POST,收回答用长连接读 text/event-stream 事件流。
| 维度 | 普通 HTTP | SSE | WebSocket |
|---|---|---|---|
| 方向 | 请求-响应,一问一答 | 服务端 → 客户端 单向推 | 双向 全双工 |
| 协议 | HTTP/1.1 或 HTTP/2 | HTTP,MIME text/event-stream | 先 HTTP Upgrade,再 WS 帧 |
| 客户端发送 | 每次新请求 | 通常另开 POST 发用户消息 | 同一连接随时发 |
| 断线重连 | 无标准 | 规范支持 Last-Event-ID | 需自建心跳与重连 |
| 代理 / 防火墙 | 友好 | 友好(仍是 HTTP) | 部分环境 Upgrade 被拦 |
| 典型场景 | CRUD | LLM 流式输出、进度条、日志 tail | IM、行情、协作编辑 |
更完整的 HTTP / WebSocket 分层对照见 网络协议面试笔记。
一、SSE 协议在说什么
SSE 由 HTML5 规范定义(WHATWG Server-sent events),核心约定:
- 响应
Content-Type为text/event-stream; charset=utf-8 - 连接保持打开,服务端持续写入 事件(event)
- 每个事件由若干行字段组成,空行 表示一个事件结束
- 客户端(浏览器用
EventSource;Android 无系统 API,需自己读流)按行解析
1.1 线上常见的事件格式
OpenAI 兼容流式接口通常长这样:
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}
data: [DONE]
规范里还可带:
| 字段 | 含义 |
|---|---|
data: | 有效载荷,可多行(会拼成一行) |
event: | 事件类型,默认 message |
id: | 事件 ID,断线重连时客户端可带 Last-Event-ID |
retry: | 建议重连间隔(毫秒) |
: 开头 | 注释行,用于保活(heartbeat) |
1.2 和 HTTP chunked 的关系
SSE 往往配合 Transfer-Encoding: chunked 使用:服务端不必等整段 body 生成完才返回,而是边生成边 flush。Android 端要用 流式读 ResponseBody.source(),不能 body.string() 一把梭——那会阻塞到连接关闭。
1.3 何时选 SSE,何时选 WebSocket
倾向 SSE:
- 主要是 服务端推文本流(LLM token、日志、任务进度)
- 希望复用现有 HTTP 网关、鉴权(Bearer Token)、CDN 边缘策略
- 客户端发消息频率低,用 REST POST 即可
倾向 WebSocket:
- 高频双向(聊天室、游戏、协同编辑)
- 需要二进制帧或自定义子协议
- 同一连接上要 multiplex 多种消息类型且延迟极敏感
LLM 对话在多数产品里:用户消息 POST + 回答 SSE,是性价比最高的组合。
二、对话 API 的两种常见形态
落地前先和后端对齐契约,Android 实现会差很多。
形态 A:单 POST 返回 SSE(最常见)
POST /v1/chat/completions HTTP/1.1
Authorization: Bearer <token>
Content-Type: application/json
{"model":"gpt-4o","stream":true,"messages":[{"role":"user","content":"你好"}]}响应体即为 SSE 流,直到 [DONE] 或连接关闭。
形态 B:POST 建会话 + GET 订阅流
POST /v1/chat/sessions → 返回 sessionId
GET /v1/chat/sessions/{id}/stream → SSE 订阅
POST /v1/chat/sessions/{id}/messages → 发用户消息适合多端同步、服务端排队推送。Android 侧多一个 订阅 Job 的生命周期管理。
下文以 形态 A 为例,形态 B 只需把 GET 长连接单独抽成 SseClient.subscribe()。
三、Android 端技术选型
| 组件 | 推荐 | 说明 |
|---|---|---|
| HTTP 客户端 | OkHttp 4.x | 流式读 body、取消 Call、与 Retrofit 共存 |
| 异步 | Kotlin Coroutines + Flow | 见 Kotlin Coroutine |
| JSON | kotlinx.serialization 或 Moshi | 按 delta.content 解析增量 |
| UI 状态 | MVVM + StateFlow | 见 MVVM、SSOT |
| UI | Jetpack Compose | LazyColumn + 自动滚到底 |
Retrofit 可以发 POST,但流式消费建议直接用 OkHttp,避免 Converter 把 Body 缓冲完。
四、分层设计:对话模型长什么样
┌─────────────────────────────────────────────────────────┐
│ ChatScreen (Compose) │
│ 输入框 / 消息列表 / 发送按钮 / 取消生成 │
└───────────────────────────┬─────────────────────────────┘
│ collect StateFlow
┌───────────────────────────▼─────────────────────────────┐
│ ChatViewModel │
│ ChatUiState (messages, input, isStreaming, error) │
│ sendMessage() / stopGeneration() │
└───────────────────────────┬─────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────┐
│ ChatRepository │
│ streamCompletion(prompt): Flow<ChatStreamEvent> │
└───────────────────────────┬─────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────┐
│ SseChatDataSource (OkHttp) │
│ 建立 POST → 逐行解析 data: → 发射 delta │
└─────────────────────────────────────────────────────────┘SSOT 原则:消息列表只在 ViewModel 里维护一份 List<ChatMessage>;SSE 每来一个 delta,更新最后一条 assistant 消息的内容,而不是 Compose 里拼字符串。
五、核心代码:SSE 解析与 Flow
5.1 数据模型
data class ChatMessage(
val id: String,
val role: Role,
val content: String,
val isStreaming: Boolean = false,
)
enum class Role { User, Assistant }
sealed interface ChatStreamEvent {
data class Delta(val text: String) : ChatStreamEvent
data object Done : ChatStreamEvent
data class Failed(val error: Throwable) : ChatStreamEvent
}5.2 OkHttp 流式读取 + 行解析
class SseChatDataSource(
private val okHttpClient: OkHttpClient,
private val baseUrl: String,
private val json: Json,
) {
fun streamChat(
messages: List<ChatMessage>,
model: String = "gpt-4o-mini",
): Flow<ChatStreamEvent> = callbackFlow {
val bodyJson = buildRequestJson(messages, model)
val request = Request.Builder()
.url("$baseUrl/v1/chat/completions")
.post(bodyJson.toRequestBody("application/json".toMediaType()))
.header("Accept", "text/event-stream")
.header("Authorization", "Bearer ${BuildConfig.LLM_API_KEY}")
.build()
val call = okHttpClient.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
trySend(ChatStreamEvent.Failed(e))
close()
}
override fun onResponse(call: Call, response: Response) {
response.use { resp ->
if (!resp.isSuccessful) {
trySend(ChatStreamEvent.Failed(HttpException(resp)))
close()
return
}
val source = resp.body?.source() ?: run {
trySend(ChatStreamEvent.Failed(IllegalStateException("empty body")))
close()
return
}
parseSse(source) { event ->
when {
event == "[DONE]" -> trySend(ChatStreamEvent.Done)
event.isNotEmpty() -> parseDelta(event)?.let {
trySend(ChatStreamEvent.Delta(it))
}
}
}
}
close()
}
})
awaitClose { call.cancel() }
}
private fun parseSse(
source: BufferedSource,
onData: (String) -> Unit,
) {
val dataLines = StringBuilder()
while (!source.exhausted()) {
val line = source.readUtf8Line() ?: break
when {
line.startsWith("data:") -> {
val payload = line.removePrefix("data:").trimStart()
if (dataLines.isNotEmpty()) dataLines.append('\n')
dataLines.append(payload)
}
line.isEmpty() -> {
if (dataLines.isNotEmpty()) {
onData(dataLines.toString())
dataLines.clear()
}
}
line.startsWith(":") -> { /* heartbeat,忽略 */ }
}
}
}
private fun parseDelta(jsonLine: String): String? {
return runCatching {
val element = json.parseToJsonElement(jsonLine)
element.jsonObject["choices"]
?.jsonArray?.firstOrNull()
?.jsonObject?.get("delta")
?.jsonObject?.get("content")
?.jsonPrimitive?.content
}.getOrNull()
}
}要点:
callbackFlow+awaitClose { call.cancel() }:用户点「停止生成」或离开页面时取消网络- 按行读,空行 flush 一个事件——和规范一致
- 不要用
readUtf8()读整个 body
5.3 Repository
class ChatRepository(private val remote: SseChatDataSource) {
fun streamReply(history: List<ChatMessage>): Flow<ChatStreamEvent> =
remote.streamChat(history)
}六、ViewModel:把流式事件变成 UI 状态
data class ChatUiState(
val messages: List<ChatMessage> = emptyList(),
val input: String = "",
val isStreaming: Boolean = false,
val error: String? = null,
)
class ChatViewModel(
private val repository: ChatRepository,
) : ViewModel() {
private val _uiState = MutableStateFlow(ChatUiState())
val uiState: StateFlow<ChatUiState> = _uiState.asStateFlow()
private var streamJob: Job? = null
fun onInputChange(text: String) {
_uiState.update { it.copy(input = text, error = null) }
}
fun sendMessage() {
val text = _uiState.value.input.trim()
if (text.isEmpty() || _uiState.value.isStreaming) return
val userMsg = ChatMessage(id = uuid(), role = Role.User, content = text)
val assistantPlaceholder = ChatMessage(
id = uuid(),
role = Role.Assistant,
content = "",
isStreaming = true,
)
_uiState.update {
it.copy(
messages = it.messages + userMsg + assistantPlaceholder,
input = "",
isStreaming = true,
error = null,
)
}
streamJob = viewModelScope.launch {
repository.streamReply(_uiState.value.messages.dropLast(1))
.catch { e ->
_uiState.update { it.copy(isStreaming = false, error = e.message) }
markAssistantDone()
}
.collect { event ->
when (event) {
is ChatStreamEvent.Delta -> appendAssistantDelta(event.text)
ChatStreamEvent.Done -> {
markAssistantDone()
_uiState.update { it.copy(isStreaming = false) }
}
is ChatStreamEvent.Failed -> {
_uiState.update {
it.copy(isStreaming = false, error = event.error.message)
}
markAssistantDone()
}
}
}
}
}
fun stopGeneration() {
streamJob?.cancel()
streamJob = null
_uiState.update { it.copy(isStreaming = false) }
markAssistantDone()
}
private fun appendAssistantDelta(delta: String) {
_uiState.update { state ->
val list = state.messages.toMutableList()
val last = list.last()
if (last.role == Role.Assistant && last.isStreaming) {
list[list.lastIndex] = last.copy(content = last.content + delta)
}
state.copy(messages = list)
}
}
private fun markAssistantDone() {
_uiState.update { state ->
val list = state.messages.toMutableList()
val last = list.lastOrNull() ?: return@update state
if (last.isStreaming) {
list[list.lastIndex] = last.copy(isStreaming = false)
}
state.copy(messages = list)
}
}
}工程上建议再补:
- 发送前把
messages快照化,避免 collect 过程中列表被改乱 - 弱网重试:仅对 用户消息已落库 的场景重试 assistant 流
- Token 刷新:401 时刷新 Bearer 后重放一次
七、Compose UI 骨架
@Composable
fun ChatScreen(viewModel: ChatViewModel) {
val state by viewModel.uiState.collectAsStateWithLifecycle()
val listState = rememberLazyListState()
LaunchedEffect(state.messages.size, state.messages.lastOrNull()?.content) {
if (state.messages.isNotEmpty()) {
listState.animateScrollToItem(state.messages.lastIndex)
}
}
Column(Modifier.fillMaxSize()) {
LazyColumn(
state = listState,
modifier = Modifier.weight(1f).padding(horizontal = 16.dp),
verticalArrangement = Arrangement.spacedBy(12.dp),
) {
items(state.messages, key = { it.id }) { msg ->
MessageBubble(message = msg)
}
}
state.error?.let { err ->
Text(err, color = MaterialTheme.colorScheme.error, modifier = Modifier.padding(8.dp))
}
Row(Modifier.padding(16.dp), verticalAlignment = Alignment.CenterVertically) {
OutlinedTextField(
value = state.input,
onValueChange = viewModel::onInputChange,
modifier = Modifier.weight(1f),
enabled = !state.isStreaming,
placeholder = { Text("输入消息…") },
)
Spacer(Modifier.width(8.dp))
if (state.isStreaming) {
TextButton(onClick = viewModel::stopGeneration) { Text("停止") }
} else {
Button(onClick = viewModel::sendMessage) { Text("发送") }
}
}
}
}流式输出时 content 不断变化,LaunchedEffect 依赖最后一项内容可做 自动滚到底;长文场景可改成「用户上滑后暂停自动滚动」。
八、生产环境必谈的五个坑
8.1 超时
OkHttp 默认 readTimeout 较短。SSE 长连接应 单独建 OkHttpClient:
OkHttpClient.Builder()
.readTimeout(0, TimeUnit.SECONDS) // 流式不限读超时
.connectTimeout(30, TimeUnit.SECONDS)
.build()或用 readTimeout 设很大(如 5 分钟),并按业务在空闲时主动 cancel。
8.2 线程与背压
onResponse 在 OkHttp 线程池执行。trySend 进 Flow 后,ViewModel 在 Main 更新 State;若 delta 极密,可在 Repository 用 flowOn(Dispatchers.IO),UI 侧 conflate() 或按帧合并(16ms 批量 append)减重组压力。
8.3 生命周期
viewModelScope会在ViewModelclear 时取消 collect,但务必awaitClose { call.cancel() }关掉 socket- 旋转屏幕若未 retain ViewModel,应依赖
SavedStateHandle或本地 DB 恢复消息,不要在配置变更时重发同一条用户消息
8.4 断线重连
规范允许客户端带 Last-Event-ID 重连。LLM 接口多数 不支持 从中间 token 续传,产品上通常:
- 展示已收内容
- 提示「生成中断」
- 用户点「继续」→ 把已生成文本作为 context 再发一轮(由后端定义)
若后端支持 id: 字段,可在 DataSource 里记录 lastEventId 并重试 GET。
8.5 安全
- API Key 不要硬编码在 APK;走 自己的 BFF 或短期 Token
- 日志里不要打印完整 SSE 流(含用户隐私)
- 若走 HTTPS,证书校验走系统默认即可;抓包调试仅限 debug
九、和 WebSocket 方案的 Android 侧对比
| 项目 | SSE + POST | WebSocket |
|---|---|---|
| 依赖 | OkHttp 即可 | OkHttp WebSocketListener 或 Scarlet 等 |
| 发送用户消息 | 新 POST | send() 文本帧 |
| 流式解析 | 按行拆 data: | 自定义 JSON 帧协议 |
| 取消生成 | Call.cancel() | close() 或发 cancel 指令 |
| 后台保活 | 系统可能杀长连接;需 FGS 或降级 | 同左,且心跳要自建 |
十、小结
| 概念 | 一句话 |
|---|---|
| SSE | HTTP 长连接 + text/event-stream,服务端单向推事件 |
| 对话流式 | POST 提问,响应体按 data: 行解析 token 增量 |
| Android 接入 | OkHttp 流式读 + Coroutines Flow + 取消 Call |
| 对话模型 | Repository 暴露 Flow,ViewModel 追加 assistant delta,Compose 订阅 StateFlow |
把 SSE 当成 「特殊格式的 HTTP 响应体」 而不是另一套 socket,就和现有 Retrofit 鉴权、网关、监控体系对齐了。实现上最难的往往不是协议,而是 流式状态怎么单一数据源地进 UI——按 MVVM 分层做,通常比在某个 Activity 里手写 StringBuilder 稳得多。
若你司 BFF 已是 WebSocket 推流,客户端模型可复用本文 ViewModel / UI,只替换 DataSource 层即可;协议可以换,「最后一条 assistant 消息增量更新」 这条状态机不变。