In [None]:
spark = %adb_spark add \
    --spark-conf spark.adb.eni.vswitchId= <your_vswitch_id> \
    --spark-conf spark.adb.eni.securityGroupId= <your_security_group_id>\
    --spark-conf spark.adb.acuPerApp=16\
    --resource-group <your_job_resource_group_name>

Initializing ADB Spark Connector Server
Creating new Spark Connector Server...
Submitting Spark connector server:amv-wz99icf7v3831i7i with config: {"file": "local:///opt/spark/jars/offline-sql.jar", "name": "SparkConnectorServer", "className": "com.aliyun.adb.spark.connectServer.ADBSparkConnectServer", "conf": {"spark.adb.version": "3.5", "spark.adb.uiMeta.enabled": "false", "spark.adb.eni.enabled": "true", "spark.sql.hive.metastore.version": "adb", "spark.adb.eni.vswitchId": "vsw-wz926zcxi9g55ji6or73e", "spark.adb.eni.securityGroupId": "sg-wz909my4jibz0o2wr1kg", "spark.adb.acuPerApp": "16"}}
Submitted Spark connector server ID: s202511101617sz8f1a30a0003022
Waiting for Spark Connect Server to start... s202511101617sz8f1a30a0003022->SUBMITTED
Waiting for Spark Connect Server to start... s202511101617sz8f1a30a0003022->RUNNING
Spark Connect Server started at host: 172.25.219.78
ADB Spark status:RUNNING, AppId:s202511101617sz8f1a30a0003022, Web UI:https://adbsparkui-cn-shenzhen.aliyuncs.c

In [None]:
-- Create a bronze layer order details table
CREATE TABLE IF NOT EXISTS db_lake.orders_bronze (
    user_id STRING COMMENT 'UserID',
    order_id BIGINT COMMENT 'OrderID',
    product_id STRING COMMENT 'ItemID',
    product_category STRING COMMENT 'Itemcategory',
    product_name STRING COMMENT 'Itemname',
    amount DOUBLE COMMENT 'bllling',
    transaction_date DATE COMMENT 'date',
    transaction_hour INT COMMENT 'hour',
    transaction_timestamp TIMESTAMP COMMENT 'transaction_timestamp',
    batch_date STRING COMMENT 'batch datetime',
    is_valid BOOLEAN COMMENT 'whe valid'
)
USING delta

DataFrame[]

In [None]:
-- Insert data into the bronze layer
INSERT INTO TABLE db_lake.orders_bronze
SELECT
    user_id,
    order_id,
    product_id,
    product_category,
    product_name,
    amount,
    -- Convert transaction_time to date
    to_date(transaction_time, 'yyyy-MM-dd HH:mm:ss') AS transaction_date,
    -- Extraction hours
    hour(to_timestamp(transaction_time, 'yyyy-MM-dd HH:mm:ss')) AS transaction_hour,
    -- Convert to a standard timestamp
    to_timestamp(transaction_time, 'yyyy-MM-dd HH:mm:ss') AS transaction_timestamp,
    -- Extraction batch date (format: yyyyMMdd)
    regexp_replace(substr(batch_start_datetime, 1, 10), '-', '') AS batch_date,
    -- Mark valid orders (amount>0 and time valid)
    CASE 
        WHEN amount > 0 
        AND transaction_time RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$'
        THEN true 
        ELSE false 
    END AS is_valid
FROM
    db_lake.orders_raw
-- Filter out obviously invalid data
WHERE
    user_id IS NOT NULL
    AND order_id IS NOT NULL
    AND transaction_time IS NOT NULL

DataFrame[]

In [None]:
-- Create a silver layer order summary table
CREATE TABLE IF NOT EXISTS db_lake.orders_silver (
    stat_date STRING COMMENT 'Statistics date',
    product_category STRING COMMENT 'Product category',
    order_count BIGINT COMMENT 'Order quantity',
    total_amount DOUBLE COMMENT 'total transaction volume',
    avg_amount DOUBLE COMMENT 'Average customer spending',
    user_count BIGINT COMMENT 'number of users',
    peak_hour INT COMMENT 'peak trading hours',
    peak_hour_order_count BIGINT COMMENT 'Peak hour orders'
)
USING delta

DataFrame[]

In [None]:
INSERT INTO TABLE db_lake.orders_silver
SELECT
    base_stats.transaction_date AS stat_date,
    base_stats.product_category,
    base_stats.total_orders AS order_count,
    base_stats.total_amount,
    base_stats.total_amount / base_stats.total_orders AS avg_amount,
    base_stats.user_count,
    peak.peak_hour,
    peak.peak_hour_order_count
FROM (
    -- Summarize basic indicators
    SELECT
        transaction_date,
        product_category,
        COUNT(order_id) AS total_orders,
        SUM(amount) AS total_amount,
        COUNT(DISTINCT user_id) AS user_count
    FROM
        db_lake.orders_bronze  
    WHERE
        is_valid = true
    GROUP BY
        transaction_date, product_category
) base_stats
JOIN (
    -- Find peak hours
    SELECT
        transaction_date,
        product_category,
        transaction_hour AS peak_hour,
        hour_order_count AS peak_hour_order_count
    FROM (
        SELECT
            transaction_date,
            product_category,
            transaction_hour,
            COUNT(order_id) AS hour_order_count,
            ROW_NUMBER() OVER (
                PARTITION BY transaction_date, product_category 
                ORDER BY COUNT(order_id) DESC
            ) AS rn
        FROM
            db_lake.orders_bronze  
        WHERE
            is_valid = true
        GROUP BY
            transaction_date, product_category, transaction_hour
    ) ranked
    WHERE
        rn = 1
) peak ON base_stats.transaction_date = peak.transaction_date 
     AND base_stats.product_category = peak.product_category

DataFrame[]

In [None]:
select * from db_lake.orders_silver

DataFrame[stat_date: string, product_category: string, order_count: bigint, total_amount: double, avg_amount: double, user_count: bigint, peak_hour: int, peak_hour_order_count: bigint]

In [None]:
describe history db_lake.orders_silver

DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]