Wpis z mikrobloga

Tak to jest się uczyć z książki w której przykłady nie działają, bo są bez konfiguracji.
Próbuję zapodać przykładowy structured streaming sparkiem do kafki i same problemy. Większość już rozwiązałem, teraz mam na drodze ten:

org.apache.spark.AnalysisException: value attribute unsupported type bigint. value must be a(n) string or binary;
at org.apche.spark.sql.kafka010.KafkaWriter ...

kod:
val stream = spark.readStream.format("rate").option("rowsPerSecond" 1).load() // to streamuje liczby long co sekundę

val query = stream.writeStream.outputMode(OutputMode.Append()).format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092").option("topic", "myTopic")
.option("checkpointLocation", "/home/programista4k/tmp").option("failOnDataLoss", "false").start()

Streaming który zapodaję to long a nie bigint, ale niech będzie, rozumiem, że muszę to przekonwertować np. do string? Jak to zrobić? Jakiś stream.map(i => i.toString()) by się przydał, ale nie wiem jak to zrobić.

#programowanie #spark #kafka #scala
  • Odpowiedz
  • Otrzymuj powiadomienia
    o nowych komentarzach