Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<version>RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- 许可证信息,这里是Apache 2.0的许可证,大家根据实际情况修改 -->
Expand Down Expand Up @@ -166,9 +172,17 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>8</source>
<target>8</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<!-- gpg plugin,用于签名认证 -->
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/io/github/ithamal/queue/demo/DemoApplication.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.github.ithamal.queue.demo;

import io.github.ithamal.queue.boot.ItQueueAutoConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;

@SpringBootApplication(exclude = {RedisAutoConfiguration.class, ItQueueAutoConfig.class})
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
167 changes: 167 additions & 0 deletions src/test/java/io/github/ithamal/queue/demo/QueueController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package io.github.ithamal.queue.demo;

import io.github.ithamal.queue.config.ConsumerSetting;
import io.github.ithamal.queue.config.ProducerSetting;
import io.github.ithamal.queue.core.*;
import io.github.ithamal.queue.handler.MessageHandler;
import io.github.ithamal.queue.service.ConsumerGroupContainer;
import io.github.ithamal.queue.service.impl.DefaultConsumerGroupContainer;
import io.github.ithamal.queue.support.jdk.JdkQueueFactory;
import org.springframework.web.bind.annotation.*;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/api")
public class QueueController {

private Producer producer;
private ConsumerGroupContainer container;
private final ConcurrentLinkedQueue<Map<String, Object>> consumedMessages = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Map<String, Object>> producedMessages = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Map<String, Object>> errorMessages = new ConcurrentLinkedQueue<>();

@PostConstruct
public void init() {
JdkQueueFactory queueFactory = new JdkQueueFactory();

// Producer setup
ProducerSetting producerSetting = new ProducerSetting();
producerSetting.setQueue("demo");
producerSetting.setImplClass("jdk");
producerSetting.afterProperties();
producer = queueFactory.createProducer(producerSetting);

// Consumer setup
ConsumerSetting consumerSetting = new ConsumerSetting();
consumerSetting.setQueue("demo");
consumerSetting.setGroupName("group1");
consumerSetting.setConsumerNum(2);
consumerSetting.setPollSize(5);
consumerSetting.setImplClass("jdk");
consumerSetting.afterProperties();

ConsumerGroup consumerGroup = queueFactory.createConsumerGroup(consumerSetting);
container = new DefaultConsumerGroupContainer(consumerGroup);
container.registerHandler((MessageHandler<String>) (messages, consumer) -> {
for (Message<?> message : messages) {
Map<String, Object> entry = new LinkedHashMap<>();
entry.put("id", message.getId());
entry.put("payload", message.getPayload());
entry.put("time", message.getTime());
entry.put("consumer", consumer.getName());
entry.put("receivedAt", System.currentTimeMillis());
consumedMessages.add(entry);
consumer.ack(Collections.singletonList(message.getId()));
}
});
container.start();
}

@PreDestroy
public void destroy() throws InterruptedException {
if (container != null) {
container.shutdown();
container.awaitTermination(5, TimeUnit.SECONDS);
}
}

@PostMapping("/send")
public Map<String, Object> sendMessage(@RequestBody Map<String, String> body) {
Map<String, Object> result = new LinkedHashMap<>();
try {
String content = body.getOrDefault("message", "");
if (content.isEmpty()) {
result.put("success", false);
result.put("error", "Message content cannot be empty");
return result;
}
SimpleMessage<String> msg = SimpleMessage.create(content);
producer.put(msg);

Map<String, Object> entry = new LinkedHashMap<>();
entry.put("id", msg.getId());
entry.put("payload", msg.getPayload());
entry.put("time", msg.getTime());
entry.put("sentAt", System.currentTimeMillis());
producedMessages.add(entry);

result.put("success", true);
result.put("message", entry);
} catch (Exception e) {
result.put("success", false);
result.put("error", e.getMessage());
Map<String, Object> err = new LinkedHashMap<>();
err.put("error", e.getMessage());
err.put("time", System.currentTimeMillis());
errorMessages.add(err);
}
return result;
}

@PostMapping("/send-batch")
public Map<String, Object> sendBatch(@RequestBody Map<String, Object> body) {
Map<String, Object> result = new LinkedHashMap<>();
try {
int count = (Integer) body.getOrDefault("count", 5);
String prefix = (String) body.getOrDefault("prefix", "batch-msg");
List<Map<String, Object>> sent = new ArrayList<>();
for (int i = 0; i < count; i++) {
String content = prefix + "-" + (i + 1);
SimpleMessage<String> msg = SimpleMessage.create(content);
producer.put(msg);
Map<String, Object> entry = new LinkedHashMap<>();
entry.put("id", msg.getId());
entry.put("payload", msg.getPayload());
entry.put("time", msg.getTime());
producedMessages.add(entry);
sent.add(entry);
}
result.put("success", true);
result.put("count", count);
result.put("messages", sent);
} catch (Exception e) {
result.put("success", false);
result.put("error", e.getMessage());
}
return result;
}

@GetMapping("/consumed")
public Map<String, Object> getConsumed() {
Map<String, Object> result = new LinkedHashMap<>();
result.put("count", consumedMessages.size());
result.put("messages", new ArrayList<>(consumedMessages));
return result;
}

@GetMapping("/produced")
public Map<String, Object> getProduced() {
Map<String, Object> result = new LinkedHashMap<>();
result.put("count", producedMessages.size());
result.put("messages", new ArrayList<>(producedMessages));
return result;
}

@GetMapping("/errors")
public Map<String, Object> getErrors() {
Map<String, Object> result = new LinkedHashMap<>();
result.put("count", errorMessages.size());
result.put("errors", new ArrayList<>(errorMessages));
return result;
}

@PostMapping("/clear")
public Map<String, Object> clear() {
consumedMessages.clear();
producedMessages.clear();
errorMessages.clear();
Map<String, Object> result = new LinkedHashMap<>();
result.put("success", true);
return result;
}
}
Loading