val sparkConf = new SparkConf().setAppName("ProcessEmployee") sparkConf.setMaster("local[2]").set("spark.mongodb.input.uri","mongodb://127.0.0.1:27017/hp.employee?readPreference=primaryPreferred") val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) import com.mongodb.spark.config._ val readConfig = ReadConfig(Map("collection" -> "employee", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc))) val customRdd = MongoSpark.load(sc, readConfig) println(customRdd.count) println(customRdd.first.toJson) import hiveContext.implicits._ val employee = customRdd.map(e => Employee(e.get("name").toString.trim,e.get("emp_id").toString.trim,e.get("department").toString.trim, e.get("company").toString.trim,e.get("cc").toString.trim,e.get("sbu").toString.trim,e.get("landline_num").toString.trim,e.get("phone_num").toString.trim, e.get("nt_num").toString.trim,e.get("email").toString.trim,e.get("work_city").toString.trim,e.get("sup_emp_id").toString.trim)).toDF() employee.printSchema() employee.registerTempTable("employee") val employeeNew = hiveContext.sql("select distinct emp_id,name,department,company,cc,sbu,landline_num,phone_num,nt_num,email,work_city,sup_emp_id from employee ") employeeNew.show() employeeNew.write.format("parquet").save("file:///D:/XXX.parquet") sc.stop() } case class Employee (name:String,emp_id:String,department:String,company:String,cc:String,sbu:String,landline_num:String,phone_num:String,nt_num:String,email:String,work_city:String,sup_emp_id:String)
|