Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/claude.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ jobs:
- Exception handling patterns (checked vs unchecked, proper resource cleanup with try-with-resources)
- JetStream API correctness (subscription semantics, ack/nak behavior, consumer configuration)
- API compatibility with existing public interfaces

Do NOT comment on import style. Wildcard imports (e.g. `io.nats.client.support.*`, and static
wildcards) are an accepted convention in this project — never suggest converting wildcard imports
to specific imports, or flag them as inconsistent.
secrets:
claude_oauth_token: ${{ secrets.CLAUDE_OAUTH_TOKEN }}
gh_app_private_key: ${{ secrets.CLAUDE_GH_APP_PRIVATE_KEY }}
62 changes: 59 additions & 3 deletions src/main/java/io/nats/client/api/KeyValueConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
// limitations under the License.
package io.nats.client.api;

import io.nats.client.support.JsonValue;
import io.nats.client.support.JsonValueUtils;
import io.nats.client.support.NatsKeyValueUtil;
import io.nats.client.support.*;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import java.time.Duration;
import java.util.*;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonValueUtils.*;
import static io.nats.client.support.NatsJetStreamConstants.SERVER_DEFAULT_DUPLICATE_WINDOW_MS;
import static io.nats.client.support.NatsKeyValueUtil.*;
import static io.nats.client.support.Validator.*;
Expand Down Expand Up @@ -115,6 +115,62 @@ public JsonValue toJsonValue() {
return mb.toJsonValue();
}

/**
* Returns a KeyValueConfiguration deserialized from a JSON representation of a Key Value
* <b>builder</b> configuration, i.e. the values you would supply to the {@link Builder}.
* This is <b>not</b> the backing stream configuration, and it is <b>not</b> the JSON emitted
* by {@link #toJson()}. Accordingly, {@code name} is the bucket (simple) name, not the stream
* name, and the field names use the Key Value domain (for example {@code max_history_per_key},
* not the stream's {@code max_msgs_per_subject}).
*
* <p>If you instead have the full backing stream configuration JSON (for example as returned
* by the server), use {@link #instanceViaStreamConfig(String)} — at your own risk, since it
* bypasses the Key Value validation and field derivation performed here.
*
* @param json the json representing the Key Value builder configuration
* @return KeyValueConfiguration for the given json
* @throws JsonParseException if there is a problem parsing the json
* @see #instanceViaStreamConfig(String)
*/
public static KeyValueConfiguration instance(String json) throws JsonParseException {
JsonValue v = JsonParser.parse(json);
// read each field that has a builder setter, then build() so KV validation/derivation runs.
return new Builder()
.name(readString(v, NAME))
.description(readString(v, DESCRIPTION))
.maxHistoryPerKey(readInteger(v, MAX_HISTORY_PER_KEY, 1))
.maxBucketSize(readLong(v, MAX_BUCKET_SIZE, -1))
.maximumValueSize(readInteger(v, MAX_VALUE_SIZE, -1))
.ttl(readNanos(v, TTL))
.storageType(StorageType.get(readString(v, STORAGE)))
.replicas(readInteger(v, REPLICAS, 1))
.placement(Placement.optionalInstance(readValue(v, PLACEMENT)))
.republish(Republish.optionalInstance(readValue(v, REPUBLISH)))
.mirror(Mirror.optionalInstance(readValue(v, MIRROR)))
.sources(Source.optionalListOf(readValue(v, SOURCES)))
.compression(readBoolean(v, COMPRESSION))
.metadata(readStringStringMap(v, METADATA))
.limitMarker(readNanos(v, LIMIT_MARKER_TTL))
.build();
}

/**
* Returns a KeyValueConfiguration built from the full backing stream configuration JSON,
* for example the JSON of the bucket's backing stream as returned by the server. Here
* {@code name} is the stream name (such as {@code KV_bucketName}) and the field names are the
* stream's (for example {@code max_msgs_per_subject}). Use at your own risk: this trusts the
* supplied stream and bypasses the Key Value validation and field derivation that
* {@link #instance(String)} performs.
*
* @param json the json representing the backing stream configuration
* @return KeyValueConfiguration for the given backing stream json
* @throws JsonParseException if there is a problem parsing the json
* @see #instance(String)
*/
public static KeyValueConfiguration instanceViaStreamConfig(String json) throws JsonParseException {
return new KeyValueConfiguration(StreamConfiguration.instance(json));
}

/**
* Creates a builder for the Key Value Configuration.
* @return a KeyValueConfiguration Builder
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/io/nats/client/api/ObjectStoreConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
// limitations under the License.
package io.nats.client.api;

import io.nats.client.support.JsonParseException;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonValue;
import io.nats.client.support.NatsObjectStoreUtil;

import java.time.Duration;
import java.util.Map;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonValueUtils.*;
import static io.nats.client.support.NatsObjectStoreUtil.*;
import static io.nats.client.support.Validator.required;

Expand All @@ -41,6 +46,54 @@ public boolean isSealed() {
return sc.getSealed();
}

/**
* Returns an ObjectStoreConfiguration deserialized from a JSON representation of an Object Store
* <b>builder</b> configuration, i.e. the values you would supply to the {@link Builder}.
* This is <b>not</b> the backing stream configuration, and it is <b>not</b> the JSON emitted
* by {@link #toJson()}. Accordingly, {@code name} is the bucket (simple) name, not the stream
* name, and the field names use the Object Store domain.
*
* <p>If you instead have the full backing stream configuration JSON (for example as returned
* by the server), use {@link #instanceViaStreamConfig(String)} — at your own risk, since it
* bypasses the Object Store validation and field derivation performed here.
*
* @param json the json representing the Object Store builder configuration
* @return ObjectStoreConfiguration for the given json
* @throws JsonParseException if there is a problem parsing the json
* @see #instanceViaStreamConfig(String)
*/
public static ObjectStoreConfiguration instance(String json) throws JsonParseException {
JsonValue v = JsonParser.parse(json);
// read each field that has a builder setter, then build() so OS validation/derivation runs.
return new Builder()
.name(readString(v, NAME))
.description(readString(v, DESCRIPTION))
.maxBucketSize(readLong(v, MAX_BUCKET_SIZE, -1))
.ttl(readNanos(v, TTL))
.storageType(StorageType.get(readString(v, STORAGE)))
.replicas(readInteger(v, REPLICAS, 1))
.placement(Placement.optionalInstance(readValue(v, PLACEMENT)))
.compression(readBoolean(v, COMPRESSION))
.metadata(readStringStringMap(v, METADATA))
.build();
}

/**
* Returns an ObjectStoreConfiguration built from the full backing stream configuration JSON,
* for example the JSON of the bucket's backing stream as returned by the server. Here
* {@code name} is the stream name (such as {@code OBJ_bucketName}) and the field names are the
* stream's. Use at your own risk: this trusts the supplied stream and bypasses the Object Store
* validation and field derivation that {@link #instance(String)} performs.
*
* @param json the json representing the backing stream configuration
* @return ObjectStoreConfiguration for the given backing stream json
* @throws JsonParseException if there is a problem parsing the json
* @see #instance(String)
*/
public static ObjectStoreConfiguration instanceViaStreamConfig(String json) throws JsonParseException {
return new ObjectStoreConfiguration(StreamConfiguration.instance(json));
}

/**
* Creates a builder for the Object Store Configuration.
* @return an ObjectStoreConfiguration Builder
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,29 @@ public interface ApiConstants {
/** leader_since */ String LEADER_SINCE = "leader_since";
/** level */ String LEVEL = "level";
/** limit */ String LIMIT = "limit";
/** limit_marker_ttl */ String LIMIT_MARKER_TTL = "limit_marker_ttl";
/** limits */ String LIMITS = "limits";
/** link */ String LINK = "link";
/** lost */ String LOST = "lost";
/** max_ack_pending */ String MAX_ACK_PENDING = "max_ack_pending";
/** max_age */ String MAX_AGE = "max_age";
/** max_batch */ String MAX_BATCH = "max_batch";
/** max_bucket_size */ String MAX_BUCKET_SIZE = "max_bucket_size";
/** max_bytes */ String MAX_BYTES = "max_bytes";
/** max_bytes_required */ String MAX_BYTES_REQUIRED = "max_bytes_required";
/** max_consumers */ String MAX_CONSUMERS = "max_consumers";
/** max_chunk_size */ String MAX_CHUNK_SIZE = "max_chunk_size";
/** max_deliver */ String MAX_DELIVER = "max_deliver";
/** max_expires */ String MAX_EXPIRES = "max_expires";
/** max_history_per_key */ String MAX_HISTORY_PER_KEY = "max_history_per_key";
/** max_memory */ String MAX_MEMORY = "max_memory";
/** max_msg_size */ String MAX_MSG_SIZE = "max_msg_size";
/** max_msgs */ String MAX_MSGS = "max_msgs";
/** max_msgs_per_subject */ String MAX_MSGS_PER_SUB = "max_msgs_per_subject";
/** max_payload */ String MAX_PAYLOAD = "max_payload";
/** max_storage */ String MAX_STORAGE = "max_storage";
/** max_streams */ String MAX_STREAMS = "max_streams";
/** max_value_size */ String MAX_VALUE_SIZE = "max_value_size";
/** max_waiting */ String MAX_WAITING = "max_waiting"; // this is correct! the meaning name is different than the field name
/** min_pending */ String MIN_PENDING = "min_pending";
/** min_ack_pending */ String MIN_ACK_PENDING = "min_ack_pending";
Expand Down Expand Up @@ -231,6 +235,7 @@ public interface ApiConstants {
/** tls_available */ String TLS_AVAILABLE = "tls_available";
/** total */ String TOTAL = "total";
/** traffic_account */ String TRAFFIC_ACCOUNT = "traffic_account";
/** ttl */ String TTL = "ttl";
/** type */ String TYPE = "type";
/** up_to_seq */ String UP_TO_SEQ = "up_to_seq";
/** up_to_time */ String UP_TO_TIME = "up_to_time";
Expand Down
61 changes: 56 additions & 5 deletions src/test/java/io/nats/client/api/KeyValueConfigurationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,26 @@
package io.nats.client.api;

import io.nats.client.impl.JetStreamTestBase;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonValue;
import io.nats.client.support.JsonParseException;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import static io.nats.client.support.NatsConstants.EMPTY;
import static io.nats.client.utils.ResourceUtils.dataAsString;
import static org.junit.jupiter.api.Assertions.*;

public class KeyValueConfigurationTests extends JetStreamTestBase {

@Test
public void testConstruction() {
public void testConstruction() throws JsonParseException {
Placement p = Placement.builder().cluster("cluster").tags("a", "b").build();
Republish r = Republish.builder().source("src").destination("dest").headersOnly(true).build();
Map<String, String> metadata = new HashMap<>();
metadata.put("meta-key", "meta-value");
metadata.put("meta-key2", "meta-value2");

// builder
//noinspection deprecation
Expand All @@ -44,14 +49,22 @@ public void testConstruction() {
.placement(p)
.republish(r)
.compression(true)
.metadata(metadata)
.limitMarker(8888)
.build();
validate(bc);

validate(KeyValueConfiguration.builder(bc).build());

JsonValue jvSc = JsonParser.parseUnchecked(bc.getBackingConfig().toJson());
validate(new KeyValueConfiguration(StreamConfiguration.instance(jvSc)));
// instance(String) parses a JSON representation of the KV builder configuration:
// the bucket (simple) name and KV-domain field names, NOT the backing stream.
validate(KeyValueConfiguration.instance(dataAsString("KeyValueConfig.json")));
assertThrows(JsonParseException.class, () -> KeyValueConfiguration.instance("not json"));
// valid json with no name -> clean validation error
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.instance("{}"));

// instanceViaStreamConfig(String) builds from the full backing stream configuration JSON
validate(KeyValueConfiguration.instanceViaStreamConfig(bc.getBackingConfig().toJson()));

bc = KeyValueConfiguration.builder()
.name("bucketName")
Expand Down Expand Up @@ -82,10 +95,48 @@ private void validate(KeyValueConfiguration kvc) {
assertTrue(kvc.isCompressed());
assertNotNull(kvc.getLimitMarkerTtl());
assertEquals(8888, kvc.getLimitMarkerTtl().toMillis());
assertNotNull(kvc.getMetadata());
assertEquals(2, kvc.getMetadata().size());
assertEquals("meta-value", kvc.getMetadata().get("meta-key"));
assertEquals("meta-value2", kvc.getMetadata().get("meta-key2"));

assertTrue(kvc.toString().contains("bucketName"));
}

@Test
public void testInstanceMirrorAndSources() throws JsonParseException {
// mirror and sources are mutually exclusive, so exercise each on its own config.
// Both the builder-config instance(...) and the stream-config instanceViaStreamConfig(...)
// paths must deserialize them.
KeyValueConfiguration src = KeyValueConfiguration.builder()
.name("bucketName")
.sources(Source.builder().name("KV_sourceBucket").build())
.build();
validateSources(src);
validateSources(KeyValueConfiguration.instance(dataAsString("KeyValueConfigSources.json")));
validateSources(KeyValueConfiguration.instanceViaStreamConfig(src.getBackingConfig().toJson()));

KeyValueConfiguration mir = KeyValueConfiguration.builder()
.name("bucketName")
.mirror(Mirror.builder().name("KV_mirrorBucket").build())
.build();
validateMirror(mir);
validateMirror(KeyValueConfiguration.instance(dataAsString("KeyValueConfigMirror.json")));
validateMirror(KeyValueConfiguration.instanceViaStreamConfig(mir.getBackingConfig().toJson()));
}

private void validateSources(KeyValueConfiguration kvc) {
assertNull(kvc.getMirror());
assertNotNull(kvc.getSources());
assertEquals(1, kvc.getSources().size());
assertEquals("KV_sourceBucket", kvc.getSources().get(0).getName());
}

private void validateMirror(KeyValueConfiguration kvc) {
assertNotNull(kvc.getMirror());
assertEquals("KV_mirrorBucket", kvc.getMirror().getName());
}

@Test
public void testConstructionInvalidsCoverage() {
assertThrows(IllegalArgumentException.class, () -> KeyValueConfiguration.builder().build());
Expand Down
25 changes: 20 additions & 5 deletions src/test/java/io/nats/client/api/ObjectStoreApiTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
import io.nats.client.impl.Headers;
import io.nats.client.impl.JetStreamTestBase;
import io.nats.client.support.DateTimeUtils;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonValue;
import io.nats.client.support.JsonParseException;
import org.junit.jupiter.api.Test;

import java.time.Duration;
Expand All @@ -33,8 +32,11 @@
public class ObjectStoreApiTests extends JetStreamTestBase {

@Test
public void testConfigurationConstruction() {
public void testConfigurationConstruction() throws JsonParseException {
Placement p = Placement.builder().cluster("cluster").tags("a", "b").build();
Map<String, String> metadata = new HashMap<>();
metadata.put("meta-key", "meta-value");
metadata.put("meta-key2", "meta-value2");

// builder
ObjectStoreConfiguration osc = ObjectStoreConfiguration.builder("bucketName")
Expand All @@ -45,6 +47,7 @@ public void testConfigurationConstruction() {
.replicas(2)
.placement(p)
.compression(true)
.metadata(metadata)
.build();
validate(osc);

Expand All @@ -57,13 +60,21 @@ public void testConfigurationConstruction() {
.replicas(2)
.placement(p)
.compression(true)
.metadata(metadata)
.build();
validate(osc);

validate(ObjectStoreConfiguration.builder(osc).build());

JsonValue jvSc = JsonParser.parseUnchecked(osc.getBackingConfig().toJson());
validate(new ObjectStoreConfiguration(StreamConfiguration.instance(jvSc)));
// instance(String) parses a JSON representation of the OS builder configuration:
// the bucket (simple) name and OS-domain field names, NOT the backing stream.
validate(ObjectStoreConfiguration.instance(dataAsString("ObjectStoreConfig.json")));
assertThrows(JsonParseException.class, () -> ObjectStoreConfiguration.instance("not json"));
// valid json with no name -> clean validation error
assertThrows(IllegalArgumentException.class, () -> ObjectStoreConfiguration.instance("{}"));

// instanceViaStreamConfig(String) builds from the full backing stream configuration JSON
validate(ObjectStoreConfiguration.instanceViaStreamConfig(osc.getBackingConfig().toJson()));
}

private void validate(ObjectStoreConfiguration osc) {
Expand All @@ -78,6 +89,10 @@ private void validate(ObjectStoreConfiguration osc) {
assertNotNull(osc.getPlacement().getTags());
assertEquals(2, osc.getPlacement().getTags().size());
assertTrue(osc.isCompressed());
assertNotNull(osc.getMetadata());
assertEquals(2, osc.getMetadata().size());
assertEquals("meta-value", osc.getMetadata().get("meta-key"));
assertEquals("meta-value2", osc.getMetadata().get("meta-key2"));

assertTrue(osc.toString().contains("bucketName"));
}
Expand Down
25 changes: 25 additions & 0 deletions src/test/resources/data/KeyValueConfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "bucketName",
"description": "bucketDesc",
"max_history_per_key": 44,
"max_bucket_size": 555,
"max_value_size": 666,
"ttl": 777000000,
"storage": "memory",
"replicas": 2,
"placement": {
"cluster": "cluster",
"tags": ["a", "b"]
},
"republish": {
"src": "src",
"dest": "dest",
"headers_only": true
},
"compression": true,
"limit_marker_ttl": 8888000000,
"metadata": {
"meta-key": "meta-value",
"meta-key2": "meta-value2"
}
}
Loading
Loading