Quick Start
Create a topic
kafka-topics --create --topic file-chunk-topic --partitions 1 --bootstrap-server localhost:9092
Create directories to upload and download files
cd /tmp
rm -rf upload download
mkdir upload download
Start Confluent Platform
confluent local services start
Install the plugins
confluent hub install streamsend-io/file-chunk-sink:latest
confluent hub install streamsend-io/file-chunk-source:latest
Create uploader.json
{
"name": "uploader"
,"config":{
"topic": "file-chunk-topic"
, "connector.class": "com.github.streamsend.file.chunk.source.ChunkSourceConnector"
, "files.dir": "/tmp/upload"
, "input.file.pattern": ".*"
, "tasks.max": "1"
, "file.minimum.age.ms" : "2000"
, "binary.chunk.size.bytes" : "300000"
, "cleanup.policy.maintain.relative.path": "true"
, "input.path.walk.recursively" : "true"
, "finished.file.retention.mins": "10"
, "topic.partitions": "1"
}}
Create downloader.json
{
"name": "downloader"
,"config":{
"topics": "file-chunk-topic"
, "connector.class": "com.github.streamsend.file.chunk.sink.ChunkSinkConnector"
, "tasks.max": "1"
, "files.dir": "/tmp/download"
, "binary.chunk.size.bytes" : "300000"
, "auto.register.schemas": "false"
, "schema.ignore": "true"
, "schema.generation.enabled": "false"
," key.converter.schemas.enable": "false"
,"value.converter.schemas.enable": "false"
, "schema.compatibility": "NONE"
, "topic.partitions": "1"
}}
Start the connectors
If restarting, clear contents of the /tmp/upload directory first.
Test a Pipeline
The uploader will chunk and stream any files (input.file.pattern": ".*") in /tmp/upload using chunks of 300KB size.
Start with a small file:
cp /var/log/install.log /tmp/upload
Wait a few seconds for the file to complete processing.
Examine the contents of /tmp/download/merged to confirm that "install.log" has been consumed and merged.
Diff the file to confirm that it is itentical to the uploaded file.
To test a pipeline, set source connector property "generate.test.files"="true" - this will generate a test file ranging in size from 0 to 2500 bytes, every five seconds. The test file is detected, chunked and streamed to the Downloader. When generating test files, the binary.chunk.size.bytes is automatically reset to "500".