Published on

Compare code Pandas, PySpark, and Apache Hive

Authors
compare-code-pandas-pyspark-hive

Introduction

In the world of big data, data manipulation is a crucial task for any data engineer or data scientist. There are many tools available for data manipulation, including Pandas, PySpark, and Apache Hive. Each tool has its own strengths and weaknesses, and choosing the right tool for the job can make a big difference in productivity and performance.

When to use PySpark, Pandas, or Apache Hive

  • Pandas is a good choice when you need to perform data manipulation and analysis on smaller datasets that can fit into memory on a single machine. Pandas provides a wide range of functions for working with data, and is often faster and more convenient than SQL for small to medium-sized datasets.

  • PySpark is a good choice when you need to process very large datasets that cannot fit into memory on a single machine. PySpark can distribute the computation across a cluster of machines, allowing you to process datasets that are too big for a single machine.

  • Apache Hive is a good choice when you need to store and manage large datasets in a distributed computing environment. Hive provides an SQL-like interface to Hadoop, allowing you to query and analyze data stored in Hadoop using familiar SQL syntax. Hive is often used for batch processing of large datasets, and is especially useful for ETL (Extract, Transform, Load) workflows.

In general, the choice of tool will depend on the size of your dataset, the complexity of your analysis, and the resources available to you. If you are working with very large datasets, PySpark or Hive may be the best choice, while if you are working with smaller datasets, Pandas may be more appropriate.

Terms of comparison

  • Data selection and filtering
  • Data aggregation and grouping
  • Data joining and merging
  • Data transformation and cleaning
  • Insert Into
  • Create table as Select

Code comparison table

TypePandasPySparkHive
Selectdf = pd.read_sql("SELECT * FROM db.mytable", con=conn)df = spark.sql("SELECT * FROM db.mytable")SELECT * FROM db.mytable;
Filterdf[df['age'] > 30]df.filter(df.age > 30)SELECT * FROM db.mytable WHERE age > 30;
New columndf['new_col'] = df['col1'] + 5df = df.withColumn('new_col', col('col1') + 5)SELECT *, col1 + 5 AS new_col FROM db.mytable WHERE age > 30;
Joinpd.merge(df1, df2, left_on='id', right_on='id', how='left')df = df1.join(df2, df1.id == df2.id, 'left')SELECT * FROM db.table1 JOIN db.table2 ON table1.key = table2.key;
Group Bydf.groupby('category')['sales'].sum()df.groupBy('category').agg({'sales': 'sum'})SELECT category, SUM(sales) FROM mytable GROUP BY category;
Order Bydf.sort_values('col1', ascending=False)df.orderBy('col1').desc()ORDER BY col1 DESC
Uniondf1 = df1.append(df2)df = df1.union(df2)UNION
Insertdf.to_sql('my_table', con=conn, if_exists='append', index=False)df.write.insertInto("db.table1", overwrite=False)INSERT INTO table my_table SELECT * FROM table1
Create AS Selectdf.to_sql('my_table', con=conn, if_exists='fail', index=False)df.write.saveAsTable("employee")CREATE TABLE AS table my_table SELECT * FROM table1

Example ETL task

Pandas

example_pandas.py
import pandas as pd
from sqlalchemy import create_engine

# Set up connection to Hive using SQLAlchemy
engine = create_engine('hive://user:password@host:port/db')

# 1. Select data
df1 = pd.read_sql('SELECT * FROM table1', engine)
df2 = pd.read_sql('SELECT * FROM table2', engine)

# 2. Filter Row
df1 = df1[df1['age'] > 30]

# 3. Create new column
df1['year_retire'] = 60 - df1['age']

# 4. Data transformation and cleaning
df1['created_at'] = pd.to_datetime(df1['created_at'])
df1['first_name'] = df1['first_name'].apply(lambda x: x.replace("_", ""))

# 5. Merge the two datasets
df = pd.merge(df1, df2, left_on='id', right_on='id', how='left')

# 6. Group value by id
df = df.groupby('id')['year_retire'].min().reset_index(drop=False)

# 7. Order value
df = df.sort_values('id', ascending=False)

# 8. Create as Select
df.to_sql('new_table', con=engine, if_exists='fail', index=False)

Pyspark

example_pyspark.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

# create spark session
spark = SparkSession.builder.appName('myApp').getOrCreate()

# 1. Select data
df1 = spark.sql("SELECT * FROM db.table1")
df2 = spark.sql("SELECT * FROM db.table2")

# 2. Filter Row
df1 = df1.filter(col('age') > 30)

# 3. Create new column
df1 = df1.withColumn('year_retire', 60 - col('age'))

# 4. Data transformation and cleaning
df1 = df1.withColumn('created_at', 60 - to_date(col('created_at'), 'yyyy-MM-dd'))
df1 = df1.withColumn('first_name', regexp_replace(col('first_name'), "_", ""))

# 5. Merge the two datasets
df = df1.join(df2, df1.id == df2.id, 'left')

# 6. Group value by id
df.groupBy('id').agg({'year_retire': 'min'})

# 7. Order value
df = df.orderBy('id').desc()

# 8. Create as Select
df.write.saveAsTable("db.new_table", mode='append', partitionBy=None)

Apache Hive

example_hive.sql
WITH temp_table1 AS (
    SELECT -- 1. Select data
        id,
        age,
        col1 + 5 AS new_col, -- 3. Create new column
        60 - age AS year_retire,
        CAST(created_at AS TIMESTAMP) AS created_at, -- 4. Data transformation and cleaning
        regexp_replace(first_name, '_', '') AS first_name
    FROM
        db.table1
    WHERE -- 2. Filter Row
        age > 30
)
CREATE TABLE new_table AS -- 8. Create as Select
SELECT
    t1.id,
    MIN(t1.year_retire) AS year_retire,
FROM
    temp_table1 AS t1
LEFT JOIN -- 5. Merge the two datasets
    db.table2 ON table2.id = table1.id
GROUP BY -- 6. Group value by id
    table1.id
ORDER BY -- 7. Order value
    table1.id DESC

Summary

This comparison article provides an overview of data manipulation in three popular tools: Pandas, PySpark, and Apache Hive. By providing code examples and discussing the pros and cons of each approach, the article aims to help data engineers and data scientists choose the best tool for their specific use case.