How to Insert data to remote Hive server from Spark

Spark is the buzz word in world of BigData now.So what makes Spark so unique?

As we know,

  • Spark is fast – it use in memory computation on special data objects called RDD (Resilient distributed data set)
  • Spark allows execution on multiple modes i.e. run standalone, run local (without even a hadoop server), on cluster through resource managers (Mesos, YARN)
  • Spark take care of data lineage, fault recovery through DAG(Direct Acyclic Graph) as blue print for execution, which can be rebuilt at any point in case of failures
  • Easy APIs – Easy to use APIs
  • Read from anywhere – Data can be read from different types of sources i.e. files, json, databases etc. e.g. CassandraAPI
  • Write to anywhere – Result data can be saved to any format
  • Multiple language support – Spark supports scala, java , python. For people from database and SQL background Spark is simplified, to run SQLs on RDD (called dataframes) through SparkSQL(known as shark earlier)
  • Spark is available for all kind of processing i.e. batch, stream, machine language, graphs
To run a full fledged program using spark is a two step process.
  1. Code has to be built and compiled (Java/Scala) or python
  2. Submit the code on cluster where spark runs using spark-submit or pyspark(for python)

The challenge with this approach is developer will be building code locally i.e on his/her computer and they have to submit the job on cluster. This becomes cumbersome if you have to develop/debug and test
so often. To overcome this challenge, there are multiple ways

  1. Have a local infrastructure on your machine. i.e. to have a mini version of your cluster on the local machine. Hmm. May not be practical in all cases. Though with this approach you can test functionality may not be performance side of it
  2. Run spark jobs remotely. That is much easy right? Okay lets see how to do it.
While we develop a driver application configuration for spark can be set through spark conf.
val c SparkConf().setAppName("My App Name").setMaster("local[8]")
Okay so what does setMaster do?
As you know a spark program i.e. your code/job is called a driver program which when submitted create a master jvm , executors jvm and tasks (this is a spark terminology – it is nothing but a thread pool) under executor.The driver program is submitted to one particular node where a master jvm thread is created which will schedule and synchronize the executors and worker threads.So going back to setMaster, that line of code just set which cluster manager to connect to while executing program i.e. where the driver program is going to run. If you are trying to connect to an external cluster ,use setMaster to point to the cluster where job has to be run.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
HOST/SPARK_MASTER_IP Bind the master to a specific IP address, for example a public one.
PORT/SPARK_MASTER_PORT Start the master on a different port (default: 7077).
3. Third one is a weird way, where you can still run spark locally and test the storage part on remote, for instance you have spark installed on your machine but do not have Hive. In this approach we will run the job locally and store the result on hive remotely. This method can be used when you just want to test if data is getting stored in Hive. Code is commented to explain steps in detail.
In example data is sampled just to test functional logic of the code,using sample method in RDD. Idea is just to ensure data can be saved from an RDD. Once the point is proved , code has to modified to persist data in Hive on a cluster.

Source data:

//All dependencies for program.
import org.apache.spark
import org.apache.spark.SparkContext
import scala.collection.mutable.Map
import org.apache.spark.SparkConf
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;
import org.apache.hive.jdbc.HiveDriver
object Hive {
//main function – just to call method findCityAggCountandPersistHiveRemotely
def main(args: Array[String]): Unit = {
//scala: new function to aggregate city counts and persist in Hive
def findCityAggCountandPersistHiveRemotely(): Unit = {
//Set up spark conf
val c SparkConf().setAppName(“Save result to hive remotely”).setMaster(“local[8]”)
//Set up spark context
val sc = new SparkContext(conf)
//get source file to RDD. Here path is local ideally it will be an HDFS path
val mydata = sc.textFile(“src/main/resources/Master.csv”)
//work on data RDD to split the file based on delimiter and take
// 6th position which is city name and group by one the city
//Do a sort on count and take a sample(.001% of the real output)
val myRDD = { mycity => mycity.split(“,”)(6) }.map { x => (x, 1) }
.reduceByKey({ case (x, y) => x + y }).sortBy(f => f._2, true).sample(false, .001)
//set up hive driver
var drivername: String = “org.apache.hadoop.hive.jdbc.HiveDriver”
//Loop on each parttion of RDD to save results to hive
myRDD.foreachPartition {
mydata =>
//set up a zookeeper connection to hive
var con: C “userid”, “pwd”)
//create statement on connection object
var stmt = con.createStatement()
//Loop each row from single partition
for (eachrow <- br=”” mydata=””> //form SQL var query = “insert into playercount values (‘” + eachrow._1 + “‘,'” +
<- br=”” mydata=””>  eachrow._2 + “‘)” //insert into table.Execute statement stmt.executeUpdate(query) } //close connection object con.close() } } }

Production version of code has to be modified to execute on cluster directly. Here I am persisting data to a file and in turn used to load Hive. Another approach could be saving directly as Hive table if table is pre created.

val c SparkConf().setAppName(“Save result to hive remotely”).setMaster(“local[12]”)
val sc = new SparkContext(conf)
val sqlC org.apache.spark.sql.hive.HiveContext(sc)
val mydata = sc.textFile(“src/main/resources/Master.csv”)
val myRDD = { mycity => mycity.split(“,”)(6) }.map { x => (x, 1) }
.reduceByKey({ case (x, y) => x + y }).saveAsTextFile(“hdfs://path”)
sqlContext.sql(“CREATE TABLE IF NOT EXISTS playercount
(city string, cnt_of_players STRING)”)
sqlContext.sql(“LOAD DATA LOCAL INPATH ‘hdfs://path’ INTO TABLE playercount”)
Output Result:

Credits :

Leave a Reply

Your email address will not be published. Required fields are marked *

Name *