It is common to ingest a large amount of data into the Hadoop Distributed File System (HDFS) for analysis. And more often than not, we need to periodically update that data with new changes. For a long time, the most common way to achieve this was to use Apache Hive to incrementally merge new or updated records to an existing dataset. An incremental merge can also be performed using Apache Spark. In this post, I am going to review the Hive incremental merge and explore how to incrementally update data using Spark SQL and Spark DataFrame.
All of the code samples in this blog can be found here.
Incremental Merge with Hive
Table: orders
order_no | customer_id | quantity | cost | order_date | last_updated_date |
001 | u1 | 1 | $15.00 | 03/01/2020 | 03/01/2020 |
002 | u2 | 1 | $30.00 | 04/01/2020 | 04/01/2020 |
Consider the orders table above. Now, let’s suppose we have received a cost update to the order number “002” in the order_updates table. We can perform an incremental merge using the following to merge the two tables and produce an updated order_reconciled table below.
Table: order_updates
order_no | customer_id | quantity | cost | order_date | last_updated_date |
002 | u2 | 1 | $20.00 | 04/01/2020 | 04/02/2020 |
File: reconcile_orders.hql
CREATE TABLE order_reconciled AS SELECT unioned.* FROM ( SELECT * FROM orders x UNION ALL SELECT * FROM order_updates y ) unioned JOIN ( SELECT order_no, max(last_updated_date) as max_date FROM ( SELECT * FROM orders UNION ALL SELECT * FROM order_updates ) t GROUP BY order_no ) grouped ON unioned.order_no = grouped.order_no AND unioned.last_updated_date = grouped.max_date;
Table: order_reconciled
order_no | customer_id | quantity | cost | order_date | last_updated_date |
001 | u1 | 1 | $15.00 | 03/01/2020 | 03/01/2020 |
002 | u2 | 1 | $20.00 | 04/01/2020 | 04/02/2020 |
The query first combines the orders table and order_updates table together, giving the new unioned table the alias t. Then get the latest version for each order_no by grouping table t by order_no and using the MAX() function, denoted with the alias grouped. Join grouped to the unioned set of data on order_no and unioned.last_updated_date = grouped.max_date which filters only rows with the latest last_updated_date for each order_no.
The above example merges the two datasets on a single group-by key, order_no. To merge with multiple keys, SELECT the key columns in the grouped subquery and GROUP BY the keys. Then, update the join conditions to match the keys with their corresponding unioned columns as highlighted in the HQL below.
CREATE TABLE order_reconciled AS SELECT unioned.* FROM (...) unioned JOIN ( SELECT order_no, customer_id, max(last_updated_date) as max_date FROM (...) t GROUP BY Order_no, customer_id ) grouped ON unioned.order_no = grouped.order_no AND unioned.customer_id = grouped.customer_id AND unioned.last_updated_date = grouped.max_date;
Please note that after HDP 2.6, the MERGE statement can be used to achieve this task instead. For more information please see the HDP documentation here.
Now let’s see how we can do the same thing in Spark.
Incremental Merge with Apache Spark
Spark SQL lets you run SQL statements against structured data inside Spark programs. Here’s how we can use the same HQL above to update the data in Spark.
First, let’s create the data as DataFrame and register them as SQL temporary views.
def createDF(rows: Seq[Row], schema: StructType): DataFrame = { spark.createDataFrame( sc.parallelize(rows), schema ) } val schema = StructType( List( StructField("order_no", StringType, true), StructField("customer_id", StringType, true), StructField("quantity", IntegerType, true), StructField("cost", DoubleType, true), StructField("order_date", DateType, true), StructField("last_updated_date", DateType, true) ) ) // Create orders dataframe val orders = Seq( Row( "001", "u1", 1, 15.00, Date.valueOf("2020-03-01"), Date.valueOf("2020-03-01") ), Row( "002", "u2", 1, 30.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-01") ) ) val ordersDF = createDF(orders, schema) // Create order_updates dataframe val orderUpdates = Seq( Row( "002", "u2", 1, 20.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-02") ) ) val orderUpdatesDF = createDF(orderUpdates, schema) // Register temporary views ordersDF.createOrReplaceTempView("orders") orderUpdatesDF.createOrReplaceTempView("order_updates")
Now, we can use the same HQL with Spark SQL to incrementally merge the two DataFrames.
val orderReconciledDF = spark.sql( """ |SELECT unioned.* |FROM ( | SELECT * FROM orders x | UNION ALL | SELECT * FROM order_updates y |) unioned |JOIN |( | SELECT | order_no, | max(last_updated_date) as max_date | FROM ( | SELECT * FROM orders | UNION ALL | SELECT * FROM order_updates | ) t | GROUP BY | order_no |) grouped |ON | unioned.order_no = grouped.order_no AND | unioned.last_updated_date = grouped.max_date """.stripMargin )
Output:
scala> orderReconciledDF.show()
order_no | customer_id | quantity | cost | order_date | last_updated_date |
002
001 |
u2
u1 |
1
1 |
12.0
20.0 |
2020-03-01
2020-04-01 |
2020-03-01
2020-04-02 |
Using Spark DataFrame API
Spark SQL also allows users to manipulate data using functional transformations with the DataFrame API. It is also easier when dealing with multiple tables or composite keys instead of hardcoded them in the HQL. In the code below, note that the unioned variable is when the union step is performed. The following Scala code shows how to incrementally merge the orders datasets.
val keys = Seq("order_no", "customer_id") val timestampCol = "last_updated_date" val keysColumns = keys.map(ordersDF(_)) val unioned = ordersDF.union(orderUpdatesDF) val grouped = unioned.groupBy(keysColumns: _*) .agg( max(timestampCol).as(timestampCol) ) val reconciled = grouped.join(unioned, keys :+ timestampCol)
Output:
scala> reconciled.show()
order_no | customer_id | last_updated_date | quantity | cost | order_date |
001
002 |
u1
u2 |
2020-03-01
2020-04-02 |
1
1 |
12.0
20.0 |
2020-03-01
2020-04-01 |
Writing the result back to Hive is simple and more efficient with Spark. When a query is run in Hive, it gets transformed into map-reduce jobs which perform disk I/O operations and store immediate results on Disk. Spark processes data in-memory on a distributed cluster of machines and uses a more optimized Directed Acyclic Graphs (DAG) processing engine to perform tasks.
reconciled.write .mode(SaveMode.Overwrite) .saveAsTable("order_reconciled")
Spark Outperforms Hive
Spark provides flexibility and options for users to best express their data transformations. The optimized Spark execution engine outperforms that of Hive. Both SQL query and DataFrame API use the same execution engine when computing a result, so there should not be any difference performance-wise. However when dealing with more tables and keys, the programmatic and functional approaches of Spark DataFrame provide a better way to achieve best practices.
Check out http://bitbucket.org/bsinchai/spark_incremental_merge for full code samples.
phData builds and optimizes Spark execution engines that make a noticeable impact on performance. For more information, read about our AWS Data Engineering services. Or, if you have specific questions about your Spark jobs, reach out to info@phdata.io and one of our experts will be happy to help out!