在如何在Spring Boot应用程序中使用Apache Kafka的后续操作(显示如何开始使用Spring Boot和ApacheKafka®)之后,这里我将演示如何在Spring Boot应用程序中启用Confluent Schema Registry和Avro序列化格式 。
使用Avro模式,您可以在微服务应用程序之间建立数据协定。
完整的源代码可在GitHub上下载。
先决条件
- Java 8+
- Confluent Platform 5.3或更高版本
- 可选:融合云帐户
让我们开始写作
与往常一样,我们将从生成项目启动器开始。 在此启动程序中,应启用“ Spring for Apache Kafka”和“ Spring Web Starter”。
<project>
<dependencies>
<!-- other dependencies -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId> (1)
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> (2)
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId> (3)
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- other maven repositories the project -->
<repository>
<id>confluent</id> (4)
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<plugins>
<!-- other maven plugins in the project -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory> (5)<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</project>
- Confluent Schema Registry client
- Avro dependency
- Avro SerDes
- Confluent Maven repository
- Source directory where you put your Avro files and store generated Java POJOs
Spring Boot应用程序的架构
您的应用程序将包括以下组件:
use.avsc:一个Avro文件
SpringAvroApplication.java:您的应用程序的起点。 此类还包括应用程序正在使用的新主题的配置。
Producer.java:封装Kafka生产者的组件
Consumer.java:来自Kafka主题的消息的侦听器
KafkaController.java:一个RESTful控制器,它接受HTTP命令以在Kafka主题中发布消息
创建用户Avro文件
{
"namespace": "io.confluent.developer", (1)
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string",
"avro.java.string": "String"
},
{
"name": "age",
"type": "int"
}
]
}
avro-maven-plugin将在io.confluent.developer包中生成User POJO。 该POJO具有名称和年龄属性。
创建一个Spring Boot应用程序类
@SpringBootApplication
public class SpringAvroApplication {
@Value("${topic.name}") (1)
private String topicName;@Value("${topic.partitions-num}")
private Integer partitions;@Value("${topic.replication-factor}")
private short replicationFactor;
@Bean
NewTopic moviesTopic() { (2)
return new NewTopic(topicName, partitions, replicationFactor);
}
public static void main(String[] args) {
SpringApplication.run(SpringAvroApplication.class, args);
}}
这些是Spring从application.yaml文件注入的主题参数。
Spring Boot根据提供的配置创建一个新的Kafka主题。 作为应用程序开发人员,您负责创建主题,而不是依靠自动主题创建,这在生产环境中应该是错误的。
创建生产者组件
@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {@Value("${topic.name}") (1)
private String TOPIC;private final KafkaTemplate<String, User> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<String, User> kafkaTemplate) { (2)
this.kafkaTemplate = kafkaTemplate;
}void sendMessage(User user) {
this.kafkaTemplate.send(this.TOPIC, user.getName(), user); (3)
log.info(String.format("Produced user -> %s", user));
}
}
- 主题名称将从application.yaml中注入。
- Spring将使用application.yaml中提供的属性来初始化KafkaTemplate。
- 我们将使用用户作为键将消息发送到主题。
Spring在应用程序启动期间实例化所有这些组件,并且应用程序准备通过REST端点接收消息。 默认的HTTP端口是9080,可以在application.yaml配置文件中进行更改。
@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {
@Value("${topic.name}") (1)
private String topicName;@KafkaListener(topics = "users", groupId = "group_id") (2)
public void consume(ConsumerRecord<String, User> record) {
log.info(String.format("Consumed message -> %s", record.value()));
}
}
- 主题名称将从application.yaml中注入。
- 使用@KafkaListener注释,spring-kafka框架将实例化一个新的使用者。
创建KafkaController组件
@RestController
@RequestMapping(value = "/user") (1)
public class KafkaController {private final Producer producer;
@Autowired
KafkaController(Producer producer) { (2)
this.producer = producer;
}@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("name") String name, @RequestParam("age") Integer age) {
this.producer.sendMessage(new User(name, age)); (3)
}
}
- KafkaController映射到/ user HTTP端点。
- Spring注入生产者组件。
- 当新请求到达/ user / publish端点时,生产者将其发送到Kafka。
运行示例
先决条件
提示:在本指南中,我假设您已安装Java开发工具包(JDK)。 如果您不这样做,我强烈建议您使用SDKMAN! 安装它。
您还需要在本地安装Confluent Platform 5.3或更高版本。 如果尚未安装,请遵循Confluent Platform快速入门。 确保还安装了Confluent CLI(请参阅快速入门本节中的步骤4)。
启动Kafka和Schema Registry
confluent local start schema-registry
Confluent CLI提供了用于管理本地Confluent Platform安装的本地模式。 Confluent CLI以正确的顺序启动每个组件。
您应该在终端中看到类似的输出。
构建并运行您的Spring Boot应用程序
在示例目录中,运行./mvnw clean package 以编译并生成可运行的JAR。 之后,您可以运行以下命令:
java -jar target/kafka-avro-0.0.1-SNAPSHOT.jar
测试生产者/消费者REST服务
为简单起见,我喜欢使用curl命令,但是您可以使用任何REST客户端(例如Postman或IntelliJ IDEA中的REST客户端):
curl -X POST -d 'name=vik&age=33' http://localhost:9080/user/publish
2019-06-06 22:52:59.485 INFO 28910 --- [nio-9080-exec-1] Producer Logger : Produced user -> {"name": "vik", "age": 33}
2019-06-06 22:52:59.559 INFO 28910 --- [ntainer#0-0-C-1] Consumer Logger : Consumed message -> {"name": "vik", "age": 33}
使用Confluent Cloud运行应用程序
要将此演示应用程序与Confluent Cloud一起使用,您将需要托管Schema Registry的端点和API密钥/秘密。 选择环境后,都可以从Confluent Cloud UI轻松检索两者。
必须至少创建一个Kafka集群才能访问您的托管模式注册表。 选择“架构注册表”选项后,您可以检索端点并创建新的API /秘密。
Confluent Cloud配置示例可以在application-cloud.yaml中找到:
topic:
name: users
partitions-num: 6
replication-factor: 3
server:
port: 9080
spring:
kafka:
bootstrap-servers:
- mybootstrap.confluent.cloud:9092 (1)
properties:
# CCloud broker connection parameters
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="ccloud_key" password="ccloud_secret"; (2)
security.protocol: SASL_SSL# CCloud Schema Registry Connection parameter
schema.registry.url: https://schema-registry.aws.confluent.cloud (3)
basic.auth.credentials.source: USER_INFO (4)
schema.registry.basic.auth.user.info: sr_ccloud_key:sr_ccloud_key (5)
consumer:
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
template:
default-topic:
logging:
level:
root: info
- 云引导服务器
- 经纪人密钥和秘密
- 融合云架构注册表URL
- 架构注册表身份验证配置
- Cloud Schema注册表项和机密
注意:确保用您的Confluent Cloud帐户中的实际值替换虚拟的登录名和密码信息。
要以云模式运行此应用程序,请激活云Spring配置文件。 在这种情况下,Spring Boot将选择application-cloud.yaml配置文件,其中包含与Confluent Cloud中数据的连接。
java -jar -Dspring.profiles.active=cloud target/kafka-avro-0.0.1-SNAPSHOT.jar
原文:https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial
本文:https://pub.intelligentx.net/how-use-schema-registry-and-avro-spring-boot-applications
讨论:请加入知识星球或者小红圈【首席架构师圈】
- 登录 发表评论
- 73 次浏览