Hi Everyone
I've been testing with streaming data in MS Fabric but is running into issues consuming the Kafka topic and the spark logs isn't giving me much information.
If I consume from the Kafka topic in python using the following settings with the confluent_kafka library it works perfectly fine and i can consume the stream as expected
conf = {
'bootstrap.servers': kafka_broker,
'group.id': "%s-consumer" % kafka_username, 'default.topic.config': {'auto.offset.reset': 'smallest'},
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': kafka_username,
'sasl.password': kafka_password
}
c = Consumer(**conf)
c.subscribe(topics)
However, in Fabric lakehouse, similar settings does not seem to work
# Read the Kafka stream
df = spark \
.readStream \
.format("kafka") \
.option("group.id", "kafka_username-consumer") \ .option("security.protocol","SASL_SSL") \
.option("sasl.mechanism","SCRAM-SHA-512") \
.option("subscribe", "kafka_topic") \
.option("sasl.username", "kafka_username") \
.option("sasl.password", "kafka_password") \
.load()
I tried writing the stream to either json and delta table but it doesn't work and doesn't give me any errors either, so I'm suspecting that my options for connecting to Kafka is probably off somewhere
I tried
df.writeStream \
.format("json") \
.option("checkpointLocation", checkpointLocation) \
.option("path","Files/test.json") \
.start() \
.awaitTermination()
and also tried
df.writeStream \
.format("delta") \
.option("checkpointLocation", checkpointLocation) \
.outputMode("append") \
.toTable(table_delta_file_location) \
.awaitTermination()
Does anyone have any ideas? Thanks for the help in advance