我对ETL领域还很陌生,我希望使用Cassandra 3.7和Spark实现增量数据加载.我知道Cassandra的更高版本确实支持CDC,但我只能使用Cassandra 3.7.是否有一种方法,我可以通过该方法仅跟踪更改的记录并使用spark加载它们,从而执行增量数据加载?
I'm very new to the ETL world and I wish to implement Incremental Data Loading with Cassandra 3.7 and Spark. I'm aware that later versions of Cassandra do support CDC, but I can only use Cassandra 3.7. Is there a method through which I can track the changed records only and use spark to load them, thereby performing incremental data loading?
如果不能在cassandra端完成,则在Spark方面也欢迎其他建议:)
If it can't be done on the cassandra end, any other suggestions are also welcome on the Spark side :)
这是一个广泛的话题,有效的解决方案将取决于表中的数据量,表结构,数据的插入/更新方式等.,具体解决方案可能取决于可用的Spark版本.纯火花方法的一个缺点是,如果没有完整的先前状态副本,就无法轻松检测数据的删除,因此您可以在两种状态之间产生差异.
It's quite a broad topic, and efficient solution will depend on the amount of data in your tables, table structure, how data is inserted/updated, etc. Also, specific solution may depend on the version of Spark available. One downside of Spark-only method is you can't easily detect deletes of the data, without having a complete copy of previous state, so you can generate a diff between 2 states.
在所有情况下,都需要执行全表扫描以查找更改的条目,但是如果表是专门为此任务组织的,则可以避免读取所有数据.例如,如果您有一个具有以下结构的表:
In all cases you'll need to perform full table scan to find changed entries, but if your table is organized specifically for this task, you can avoid reading of all data. For example, if you have a table with following structure:
create table test.tbl ( pk int, ts timestamp, v1 ..., v2 ..., primary key(pk, ts));
然后,如果您执行以下查询:
then if you do following query:
import org.apache.spark.sql.cassandra._ val data = spark.read.cassandraFormat("tbl", "test").load() val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")
然后,Spark Cassandra Connector将把该查询下推到Cassandra,并将仅读取 ts
在给定时间范围内的数据-您可以通过执行 filtered.explain
,并检查两个时间过滤器是否都标记有 *
符号.
then Spark Cassandra Connector will push this query down to the Cassandra, and will read only data where ts
is in the given time range - you can check this by executing filtered.explain
and checking that both time filters are marked with *
symbol.
另一种检测更改的方法是从Cassandra中检索写入时间,然后根据该信息过滤掉更改.写入时间的获取是从
从Another way to detect changes is to retrieve the write time from Cassandra, and filter out the changes based on that information. Fetching of writetime is supported in RDD API for all recent versions of SCC, and is supported in the Dataframe API since release of SCC 2.5.0 (requires at least Spark 2.4, although may work with 2.3 as well). After fetching this information, you can apply filters on the data & extract changes. But you need to keep in mind several things:
writetime
函数将生成错误,并且可能会返回 null
用于用户定义类型的列writetime
function will generate error when it's done for collection column (list/map/set), and will/may return null
for column with user-defined typeP.S.即使您启用了CDC,正确使用它也不是一件容易的事:
P.S. Even if you had CDC enabled, it's not a trivial task to use it correctly:
对于CDC,您可以在2019年DataStax Accelerate会议上寻找演示文稿-有关该主题的一些演讲.
For CDC you may look for presentations from 2019th DataStax Accelerate conference - there were several talks on that topic.
这篇关于Cassandra 3.7 CDC/增量数据加载的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程技术网(www.editcode.net)!