跳转到主要内容
Chinese, Simplified

在如何在Spring Boot应用程序中使用Apache Kafka的后续操作(显示如何开始使用Spring Boot和ApacheKafka®)之后,这里我将演示如何在Spring Boot应用程序中启用Confluent Schema Registry和Avro序列化格式 。

使用Avro模式,您可以在微服务应用程序之间建立数据协定。
完整的源代码可在GitHub上下载。

先决条件

 

  • Java 8+
  • Confluent Platform 5.3或更高版本
  • 可选:融合云帐户

让我们开始写作


与往常一样,我们将从生成项目启动器开始。 在此启动程序中,应启用“ Spring for Apache Kafka”和“ Spring Web Starter”。

Starter

 

<project>
    <dependencies>
        <!-- other dependencies -->
        <dependency>
            <groupId>io.confluent</groupId>
            
<artifactId>kafka-schema-registry-client</artifactId>   (1)
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>   (2)
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>   (3)
            <version>5.2.1</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>5.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <repositories>
        <!-- other maven repositories the project -->
        <repository>
            <id>confluent</id>   (4)   
            <url>https://packages.confluent.io/maven/</url&gt;
        </repository>
    </repositories>
    <plugins>
        <!-- other maven plugins in the project -->
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory>   (5)

<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
                        <stringType>String</stringType>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</project>

  1. Confluent Schema Registry client
  2. Avro dependency
  3. Avro SerDes
  4. Confluent Maven repository
  5. Source directory where you put your Avro files and store generated Java POJOs

Spring Boot应用程序的架构


您的应用程序将包括以下组件:

use.avsc:一个Avro文件
SpringAvroApplication.java:您的应用程序的起点。 此类还包括应用程序正在使用的新主题的配置。
Producer.java:封装Kafka生产者的组件
Consumer.java:来自Kafka主题的消息的侦听器
KafkaController.java:一个RESTful控制器,它接受HTTP命令以在Kafka主题中发布消息


创建用户Avro文件

{
  "namespace": "io.confluent.developer",   (1)
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string",
      "avro.java.string": "String"
    },
    {
      "name": "age",
      "type": "int"
    }
  ]
}

avro-maven-plugin将在io.confluent.developer包中生成User POJO。 该POJO具有名称和年龄属性。


创建一个Spring Boot应用程序类

@SpringBootApplication
public class SpringAvroApplication {

  
  @Value("${topic.name}")   (1)
  private String topicName;

  @Value("${topic.partitions-num}")
  private Integer partitions;

  @Value("${topic.replication-factor}")
  private short replicationFactor;

  
  @Bean
  NewTopic moviesTopic() {   (2)
    return new NewTopic(topicName, partitions, replicationFactor);
  }

  
  public static void main(String[] args) {
    SpringApplication.run(SpringAvroApplication.class, args);
  }

}

这些是Spring从application.yaml文件注入的主题参数。
Spring Boot根据提供的配置创建一个新的Kafka主题。 作为应用程序开发人员,您负责创建主题,而不是依靠自动主题创建,这在生产环境中应该是错误的。


创建生产者组件

@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {

  @Value("${topic.name}")   (1) 
  private String TOPIC;

  private final KafkaTemplate<String, User> kafkaTemplate;

  @Autowired
  public Producer(KafkaTemplate<String, User> kafkaTemplate) {   (2) 
    this.kafkaTemplate = kafkaTemplate;
  }

  void sendMessage(User user) {
    this.kafkaTemplate.send(this.TOPIC, user.getName(), user);   (3)
    log.info(String.format("Produced user -> %s", user));
  }
}

  1. 主题名称将从application.yaml中注入。
  2. Spring将使用application.yaml中提供的属性来初始化KafkaTemplate。
  3. 我们将使用用户作为键将消息发送到主题。

Spring在应用程序启动期间实例化所有这些组件,并且应用程序准备通过REST端点接收消息。 默认的HTTP端口是9080,可以在application.yaml配置文件中进行更改。

@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {

  
  @Value("${topic.name}")   (1) 
  private String topicName;

  @KafkaListener(topics = "users", groupId = "group_id")   (2)
  public void consume(ConsumerRecord<String, User> record) {
    log.info(String.format("Consumed message -> %s", record.value()));
  }
}

  1. 主题名称将从application.yaml中注入。
  2. 使用@KafkaListener注释,spring-kafka框架将实例化一个新的使用者。


创建KafkaController组件

@RestController
@RequestMapping(value = "/user")   (1)
public class KafkaController {

  private final Producer producer;

  @Autowired
  KafkaController(Producer producer) {   (2)
    this.producer = producer;
  }

  @PostMapping(value = "/publish")
  public void sendMessageToKafkaTopic(@RequestParam("name") String name, @RequestParam("age") Integer age) {
    this.producer.sendMessage(new User(name, age));   (3)
  }
}

  1. KafkaController映射到/ user HTTP端点。
  2. Spring注入生产者组件。
  3. 当新请求到达/ user / publish端点时,生产者将其发送到Kafka。

运行示例


先决条件


提示:在本指南中,我假设您已安装Java开发工具包(JDK)。 如果您不这样做,我强烈建议您使用SDKMAN! 安装它。

您还需要在本地安装Confluent Platform 5.3或更高版本。 如果尚未安装,请遵循Confluent Platform快速入门。 确保还安装了Confluent CLI(请参阅快速入门本节中的步骤4)。

启动Kafka和Schema Registry

confluent local start schema-registry

Confluent CLI提供了用于管理本地Confluent Platform安装的本地模式。 Confluent CLI以正确的顺序启动每个组件。

Confluent CLI

您应该在终端中看到类似的输出。

构建并运行您的Spring Boot应用程序


在示例目录中,运行./mvnw clean package 以编译并生成可运行的JAR。 之后,您可以运行以下命令:

java -jar target/kafka-avro-0.0.1-SNAPSHOT.jar

测试生产者/消费者REST服务


为简单起见,我喜欢使用curl命令,但是您可以使用任何REST客户端(例如Postman或IntelliJ IDEA中的REST客户端):

curl -X POST -d 'name=vik&age=33' http://localhost:9080/user/publish

REST Client

2019-06-06 22:52:59.485  INFO 28910 --- [nio-9080-exec-1] Producer Logger                          : Produced user -> {"name": "vik", "age": 33}
2019-06-06 22:52:59.559  INFO 28910 --- [ntainer#0-0-C-1] Consumer Logger                          : Consumed message -> {"name": "vik", "age": 33}

 

使用Confluent Cloud运行应用程序


要将此演示应用程序与Confluent Cloud一起使用,您将需要托管Schema Registry的端点和API密钥/秘密。 选择环境后,都可以从Confluent Cloud UI轻松检索两者。

Clusters

必须至少创建一个Kafka集群才能访问您的托管模式注册表。 选择“架构注册表”选项后,您可以检索端点并创建新的API /秘密。

Schema Registry

Confluent Cloud配置示例可以在application-cloud.yaml中找到:

topic:
  name: users
  partitions-num: 6
  replication-factor: 3
server:
  port: 9080
spring:
  kafka:
    bootstrap-servers:
      - mybootstrap.confluent.cloud:9092   (1)  
    properties:
      # CCloud broker connection parameters
      ssl.endpoint.identification.algorithm: https
      sasl.mechanism: PLAIN
      request.timeout.ms: 20000
      retry.backoff.ms: 500
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="ccloud_key" password="ccloud_secret";   (2) 
      security.protocol: SASL_SSL

      # CCloud Schema Registry Connection parameter
      schema.registry.url: https://schema-registry.aws.confluent.cloud   (3)  
      basic.auth.credentials.source: USER_INFO   (4)  
      schema.registry.basic.auth.user.info: sr_ccloud_key:sr_ccloud_key   (5) 
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info

 

  1. 云引导服务器
  2. 经纪人密钥和秘密
  3. 融合云架构注册表URL
  4. 架构注册表身份验证配置
  5. Cloud Schema注册表项和机密

注意:确保用您的Confluent Cloud帐户中的实际值替换虚拟的登录名和密码信息。
要以云模式运行此应用程序,请激活云Spring配置文件。 在这种情况下,Spring Boot将选择application-cloud.yaml配置文件,其中包含与Confluent Cloud中数据的连接。

java -jar -Dspring.profiles.active=cloud target/kafka-avro-0.0.1-SNAPSHOT.jar

 

原文:https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial

本文:https://pub.intelligentx.net/how-use-schema-registry-and-avro-spring-boot-applications

讨论:请加入知识星球或者小红圈【首席架构师圈】

 

Article
知识星球
 
微信公众号
 
视频号