import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("IcebergSparkApp")
.config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.my_catalog.type", "hadoop")
.config("spark.sql.catalog.my_catalog.warehouse", "path/to/your/warehouse")
.getOrCreate();
case args[2] {
"C":
// Create Table with Merge-on-Read Enabled
TableCreation.createTable(spark, "my_catalog.my_table", args[3], args[4]);
break;
"S":
// Select, Upsert, and Delete Examples
Dataset<Row> result = SelectOperation.performSelect(spark, "my_catalog.my_table", args[3]);
result.show();
break;
"U":
UpsertOperation.performUpsert(spark, "my_catalog.my_table", args[3]);
break;
"D":
DeleteOperation.performDelete(spark, "my_catalog.my_table", args[3]);
break;
default:
}
spark.stop();
}
public class TableCreation {
public static void createTable(SparkSession spark, String tableName, String schema, String partitionColumn) {
String createTableQuery = String.format(
"CREATE TABLE IF NOT EXISTS %s (%s) " +
"USING iceberg " +
"PARTITIONED BY (%s) " +
"TBLPROPERTIES ('write.delete.mode'='merge-on-read')",
tableName, schema, partitionColumn
);
spark.sql(createTableQuery);
}
}
public class SelectOperation {
public static Dataset<Row> performSelect(SparkSession spark, String tableName, String condition) {
String query = String.format("SELECT * FROM %s WHERE %s", tableName, condition);
return spark.sql(query);
}
}
public class UpsertOperation {
public static void performUpsert(SparkSession spark, String tableName, String sourceData) {
String upsertQuery = String.format(
"MERGE INTO %s t USING (SELECT * FROM VALUES (%s) AS s(id, name, value)) " +
"ON t.id = s.id " +
"WHEN MATCHED THEN UPDATE SET t.name = s.name, t.value = s.value " +
"WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)",
tableName, sourceData);
spark.sql(upsertQuery);
}
}
public class DeleteOperation {
public static void performDelete(SparkSession spark, String tableName, String condition) {
String deleteQuery = String.format("DELETE FROM %s WHERE %s", tableName, condition);
spark.sql(deleteQuery);
}
}