from pyspark.sql import SparkSession   
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from faker import Faker
import random
from geopy.distance import geodesic

spark = SparkSession.builder \
        .appName("PySparkThirdPartyLibsDemo") \
        .getOrCreate()

# 使用第三方库faker生成模拟数据
fake = Faker()
landmark = (48.8584, 2.2945)  # 埃菲尔铁塔坐标

# 创建模拟数据函数
def generate_fake_data(num_records):
    data = []
    for _ in range(num_records):
        # 在巴黎附近生成随机坐标
        lat = 48.85 + random.uniform(-0.2, 0.2)
        lon = 2.30 + random.uniform(-0.2, 0.2)
        data.append((
            fake.uuid4(),        # 用户ID
            fake.name(),         # 姓名
            lat,                 # 纬度
            lon                  # 经度
        ))
    return data

# 生成100条模拟记录
fake_data = generate_fake_data(100)

# 创建Spark DataFrame
columns = ["user_id", "name", "latitude", "longitude"]
df = spark.createDataFrame(fake_data, schema=columns)

# 打印生成的样本数据
print("生成的样本数据:")
df.show(5)

# 使用第三方库geopy计算距离
def calculate_distance(lat, lon, landmark=landmark):
    """计算两点之间的地理距离（公里）"""
    user_location = (lat, lon)
    return geodesic(user_location, landmark).kilometers

# 注册UDF（用户定义函数）
distance_udf = udf(calculate_distance, FloatType())

# 添加距离列
df_with_distance = df.withColumn(
    "distance_km", 
    distance_udf("latitude", "longitude")
)

# 找出10公里范围内的用户
nearby_users = df_with_distance.filter("distance_km <= 10")

# 打印结果
print(f"\n找到 {nearby_users.count()} 个在10公里范围内的用户:")
nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
