Skip to content

Design stock and option schema and provide fast api to access using spark deltaΒΆ

Create delta lake schema to record stock buy and sellΒΆ

Python
{
    "type": "record",
    "name": "stock_transaction",
    "fields": [
        {"name": "transaction_id", "type": "string"},
        {"name": "timestamp", "type": "string"},
        {"name": "transaction_type", "type": "string"},
        {"name": "stock_symbol", "type": "string"},
        {"name": "shares", "type": "int"},
        {"name": "price", "type": "double"}
    ]
}

The above schema defines a record for a stock transaction, which includes the transaction ID, timestamp, transaction type (buy or sell), stock symbol, number of shares, and price per share. This schema can be used with Delta Lake to record stock buy and sell transactions.

Also define schema to store buy sell of optionsΒΆ

Python
{
    "type": "record",
    "name": "options_transaction",
    "fields": [
        {"name": "transaction_id", "type": "string"},
        {"name": "timestamp", "type": "string"},
        {"name": "transaction_type", "type": "string"},
        {"name": "stock_symbol", "type": "string"},
        {"name": "option_type", "type": "string"},
        {"name": "strike_price", "type": "double"},
        {"name": "expiration_date", "type": "string"},
        {"name": "shares", "type": "int"},
        {"name": "price", "type": "double"}
    ]
}

This schema defines a record for an options transaction, which includes the transaction ID, timestamp, transaction type (buy or sell), stock symbol, option type (call or put), strike price, expiration date, number of shares, and price per share. This schema can be used with Delta Lake to record buy and sell transactions for options.

Create PySpark schema from previous schemas and provide code to read write and delete recordsΒΆ

Here is the PySpark code to create the schema for the stock_transaction record:

Python
from pyspark.sql.types import *

stock_transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", StringType()),
    StructField("transaction_type", StringType()),
    StructField("stock_symbol", StringType()),
    StructField("shares", IntegerType()),
    StructField("price", DoubleType())
])

And here is the PySpark code to create the schema for the options_transaction record:

Python
options_transaction_schema = StructType([
    StructField("transaction_id", StringType()),
    StructField("timestamp", StringType()),
    StructField("transaction_type", StringType()),
    StructField("stock_symbol", StringType()),
    StructField("option_type", StringType()),
    StructField("strike_price", DoubleType()),
    StructField("expiration_date", StringType()),
    StructField("shares", IntegerType()),
    StructField("price", DoubleType())
])

To read a Delta Lake table into a PySpark DataFrame, you can use the read method:

df = spark.read.format("delta").load("path/to/delta/table")

To write a DataFrame to a Delta Lake table, you can use the write method:

df.write.format("delta").save("path/to/delta/table")

To delete a Delta Lake table, you can use the delete method:

spark.sql("DELETE FROM delta.path/to/delta/table")

It is important to note that the above code will delete the entire table. If you would like to delete specific records, you can use a filter in the delete method:

spark.sql("DELETE FROM delta.path/to/delta/tableWHERE stock_symbol='AAPL'")

Generate fast-api apis to interact with add, update delete process from PySparkΒΆ

Here is an example of how you could use the FastAPI library to create a REST API that interacts with a Delta Lake table using PySpark:

Python
from fastapi import FastAPI, HTTPException
from pyspark.sql import SparkSession

app = FastAPI()

# Create a SparkSession
spark = SparkSession.builder.appName("DeltaLakeAPI").getOrCreate()

@app.post("/stock")
async def add_stock_transaction(transaction: dict):
    # Create a DataFrame from the incoming transaction data
    df = spark.createDataFrame([transaction], stock_transaction_schema)

    # Write the DataFrame to the Delta Lake table
    df.write.format("delta").mode("append").save("path/to/delta/table")

    return {"message": "Transaction added successfully"}

@app.put("/stock/{transaction_id}")
async def update_stock_transaction(transaction_id: str, transaction: dict):
    # Read the Delta Lake table into a DataFrame
    df = spark.read.format("delta").load("path/to/delta/table")

    # Filter the DataFrame for the specified transaction
    df = df.filter(df["transaction_id"] == transaction_id)

    # Update the values in the DataFrame with the incoming data
    for key, value in transaction.items():
        df = df.withColumn(key, value)

    # Overwrite the Delta Lake table with the updated DataFrame
    df.write.format("delta").mode("overwrite").save("path/to/delta/table")

    return {"message": "Transaction updated successfully"}

@app.delete("/stock/{transaction_id}")
async def delete_stock_transaction(transaction_id: str):
    # Delete the specified transaction from the Delta Lake table
    spark.sql(f"DELETE FROM delta.`path/to/delta/table` WHERE transaction_id='{transaction_id}'")

    return {"message": "Transaction deleted successfully"}

This is an example to handle the stock transactions. You can use the same approach to handle option transaction as well. In the above example, the add_stock_transaction endpoint accepts a JSON object containing the transaction data, creates a DataFrame from that data, and writes it to the Delta Lake table. The update_stock_transaction endpoint accepts a transaction ID and a JSON object containing the updated transaction data, reads the Delta Lake table into a DataFrame, filters the DataFrame for the specified transaction, updates the values in the DataFrame with the incoming data, and overwrites the Delta Lake table with the updated DataFrame. The delete_stock_transaction endpoint accepts a transaction ID and deletes the specified transaction from the Delta Lake table.

It is important to note that the above code is an example and may require additional error handling and validation. Also, you should update the path of delta table in the code to use your own delta table.

Generate similar api for options schemaΒΆ

Here's an example of how you could use the same approach to create REST APIs for the options_transaction schema:

Python
@app.post("/options")
async def add_options_transaction(transaction: dict):
    # Create a DataFrame from the incoming transaction data
    df = spark.createDataFrame([transaction], options_transaction_schema)

    # Write the DataFrame to the Delta Lake table
    df.write.format("delta").mode("append").save("path/to/options_delta/table")

    return {"message": "Transaction added successfully"}

@app.put("/options/{transaction_id}")
async def update_options_transaction(transaction_id: str, transaction: dict):
    # Read the Delta Lake table into a DataFrame
    df = spark.read.format("delta").load("path/to/options_delta/table")

    # Filter the DataFrame for the specified transaction
    df = df.filter(df["transaction_id"] == transaction_id)

    # Update the values in the DataFrame with the incoming data
    for key, value in transaction.items():
        df = df.withColumn(key, value)

    # Overwrite the Delta Lake table with the updated DataFrame
    df.write.format("delta").mode("overwrite").save("path/to/options_delta/table")

    return {"message": "Transaction updated successfully"}

@app.delete("/options/{transaction_id}")
async def delete_options_transaction(transaction_id: str):
    # Delete the specified transaction from the Delta Lake table
    spark.sql(f"DELETE FROM delta.`path/to/options_delta/table` WHERE transaction_id='{transaction_id}'")

    return {"message": "Transaction deleted successfully"}

The above code is similar to the previous code for handling stock transactions, but it uses the options_transaction schema and different endpoint paths and file paths. As with the previous example, you should update the path of delta table in the code to use your own delta table.

Provide api for profit and loss for both stocks and optionsΒΆ

Here's an example of how you could create a REST API endpoint for calculating profit and loss for both stock and options transactions using PySpark:

Python
@app.get("/profit_loss/{stock_symbol}")
async def get_profit_loss(stock_symbol: str):
    # Read the stock transaction table into a DataFrame
    stock_df = spark.read.format("delta").load("path/to/stock_delta/table").filter(col("stock_symbol") == stock_symbol)

    # Read the options transaction table into a DataFrame
    options_df = spark.read.format("delta").load("path/to/options_delta/table").filter(col("stock_symbol") == stock_symbol)

    # Calculate the profit and loss for the stock transactions
    stock_profit_loss = stock_df.withColumn("profit_loss", (col("price") - col("cost")) * col("shares"))
    stock_profit_loss = stock_profit_loss.groupBy().sum("profit_loss").collect()[0][0]

    # Calculate the profit and loss for the options transactions
    options_profit_loss = options_df.withColumn("profit_loss", (col("price") - col("cost")) * col("shares"))
    options_profit_loss = options_profit_loss.groupBy().sum("profit_loss").collect()[0][0]

    return {"stock_profit_loss": stock_profit_loss, "options_profit_loss": options_profit_loss}

In the above code, the get_profit_loss endpoint accepts a stock symbol, reads the stock transaction table and options transaction table into DataFrames, filters the DataFrames for transactions with the specified stock symbol, calculates the profit and loss for the stock and options transactions using the formula (price - cost) * shares, and returns the calculated values in a JSON response. In the above code, cost is not present in the schema, you need to add that accordingly. Also, you should update the path of delta table in the code to use your own delta table.

It is important to note that the above code is an example and may require additional error handling and validation.