Skip to main content

kafkaload

Synopsis

starlake kafkaload [options]

Description

Load data to or offload data from Apache Kafka topics. Supports both producing and consuming messages.

Two modes are available : The batch mode and the streaming mode.

Batch mode

In batch mode, you start the kafka (off)loader regurarly and the last consumed offset will be stored in the comet_offsets topic config (see reference-kafka.conf for an example).

When offloading data from kafka to a file, you may ask to coalesce the result to a specific number of files / partitions. If you ask to coalesce to a single partition, the offloader will store the data in the exact filename you provided in the path argument.

The figure below describes the batch offloading process Starlake Kafka batch offloading process diagram from topics to files

The figure below describes the batch offloading process with comet-offsets-mode = "FILE" Starlake Kafka batch offloading with FILE offset mode for single partition output

Streaming mode

In this mode, te program keep running and you the comet_offsets topic is not used. The (off)loader will use a consumer group id you specify in the access options of the topic configuration you are dealing with.

Parameters

ParameterCardinalityDescription
--config <value>OptionalTopic Name declared in reference.conf file
--connectionRef <value>OptionalConnection to any specific sink
--format <value>OptionalRead/Write format eq : parquet, json, csv ... Default to parquet.
--path <value>OptionalSource file for load and target file for store
--options <value>OptionalOptions to pass to Spark Reader
--write-config <value>OptionalTopic Name declared in reference.conf file
--write-path <value>OptionalSource file for load and target file for store
--write-mode <value>OptionalWhen offload is true, describes how data should be stored on disk. Ignored if offload is false.
--write-options <value>OptionalOptions to pass to Spark Writer
--write-format <value>OptionalStreaming format eq. kafka, console ...
--write-coalesce <value>OptionalShould we coalesce the resulting dataframe
--transform <value>OptionalAny transformation to apply to message before loading / offloading it
--stream <value>OptionalShould we use streaming mode ?
--streaming-trigger <value>OptionalOnce / Continuous / ProcessingTime
--streaming-trigger-option <value>Optional10 seconds for example. see https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/Trigger.html#ProcessingTime-java.lang.String-
--streaming-to-table <value>OptionalTable name to sink to
--streaming-partition-by <value>OptionalList of columns to use for partitioning