Skip to content

✨ Creating tables in PySpark Azure¢

Python
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .appName("Delong-Synapse") \
        .getOrCreate()

spark.range(1, 2000).count()
Python
# sc = spark._jsc.sc() # spark config

spark_conf = spark._conf # spark config https//spark.apache.org/docs/latest/configuration.html
properties = [
    'spark.executor.cores', 
    'spark.executor.instances', 
    'spark.executor.memory', 
    'spark.driver.maxResultSize',
    'spark.sql.warehouse.dir' # location for managed table data and metadata
    ]

for prop in properties:
    print(f'{prop}: {spark_conf.get(prop)}')

Creating TablesΒΆ

Python
# Azure storage access info
account_name = 'td04agc96tgi00sa'
container_name = 'test'
# relative_path = ''
file_name =  'sales.csv'

linked_service_name = 'ls_blob'
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

wasbs_path = f'wasbs://{container_name}@{account_name}.blob.core.windows.net/{file_name}'
print(wasbs_path)

spark.conf.set(f'fs.azure.sas.{container_name}.{account_name}.blob.core.windows.net', blob_sas_token)

df = spark.read \
        .option('header', 'True') \
        .option('inferSchema', "true") \
        .csv(wasbs_path)
display(df)
Python
from pyspark.sql.functions import to_date, col

df_final = (df.withColumn("order_id", df["Order ID"]).drop("Order ID")
            .withColumn("order_date", to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date")
            .withColumn("item_type", df["Item Type"]).drop("Item Type")
            .withColumn("sales_channel", df["Sales Channel"]).drop("Sales Channel")
            .withColumn("units_sold", df["Units Sold"].cast('float')).drop("Units Sold")
            .withColumn("unit_price", df["Unit Price"].cast('float')).drop("Unit Price")
            .withColumn("total_revenue", df["Total Revenue"].cast('float')).drop("Total Revenue")
            .drop("Region", "Country", "Order Priority", "Ship Date", "Total Profit", "Total Cost", "Unit Cost")
            .distinct()
            )

display(df_final)

Create temporary view and parquet file for managed and unmanaged tables

Python
df_final.createOrReplaceTempView('df_final_view')
spark.sql("Select * from df_final_view").show(5)

df_final.repartition(2).write.mode("overwrite").save("wasbs://test@td04agc96tgi00sa.blob.core.windows.net/df_final.parquet")

Create Managed TablesΒΆ

Method 1ΒΆ

Python
df_final.write.mode("overwrite").saveAsTable("salesTable_managed1") # No path required

spark.catalog.listTables() # list managed tables

Method 2ΒΆ

SQL Command

SQL
CREATE TABLE IF NOT EXISTS salesTable_managed2 AS
SELECT * FROM df_final_view
Python
# Or using spark.sql()
spark.sql("CREATE TABLE IF NOT EXISTS salesTable_managed2 AS SELECT * FROM df_final_view")

Method 3ΒΆ

Crate table and insert data. Use this approach if you have to change column types or replace or append data

SQL
CREATE TABLE salestable_managed3 (
    order_id INT,
    order_date DATE,
    item_type STRING,
    sales_channel STRING,
    units_sold FLOAT,
    unit_price FLOAT,
    total_revenue FLOAT
);

INSERT INTO salestable_managed3
SELECT * FROM df_final_view
Python
spark.catalog.listTables()

Unmanaged TableΒΆ

Metod 1ΒΆ

Python
df_final.repartition(2).write.mode("overwrite") \
    .option("path", "wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/tables/salesTable_unmanaged1") \
    .saveAsTable("salesTable_unmanaged1")

Method 2ΒΆ

location as part of SQL query

SQL
CREATE EXTERNAL TABLE IF NOT EXISTS  salesTable_unmanaged2 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 'wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/tables/salesTable_unmanaged2' AS
SELECT * FROM df_final_view

Method 3ΒΆ

If the table already exists in location.

SQL
CREATE TABLE salestable_unmanaged3 (
    order_id INT,
    order_date DATE,
    item_type STRING,
    sales_channel STRING,
    units_sold FLOAT,
    unit_price FLOAT,
    total_revenue FLOAT
)
USING DELTA OPTIONS (path "wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/tables/salesTable_unmanaged3");

Select * from salesTable_unmanaged3 limit 10;
Python
spark.catalog.listTables()

Delete tablesΒΆ

Python
mssparkutils.fs.rm("wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/", True)

Executing Drop command delete metadata from both type of tables and data from managed tables only wherease data will be preserved in the external locations

SQL
-- DROP TABLE salestable_managed1;
-- DROP TABLE salestable_unmanaged1;