Skip to main content

Source Connector

files.dir

The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. Version 2.4: configuration properties input.path, finished.path and error.path have been deprecated. Use files.dir instead. If the source and sink connectors are running on the same machine (for testing) then ensure that the files.dir property is not set to the same directory for both connectors.

  • Importance: HIGH
  • Type: STRING
finished.file.retention.mins

the duration (in minutes) before uploaded files are automatically deleted from the "files.dir" directory. The default is 60 mins. Set it to -1 to never delete uploaded files.

  • Importance: HIGH
  • Default: 60
  • Type: STRING
error.file.retention.min

The duration (in minutes) before files that failed to upload are automatically deleted from the "files.dir" directory. The default is -1 (never delete)

  • Importance: HIGH
  • Default: -1
  • Type: STRING
input.file.pattern

Regular expression to check input file names against. This expression must match the entire filename.

  • Importance: HIGH
  • Type: STRING
generate.test.files

Release 2.8: automatically generate test files for chunking and upload.

Generate files to test end-to-end functionality - multiple uploaders can generate test files to a single downloader. The default is "false" - files are not automatically generated and the connector scans the "files.dir" for filenames that match the input.file.pattern When configured to "true", the connector generates a uniquely named test file in the files.dir ranging in size from 0 to 2500 bytes every ten seconds. The files contain randomized strings, and the file size is in the filename - an example test filename is "streamsend_testfile_021601UTC00_877bytes.bin ".

The default input.file.pattern (".*") will detect, chunk and upload the test files. Note: Setting generate.test.files = true automatically over-rides the binary.chunk.size.bytes to 500. This is a deliberately low chunk size to exercise functionality: it is inadvisable to attempt chunking of very large test files with such a low binary.chunk.size.bytes.

  • Importance: LOW
  • Type: String
  • Default Value: false
binary.chunk.size.bytes

The size of each data chunk that will be produced to the topic. The size must be < Kafka cluster message.max.bytes. Matching files in input.directory exceeding this size will be split (chunked) into multiple smaller files of this size. Matching files less than this size are streamed as a single chunk. The binary.chunk.size.bytes applies to all files that are uploaded using this Kafka Connector.

Release 2.8: If generate.test.files = true then binary.chunk.size.bytes is automatically over-ridden to 500. Test files ranging in size from 0 to 2500 bytes are automatically generated, resulting in 1-5 chunks per test file.

Release 2.6: the downloader reads the binary.chunk.size.bytes used by the uploader of a chunk from the message header. It merges each file based on the chunk size used by the uploader. This allows multiple uploaders (which may use different chunk sizes) to stream to one downloader which will merge each file chunk using the binary.chunk.size.bytes from the message header.

  • Importance: HIGH
  • Type: INT
  • Default Value: 1048588
file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed. This should be set appropriatly to enable large files to be copied into the input.dir before it is processe d.

binary.chunk.size.bytes

The size of each data chunk that will be produced to the topic. The size must be < Kafka cluster message.max.bytes. Matching files in input.directory exceeding this size will be split (chunked) into multiple smaller files of this size. Matching files less than this size are streamed as a single chunk. The binary.chunk.size.bytes applies to all files that are uploaded using this Kafka Connector.

Release 2.8: If generate.test.files = true then binary.chunk.size.bytes is automatically over-ridden to 500. Test files ranging in size from 0 to 2500 bytes are automatically generated, resulting in 1-5 chunks per test file.

Release 2.6: the downloader reads the binary.chunk.size.bytes used by the uploader of a chunk from the message header. It merges each file based on the chunk size used by the uploader. This allows multiple uploaders (which may use different chunk sizes) to stream to one downloader which will merge each file chunk using the binary.chunk.size.bytes from the message header.

  • Importance: HIGH
  • Type: INT
  • Default Value: 1048588
file.minimum.age.ms

The amount of time in milliseconds after the file was last written to before the file can be processed. This should be set appropriatly to enable large files to be copied into the input.dir before it is processe d.

  • Importance: HIGH
  • Type: LONG
  • Default Value: 5000
topic

The Kafka topic to write the data to.

  • Importance: HIGH
  • Type: STRING
tasks.max

Maximum number of tasks to use for this connector. For this release set tasks.max=1. If a higher value is configured, then it is automatically reset to 1.

  • Importance: HIGH
  • Type: INTEGER
  • Default: 1
halt.on.error

Stop all tasks if an error is encountered while processing input files.

  • Importance: HIGH
  • Type: BOOLEAN
  • Default: true
topic.partitions

The partition count of the topic specified by "topic". The file-chunk connectors can operate in single-task mode or multi-task mode. This property is needed for efficient operation in multi-task mode. If operating in single-task mode (which is the default mode) then this property is ignored. In multi-task mode, the source connector sets message keys to a range of distinct values that matches the value of "topic.partitions" to improve distribution of data to multiple partitions, while maintaining ordering for consumers.

  • Importance: HIGH
  • Type: STRING
    • Default: 1
warning

For optimal performance, topic.partitions must reflect the number of partitions in the topic. If the topic is recreated with a changed partition count, then all source/sink connectors shou ld be restarted with updated configuration

finished.file.retention.mins

Then number of minutes to retain a file in the input.file directory after it has been uploaded successfully. Set to -1 to disable deletion.

  • Importance: HIGH
  • Type: INTEGER
    • Default: 60
error.file.retention.mins

Then number of minutes to retain a file in the input.file directory after it has been uploaded unsuccessfully. Set to -1 to disable deletion.

  • Importance: HIGH
  • Type: INTEGER
    • Default: -1