# Install Python library

#### (No manual modification is required, just run the following cell)

In [1]:
!pip install azure-storage-blob
!pip install azure-storage-file-datalake

# Define the tool functions

#### 1. detect_azure_storage_account_type
##### Detect the type of the target Azure storage account and generate the corresponding file path prefix. The path prefix will be used for the directly reading towards Azure Deltatable's location.

#### 2. list_blobs_in_container
##### Use Python to list your Azure storage container. So it can verify whether the target Azure storage account can be accessible for this python environment. 

#### 3. get_delta_table_metadata
##### Get the table storage path based on the Databricks table name. Alibaba Cloud ADB Spark can read the delta table data through this path without integrated with UNITY CATALOG.

##### (No manual modification is required, just run the following cell)

In [3]:
#pip install azure-storage-blob
import requests
import logging
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from requests.exceptions import ConnectionError, HTTPError, Timeout
import requests
from requests.exceptions import HTTPError, ConnectionError, Timeout
import logging
from urllib.parse import urlparse
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceNotFoundError

def detect_azure_storage_account_type(azure_storage_account_name, account_key):
    blob_url = f"https://{azure_storage_account_name}.blob.core.windows.net"
    try:
        blob_service = BlobServiceClient(account_url=blob_url, credential=account_key)
        properties = blob_service.get_account_information()
        if properties.get('is_hns_enabled', False):
            return 'adls','dfs'
        return 'blob','blob'
    except Exception as e:
        print(f"❌ Detection failed:{str(e)}")
        return None,None

def list_blobs_in_container(storage_account, access_key, container_name):
    try:
        connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account};AccountKey={access_key};EndpointSuffix=core.windows.net"
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        container_client = blob_service_client.get_container_client(container_name)
        blob_list = container_client.list_blobs()
    except Exception as ex:
        print(f"Failed to List objects in Azure container. Please check your Azure storage credentail&network. Error details : {ex}")

def get_delta_table_metadata(workspace_url, access_token, catalog, schema, table):
    api_url = f"{workspace_url}/api/2.1/unity-catalog/tables/{catalog}.{schema}.{table}"
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'}
    try:
        response = requests.get(api_url, headers=headers, timeout=100)
        response.raise_for_status()  
        table_metadata = response.json()
        storage_location = table_metadata.get('storage_location')
        if not storage_location:
            raise ValueError(f"The 'storage_location' field was not found in the table metadata obtained from the Databricks API.")
        parsed_uri = urlparse(storage_location)
        container_name = parsed_uri.netloc.split('@')[0]
        storage_account_full_name = parsed_uri.netloc.split('@')[-1]
        storage_account_name = storage_account_full_name.split('.')[0]
        print("Successfully obtained Databricks delta table metadata:")
        print(f" - Azure Storage Location: {storage_location}")
        print(f" - Storage Account Name: {storage_account_name}")
        print(f" - Container Name: {container_name}")
        return storage_location, storage_account_name, container_name
    except HTTPError as e:
        print(f"HTTP Error: Unable to get metadata from the Databricks API. Please check the URL and access token. Error：{e.response.text}")
        raise
    except ConnectionError as e:
        print(f"Connection Error: Unable to connect to the Databricks workspace URL. Please check the URL and network. Error:{e}")
        raise
    except Timeout as e:
        print(f"Request timeout: Connection to Databricks API timed out. Error:{e}")
        raise
    except Exception as e:
        print(f"An unknown error occurred while getting metadata:{e}")
        raise


# Define Azure credentials and Alibaba cloud storage

## 1.Define your Azure Databricks account information and Azure storage account information
#### Define which catalog/schema/table you want to import from Databricks, and how to access your Azure storage account.
#### Please make sure your Azure Storage account can be access through public network.
#### Make sure you have already download the necesary JARs can upload it to your own OSS bucket.

## 2.Define the Alibaba Cloud OSS path. A database with the same name will be created by default; also define the default maximum number of rows during the import process
#### Cross-cloud ingestion may meet bandwidth bottlenecks based on your EIP size. It is recommended to perform single-table validation on a small data volume first. Then consider large-scale import

#### Please manually modify the variables in the cell below

In [None]:
DBRX_WORKSPACE_URL = "https://adb-289****.9.azuredatabricks.net/"
DBRX_ACCESS_TOKEN = "dapi****"
DBRX_CATALOG = "markov****"
DBRX_SCHEMA = "db_test_migration"
DBRX_TABLE = "flat_tpch_1"
AZURE_STORAGE_ACCOUNT_NAME = "markov****"
AZURE_STORAGE_ACCESS_KEY = "lx8z****"   

# Define the target location for deltatable in Alibaba cloud
OSS_TARGET_LOCATION = 'oss://testBucketName/db_test_migration/'
# Define your JARs location. Before you start, you must upload to your OSS bucket
OSS_JAR_ROOT_LOCATION = 'oss://testBucketName/jars/'
MAX_ROWS_COUNT = 10000


# Create a Spark app
### Make sure if you have already upload the JARs library to your OSS buckeet.
#### (No manual modification is required, just run the following cell)

In [None]:
storage_location, storage_account_name, container_name = get_delta_table_metadata(DBRX_WORKSPACE_URL, DBRX_ACCESS_TOKEN, DBRX_CATALOG, DBRX_SCHEMA, DBRX_TABLE)
list_blobs_in_container(storage_account_name, AZURE_STORAGE_ACCESS_KEY, container_name)
storage_account_type,fs_prefix = detect_azure_storage_account_type(storage_account_name,AZURE_STORAGE_ACCESS_KEY)
print(f'Storage account type: {storage_account_type}; Filesystem prefix :{fs_prefix}')

spark = SparkSession.builder \
        .appName("AzureDatabricksToOSS") \
        .config("spark.jars", \
        f"{OSS_JAR_ROOT_LOCATION}jetty-util-ajax-9.4.51.v20230217.jar,\
        {OSS_JAR_ROOT_LOCATION}jetty-server-9.4.51.v20230217.jar,\
        {OSS_JAR_ROOT_LOCATION}jetty-io-9.4.51.v20230217.jar,\
        {OSS_JAR_ROOT_LOCATION}jetty-util-9.4.51.v20230217.jar,\
        {OSS_JAR_ROOT_LOCATION}azure-storage-8.6.0.jar,\
        {OSS_JAR_ROOT_LOCATION}hadoop-azure-3.3.0.jar,\
        {OSS_JAR_ROOT_LOCATION}hadoop-azure-datalake-3.3.0.jar")\
        .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
        .config("spark.hadoop.fs.azure.account.key.%s.%s.core.windows.net"%(storage_account_name,fs_prefix), AZURE_STORAGE_ACCESS_KEY) \
        .getOrCreate() 

# Start the ingestion in append model.
### If the deltalake table has not been created in ADB before importing, a table with the same schema will be automatically created; if the deltalake table has already been created in ADB, the data will be automatically appended to it.

In [None]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DBRX_SCHEMA} LOCATION '{OSS_TARGET_LOCATION}{DBRX_SCHEMA}/'")

df = spark.read.format("delta").load(storage_location)
logging.info("DataFrame Schema:")
df.printSchema()
df.limit(MAX_ROWS_COUNT).write.format("delta").mode("append").saveAsTable(f'{DBRX_SCHEMA}.{DBRX_TABLE}')

In [10]:
select count(*) from db_test_migration.flat_tpch_1;

# You can then modify the script as needed and import more tables on demand

In [11]:
DBRX_TABLE = 'flat_tpch_2'

storage_location, storage_account_name, container_name = get_delta_table_metadata(DBRX_WORKSPACE_URL, DBRX_ACCESS_TOKEN, DBRX_CATALOG, DBRX_SCHEMA, DBRX_TABLE)
df = spark.read.format("delta").load(storage_location)
df.limit(MAX_ROWS_COUNT).write.format("delta").mode("append").saveAsTable(f'{DBRX_SCHEMA}.{DBRX_TABLE}')