springcloud stream消费者和生产者以及消费组配置
- 2021-02-04 17:36:00
- admin 原创
- 3650
springcloud stream的配置
1、首先是pom文件的配置
<modelVersion>4.0.0</modelVersion> <groupId>com.didispace</groupId> <artifactId>stream-exception-handler-2</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Finchley.SR1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>2、application.yml配置
server:
port: 8080
spring:
application:
name: stream-exception-handler-2
cloud:
stream:
bindings:
example-topic-input:
consumer:
max-attempts: 1 #重试次数,1是从1开始的
destination: test-topic
group: stream-exception-handler #消费组
example-topic-output:
destination: test-topic
rabbitmq:
host: localhost
password: guest
port: 5672
username: guest
3、生产者和消费者代码
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return "ok"; } } /** * 消息消费逻辑 */ @Slf4j @Component static class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received payload : " + payload); //throw new RuntimeException("Message consumer failed!"); } /** * 消息消费失败的降级处理逻辑) * * @param message */ @ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors") public void error(Message<?> message) { log.info("Message consumer failed, call fallback!"); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); } }备注:当有自定义业务处理逻辑的时候,重试次数有三次,自定义处理逻辑是最后执行的,只执行一次
发表评论
文章分类
联系方式
联系人: | 郑州-小万 |
---|---|
电话: | 13803993919 |
Email: | 1027060531@qq.com |
QQ: | 1027060531 |
网址: | www.wanhejia.com |
Update Required
To play the media you will need to either update your browser to a recent version or update your Flash plugin.