from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySparkOnKakfa") \
    .getOrCreate()

from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import sys

# Generate kafka data using tool:
# > cd /var/log/emr/taihao_exporter
# > tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics
kafka_options = {
    "kafka.bootstrap.servers": sys.argv[1] + ":9092", #TODO edit me
    "startingOffsets": "earliest", # Start from the beginning when we consume from kafka
    "subscribe": "taihaometrics"   # Our topic name
}

df = spark.readStream.format("kafka").options(**kafka_options).load()

json_schema = StructType([
  StructField("userId", StringType()),
  StructField("clusterId", StringType()),
  StructField("hostname", StringType()),
  StructField("nodeGroup", StringType()),
  StructField("nodeGroupId", StringType()),
  StructField("service", StringType()),
  StructField("component", StringType()),
  StructField("tag_type", StringType()),
  StructField("type", StringType()),
  StructField("timestamp", TimestampType()),
  StructField("name", StringType()),
  StructField("value", StringType()),
])

json_df = df.select(from_json(col("value").cast("string"), json_schema).alias("content"))
json_df.select("content.*").createOrReplaceTempView("taihao_metrics")

count = spark.sql("select count(*) from taihao_metrics")
query = count.writeStream.outputMode("complete").format("console").start()
time.sleep(6000)
query.stop()
