跳转到主要内容
Chinese, Simplified

在开始使用Cassandra和时间序列数据时,人们面临的最大挑战之一是理解编写工作负载对集群的影响。过快地写入单个分区可能会创建热点,从而限制向外扩展的能力。分区太大可能会导致修复、流和读取性能方面的问题。从大分区的中间读取会带来很大的开销,并导致GC压力的增加。Cassandra 4.0应该可以提高大分区的性能,但是它不能完全解决我已经提到的其他问题。在可预见的未来,我们将需要考虑它们的性能影响,并相应地进行计划。

在这篇文章中,我将讨论一种常见的Cassandra数据建模技术,称为bucketing。Bucketing是一种策略,让我们可以控制每个分区中存储多少数据,以及将写出的数据分散到整个集群。这篇文章将讨论两种形式的攻击。当数据模型需要进一步扩展时,可以结合使用这些技术。读者应该已经熟悉了分区的解剖和基本的CQL命令。

当我们第一次使用Cassandra学习数据建模时,我们可能会看到如下内容:

CREATE TABLE raw_data (
    sensor text,
    ts timeuuid,
    readint int,
    primary key(sensor, ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
  AND compaction = {'class': 'TimeWindowCompactionStrategy', 
                    'compaction_window_size': 1, 
                    'compaction_window_unit': 'DAYS'};

 

这是存储一些非常简单的传感器数据的一个很好的第一个数据模型。通常我们收集的数据要比整数复杂得多,但在这篇文章中,我们将关注键。我们利用TWCS作为压缩战略。TWCS将帮助我们处理压缩大分区的开销,这将使我们的CPU和I/O处于控制之下。不幸的是,它仍然有一些明显的限制。如果我们不使用TTL,那么当我们接收更多数据时,我们的分区大小将无限地持续增长。如上所述,在修复、流化或从任意时间片读取数据时,大分区会带来很大的开销。

为了分解这个大分区,我们将利用第一种形式的bucketing。我们将根据时间窗口将我们的分区分成更小的分区。理想的大小是将分区保持在100MB以下。例如,如果我们每天存储50-75MB的数据,那么每天每个传感器一个分区就是一个不错的选择。只要分区不超过100MB,我们也可以简单地使用周(从某个纪元开始)、月和年。无论选择什么,留一点增长空间是个好主意。

为此,我们将向分区键添加另一个组件。修改之前的数据模型,我们将添加一个day字段:

CREATE TABLE raw_data_by_day (
sensor text,
day text,
ts timeuuid,
reading int,
primary key((sensor, day), ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
       AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                     'compaction_window_unit': 'DAYS', 
                     'compaction_window_size': 1};

插入到表中需要使用date和now()值(你也可以在你的应用代码中生成一个TimeUUID):

INSERT INTO raw_data_by_day (sensor, day, ts, reading) 
VALUES ('mysensor', '2017-01-01', now(), 10);

这是限制每个分区的数据量的一种方法。为了跨多天获取大量数据,您需要每天发出一个查询。这样查询的好处在于,我们可以将工作分散到整个集群,而不是要求单个节点执行大量工作。我们还可以通过依赖驱动程序中的异步调用并行地发出这些查询。对于这种用例,Python驱动程序甚至有一个方便的辅助函数:

from itertools import product
from cassandra.concurrent import execute_concurrent_with_args

days = ["2017-07-01", "2017-07-12", "2017-07-03"]  # collecting three days worth of data
session  = Cluster(["127.0.0.1"]).connect("blog")
prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")

args = product(["mysensor"], days) 
# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')

# driver handles concurrency for you
results = execute_concurrent_with_args(session, prepared, args)

# Results:
#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]

这种技术的一种变体是每个时间窗口使用不同的表。例如,每月使用一个表意味着每年有12个表:

CREATE TABLE raw_data_may_2017 (
    sensor text,
    ts timeuuid,
    reading int,
    primary key(sensor, ts)
) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                     'compaction_window_unit': 'DAYS', 
                     'compaction_window_size': 1};

这种策略的主要好处是有助于存档和快速删除旧数据。例如,在每个月的开始,我们可以将上个月的数据以拼花的格式归档到HDFS或S3中,利用便宜的存储来进行分析。当我们不再需要Cassandra中的数据时,我们可以简单地删除表。您可能会看到,在创建和删除表时需要进行一些额外的维护,因此,这种方法实际上只有在需要归档时才有用。还有其他存档数据的方法,因此这种类型的bucketing可能是不必要的。

上面的策略主要是防止分区在长时间内变得太大。如果我们有一个可预测的工作负载和有很小变化的分区大小,这是很好的。我们可能会摄入太多的信息,以至于单个节点无法写出数据,或者一小部分对象的摄入率要高得多。Twitter就是一个很好的例子,有些人拥有数千万的追随者,但这并不常见。对于我们需要大规模使用的这些类型的账户,通常会有一个单独的代码路径

第二种技术在任何给定时间使用多个分区将插入扇出到整个集群。这个策略的好处是,我们可以使用一个分区来处理小卷,使用多个分区来处理大卷。

我们在这个设计中所做的权衡是在读取时我们需要使用散射聚集,这有明显的更高的开销。这可能会使分页更加困难。我们需要能够跟踪我们为每个小发明摄取了多少数据。这是为了确保我们可以选择正确数量的分区来使用。如果我们使用太多的桶,我们就会在很多分区上执行很多非常小的读取操作。如果桶太少,我们会得到非常大的分区,这些分区不能很好地压缩、修复、流处理,并且读取性能很差。

在这个例子中,我们将研究一个理论模型,它适用于那些在Twitter这样的社交网络上关注大量用户的人。大多数帐户都可以使用一个单独的分区来接收消息,但有些人/机器人可能会关注数百万个帐户。

免责声明:我不知道Twitter实际上是如何存储他们的数据的,这只是一个简单的例子来讨论。

CREATE TABLE tweet_stream (
    account text,
    day text,
    bucket int,
    ts timeuuid,
    message text,
    primary key((account, day, bucket), ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
         AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                       'compaction_window_unit': 'DAYS', 
                       'compaction_window_size': 1};

 

这个数据模型扩展了前面的数据模型,将bucket添加到分区键中。现在,每天都可以从多个桶中获取数据。当需要读取时,我们需要从所有分区中获取所需的结果。为了演示,我们将插入一些数据到我们的分区:

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');

如果我们想要十个最新的消息,我们可以这样做:

from itertools import chain
from cassandra.util import unix_time_from_uuid1

prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")
# let's get 10 buckets 
partitions = range(10)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

args = product(["jon_haddad"], ["2017-07-01"], partitions)

result = execute_concurrent_with_args(session, prepared, args)

# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]

results = [x.result_or_exc for x in result]

# append all the results together
data = chain(*results)
            
sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)            

# newest stuff first
# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),
#  Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),
#  Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),
#  Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]

这个例子只使用了10个项目,所以我们可以作为懒惰的程序员,合并列表,然后对它们排序。如果我们想获取更多的元素我们就需要k路归并算法。我们将在以后的博客中进一步讨论这个话题。

此时,您应该对如何围绕集群分发数据和请求有了更好的理解,这使得集群可以比使用单个分区时扩展得更大。记住每个问题都是不同的,没有万能的解决方案。

 

原文:https://thelastpickle.com/blog/2017/08/02/time-series-data-modeling-massive-scale.html

本文:http://jiagoushi.pro/node/1348

讨论:请加入知识星球【首席架构师圈】或者小号【jiagoushi_pro】或者QQ群【1110777】

Tags
 
Article
知识星球
 
微信公众号
 
视频号