Compare code Pandas, PySpark, and Apache Hive
Introduction
In the world of big data, data manipulation is a crucial task for any data engineer or data scientist. There are many tools available for data manipulation, including Pandas, PySpark, and Apache Hive. Each tool has its own strengths and weaknesses, and choosing the right tool for the job can make a big difference in productivity and performance.
When to use PySpark, Pandas, or Apache Hive
Pandas is a good choice when you need to perform data manipulation and analysis on smaller datasets that can fit into memory on a single machine. Pandas provides a wide range of functions for working with data, and is often faster and more convenient than SQL for small to medium-sized datasets.
PySpark is a good choice when you need to process very large datasets that cannot fit into memory on a single machine. PySpark can distribute the computation across a cluster of machines, allowing you to process datasets that are too big for a single machine.
Apache Hive is a good choice when you need to store and manage large datasets in a distributed computing environment. Hive provides an SQL-like interface to Hadoop, allowing you to query and analyze data stored in Hadoop using familiar SQL syntax. Hive is often used for batch processing of large datasets, and is especially useful for ETL (Extract, Transform, Load) workflows.
In general, the choice of tool will depend on the size of your dataset, the complexity of your analysis, and the resources available to you. If you are working with very large datasets, PySpark or Hive may be the best choice, while if you are working with smaller datasets, Pandas may be more appropriate.
Terms of comparison
- Data selection and filtering
- Data aggregation and grouping
- Data joining and merging
- Data transformation and cleaning
- Insert Into
- Create table as Select
Code comparison table
Type | Pandas | PySpark | Hive |
---|---|---|---|
Select | df = pd.read_sql("SELECT * FROM db.mytable", con=conn) | df = spark.sql("SELECT * FROM db.mytable") | SELECT * FROM db.mytable; |
Filter | df[df['age'] > 30] | df.filter(df.age > 30) | SELECT * FROM db.mytable WHERE age > 30; |
New column | df['new_col'] = df['col1'] + 5 | df = df.withColumn('new_col', col('col1') + 5) | SELECT *, col1 + 5 AS new_col FROM db.mytable WHERE age > 30; |
Join | pd.merge(df1, df2, left_on='id', right_on='id', how='left') | df = df1.join(df2, df1.id == df2.id, 'left') | SELECT * FROM db.table1 JOIN db.table2 ON table1.key = table2.key; |
Group By | df.groupby('category')['sales'].sum() | df.groupBy('category').agg({'sales': 'sum'}) | SELECT category, SUM(sales) FROM mytable GROUP BY category; |
Order By | df.sort_values('col1', ascending=False) | df.orderBy('col1').desc() | ORDER BY col1 DESC |
Union | df1 = df1.append(df2) | df = df1.union(df2) | UNION |
Insert | df.to_sql('my_table', con=conn, if_exists='append', index=False) | df.write.insertInto("my_table", overwrite=False) | INSERT INTO my_table SELECT * FROM table1 |
Create AS Select | df.to_sql('my_table', con=conn, if_exists='fail', index=False) | df.write.saveAsTable("my_table") | CREATE TABLE AS my_table SELECT * FROM table1 |
Example ETL task
Pandas
import pandas as pd
from sqlalchemy import create_engine
# Set up connection to Hive using SQLAlchemy
engine = create_engine('hive://user:password@host:port/db')
# 1. Select data
df1 = pd.read_sql('SELECT * FROM table1', engine)
df2 = pd.read_sql('SELECT * FROM table2', engine)
# 2. Filter Row
df1 = df1[df1['age'] > 30]
# 3. Create new column
df1['year_retire'] = 60 - df1['age']
# 4. Data transformation and cleaning
df1['created_at'] = pd.to_datetime(df1['created_at'])
df1['first_name'] = df1['first_name'].apply(lambda x: x.replace("_", ""))
# 5. Merge the two datasets
df = pd.merge(df1, df2, left_on='id', right_on='id', how='left')
# 6. Group value by id
df = df.groupby('id')['year_retire'].min().reset_index(drop=False)
# 7. Order value
df = df.sort_values('id', ascending=False)
# 8. Create as Select
df.to_sql('new_table', con=engine, if_exists='fail', index=False)
Pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
# create spark session
spark = SparkSession.builder.appName('myApp').getOrCreate()
# 1. Select data
df1 = spark.sql("SELECT * FROM db.table1")
df2 = spark.sql("SELECT * FROM db.table2")
# 2. Filter Row
df1 = df1.filter(col('age') > 30)
# 3. Create new column
df1 = df1.withColumn('year_retire', 60 - col('age'))
# 4. Data transformation and cleaning
df1 = df1.withColumn('created_at', 60 - to_date(col('created_at'), 'yyyy-MM-dd'))
df1 = df1.withColumn('first_name', regexp_replace(col('first_name'), "_", ""))
# 5. Merge the two datasets
df = df1.join(df2, df1.id == df2.id, 'left')
# 6. Group value by id
df.groupBy('id').agg({'year_retire': 'min'})
# 7. Order value
df = df.orderBy('id').desc()
# 8. Create as Select
df.write.saveAsTable("db.new_table", mode='append', partitionBy=None)
Apache Hive
WITH temp_table1 AS (
SELECT -- 1. Select data
id,
age,
col1 + 5 AS new_col, -- 3. Create new column
60 - age AS year_retire,
CAST(created_at AS TIMESTAMP) AS created_at, -- 4. Data transformation and cleaning
regexp_replace(first_name, '_', '') AS first_name
FROM
db.table1
WHERE -- 2. Filter Row
age > 30
)
CREATE TABLE new_table AS -- 8. Create as Select
SELECT
t1.id,
MIN(t1.year_retire) AS year_retire,
FROM
temp_table1 AS t1
LEFT JOIN -- 5. Merge the two datasets
db.table2 ON table2.id = table1.id
GROUP BY -- 6. Group value by id
table1.id
ORDER BY -- 7. Order value
table1.id DESC
Summary
This comparison article provides an overview of data manipulation in three popular tools: Pandas, PySpark, and Apache Hive. By providing code examples and discussing the pros and cons of each approach, the article aims to help data engineers and data scientists choose the best tool for their specific use case.
Wuttichai Kaewlomsap
Sr. Data Engineer