返回 Android
Android
15 分钟阅读

SSE 协议与 Android 流式对话接入实践

Server-Sent Events 是什么、和 WebSocket 怎么选,以及用 OkHttp + Coroutines + Compose 在 Android 端实现 LLM 流式对话的完整落地路径。

为什么写这篇

接入大模型对话能力时,产品侧几乎都会要求 「打字机效果」——用户发一句,回答不是等整段 JSON 回来再展示,而是 token 逐字流出。后端若走 OpenAI 兼容接口,响应头里常见:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

这就是 SSE(Server-Sent Events)。它和 WebSocket 一样属于「长连接推送」,但语义更简单:服务端单向推、客户端用普通 HTTP POST 发消息。对「用户问一句、模型答一长段」的聊天场景,往往比 WebSocket 更省事。

做 Android 落地时,常见问题不是「懂不懂 SSE」,而是:

  1. OkHttp / Retrofit 默认按 完整 Response Body 解析,流式要另写
  2. 解析 data: 行、处理 event: / id: / 空行分隔时容易踩坑
  3. 流式文本如何安全地喂给 ViewModel → Compose,并处理取消、重连、生命周期

这篇按 协议 → 选型 → Android 接入 → 对话模型实现 的顺序写,代码为可运行的骨架,可按项目替换为自家 BFF 或 OpenAI 兼容网关。


被问到时,我一般会先答这一句

SSE = 基于 HTTP 的服务端单向推送;客户端发请求用普通 POST,收回答用长连接读 text/event-stream 事件流。

维度普通 HTTPSSEWebSocket
方向请求-响应,一问一答服务端 → 客户端 单向推双向 全双工
协议HTTP/1.1 或 HTTP/2HTTP,MIME text/event-stream先 HTTP Upgrade,再 WS 帧
客户端发送每次新请求通常另开 POST 发用户消息同一连接随时发
断线重连无标准规范支持 Last-Event-ID需自建心跳与重连
代理 / 防火墙友好友好(仍是 HTTP)部分环境 Upgrade 被拦
典型场景CRUDLLM 流式输出、进度条、日志 tailIM、行情、协作编辑

更完整的 HTTP / WebSocket 分层对照见 网络协议面试笔记


一、SSE 协议在说什么

SSE 由 HTML5 规范定义(WHATWG Server-sent events),核心约定:

  1. 响应 Content-Typetext/event-stream; charset=utf-8
  2. 连接保持打开,服务端持续写入 事件(event)
  3. 每个事件由若干行字段组成,空行 表示一个事件结束
  4. 客户端(浏览器用 EventSource;Android 无系统 API,需自己读流)按行解析

1.1 线上常见的事件格式

OpenAI 兼容流式接口通常长这样:

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}
 
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}
 
data: [DONE]
 

规范里还可带:

字段含义
data:有效载荷,可多行(会拼成一行)
event:事件类型,默认 message
id:事件 ID,断线重连时客户端可带 Last-Event-ID
retry:建议重连间隔(毫秒)
: 开头注释行,用于保活(heartbeat)

1.2 和 HTTP chunked 的关系

SSE 往往配合 Transfer-Encoding: chunked 使用:服务端不必等整段 body 生成完才返回,而是边生成边 flush。Android 端要用 流式读 ResponseBody.source(),不能 body.string() 一把梭——那会阻塞到连接关闭。

1.3 何时选 SSE,何时选 WebSocket

倾向 SSE:

  • 主要是 服务端推文本流(LLM token、日志、任务进度)
  • 希望复用现有 HTTP 网关、鉴权(Bearer Token)、CDN 边缘策略
  • 客户端发消息频率低,用 REST POST 即可

倾向 WebSocket:

  • 高频双向(聊天室、游戏、协同编辑)
  • 需要二进制帧或自定义子协议
  • 同一连接上要 multiplex 多种消息类型且延迟极敏感

LLM 对话在多数产品里:用户消息 POST + 回答 SSE,是性价比最高的组合。


二、对话 API 的两种常见形态

落地前先和后端对齐契约,Android 实现会差很多。

形态 A:单 POST 返回 SSE(最常见)

POST /v1/chat/completions HTTP/1.1
Authorization: Bearer <token>
Content-Type: application/json
 
{"model":"gpt-4o","stream":true,"messages":[{"role":"user","content":"你好"}]}

响应体即为 SSE 流,直到 [DONE] 或连接关闭。

形态 B:POST 建会话 + GET 订阅流

POST /v1/chat/sessions        → 返回 sessionId
GET  /v1/chat/sessions/{id}/stream   → SSE 订阅
POST /v1/chat/sessions/{id}/messages → 发用户消息

适合多端同步、服务端排队推送。Android 侧多一个 订阅 Job 的生命周期管理。

下文以 形态 A 为例,形态 B 只需把 GET 长连接单独抽成 SseClient.subscribe()


三、Android 端技术选型

组件推荐说明
HTTP 客户端OkHttp 4.x流式读 body、取消 Call、与 Retrofit 共存
异步Kotlin Coroutines + FlowKotlin Coroutine
JSONkotlinx.serialization 或 Moshidelta.content 解析增量
UI 状态MVVM + StateFlowMVVMSSOT
UIJetpack ComposeLazyColumn + 自动滚到底

Retrofit 可以发 POST,但流式消费建议直接用 OkHttp,避免 Converter 把 Body 缓冲完。


四、分层设计:对话模型长什么样

┌─────────────────────────────────────────────────────────┐
│  ChatScreen (Compose)                                    │
│    输入框 / 消息列表 / 发送按钮 / 取消生成                  │
└───────────────────────────┬─────────────────────────────┘
                            │ collect StateFlow
┌───────────────────────────▼─────────────────────────────┐
│  ChatViewModel                                             │
│    ChatUiState (messages, input, isStreaming, error)       │
│    sendMessage() / stopGeneration()                        │
└───────────────────────────┬─────────────────────────────┘

┌───────────────────────────▼─────────────────────────────┐
│  ChatRepository                                            │
│    streamCompletion(prompt): Flow<ChatStreamEvent>         │
└───────────────────────────┬─────────────────────────────┘

┌───────────────────────────▼─────────────────────────────┐
│  SseChatDataSource (OkHttp)                                │
│    建立 POST → 逐行解析 data: → 发射 delta                 │
└─────────────────────────────────────────────────────────┘

SSOT 原则:消息列表只在 ViewModel 里维护一份 List<ChatMessage>;SSE 每来一个 delta,更新最后一条 assistant 消息的内容,而不是 Compose 里拼字符串。


五、核心代码:SSE 解析与 Flow

5.1 数据模型

data class ChatMessage(
    val id: String,
    val role: Role,
    val content: String,
    val isStreaming: Boolean = false,
)
 
enum class Role { User, Assistant }
 
sealed interface ChatStreamEvent {
    data class Delta(val text: String) : ChatStreamEvent
    data object Done : ChatStreamEvent
    data class Failed(val error: Throwable) : ChatStreamEvent
}

5.2 OkHttp 流式读取 + 行解析

class SseChatDataSource(
    private val okHttpClient: OkHttpClient,
    private val baseUrl: String,
    private val json: Json,
) {
    fun streamChat(
        messages: List<ChatMessage>,
        model: String = "gpt-4o-mini",
    ): Flow<ChatStreamEvent> = callbackFlow {
        val bodyJson = buildRequestJson(messages, model)
        val request = Request.Builder()
            .url("$baseUrl/v1/chat/completions")
            .post(bodyJson.toRequestBody("application/json".toMediaType()))
            .header("Accept", "text/event-stream")
            .header("Authorization", "Bearer ${BuildConfig.LLM_API_KEY}")
            .build()
 
        val call = okHttpClient.newCall(request)
 
        call.enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {
                trySend(ChatStreamEvent.Failed(e))
                close()
            }
 
            override fun onResponse(call: Call, response: Response) {
                response.use { resp ->
                    if (!resp.isSuccessful) {
                        trySend(ChatStreamEvent.Failed(HttpException(resp)))
                        close()
                        return
                    }
                    val source = resp.body?.source() ?: run {
                        trySend(ChatStreamEvent.Failed(IllegalStateException("empty body")))
                        close()
                        return
                    }
                    parseSse(source) { event ->
                        when {
                            event == "[DONE]" -> trySend(ChatStreamEvent.Done)
                            event.isNotEmpty() -> parseDelta(event)?.let {
                                trySend(ChatStreamEvent.Delta(it))
                            }
                        }
                    }
                }
                close()
            }
        })
 
        awaitClose { call.cancel() }
    }
 
    private fun parseSse(
        source: BufferedSource,
        onData: (String) -> Unit,
    ) {
        val dataLines = StringBuilder()
        while (!source.exhausted()) {
            val line = source.readUtf8Line() ?: break
            when {
                line.startsWith("data:") -> {
                    val payload = line.removePrefix("data:").trimStart()
                    if (dataLines.isNotEmpty()) dataLines.append('\n')
                    dataLines.append(payload)
                }
                line.isEmpty() -> {
                    if (dataLines.isNotEmpty()) {
                        onData(dataLines.toString())
                        dataLines.clear()
                    }
                }
                line.startsWith(":") -> { /* heartbeat,忽略 */ }
            }
        }
    }
 
    private fun parseDelta(jsonLine: String): String? {
        return runCatching {
            val element = json.parseToJsonElement(jsonLine)
            element.jsonObject["choices"]
                ?.jsonArray?.firstOrNull()
                ?.jsonObject?.get("delta")
                ?.jsonObject?.get("content")
                ?.jsonPrimitive?.content
        }.getOrNull()
    }
}

要点:

  • callbackFlow + awaitClose { call.cancel() }:用户点「停止生成」或离开页面时取消网络
  • 按行读,空行 flush 一个事件——和规范一致
  • 不要用 readUtf8() 读整个 body

5.3 Repository

class ChatRepository(private val remote: SseChatDataSource) {
    fun streamReply(history: List<ChatMessage>): Flow<ChatStreamEvent> =
        remote.streamChat(history)
}

六、ViewModel:把流式事件变成 UI 状态

data class ChatUiState(
    val messages: List<ChatMessage> = emptyList(),
    val input: String = "",
    val isStreaming: Boolean = false,
    val error: String? = null,
)
 
class ChatViewModel(
    private val repository: ChatRepository,
) : ViewModel() {
 
    private val _uiState = MutableStateFlow(ChatUiState())
    val uiState: StateFlow<ChatUiState> = _uiState.asStateFlow()
 
    private var streamJob: Job? = null
 
    fun onInputChange(text: String) {
        _uiState.update { it.copy(input = text, error = null) }
    }
 
    fun sendMessage() {
        val text = _uiState.value.input.trim()
        if (text.isEmpty() || _uiState.value.isStreaming) return
 
        val userMsg = ChatMessage(id = uuid(), role = Role.User, content = text)
        val assistantPlaceholder = ChatMessage(
            id = uuid(),
            role = Role.Assistant,
            content = "",
            isStreaming = true,
        )
 
        _uiState.update {
            it.copy(
                messages = it.messages + userMsg + assistantPlaceholder,
                input = "",
                isStreaming = true,
                error = null,
            )
        }
 
        streamJob = viewModelScope.launch {
            repository.streamReply(_uiState.value.messages.dropLast(1))
                .catch { e ->
                    _uiState.update { it.copy(isStreaming = false, error = e.message) }
                    markAssistantDone()
                }
                .collect { event ->
                    when (event) {
                        is ChatStreamEvent.Delta -> appendAssistantDelta(event.text)
                        ChatStreamEvent.Done -> {
                            markAssistantDone()
                            _uiState.update { it.copy(isStreaming = false) }
                        }
                        is ChatStreamEvent.Failed -> {
                            _uiState.update {
                                it.copy(isStreaming = false, error = event.error.message)
                            }
                            markAssistantDone()
                        }
                    }
                }
        }
    }
 
    fun stopGeneration() {
        streamJob?.cancel()
        streamJob = null
        _uiState.update { it.copy(isStreaming = false) }
        markAssistantDone()
    }
 
    private fun appendAssistantDelta(delta: String) {
        _uiState.update { state ->
            val list = state.messages.toMutableList()
            val last = list.last()
            if (last.role == Role.Assistant && last.isStreaming) {
                list[list.lastIndex] = last.copy(content = last.content + delta)
            }
            state.copy(messages = list)
        }
    }
 
    private fun markAssistantDone() {
        _uiState.update { state ->
            val list = state.messages.toMutableList()
            val last = list.lastOrNull() ?: return@update state
            if (last.isStreaming) {
                list[list.lastIndex] = last.copy(isStreaming = false)
            }
            state.copy(messages = list)
        }
    }
}

工程上建议再补:

  • 发送前把 messages 快照化,避免 collect 过程中列表被改乱
  • 弱网重试:仅对 用户消息已落库 的场景重试 assistant 流
  • Token 刷新:401 时刷新 Bearer 后重放一次

七、Compose UI 骨架

@Composable
fun ChatScreen(viewModel: ChatViewModel) {
    val state by viewModel.uiState.collectAsStateWithLifecycle()
    val listState = rememberLazyListState()
 
    LaunchedEffect(state.messages.size, state.messages.lastOrNull()?.content) {
        if (state.messages.isNotEmpty()) {
            listState.animateScrollToItem(state.messages.lastIndex)
        }
    }
 
    Column(Modifier.fillMaxSize()) {
        LazyColumn(
            state = listState,
            modifier = Modifier.weight(1f).padding(horizontal = 16.dp),
            verticalArrangement = Arrangement.spacedBy(12.dp),
        ) {
            items(state.messages, key = { it.id }) { msg ->
                MessageBubble(message = msg)
            }
        }
 
        state.error?.let { err ->
            Text(err, color = MaterialTheme.colorScheme.error, modifier = Modifier.padding(8.dp))
        }
 
        Row(Modifier.padding(16.dp), verticalAlignment = Alignment.CenterVertically) {
            OutlinedTextField(
                value = state.input,
                onValueChange = viewModel::onInputChange,
                modifier = Modifier.weight(1f),
                enabled = !state.isStreaming,
                placeholder = { Text("输入消息…") },
            )
            Spacer(Modifier.width(8.dp))
            if (state.isStreaming) {
                TextButton(onClick = viewModel::stopGeneration) { Text("停止") }
            } else {
                Button(onClick = viewModel::sendMessage) { Text("发送") }
            }
        }
    }
}

流式输出时 content 不断变化,LaunchedEffect 依赖最后一项内容可做 自动滚到底;长文场景可改成「用户上滑后暂停自动滚动」。


八、生产环境必谈的五个坑

8.1 超时

OkHttp 默认 readTimeout 较短。SSE 长连接应 单独建 OkHttpClient

OkHttpClient.Builder()
    .readTimeout(0, TimeUnit.SECONDS) // 流式不限读超时
    .connectTimeout(30, TimeUnit.SECONDS)
    .build()

或用 readTimeout 设很大(如 5 分钟),并按业务在空闲时主动 cancel。

8.2 线程与背压

onResponse 在 OkHttp 线程池执行。trySend 进 Flow 后,ViewModel 在 Main 更新 State;若 delta 极密,可在 Repository 用 flowOn(Dispatchers.IO),UI 侧 conflate() 或按帧合并(16ms 批量 append)减重组压力。

8.3 生命周期

  • viewModelScope 会在 ViewModel clear 时取消 collect,但务必 awaitClose { call.cancel() } 关掉 socket
  • 旋转屏幕若未 retain ViewModel,应依赖 SavedStateHandle 或本地 DB 恢复消息,不要在配置变更时重发同一条用户消息

8.4 断线重连

规范允许客户端带 Last-Event-ID 重连。LLM 接口多数 不支持 从中间 token 续传,产品上通常:

  1. 展示已收内容
  2. 提示「生成中断」
  3. 用户点「继续」→ 把已生成文本作为 context 再发一轮(由后端定义)

若后端支持 id: 字段,可在 DataSource 里记录 lastEventId 并重试 GET。

8.5 安全

  • API Key 不要硬编码在 APK;走 自己的 BFF 或短期 Token
  • 日志里不要打印完整 SSE 流(含用户隐私)
  • 若走 HTTPS,证书校验走系统默认即可;抓包调试仅限 debug

九、和 WebSocket 方案的 Android 侧对比

项目SSE + POSTWebSocket
依赖OkHttp 即可OkHttp WebSocketListener 或 Scarlet 等
发送用户消息新 POSTsend() 文本帧
流式解析按行拆 data:自定义 JSON 帧协议
取消生成Call.cancel()close() 或发 cancel 指令
后台保活系统可能杀长连接;需 FGS 或降级同左,且心跳要自建

十、小结

概念一句话
SSEHTTP 长连接 + text/event-stream,服务端单向推事件
对话流式POST 提问,响应体按 data: 行解析 token 增量
Android 接入OkHttp 流式读 + Coroutines Flow + 取消 Call
对话模型Repository 暴露 Flow,ViewModel 追加 assistant delta,Compose 订阅 StateFlow

把 SSE 当成 「特殊格式的 HTTP 响应体」 而不是另一套 socket,就和现有 Retrofit 鉴权、网关、监控体系对齐了。实现上最难的往往不是协议,而是 流式状态怎么单一数据源地进 UI——按 MVVM 分层做,通常比在某个 Activity 里手写 StringBuilder 稳得多。

若你司 BFF 已是 WebSocket 推流,客户端模型可复用本文 ViewModel / UI,只替换 DataSource 层即可;协议可以换,「最后一条 assistant 消息增量更新」 这条状态机不变。


延伸阅读

相关文章

Java / Kotlin
公开
13 分钟
Android面试常被问的网络协议问题
HTTP、HTTPS、gRPC、WebSocket 和 TCP 握手——Android 面试里常被一锅炖的网络题,按分层答法整理成一份能讲清楚的笔记。
Java / Kotlin
公开
9 分钟
Kotlin Coroutine
以图片处理串行流水线为例,对比 Callback、RxJava 与 Coroutines 三种异步写法
Android
公开
4 分钟
如何在Android开发中实现MVVM的架构
MVVM与Android的实践简要指导
Android
公开
26 分钟
Android 架构核心原则:单一数据源(SSOT)与单向数据流(UDF)实战指南
在Android开发中,随着应用复杂度提升,数据混乱、状态不一致、调试困难等问题频发,而单一数据源(Single Source of Truth, SSOT)与单向数据流(Unidirectional Data Flow, UDF)正是解决这些痛点的核心架构原则。