跳转到主要内容
SEO Title
How to Use Schema Registry and Avro in Spring Boot Applications

TL;DR

接下来介绍如何在您的Spring Boot应用程序中使用Apache Kafka,这将展示如何开始使用Spring Boot和Apache Kafka?,这里我将演示如何在您的Spring Boot应用程序中启用合流模式注册表和Avro序列化格式。

使用Avro模式,可以在微服务应用程序之间建立数据协定。

完整的源代码可以在GitHub上下载。

Version Date Date
v1.0 7/31/19 Initial revision

先决条件

我们开始写吧

一如既往,我们将从generating a project starter 开始。在这个启动程序中,您应该启用“Spring for Apache Kafka”和“Spring Web starter”

Starter

Figure 1. Generate a new project with Spring Initializer.

<project>
    <dependencies>
        <!-- other dependencies -->
        <dependency>
            <groupId>io.confluent</groupId>
            
<artifactId>kafka-schema-registry-client</artifactId>   (1)
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>   (2)
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>   (3)
            <version>5.2.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>5.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <repositories>
        <!-- other maven repositories the project -->
        <repository>
            <id>confluent</id>   (4)   
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
    <plugins>
        <!-- other maven plugins in the project -->
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory>   (5)

<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
                        <stringType>String</stringType>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</project>
  1. Confluent Schema Registry client
  2. Avro dependency
  3. Avro SerDes
  4. Confluent Maven repository
  5. Source directory where you put your Avro files and store generated Java POJOs

 

Spring Boot应用程序的体系结构

您的应用程序将包括以下组件:

  • use.avsc:一个Avro文件
  • SpringAvroApplication.java:应用程序的起点。这个类还包括应用程序正在使用的新主题的配置。
  • Producer.java:封装Kafka Producer的组件
  • Consumer.java:来自Kafka主题的消息的侦听器
  • java:一个RESTful控制器,它接受HTTP命令以便在Kafka主题中发布消息

创建用户Avro文件

{
  "namespace": "io.confluent.developer",   (1)
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string",
      "avro.java.string": "String"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}

avro maven插件将在io.confluent.developer包中生成用户POJO。此POJO具有名称和年龄属性。

创建Spring Boot应用程序类

@SpringBootApplication
public class SpringAvroApplication {

  
  @Value("${topic.name}")   (1)
  private String topicName;

  @Value("${topic.partitions-num}")
  private Integer partitions;

  @Value("${topic.replication-factor}")
  private short replicationFactor;

  
  @Bean
  NewTopic moviesTopic() {   (2)
    return new NewTopic(topicName, partitions, replicationFactor);
  }

  
  public static void main(String[] args) {
    SpringApplication.run(SpringAvroApplication.class, args);
  }

}

这些是Spring从application.yaml文件中注入的主题参数。

Spring Boot基于提供的配置创建了一个新的Kafka主题。作为应用程序开发人员,您负责创建主题,而不是依赖于自动创建主题,这在生产环境中应该是错误的。

创建生产者组件

@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {

  @Value("${topic.name}")   (1) 
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  @Autowired
  public Producer(KafkaTemplate<String, User> kafkaTemplate) {   (2) 
    this.kafkaTemplate = kafkaTemplate;
  }

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, user.getName(), user);   (3)
    log.info(String.format("Produced user -> %s", user));
  }
}

将从application.yaml中注入主题名。

Spring将使用application.yaml中提供的属性初始化KafkaTemplate。

我们将使用 User 作为主键向主题发送消息。

Spring在应用程序启动期间实例化所有这些组件,应用程序准备好通过REST端点接收消息。默认的HTTP端口是9080,可以在application.yaml配置文件中进行更改。

创建消费者组件

@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {

  @Value("${topic.name}")   (1) 
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  @Autowired
  public Producer(KafkaTemplate<String, User> kafkaTemplate) {   (2) 
    this.kafkaTemplate = kafkaTemplate;
  }

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, user.getName(), user);   (3)
    log.info(String.format("Produced user -> %s", user));
  }
}

主题名称将从application.yaml中注入。

使用@KafkaListener注释,spring kafka框架将实例化一个新的使用者。

创建KafkaController组件

@RestController
@RequestMapping(value = "/user")   (1)
public class KafkaController {

  private final Producer producer;

  @Autowired
  KafkaController(Producer producer) {   (2)
    this.producer = producer;
  }

  @PostMapping(value = "/publish")
  public void sendMessageToKafkaTopic(@RequestParam("name") String name, @RequestParam("age") Integer age) {
    this.producer.sendMessage(new User(name, age));   (3)
  }
}

kafkaontroller映射到/user HTTP端点。

Spring注入producer组件。

当一个新请求到达/user/publish端点时,生产者将其发送给Kafka。

运行示例

先决条件

提示:在本指南中,我假设您已经安装了Java开发工具包(JDK)。如果你没有,我强烈建议使用SDKMAN!安装它。

您还需要在本地安装合流平台5.3或更新版本。如果你还没有它,跟随汇合平台快速启动。请确保也安装Confluent CLI(请参阅快速入门本节中的步骤4)。

启动Kafka和Schema注册表

confluent local start schema-registry

Confluent CLI提供用于管理本地Confluent平台安装的本地模式。合流CLI以正确的顺序启动每个组件。

Confluent CLI

您应该在终端中看到类似的输出。

构建和运行Spring Boot应用程序

在examples目录中,运行./mvnw clean package编译并生成一个可运行的JAR。之后,可以运行以下命令:

java-jar目标/kafka-avro-0.0.1-SNAPSHOT.jar

测试生产者/消费者REST服务

为了简单起见,我喜欢使用curl命令,但您可以使用任何REST客户端(如Postman或IntelliJ IDEA中的REST客户端):

curl -X POST -d “name=vik&age=33”  http://localhost:9080/user/publish

REST Client

2019-06-06 22:52:59.485  INFO 28910 --- [nio-9080-exec-1] Producer Logger                          : Produced user -> {"name": "vik", "age": 33}
2019-06-06 22:52:59.559  INFO 28910 --- [ntainer#0-0-C-1] Consumer Logger                          : Consumed message -> {"name": "vik", "age": 33}

使用Confluent Cloud运行应用程序

要将此演示应用程序与confluent cloud一起使用,您将需要托管架构注册表的端点和一个API键/密钥。一旦您选择了一个环境,就可以很容易地从confluent的cloud UI中检索到这两者。

Clusters

 

必须至少创建一个Kafka群集才能访问托管架构注册表。一旦选择Schema Registry选项,就可以检索端点并创建新的API/secret。

Schema Registry

confluent cloud 配置的示例可以在application-Cloud.yaml中找到:

topic:
  name: users
  partitions-num: 6
  replication-factor: 3
server:
  port: 9080
spring:
  kafka:
    bootstrap-servers:
      - mybootstrap.confluent.cloud:9092   (1)  
    properties:
      # CCloud broker connection parameters
      ssl.endpoint.identification.algorithm: https
      sasl.mechanism: PLAIN
      request.timeout.ms: 20000
      retry.backoff.ms: 500
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="ccloud_key" password="ccloud_secret";   (2) 
      security.protocol: SASL_SSL

      # CCloud Schema Registry Connection parameter
      schema.registry.url: https://schema-registry.aws.confluent.cloud   (3)  
      basic.auth.credentials.source: USER_INFO   (4)  
      schema.registry.basic.auth.user.info: sr_ccloud_key:sr_ccloud_key   (5) 
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info

 

  1. Cloud bootstrap server
  2. Broker key and secret
  3. Confluent Cloud Schema Registry URL
  4. Schema Registry authentication configuration
  5. Cloud Schema Registry key and secret
    • Note: Make sure to replace the dummy login and password information with actual values from your Confluent Cloud account.

要在云模式下运行此应用程序,请激活cloud Spring配置文件。在本例中,Spring Boot将获取application-cloud.yaml配置文件,该文件包含到合流云中数据的连接。

java -jar -Dspring.profiles.active=cloud target/kafka-avro-0.0.1-SNAPSHOT.jar

 

原文:https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/

本文:http://jiagoushi.pro/55555

讨论:请加入知识星球或者微信圈子【首席架构师圈】

,>,>,>,>

Tags