跳转到主要内容
Chinese, Simplified

Heroku最近宣布了对新的Heroku Kafka服务的早期访问,虽然我听说了关于ApacheKafka的一些好消息,但我没有玩过它,因为我太懒了,无法自己设置这种东西。现在我可以通过设置一个Heroku插件来设置Kafka集群,我想是时候试试了。

如果你不熟悉卡夫卡,这是一个下一代的信息系统。它使用pub-sub,横向扩展,并具有内置的消息持久性和传递保证。卡夫卡最初是在LinkedIn创建的,但现在几乎所有需要通过转换管道移动大量数据的进步企业都在使用它。

在学习卡夫卡的过程中,我想构建一个非常简单的东西:一个事件生产者,它只向卡夫卡主题发送随机数,一个事件消费者,它接收这些随机数,并通过WebSocket将它们发送到浏览器。我决定使用Play框架(Play Framework)和Akka流(Akka Streams)实现反应流(Reactive Streams.)。

在反应流中,有相当标准的“源”和“汇”,其中事件生产者是源,消费者是汇。“流”是源和汇之间的配对,具有可选的转换。在我的例子中,有两个应用程序,每个都有一个流。工作进程将向Kafka发送随机数,因此其源将定期生成随机数,其汇将为Kafka。在web进程中,源代码是Kafka,接收器是一个WebSocket,它将随机数推送到浏览器中。

下面是省略了一些必要配置的worker应用程序(查看完整的源代码check out the full source):

 

object RandomNumbers extends App {

  val tickSource = Source.tick(Duration.Zero, 500.milliseconds, Unit).map(_ => Random.nextInt().toString)

  kafka.sink("RandomNumbers").map { kafkaSink =>
    tickSource
      .map(new ProducerRecord[String, String]("RandomNumbers", _))
      .to(kafkaSink)
      .run()(app.materializer)
  }

}

 

tickSource是每500毫秒生成一个新的随机Int的源。该源连接到一个Kafka接收器,该接收器的流将Int转换为ProducerRecord(用于Kafka)。这使用了Reactive Kafka库,它是一个用于处理Kafka的Reactive Streams API。

在web应用程序方面,Play Framework内置了对WebSockets使用反应流的支持,因此我们只需要一个控制器方法,它从Kafka主题创建一个源,并将其挂接到WebSocket流(完整源):

def ws = WebSocket.acceptOrResult[Any, String] { _ =>
  kafka.source(Set("RandomNumbers")) match {
    case Failure(e) =>
      Future.successful(Left(InternalServerError("Could not connect to Kafka")))
    case Success(source) =>
      val flow = Flow.fromSinkAndSource(Sink.ignore, source.map(_.value))
      Future.successful(Right(flow))
  }
}

请注意,流有一个Sink.ignore,它只表示忽略WebSocket上的任何传入消息(从浏览器发送的消息)。Play会处理所有的底层内容,然后每当Kafka收到关于“随机数”主题的消息时,就会通过WebSocket发送出去。

这一切都奏效了!

hello-play-kafka

Check out the full source for instructions on how to get this example setup on your machine and on Heroku. Let me know how it goes!

 

原文:https://jamesward.com/2016/05/25/combining-reactive-streams-heroku-kafka-and-play-framework/

本文:

讨论:请加入知识星球或者微信圈子【首席架构师圈】

Tags
 
Article
知识星球
 
微信公众号
 
视频号