Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkgs/ok_http/android/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ group = "com.example.ok_http"
version = "1.0"

buildscript {
ext.kotlin_version = '1.8.22'
ext.kotlin_version = '2.2.20'
repositories {
google()
mavenCentral()
}

dependencies {
// The Android Gradle Plugin knows how to build native code with the NDK.
classpath("com.android.tools.build:gradle:8.1.2")
classpath("com.android.tools.build:gradle:8.11.1")
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
Expand Down Expand Up @@ -42,7 +42,7 @@ android {

// Bumping the plugin compileSdk version requires all clients of this plugin
// to bump the version in their app.
compileSdk = 35
compileSdk = 36

// Use the NDK version
// declared in /android/app/build.gradle file of the Flutter project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.example.ok_http

import java.io.IOException
import java.io.InputStream
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future

Expand All @@ -24,7 +23,9 @@ interface DataCallback {
* Provides functions to read data from an InputStream asynchronously.
*/
class AsyncInputStreamReader {
private val executorService: ExecutorService = Executors.newSingleThreadExecutor()
companion object {
private val executor = Executors.newCachedThreadPool()
}

/**
* Reads data from an InputStream asynchronously using an executor service.
Expand All @@ -35,7 +36,7 @@ class AsyncInputStreamReader {
* @return Future<*>
*/
fun readAsync(inputStream: InputStream, callback: DataCallback): Future<*> {
return executorService.submit {
return executor.submit {
try {
val buffer = ByteArray(4096)
var bytesRead: Int
Expand All @@ -56,8 +57,4 @@ class AsyncInputStreamReader {
}
}
}

fun shutdown() {
executorService.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ class FixedResponseX509ExtendedKeyManager(
private val alias: String,
) : X509ExtendedKeyManager() {

override fun getClientAliases(keyType: String, issuers: Array<Principal>?) = arrayOf(alias)
override fun getClientAliases(keyType: String?, issuers: Array<Principal?>?) = arrayOf(alias)

override fun chooseClientAlias(
keyType: Array<String>,
issuers: Array<Principal>?,
keyType: Array<String?>?,
issuers: Array<Principal?>?,
socket: Socket?,
) = alias

override fun getServerAliases(keyType: String, issuers: Array<Principal>?) = arrayOf(alias)
override fun getServerAliases(keyType: String?, issuers: Array<Principal?>?) = arrayOf(alias)

override fun chooseServerAlias(
keyType: String,
issuers: Array<Principal>?,
keyType: String?,
issuers: Array<Principal?>?,
socket: Socket?,
) = alias

override fun getCertificateChain(alias: String) = certificateChain
override fun getCertificateChain(alias: String?) = certificateChain

override fun getPrivateKey(alias: String) = privateKey
override fun getPrivateKey(alias: String?) = privateKey

override fun chooseEngineClientAlias(
keyType: Array<String?>?,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2026, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

package com.example.ok_http

import okhttp3.MediaType
import okhttp3.RequestBody
import okio.BufferedSink
import okio.Pipe
import okio.buffer
import java.io.IOException
import java.util.concurrent.Executors

/**
* A [RequestBody] that receives data incrementally from Dart via [writeChunk],
* using an [okio.Pipe] for backpressure between the Dart write side and the
* OkHttp read side.
*
* Flow:
* 1. OkHttp calls [writeTo] on its dispatcher thread, which blocks reading
* from the pipe source until data is available or the source is closed.
* 2. Dart calls [writeChunk] to push data into the pipe sink. The write
* happens on a dedicated executor thread so the Dart isolate is never
* blocked by pipe backpressure.
* 3. [WriteCallback.onWriteComplete] fires after each chunk, signaling Dart
* to send the next chunk.
* 4. [finish] closes the pipe sink, causing [writeTo]'s writeAll to complete.
* 5. [cancel] aborts the pipe for early termination (e.g., server
* responded 413 before body was fully sent).
*/
class StreamingRequestBody(
private val mediaType: MediaType?,
private val length: Long,
bufferSize: Long = 65536
) : RequestBody() {
companion object {
private val executor = Executors.newCachedThreadPool()
}

private val pipe = Pipe(bufferSize)
private val bufferedSink = pipe.sink.buffer()

override fun contentType(): MediaType? = mediaType

override fun contentLength(): Long = length

override fun isOneShot(): Boolean = true

override fun writeTo(sink: BufferedSink) {
sink.writeAll(pipe.source)
}

/**
* Write a chunk of data to the pipe asynchronously.
* The callback fires when the write completes, providing backpressure.
*/
fun writeChunk(data: ByteArray, length: Int, callback: WriteCallback) {
executor.submit {
try {
bufferedSink.write(data, 0, length)
callback.onWriteComplete()
} catch (e: IOException) {
callback.onError(e)
}
}
}

/**
* Signal that all data has been written. Closes the pipe sink,
* which causes [writeTo]'s writeAll to complete.
*/
fun finish() {
executor.submit { bufferedSink.close() }
}

/**
* Cancel the streaming and close resources.
*/
fun cancel() {
try { pipe.cancel() } catch (_: Exception) {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2026, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

package com.example.ok_http

import java.io.IOException

/**
* Callback interface utilized by [StreamingRequestBody].
*
* Signals to the Dart side when a chunk write has completed
* or an error has occurred, enabling backpressure-aware streaming.
*/
interface WriteCallback {
fun onWriteComplete()
fun onError(e: IOException)
}
31 changes: 29 additions & 2 deletions pkgs/ok_http/example/integration_test/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ Future<void> testConformance() async {
try {
testAll(
OkHttpClient.new,
canStreamRequestBody: false,
canStreamRequestBody: true,
preservesMethodCase: true,
supportsFoldedHeaders: false,
canSendCookieHeaders: true,
canReceiveSetCookieHeaders: true,
correctlyHandlesNullHeaderValues: false,
supportsAbort: true,
);
} finally {
HttpClientRequestProfile.profilingEnabled = profile;
Expand All @@ -42,16 +43,42 @@ Future<void> testConformance() async {
try {
testAll(
OkHttpClient.new,
canStreamRequestBody: false,
canStreamRequestBody: true,
preservesMethodCase: true,
supportsFoldedHeaders: false,
canSendCookieHeaders: true,
canReceiveSetCookieHeaders: true,
correctlyHandlesNullHeaderValues: false,
supportsAbort: true,
);
} finally {
HttpClientRequestProfile.profilingEnabled = profile;
}
});
});

group('ok_http client fromJniGlobalRef', () {
final owners = <OkHttpClient>[];

tearDownAll(() {
for (final owner in owners) {
owner.close();
}
});

testAll(
() {
final owner = OkHttpClient();
owners.add(owner);
return OkHttpClient.fromJniGlobalRef(owner.nativeReference);
},
canStreamRequestBody: true,
preservesMethodCase: true,
supportsFoldedHeaders: false,
canSendCookieHeaders: true,
canReceiveSetCookieHeaders: true,
correctlyHandlesNullHeaderValues: false,
supportsAbort: true,
);
});
}
2 changes: 2 additions & 0 deletions pkgs/ok_http/jnigen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ classes:
- "com.example.ok_http.RedirectInterceptor"
- "com.example.ok_http.AsyncInputStreamReader"
- "com.example.ok_http.DataCallback"
- "com.example.ok_http.WriteCallback"
- "com.example.ok_http.StreamingRequestBody"
- "okhttp3.WebSocket"
- "com.example.ok_http.WebSocketListenerProxy"
- "com.example.ok_http.FixedResponseX509ExtendedKeyManager"
Expand Down
Loading