File Chunks
The file-chunk connectors split files into chunks that fit inside a Kafka message.
The files can be of any content type and of any size (within reason). File sizes from tens to low-thousands of megabytes have been tested successfully.
The chunk size (configured using binary.chunk.size.bytes
) must be smaller than the max.message.size
configured for the Kafka cluster.
Files
The contents of the files is immaterial.
Stream any type of file:
-
Contents can be encoded as character or binary.
-
Character encoded files can use any encoding scheme including UTF-8, big-5, shift-JIS.
-
Character encoded files can include any content type including text, XML, JSON, YAML, CDR
-
Binary contents can include audio, video, image or closed formats such as database redo logs.
-
files can be uncompressed or compressed (using any compression algorithm)
-
files can be unencrypted or encrypted
Queued files must be static: they must not be open by an process or thread.
Reading the file can take some time; if any process changes the file during chunking the a target file MD5 mismatch is likely.
File Sizes
The file must fit inside the JVM memory allocation for Kafka Connect workers that run the source connector and the sink connector.
For example, if the Sink Kafka Connect server JVM is configured with -Xms 2G
then any uploaded input file could not exceed 2G in size.
Upload Ordering
The source connector scans for unprocessed files in the files.dir
, selecting eligible files for upload using the input.file.pattern
. Renaming a file (after copying it into the files.dir) could also be used (as an alternative to file.minimum.age.ms
) as a technique to ensure that chunking begins for fully-copied files.
If multiple eligible files are found in files.dir, then the first file is chunked and sent before chunking the second file; and so on.
Upload File Age
As each new file is found, if file.minimum.age.ms
is configured, then the source connector task waits until the file reaches this age before it begins chunking the file. file.minimum.age.ms
should be configured long enough to copy complete files into the files.dir (to avoid chunking of partially copied files).
Filesystem Considerations
File chunks are written to the same directory as each eligible input file (in files.dir
). This filesystem must be sufficiently large to accomodate both an input.file and its chunks, which (in total) will be the same size as the input file. Thus the filesystem must be sized at least to the largest anticipated input file, multiplied by two.
The Sink connector appends chunks from the first chunk to the final chunk before renaming the merged file after a successful MD5 check. The merged files are retained for finished.file.retention.mins
(and, similarly, unsuccessfully merged files are retained for error.file.retention.mins). Automatic cleanup of merged files reduces the possibility of a full filesystem impacting sink connector operation. The merged files must be processed from the files.dir
directory before this time has elapsed. Moving of deleting merged files does not cause any error.
Chunks
The property binary.chunk.size.bytes
must be configured to the same value on the source and sink connectors.
File are chunked on byte-boundaries.
For example, using a binary.chunk.size.bytes
= 500000: the first message produced to the topic are the file bytes from 1 to 500000.
The second message produced to Kafka has the file contents from 500001 to 1000001; etc.