β¨ Creating tables in PySpark AzureΒΆ
Python
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Delong-Synapse") \
.getOrCreate()
spark.range(1, 2000).count()
Python
# sc = spark._jsc.sc() # spark config
spark_conf = spark._conf # spark config https//spark.apache.org/docs/latest/configuration.html
properties = [
'spark.executor.cores',
'spark.executor.instances',
'spark.executor.memory',
'spark.driver.maxResultSize',
'spark.sql.warehouse.dir' # location for managed table data and metadata
]
for prop in properties:
print(f'{prop}: {spark_conf.get(prop)}')
Creating TablesΒΆ
Python
# Azure storage access info
account_name = 'td04agc96tgi00sa'
container_name = 'test'
# relative_path = ''
file_name = 'sales.csv'
linked_service_name = 'ls_blob'
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
wasbs_path = f'wasbs://{container_name}@{account_name}.blob.core.windows.net/{file_name}'
print(wasbs_path)
spark.conf.set(f'fs.azure.sas.{container_name}.{account_name}.blob.core.windows.net', blob_sas_token)
df = spark.read \
.option('header', 'True') \
.option('inferSchema', "true") \
.csv(wasbs_path)
display(df)
Python
from pyspark.sql.functions import to_date, col
df_final = (df.withColumn("order_id", df["Order ID"]).drop("Order ID")
.withColumn("order_date", to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date")
.withColumn("item_type", df["Item Type"]).drop("Item Type")
.withColumn("sales_channel", df["Sales Channel"]).drop("Sales Channel")
.withColumn("units_sold", df["Units Sold"].cast('float')).drop("Units Sold")
.withColumn("unit_price", df["Unit Price"].cast('float')).drop("Unit Price")
.withColumn("total_revenue", df["Total Revenue"].cast('float')).drop("Total Revenue")
.drop("Region", "Country", "Order Priority", "Ship Date", "Total Profit", "Total Cost", "Unit Cost")
.distinct()
)
display(df_final)
Create temporary view and parquet file for managed and unmanaged tables
Python
df_final.createOrReplaceTempView('df_final_view')
spark.sql("Select * from df_final_view").show(5)
df_final.repartition(2).write.mode("overwrite").save("wasbs://test@td04agc96tgi00sa.blob.core.windows.net/df_final.parquet")
Create Managed TablesΒΆ
Method 1ΒΆ
Python
df_final.write.mode("overwrite").saveAsTable("salesTable_managed1") # No path required
spark.catalog.listTables() # list managed tables
Method 2ΒΆ
SQL Command
Python
# Or using spark.sql()
spark.sql("CREATE TABLE IF NOT EXISTS salesTable_managed2 AS SELECT * FROM df_final_view")
Method 3ΒΆ
Crate table and insert data. Use this approach if you have to change column types or replace or append data
SQL
CREATE TABLE salestable_managed3 (
order_id INT,
order_date DATE,
item_type STRING,
sales_channel STRING,
units_sold FLOAT,
unit_price FLOAT,
total_revenue FLOAT
);
INSERT INTO salestable_managed3
SELECT * FROM df_final_view
Unmanaged TableΒΆ
Metod 1ΒΆ
Python
df_final.repartition(2).write.mode("overwrite") \
.option("path", "wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/tables/salesTable_unmanaged1") \
.saveAsTable("salesTable_unmanaged1")
Method 2ΒΆ
location as part of SQL query
SQL
CREATE EXTERNAL TABLE IF NOT EXISTS salesTable_unmanaged2
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 'wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/tables/salesTable_unmanaged2' AS
SELECT * FROM df_final_view
Method 3ΒΆ
If the table already exists in location.
SQL
CREATE TABLE salestable_unmanaged3 (
order_id INT,
order_date DATE,
item_type STRING,
sales_channel STRING,
units_sold FLOAT,
unit_price FLOAT,
total_revenue FLOAT
)
USING DELTA OPTIONS (path "wasbs://test@td04agc96tgi00sa.blob.core.windows.net/filestore/tables/salesTable_unmanaged3");
Select * from salesTable_unmanaged3 limit 10;
Delete tablesΒΆ
Executing Drop command delete metadata from both type of tables and data from managed tables only wherease data will be preserved in the external locations