Spark操作Iceberg不仅可以使用SQL方式查询Iceberg中的数据,还可以使用DataFrame方式加载Iceberg表中的数据,可以通过spark.table(Iceberg表名)或者spark.read.format("iceberg").load("iceberg data path")来加载对应Iceberg表中的数据,操作如下:
在Iceberg中可以回滚快照,可以借助于Java 代码实现,Spark DataFrame Api 不能回滚快照,在Spark3.x版本之后,支持SQL回滚快照。回滚快照之后,Iceberg对应的表中会生成新的Snapshot-id,重新查询,回滚生效,具体操作如下:
//9.回滚到某个快照,rollbackTo(snapshot-id),指定的是固定的某个快照ID,回滚之后,会生成新的Snapshot-id, 重新查询生效。 val conf = new Configuration() val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg") catalog.setConf(conf) val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest")) table.manageSnapshots().rollbackTo(3368002881426159310L).commit()
//查询表 hadoop_prod.mydb.mytest 数据,已经是历史数据 spark.sql( """ |select * from hadoop_prod.mydb.mytest """.stripMargin).show(100)
结果如下:
九、合并Iceberg表的数据文件
针对Iceberg表每次commit都会生成一个parquet数据文件,有可能一张Iceberg表对应的数据文件非常多,那么我们通过Java Api 方式对Iceberg表可以进行数据文件合并,数据文件合并之后,会生成新的Snapshot且原有数据并不会被删除,如果要删除对应的数据文件需要通过“Expire Snapshots来实现”,具体操作如下:
//2) 合并小文件数据,Iceberg合并小文件时并不会删除被合并的文件,Compact是将小文件合并成大文件并创建新的Snapshot。如果要删除文件需要通过Expire Snapshots来实现,targetSizeInBytes 指定合并后的每个文件大小 val conf = new Configuration() val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg") val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest")) Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(1024)//1kb,指定生成合并之后文件大小
合并小文件后,Iceberg对应表元数据目录如下:
数据目录如下:
十、删除历史快照
目前我们可以通过Java Api 删除历史快照,可以通过指定时间戳,当前时间戳之前的所有快照都会被删除(如果指定时间比最后一个快照时间还大,会保留最新快照数据),可以通过查看最新元数据json文件来查找要指定的时间。例如,表mytest 最新的json元数据文件信息如下:
//11.删除历史快照,历史快照是通过ExpireSnapshot来实现的,设置需要删除多久的历史快照 val conf = new Configuration() val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg") val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest")) table.expireSnapshots().expireOlderThan(1640070000000L).commit()
CREATETABLE ${CataLog名称}.${库名}.${表名}( id bigint, name string ) using iceberg PARTITIONED BY( loc string ) TBLPROPERTIES ( 'write.metadata.delete-after-commit.enabled'=true, 'write.metadata.previous-version-max'=3 )