From 5ec47515b5992003604b98f1324ee31eb3c2a27b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 10 Apr 2026 09:27:42 +0000 Subject: [PATCH] feat: add web demo for producer/consumer testing with JDK queue Co-Authored-By: kenlin --- pom.xml | 14 + .../ithamal/queue/demo/DemoApplication.java | 14 + .../ithamal/queue/demo/QueueController.java | 167 ++++++++++ src/test/resources/static/index.html | 290 ++++++++++++++++++ 4 files changed, 485 insertions(+) create mode 100644 src/test/java/io/github/ithamal/queue/demo/DemoApplication.java create mode 100644 src/test/java/io/github/ithamal/queue/demo/QueueController.java create mode 100644 src/test/resources/static/index.html diff --git a/pom.xml b/pom.xml index 5a76e11..009181d 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,12 @@ RELEASE test + + org.springframework.boot + spring-boot-starter-web + 2.7.12 + test + @@ -166,9 +172,17 @@ org.apache.maven.plugins maven-compiler-plugin + 3.11.0 8 8 + + + org.projectlombok + lombok + 1.18.26 + + diff --git a/src/test/java/io/github/ithamal/queue/demo/DemoApplication.java b/src/test/java/io/github/ithamal/queue/demo/DemoApplication.java new file mode 100644 index 0000000..536abaa --- /dev/null +++ b/src/test/java/io/github/ithamal/queue/demo/DemoApplication.java @@ -0,0 +1,14 @@ +package io.github.ithamal.queue.demo; + +import io.github.ithamal.queue.boot.ItQueueAutoConfig; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; + +@SpringBootApplication(exclude = {RedisAutoConfiguration.class, ItQueueAutoConfig.class}) +public class DemoApplication { + + public static void main(String[] args) { + SpringApplication.run(DemoApplication.class, args); + } +} diff --git a/src/test/java/io/github/ithamal/queue/demo/QueueController.java b/src/test/java/io/github/ithamal/queue/demo/QueueController.java new file mode 100644 index 0000000..3f7c1a9 --- /dev/null +++ b/src/test/java/io/github/ithamal/queue/demo/QueueController.java @@ -0,0 +1,167 @@ +package io.github.ithamal.queue.demo; + +import io.github.ithamal.queue.config.ConsumerSetting; +import io.github.ithamal.queue.config.ProducerSetting; +import io.github.ithamal.queue.core.*; +import io.github.ithamal.queue.handler.MessageHandler; +import io.github.ithamal.queue.service.ConsumerGroupContainer; +import io.github.ithamal.queue.service.impl.DefaultConsumerGroupContainer; +import io.github.ithamal.queue.support.jdk.JdkQueueFactory; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +@RestController +@RequestMapping("/api") +public class QueueController { + + private Producer producer; + private ConsumerGroupContainer container; + private final ConcurrentLinkedQueue> consumedMessages = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue> producedMessages = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue> errorMessages = new ConcurrentLinkedQueue<>(); + + @PostConstruct + public void init() { + JdkQueueFactory queueFactory = new JdkQueueFactory(); + + // Producer setup + ProducerSetting producerSetting = new ProducerSetting(); + producerSetting.setQueue("demo"); + producerSetting.setImplClass("jdk"); + producerSetting.afterProperties(); + producer = queueFactory.createProducer(producerSetting); + + // Consumer setup + ConsumerSetting consumerSetting = new ConsumerSetting(); + consumerSetting.setQueue("demo"); + consumerSetting.setGroupName("group1"); + consumerSetting.setConsumerNum(2); + consumerSetting.setPollSize(5); + consumerSetting.setImplClass("jdk"); + consumerSetting.afterProperties(); + + ConsumerGroup consumerGroup = queueFactory.createConsumerGroup(consumerSetting); + container = new DefaultConsumerGroupContainer(consumerGroup); + container.registerHandler((MessageHandler) (messages, consumer) -> { + for (Message message : messages) { + Map entry = new LinkedHashMap<>(); + entry.put("id", message.getId()); + entry.put("payload", message.getPayload()); + entry.put("time", message.getTime()); + entry.put("consumer", consumer.getName()); + entry.put("receivedAt", System.currentTimeMillis()); + consumedMessages.add(entry); + consumer.ack(Collections.singletonList(message.getId())); + } + }); + container.start(); + } + + @PreDestroy + public void destroy() throws InterruptedException { + if (container != null) { + container.shutdown(); + container.awaitTermination(5, TimeUnit.SECONDS); + } + } + + @PostMapping("/send") + public Map sendMessage(@RequestBody Map body) { + Map result = new LinkedHashMap<>(); + try { + String content = body.getOrDefault("message", ""); + if (content.isEmpty()) { + result.put("success", false); + result.put("error", "Message content cannot be empty"); + return result; + } + SimpleMessage msg = SimpleMessage.create(content); + producer.put(msg); + + Map entry = new LinkedHashMap<>(); + entry.put("id", msg.getId()); + entry.put("payload", msg.getPayload()); + entry.put("time", msg.getTime()); + entry.put("sentAt", System.currentTimeMillis()); + producedMessages.add(entry); + + result.put("success", true); + result.put("message", entry); + } catch (Exception e) { + result.put("success", false); + result.put("error", e.getMessage()); + Map err = new LinkedHashMap<>(); + err.put("error", e.getMessage()); + err.put("time", System.currentTimeMillis()); + errorMessages.add(err); + } + return result; + } + + @PostMapping("/send-batch") + public Map sendBatch(@RequestBody Map body) { + Map result = new LinkedHashMap<>(); + try { + int count = (Integer) body.getOrDefault("count", 5); + String prefix = (String) body.getOrDefault("prefix", "batch-msg"); + List> sent = new ArrayList<>(); + for (int i = 0; i < count; i++) { + String content = prefix + "-" + (i + 1); + SimpleMessage msg = SimpleMessage.create(content); + producer.put(msg); + Map entry = new LinkedHashMap<>(); + entry.put("id", msg.getId()); + entry.put("payload", msg.getPayload()); + entry.put("time", msg.getTime()); + producedMessages.add(entry); + sent.add(entry); + } + result.put("success", true); + result.put("count", count); + result.put("messages", sent); + } catch (Exception e) { + result.put("success", false); + result.put("error", e.getMessage()); + } + return result; + } + + @GetMapping("/consumed") + public Map getConsumed() { + Map result = new LinkedHashMap<>(); + result.put("count", consumedMessages.size()); + result.put("messages", new ArrayList<>(consumedMessages)); + return result; + } + + @GetMapping("/produced") + public Map getProduced() { + Map result = new LinkedHashMap<>(); + result.put("count", producedMessages.size()); + result.put("messages", new ArrayList<>(producedMessages)); + return result; + } + + @GetMapping("/errors") + public Map getErrors() { + Map result = new LinkedHashMap<>(); + result.put("count", errorMessages.size()); + result.put("errors", new ArrayList<>(errorMessages)); + return result; + } + + @PostMapping("/clear") + public Map clear() { + consumedMessages.clear(); + producedMessages.clear(); + errorMessages.clear(); + Map result = new LinkedHashMap<>(); + result.put("success", true); + return result; + } +} diff --git a/src/test/resources/static/index.html b/src/test/resources/static/index.html new file mode 100644 index 0000000..95e3e25 --- /dev/null +++ b/src/test/resources/static/index.html @@ -0,0 +1,290 @@ + + + + + + itqueue Demo - Producer/Consumer + + + +
+

itqueue Demo

+

In-memory message queue with JDK backend - Producer / Consumer demonstration

+
+ +
+ +
+
+
+
0
+
Messages Produced
+
+
+
0
+
Messages Consumed
+
+
+
0
+
Errors
+
+
+
+ + +
+

Producer

+
+ + +
+
+ +
+ +
+ +
+ +
+ + +
+
+
+ +
+ +

Produced Messages

+
+
No messages produced yet
+
+
+ + +
+

Consumer

+

+ + Consumer group group1 active with 2 consumers, auto-polling +

+
+ + +
+ +

Consumed Messages

+
+
No messages consumed yet
+
+
+ + +
+

Activity Log

+
+
[--:--:--] INFO System ready. Waiting for messages...
+
+
+
+ + + +