- Sometimes you need to create denormalized data from normalized data, for instance if you have data that looks like
CREATE TABLE flat ( propertyId string, propertyName String, roomname1 string, roomsize1 string, roomname2 string, roomsize2 int, .. )
but we want something like
CREATE TABLE nested ( propertyId string, propertyName string, rooms <array<struct<roomname:string,roomsize:int>> )
This can be done with a pretty horrific query, but we want to do it in spark sql by manipulating the rows programmatically.
Let’s see step by step, loading data from a CSV file with a flat structure, and inserting in a nested hive table.
These commands can be run from spark-shell. Later, when we write the buildRecord() function, we’ll have to wrap everything in an object because any code that is going to be executed in the workers needs to extend the Serializable trait.
import com.databricks.spark.csv._ import org.apache.spark.sql.Row // build hive context val hc = new org.apache.spark.sql.hive.HiveContext(sc) // load data (flat) val data = hc.csvFile("hdfs:///data/data.csv") // make hive aware of our RDD as a table hc.registerRDDAsTable(data,"data")
Notice that we used
hc.registerRDDAsTable(data)
instead of
data.registerTempTable()
I found that in spark 1.2.1, registerTempTable won’t make the table available to hive, and if you want to transfer data between actual hive tables and temporary tables, you have to use registerRDDAsTable or you’ll get a ‘table not found’ error from hive.
SchemaRDDs return data in form of object of class Row. Row is also how SchemaRDDs expect to receive data and hive tables are basically one form of SchemaRDDs.
If an RDD built from a CVS file had the same schema we could just do something like
hc.sql("insert into table1 select * from table2")
but in this case, before inserting, we have to transform the data so it has the same structure as the table we want to put it in.
We observe also that the structure of the record is two scalars, followed by an array of four structs.
We want to store it in a hive nested table, so we create it:
hc.sql("""CREATE TABLE IF NOT EXISTS nested ( propertyId string, propertyName string, rooms array<struct<roomname:string,roomsize:int>> ) STORED AS PARQUET """)
We can then build the record as:
val nestedRDD = data.map(buildRecord(_)) // this builds a nested record def buildRecord(r:Row):Row = { println(r) var res = Seq[Any]() // takes the first two elements res = res ++ r.slice(0,2) // now res = [ 'some id','some name'] // this will contain all the array elements var ary = Seq[Any]() // we assume there are 2 groups of columns for (i <- 0 to 1 ) { // 0-based indexes, takes (2,3) (4,5) .. //and converts to appropriate type ary = ary :+ Row( r.getString( 2 + 2 * i), r.getString(2 + 1 + 2*i).toInt ) } // adds array as an element and returns it res = res :+ ary Row.fromSeq(res) } }
Notice a few things here:
- we had to convert the data approriately. CSV files have a header with the field name, but not the type, so we must know in advance how to convert data. This could be done with a case class in Scala.
- We convert the scalars, when we have an array we just build a sequence (in this case a list), and when we have a struct we use Row
- Rows can be built in two ways, one as Row( element1, element2,..), but if you want to build them from a sequence, use Row.fromSeq like above.
Assuming the table called ‘nested’ was created as the CREATE TABLE definition earlier, we can use it to infer its schema and apply it to the newly built rdd.
// copy schema from hive table and apply to RDD val nested = hc.sql("select * from nested limit 0") val nestedRDDwithSchema = hc.applySchema(nestedRDD, nested.schema)
now we can insert, after registering the new rdd as a table
hc.registerRDDAsTable(nestedRDDwithSchema, "nestedrdd") hc.sql("insert into nested select * from nestedrdd")
et voilà !
Now data is available in hive/parquet/sparksql as nested:
hive> select * from nested; OK bhaa123 My house [{"roomname":"kitchen","roomsize":134},{"roomname":"bedroom","roomsize":345}] pasa372 Other house [{"roomname":"living room","roomsize":433},{"roomname":"bedroom","roomsize":332}]
Let’s see the complete code:
import com.databricks.spark.csv._ import org.apache.spark.sql.Row import org.apache.spark.{SparkConf,SparkContext} object nesting extends Serializable { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf()) val hc = new org.apache.spark.sql.hive.HiveContext(sc) // Change this to your file location val data = hc.csvFile("file:///data/data.csv") hc.registerRDDAsTable(data,"data") hc.sql("""CREATE TABLE IF NOT EXISTS nested ( propertyId string, propertyName string, rooms array<struct<roomname:string,roomsize:int>> ) STORED AS PARQUET """) val nestedRDD = data.map(buildRecord(_)) // after the map, Spark does not know the schema // of the result RDD. We can just copy it from the // hive table using applySchema val nested = hc.sql("select * from nested limit 0") val schemaNestedRDD = hc.applySchema(nestedRDD, nested.schema) hc.registerRDDAsTable(schemaNestedRDD,"schemanestedrdd") hc.sql("insert overwrite table nested select * from schemanestedrdd") } def buildRecord(r:Row):Row = { println(r) var res = Seq[Any]() // takes the first two elements res = res ++ r.slice(0,2) // now res = [ 'some id','some name'] var ary = Seq[Any]() // this will contain all the array elements // we assume there are 2 groups of columns for (i <- 0 to 1 ) { // 0-based indexes, takes (2,3) (4,5) .. // and converts to appropriate type ary = ary :+ Row( r.getString( 2 + 2 * i), r.getString(2 + 1 + 2*i).toInt ) } // adds array as an element and returns it res = res :+ ary Row.fromSeq(res) } }
The code is available on github here: https://github.com/rcongiu/spark-nested-example
Posted on April 4, 2015 by Roberto Congiu
0