Logo

Apache Livy คืออะไร

avatar
Wuttichai Kaewlomsap@wuttichaihung
python-apache-livy

บทความนี้พูดถึงอะไรบ้าง

  1. What is Apache Livy ?
  2. ใช้ Apache livy ทำอะไร ?
  3. ขั้นตอนการใช้ Livy Rest API
  4. การใช้งาน Pylivy
  5. การใช้งานร่วมกับ Kerberos

What is Apache livy ?

เราเคยพูดกันถึงการส่งคำสั่ง HDFS Commands ด้วย Rest API ไปยัง Hadoop Cluster กันไปแล้ว ลิ้งนี้ ทีนี้ Apache Livy ก็เป็นเครื่องมือที่ให้ผู้ใช้งานส่งคำสั่งผ่าน Rest API เช่นกัน แต่สำหรับส่งไป Spark Cluster

ใช้ Apache livy ทำอะไร ?

เป้าหมายหลักคือการส่งงานไปให้ Spark ทำงาน หรือที่เราเรียกว่า Spark Submit Jobs แต่กว่าจะถึงจุดนั้นก็มีหลายขั้นตอนดังต่อไปนี้

ขั้นตอนการใช้ Livy Rest API

  1. จะส่งคำสั่งขอสร้าง Session ไปยัง Spark Cluster
ex1_create_session.py
import json, pprint, requests, textwrap
host = 'http://localhost:8998'
data = {'kind': 'pyspark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
r.json()
  1. รอจนกว่า Spark Cluster จะสร้าง Session เสร็จ ซึ่งเราต้องคอยเช็คว่าเมื่อไรสถานะ state == "idle" ก็คือพร้อมใช้งาน
ex2_get_session_status.py
session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
r.json()
  1. เมื่อทุกอย่างพร้อมเราก็ submit job ด้วย code ที่เตรียมไว้
ex3_summit_job.py
code = textwrap.dedent("""
    import random
    NUM_SAMPLES = 100000
    def sample(p):
      x, y = random.random(), random.random()
      return 1 if x*x + y*y < 1 else 0

    count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
    print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
    """)
data = {
  'code': code
}

statements_url = session_url + '/statements'
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()
  1. หลังจากส่งไปแล้ว เราก็ต้องไปทวงถามวนๆไปว่าทำงานเสร็จหรือยัง ได้ผลลัพธ์เท่าไร
ex4_get_result.py
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
r.json()
  1. สุดท้ายเมื่อได้ผลลัพธ์ที่ต้องการ ก็ทำการปิด Session
ex5_close_session.py
session_url = 'http://localhost:8998/sessions/' + session_id
requests.delete(session_url, headers=headers)

อ่านข้อมูล API เพิ่มเติมได้

ง่ายกว่านี้มีไหม Pylivy

มีสิ pylivy ช่วยท่านได้

ex5_pylivy.py
from livy import LivySession

with LivySession.create(host) as session:
    session.run(code)

เหลือเท่านี้เลย pylivy เป็นตัวที่รวมทั้ง 5 ขั้นตอนที่กล่าวมาไว้ให้ด้วยกันแล้ว ก็ช่วยให้เราสะดวกขึ้นเยอะมากๆ และนี้ก็เป็นอีกตัวอย่างการใช้ with statement อ่านเพิ่มเติม

การใช้งานร่วมกับ Kerberos

จะเพิ่มส่วนของการ auth auth_token = HTTPNegotiateAuth() เข้าไป

ex6_livy_kerberos.py
from requests_negotiate import HTTPNegotiateAuth
auth_token = HTTPNegotiateAuth()

statements_url = session_url + '/statements'
r = requests.post(statements_url, data=json.dumps(data), headers=headers, auth=auth_token)
r.json()
# or
with LivySession.create(host, auth=auth_token) as session:
    session.run(code)

ติดตามความรู้ Python ได้ที่ DataHungry

avatar

Wuttichai Kaewlomsap

Sr. Data Engineer