import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import io.hops.util.Hops import org.apache.spark._ import org.apache.spark.streaming._ val topicName = "test2" val outputPath = "/Projects/" + Hops.getProjectName() + "/Resources/data2-txt" val checkpointPath = "/Projects/" + Hops.getProjectName() + "/Resources/checkpoint2-txt" val df = spark.readStream.format("kafka"). option("kafka.bootstrap.servers", Hops.getBrokerEndpoints()). option("kafka.security.protocol","SSL"). option("kafka.ssl.truststore.location",Hops.getTrustStore()). option("kafka.ssl.truststore.password", Hops.getKeystorePwd().filterNot(_.toInt < 32).filterNot(_.toInt == 64)). option("kafka.ssl.keystore.location",Hops.getKeyStore()). option("kafka.ssl.keystore.password",Hops.getKeystorePwd().filterNot(_.toInt < 32).filterNot(_.toInt == 64)). option("kafka.ssl.key.password",Hops.getKeystorePwd().filterNot(_.toInt < 32).filterNot(_.toInt == 64)). option("subscribe", topicName).load(); df.printSchema() val messages = df.selectExpr("CAST(value AS STRING)") val query = messages. writeStream. format("text"). option("path", outputPath). option("checkpointLocation", checkpointPath). start() query.awaitTermination() query.stop()