Skip to content

Creating partition in delta lake with sparkΒΆ

How to create such a partition and insert into delta lakeΒΆ

To create a partition in Delta Lake and insert data, you can use the following steps:

  1. Create a table with the partition column specified, in this case, "event_dt":
    SQL
      spark.sql("""
      CREATE TABLE table_name (
      ...
      )
      USING DELTA
      PARTITIONED BY (event_dt string)
      """)
    
  2. Insert data into the table and specify the partition value: data.write.format("delta").partitionBy("event_dt").mode("append").save("/path/to/table_name")
  3. You can also use the insertInto method to insert data into the table with partition data.write.format("delta").insertInto("table_name",overwrite=False)
  4. Now you can query your data by event_dt spark.sql("SELECT * FROM table_name WHERE event_dt = '2022-01-01'")

Node

data.write.partitionBy("event_dt").save("/path/to/table_name", format="delta", mode="append") instead of data.write.format("delta").partitionBy("event_dt").mode("append").save("/path/to/table_name")

It's important to note that when you partition the table by 'event_dt', it will create a subdirectory for each unique value of 'event_dt' in the table, which will allow for faster queries on that column.

How to partition by event_dt and symbol for stock data to make query performant.ΒΆ

To partition a Delta Lake table by multiple columns, such as "event_dt" and "symbol" for stock data, you can use the following steps:

  1. Create a table with the partition columns specified, in this case, "event_dt" and "symbol":
    SQL
      spark.sql("""
      CREATE TABLE table_name (
      ...
      )
      USING DELTA
      PARTITIONED BY (event_dt string, symbol string)
      """)
    
  2. Insert data into the table and specify the partition values: data.write.format("delta").partitionBy("event_dt","symbol").mode("append").save("/path/to/table_name")
  3. You can also use the insertInto method to insert data into the table with partition data.write.format("delta").insertInto("table_name",overwrite=False)
  4. Now you can query your data by event_dt and symbol spark.sql("SELECT * FROM table_name WHERE event_dt = '2022-01-01' and symbol = 'AAPL'")

It's important to note that when you partition the table by 'event_dt' and 'symbol', it will create a subdirectory for each unique combination of 'event_dt' and 'symbol' in the table, which will allow for faster queries on those columns.

Also, you can use the data.write.partitionBy("event_dt","symbol").save("/path/to/table_name", format="delta", mode="append") instead of data.write.format("delta").partitionBy("event_dt","symbol").mode("append").save("/path/to/table_name")

It is advisable to partition on the columns that are most frequently used in filter conditions or join conditions.

what's the performance impact if we query only by event_dtΒΆ

If you query a Delta Lake table that is partitioned by both "event_dt" and "symbol" but only include a filter on "event_dt", the performance of the query will still be relatively good.

When you filter on only one of the partition columns, the query engine will scan only the partitions that match the filter condition, which will significantly reduce the amount of data that needs to be scanned. For example, if you have 100 partitions in total and 90 of them are filtered out based on the event_dt, only 10 partitions will be scanned.

However, if you filter on both partition columns, the performance will be even better. This is because the query engine will be able to scan even fewer partitions, reducing the amount of data that needs to be read and processed.

Also, when you partition the table by 'event_dt' and 'symbol', it will create a subdirectory for each unique combination of 'event_dt' and 'symbol' in the table. When you filter on only one partition column, it will scan all subdirectories for that partition column which increases the IO operations and a bit slower than filtering on both partition columns.

In short, partitioning can speed up the query performance even when you filter on only one partition column, but the performance will be even better when filtering on both partition columns.

What's the best practice to partition time series data like stock dataΒΆ

When partitioning time series data like stock data, it's generally best practice to partition on the time-based column, such as "event_dt" or "timestamp", as well as any other columns that are frequently used in filter conditions or join conditions.

Here are some best practices to consider when partitioning time series data:

  1. Partition on the time-based column: This allows for efficient querying of data based on time ranges, as well as efficient archiving of older data.
  2. Consider partitioning on additional columns: For example, if you frequently filter or join on a stock symbol column, you should also partition on that column.
  3. Choose the right granularity for partitioning: The granularity of the partitioning should match the granularity of the queries that will be run against the data. For example, if most of your queries are at a daily granularity, then partitioning on the day is a good choice.
  4. Keep the number of partitions manageable: Too many partitions can lead to poor performance and increased management overhead. So it's important to strike a balance between the granularity of the partitioning and the number of partitions created.
  5. Use bucketing: Bucketing is a technique for grouping data into more manageable chunks. It can be used in conjunction with partitioning to improve query performance on specific columns by creating additional subdirectories under each partition.
  6. Keep the data fresh: Partitioning makes it easy to delete old partitions, which is important for keeping the data fresh and the storage space used by the table small.

It's also important to keep in mind that partitioning strategy may change over time based on the usage pattern of the data and the performance of the queries.

How to create partition with bucketingΒΆ

Bucketing is a technique for grouping data into more manageable chunks, and can be used in conjunction with partitioning to improve query performance on specific columns.

Here is an example of how to create a Delta Lake table that is partitioned by "event_dt" and bucketing on "symbol" using Spark SQL:

SQL
spark.sql("""
CREATE TABLE table_name (
  ...
)
USING DELTA
PARTITIONED BY (event_dt string)
CLUSTERED BY (symbol string) INTO 32 BUCKETS
""")

This creates a table that is partitioned by "event_dt" and bucketing on "symbol" into 32 buckets. The "CLUSTERED BY" clause tells the table to group data by the "symbol" column, and "INTO 32 BUCKETS" tells it to create 32 buckets to group the data.

You can also use the data.write.format("delta").partitionBy("event_dt").bucketBy(32, "symbol").save("/path/to/table_name") method to create the table and insert data at the same time.

When you query the table, the query engine will first filter on the partitions based on the "event_dt" column, and then it will further filter on the buckets based on the "symbol" column. This can significantly reduce the amount of data that needs to be scanned, as the query engine only needs to look at a subset of the data within each partition.

It's important to note that bucketing works best when the distribution of the values in the bucketing column is relatively even. If the distribution is skewed, the bucketing will not be as effective. Also, the number of buckets should be selected based on the data distribution and the query pattern.

It's also important to keep in mind that bucketing strategy may change over time based on the usage pattern of the data and the performance of the queries.

How to append data with partition and bucketingΒΆ

To append data to a Delta Lake table that is partitioned by "event_dt" and bucketing on "symbol", you can use the following method:

data.write.format("delta").partitionBy("event_dt").bucketBy(32, "symbol").mode("append").save("/path/to/table_name")

This method uses the bucketBy() method to specify the bucketing column, in this case "symbol" and the number of buckets, in this case 32. The partitionBy() method is used to specify the partition column, in this case "event_dt"

This is similar to the method used to write data to the table initially. The difference is that the mode("append") method is used to tell Delta Lake to append the data to the existing table, rather than overwriting it.

It's important to note that when you append data to a table that is bucketing, the data will be written to the appropriate bucket based on the value of the bucketing column. If the data contains new values for the bucketing column that do not match any existing buckets, new buckets will be created to accommodate the new data.

Also, for the better performance, you can use the data.write.format("delta").mode("append").save("/path/to/table_name") instead of using data.write.format("delta").partitionBy("event_dt").bucketBy(32, "symbol").mode("append").save("/path/to/table_name"). This will append the data to the existing table without specifying partition or bucketing and the data will be written to the existing partitions and buckets.

It's important to keep in mind that data should be appended with the same bucketing strategy as the existing table, otherwise, it may lead to poor performance and increased management overhead.

How to delete old partitionsΒΆ

To delete old partitions in a Delta Lake table that is partitioned by "event_dt" and bucketing on "symbol", you can use the delete method.

Here is an example of how to delete all partitions older than a certain date:

Python
from delta.tables import *

deltaTable = DeltaTable.forPath("/path/to/table_name")
deltaTable.delete("event_dt < '2022-01-01'")

This will delete all the partitions with event_dt less than '2022-01-01' from the table.

You can also use the deleteExpired method which deletes all the partitions that are older than the retention threshold.

Python
from delta.tables import *

deltaTable = DeltaTable.forPath("/path/to/table_name")
deltaTable.deleteExpired(180, "D")

This will delete all the partitions that are older than 180 days from the current date

It's important to note that deleting old partitions in a bucketing table, will also delete the associated buckets and data. Therefore, it's important to make sure that you have a backup of the data before deleting it.

Also, when you delete old partitions in a table that is being continuously written to, it's a good idea to do it during a period of low write activity to avoid any performance impact on the write operations.

It's also important to keep in mind that the retention policy and the way to delete the old partitions may change over time based on the usage pattern of the data and the storage constraints.

which one is better option, symbol as part of partition or bucketing?ΒΆ

Whether to use "symbol" as a partition or bucketing column depends on the usage pattern of the data and the performance of the queries.

If the data is frequently queried and filtered based on the "symbol" column, it would be more efficient to partition the data on that column. This allows the query engine to quickly identify the partitions that contain the relevant data and only scan those partitions. However, partitioning on symbol may lead to a large number of small partitions if the number of symbols is large, which can lead to poor performance and increased management overhead.

On the other hand, if the data is queried and filtered based on the time-based column like "event_dt" and "symbol" is used for grouping the data, bucketing is a better option. Bucketing allows you to group similar data together, and reduce the number of files that need to be read by the query engine. Additionally, bucketing works best when the distribution of the values in the bucketing column is relatively even.

In summary, if the data is frequently queried based on the "symbol" column, partitioning on that column would be more efficient. But if the data is queried based on the time-based column and "symbol" is used for grouping the data, bucketing is more appropriate.

It's also important to keep in mind that partitioning and bucketing strategy may change over time based on the usage pattern of the data and the performance of the queries. It's a good idea to monitor the performance of the queries and make adjustments accordingly.