Spark Example, Hive에서 읽기
문법 참조, spark-submit, Spark공식사이트
Hive 테이블을 읽는 Pyspark 프로그램 => read_hive_table.py
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
#custom function to access Hive Table
def FetchHiveTable():
fetch_sql = "select * from car_master.electric_cars"
# collect는 메모리로 모든 데이터를 올리기 때문에 주의해야 함.
table_res = spark.sql(fetch_sql).collect()
print(table_res)
for row in table_res:
car_model_name = row["car_model"]
car_price = row["price_in_usd"]
print("car model name : " + car_model_name)
print("car price : " + car_price)
print("for loop is exit")
#Main program starts here
if __name__ == "__main__":
appname = "ExtractCars"
#Creating Spark Session
spark = SparkSession.builder.appName(appname)
.config("hive.metastore.uris", "thrift://localhost:9083", conf=SparkConf())
.enableHiveSupport().getOrCreate()
print("Spark application name: " + appname)
FetchHiveTable()
spark.stop()
exit(0)
Pyspark 프로그램을 호출하는 쉘 스크립트 => test_script.sh
#!/bin/bash
echo "Info: Setting global variables"
export SPARK_MAJOR_VERSION=3
export SPARK_HOME=/usr/local/lib/spark
export PATH=$SPARK_HOME/bin:$PATH
spark-submit ./read_hive_table.py
Hive연결하고 Select하기
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
sparkSession = (SparkSession
.builder
.appName('example-pyspark-read-and-write-from-hive')
.enableHiveSupport()
.getOrCreate()
)
# data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
# df = sparkSession.createDataFrame(data)
## Write into Hive
# f.write.saveAsTable('example')
df_load = sparkSession.sql('SELECT * FROM example')
df_load.show()
print(df_load.show())