SparkSQL
Spark SQL provides a special type of RDD called SchemaRDD. A SchemaRDD is an RDD of Row objects, each representing a record. A SchemaRDD also knows the schema (i.e., data fields) of its rows. While SchemaRDDs look like regular RDDs, internally they store data in a more efficient manner, taking advantage of their schema.SchemaRDDs can be created from external data sources, from the results of queries, or from regular RDDs.
If you can’t include the Hive dependencies, use the artifact ID spark-sql_2.10 instead of spark-hive_2.10.
When programming against Spark SQL we have two entry points depending on whether we need Hive support. The recommended entry point is the HiveContext to provide access to HiveQL and other Hive-dependent functionality. The more basic SQLContext provides a subset of the Spark SQL support that does not depend on Hive. The separation exists for users who might have conflicts with including all of the Hive dependencies. Using a HiveContext does not require an existing Hive setup.
Note that if you don’t have an existing Hive installation, Spark SQL will create its own Hive metastore (metadata DB) in your program’s work directory, called metastore_db. In addition, if you attempt to create tables using HiveQL’s CREATE TABLE statement (not CREATE EXTERNAL TABLE), they will be placed in the /user/hive/warehouse directory on your default filesystem (either your local filesystem, or HDFS if you have a hdfs-site.xml on your classpath).
//Loading and quering tweets in Java
SchemaRDD input = hiveCtx.jsonFile(inputFile);
// Register the input schema RDD
input.registerTempTable("tweets");
// Select tweets based on the retweetCount
SchemaRDD topTweets = hiveCtx.sql("SELECT text, retweetCount FROM
tweets ORDER BY retweetCount LIMIT 10");
SchemaRDDs or DataFrame
Both loading data and executing queries return SchemaRDDs. SchemaRDDs are similar to tables in a traditional database. Under the hood, a SchemaRDD is an RDD composed of Row objects with additional schema information of the types in each column. Row objects are just wrappers around arrays of basic types (e.g., integers and strings)
You can register any SchemaRDD as a temporary table to query it via HiveContext.sql or SQLContext.sql. You do so using the SchemaRDD’s registerTempTable() method.
Temp tables are local to the HiveContext or SQLContext being used, and go away when your application exits
Row objects represent records inside SchemaRDDs, and are simply fixed-length arrays of fields. In Scala/Java, Row objects have a number of getter functions to obtain the value of each field given its index. The standard getter, get (or apply in Scala), takes a column number and returns an Object type (or Any in Scala) that we are responsible for casting to the correct type. For Boolean, Byte, Double, Float, Int, Long, Short and String, there is a getType() method, which returns that type.
//Accessing the text column (also first column) in the topTweets SchemaRDD in Java
JavaRDD<String> topTweetText = topTweets.toJavaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0);
}});
//Hive load in Java
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SchemaRDD;
HiveContext hiveCtx = new HiveContext(sc);
SchemaRDD rows = hiveCtx.sql("SELECT key, value FROM mytable");
JavaRDD<Integer> keys = rdd.toJavaRDD().map(new Function<Row, Integer>() {
public Integer call(Row row) { return row.getInt(0); }
});
//Input records
{"name": "Holden"}
{"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}
//Loading JSON with Spark SQL in Java
SchemaRDD input = hiveCtx.jsonFile(jsonFile);
//Resulting schema from printSchema()
root
|-- knows: struct (nullable = true)
| |-- friends: array (nullable = true)
| | |-- element: string (containsNull = false)
|-- lovesPandas: boolean (nullable = true)
|-- name: string (nullable = true)
//Creating a SchemaRDD from a JavaBean in Java
class HappyPerson implements Serializable {
private String name;
private String favouriteBeverage;
public HappyPerson() {}
public HappyPerson(String n, String b) {
name = n; favouriteBeverage = b;
}
public String getName() { return name; public void setName(String n) { name = n; }
public String getFavouriteBeverage() { return favouriteBeverage; }
public void setFavouriteBeverage(String b) { favouriteBeverage = b; }
};
...
ArrayList<HappyPerson> peopleList = new ArrayList<HappyPerson>();
peopleList.add(new HappyPerson("holden", "coffee"));
JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);
SchemaRDD happyPeopleSchemaRDD = hiveCtx.applySchema(happyPeopleRDD,
HappyPerson.class);
happyPeopleSchemaRDD.registerTempTable("happy_people");
//Java UDF imports
// Import UDF function class and DataTypes
// Note: these import paths may change in a future release
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
//Example : Java string length UDF
hiveCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() {
@Override
public Integer call(String str) throws Exception {
return str.length();
}
}, DataTypes.IntegerType);
SchemaRDD tweetLength = hiveCtx.sql(
"SELECT stringLengthJava('text') FROM tweets LIMIT 10");
List<Row> lengths = tweetLength.collect();
for (Row row : result) {
System.out.println(row.get(0));
}
To make a Hive UDF available, simply call hiveCtx.sql(“CREATE TEMPORARY FUNCTION name AS class.function”).
There are several ways to interact with Spark SQL including SQL, the DataFrames API and the Datasets API.
The table below summarises which data types are available in each language, organized by Spark version