Apache Livy คืออะไร
Wuttichai Kaewlomsap@wuttichaihung
บทความนี้พูดถึงอะไรบ้าง
- What is Apache Livy ?
- ใช้ Apache livy ทำอะไร ?
- ขั้นตอนการใช้ Livy Rest API
- การใช้งาน Pylivy
- การใช้งานร่วมกับ Kerberos
What is Apache livy ?
เราเคยพูดกันถึงการส่งคำสั่ง HDFS Commands ด้วย Rest API ไปยัง Hadoop Cluster กันไปแล้ว ลิ้งนี้ ทีนี้ Apache Livy ก็เป็นเครื่องมือที่ให้ผู้ใช้งานส่งคำสั่งผ่าน Rest API เช่นกัน แต่สำหรับส่งไป Spark Cluster
ใช้ Apache livy ทำอะไร ?
เป้าหมายหลักคือการส่งงานไปให้ Spark ทำงาน หรือที่เราเรียกว่า Spark Submit Jobs แต่กว่าจะถึงจุดนั้นก็มีหลายขั้นตอนดังต่อไปนี้
ขั้นตอนการใช้ Livy Rest API
- จะส่งคำสั่งขอสร้าง Session ไปยัง Spark Cluster
ex1_create_session.py
import json, pprint, requests, textwrap
host = 'http://localhost:8998'
data = {'kind': 'pyspark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()
- รอจนกว่า Spark Cluster จะสร้าง Session เสร็จ ซึ่งเราต้องคอยเช็คว่าเมื่อไรสถานะ
state == "idle"
ก็คือพร้อมใช้งาน
ex2_get_session_status.py
session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
r.json()
- เมื่อทุกอย่างพร้อมเราก็ submit job ด้วย code ที่เตรียมไว้
ex3_summit_job.py
code = textwrap.dedent("""
import random
NUM_SAMPLES = 100000
def sample(p):
x, y = random.random(), random.random()
return 1 if x*x + y*y < 1 else 0
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
""")
data = {
'code': code
}
statements_url = session_url + '/statements'
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()
- หลังจากส่งไปแล้ว เราก็ต้องไปทวงถามวนๆไปว่าทำงานเสร็จหรือยัง ได้ผลลัพธ์เท่าไร
ex4_get_result.py
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
r.json()
- สุดท้ายเมื่อได้ผลลัพธ์ที่ต้องการ ก็ทำการปิด Session
ex5_close_session.py
session_url = 'http://localhost:8998/sessions/' + session_id
requests.delete(session_url, headers=headers)
อ่านข้อมูล API เพิ่มเติมได้
ง่ายกว่านี้มีไหม Pylivy
มีสิ pylivy ช่วยท่านได้
ex5_pylivy.py
from livy import LivySession
with LivySession.create(host) as session:
session.run(code)
เหลือเท่านี้เลย pylivy เป็นตัวที่รวมทั้ง 5 ขั้นตอนที่กล่าวมาไว้ให้ด้วยกันแล้ว ก็ช่วยให้เราสะดวกขึ้นเยอะมากๆ และนี้ก็เป็นอีกตัวอย่างการใช้ with statement อ่านเพิ่มเติม
การใช้งานร่วมกับ Kerberos
จะเพิ่มส่วนของการ auth auth_token = HTTPNegotiateAuth()
เข้าไป
ex6_livy_kerberos.py
from requests_negotiate import HTTPNegotiateAuth
auth_token = HTTPNegotiateAuth()
statements_url = session_url + '/statements'
r = requests.post(statements_url, data=json.dumps(data), headers=headers, auth=auth_token)
r.json()
# or
with LivySession.create(host, auth=auth_token) as session:
session.run(code)
ติดตามความรู้ Python ได้ที่ DataHungry
Wuttichai Kaewlomsap
Sr. Data Engineer