Skip to main content

Docker

Overview

Use docker-compose to start containers for file-chunk uploader(s) and downloader(s). The containers run Connect-distributed as single-node Kafka Connect clusters. The containers below are Release 2.8, based on OneCricketeer/apache-kafka-connect-docker, rather than cp-connect. This is becuase these containers are much more lightweight; enabling edge operation as file-chunk uploaders.

Internet Connectivity

The docker compose below requires internet connectivity to download the following containers:

  • streamsend/uploader:latest
  • streamsend/downloader:latest
  • alpine:latest

The streamsend containers are self-contained and do not make any subsequent calls (for example, to install additional plugins). The alpine container makes one call to install "curl" using "apk".

Quickstart

Start the uploader container and copy a file into ./streamsend/upload. Check the container logfile.

Each uploaded file will be chunked into 512000 byte events before streaming to file-chunk-topic.

On completion, the file is renamed with a __FINISHED postfix. This will be deleted after ten minutes.

Start the downloader container. Each streamed file will be merged into directory ./streamsend/download/merged

Copy more file to the upload directory - they will be processed in order.

Files of any size can be streamed - larger files benefit from setting a larger binary.chunk.size.bytes

The chunk size bytes must be less than the max.message.size set for this Kafka cluster.

Docker compose

Uploader Container

This docker-compose.yml starts a single-node connect distributed container on port 8083 and then submits an uploader job.

Volume Mapping

Create a directory on the docker host to place queued files for upload. This will be a mapped volume in the docker container.

mkdir -p ./streamsend/upload

Environment Variables

The following environment variables are required for Kafka authentication and for identification of the machine that is uploading files.

Export these environment variables
  export BOOTSTRAP_SERVERS="your bookstrap servers here"
export API_KEY="your Kafka Auth API Key here"
export SECRET="your Kafka Auth Secret here"
export HOSTNAME=\`hostname\`

uploader Docker Compose

Copy this to docker-compose.yml

version: '3'
services:
uploader:
image: streamsend/uploader:latest
container_name: uploader
volumes:
- './streamsend/upload:/streamsend/upload'
deploy:
replicas: 1
environment:
CONNECT_GROUP_ID: "${HOSTNAME}-uploader-group"
CONNECT_CLIENT_ID: "${HOSTNAME}-uploader-client"
# Container location of the streamsend plugins
CONNECT_PLUGIN_PATH: /app/libs
#connect-distributed
CONNECT_OFFSET_STORAGE_TOPIC: offset-topic-up
CONNECT_CONFIG_STORAGE_TOPIC: config-topic-up
CONNECT_STATUS_STORAGE_TOPIC: status-topic-up
CONNECT_OFFSET_STORAGE_PARTITIONS: 1
CONNECT_CONFIG_STORAGE_PARTITIONS: 1
CONNECT_STATUS_STORAGE_PARTITIONS: 1
#
CONNECT_REST_ADVERTISED_HOST_NAME: 'uploader'
CONNECT_REST_PORT: 8083
# kafka connection
CONNECT_BOOTSTRAP_SERVERS: ${BOOTSTRAP_SERVERS}
CONNECT_PRODUCER_BOOTSTRAP_SERVERS: ${BOOTSTRAP_SERVERS}
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${APIKEY}\" password=\"${SECRET}\";"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${APIKEY}\" password=\"${SECRET}\";"
#defaults for all connectors
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
#
CONNECT_AUTO_INCLUDE_JMX_REPORTER: FALSE
#command:
#dont over-ride the command - this is not based on the confluent connect docker image

# a lightweight container to (re)submit the uploader job. It exits after submit.
submit-uploader:
image: alpine:latest
hostname: submit-uploader
depends_on:
- uploader
command:
- sh
- -c
- |
# install curl
apk add --update curl
echo "__________________________________________________ Uploader ______________________________________________________________"
echo "..sleeping to let Connect start up..."
sleep 20
echo "... Unsubmit the uploader (in case one is running). Ignore if 'not found'"
# the || true ensures continuation of commands, even if the connect server returns "7" becuase the job doesnt exist
curl -X DELETE "http://uploader:8083/connectors/uploader" || true
echo "__________________________________________________ Uploader ______________________________________________________________"
# note that the base image for this container only supports INFO logging.
#curl -s -X PUT http://uploader:8083/admin/loggers/root -H "Content-Type:application/json" -d '{"level": "DEBUG"}'
curl -X POST http://uploader:8083/connectors -H "Content-Type: application/json" -d '{
"name": "uploader"
, "config": {
"topic": "file-chunk-topic"
, "connector.class": "com.github.streamsend.file.chunk.source.ChunkSourceConnector"
, "files.dir": "/streamsend/upload"
, "input.file.pattern": ".*"
, "file.minimum.age.ms": "1000"
, "binary.chunk.size.bytes": "512000"
, "cleanup.policy.maintain.relative.path": "true"
, "input.path.walk.recursively": "true"
, "finished.file.retention.mins": "5"
, "topic.partitions": "1"
, "tasks.max": "1"
, "feature.enable": "none"
, "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
, "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}}'

Start an uploader Container

docker compose up

Single-task and multi-task

This release of the source connectors enforces single-task operation (irrespective of the "tasks.max" setting). This means that input files (and their subsequent chunks) are processed by one Kafka Connect task, and all file chunks are produced to a single Kafka topic.

Start additional uploader Containers

Multiple single-task uploaders can be configured to stream to the same kafka cluster (and kafka topic). There is no limitation (technical or licensing) on the number of single-task uploaders that are started. Uploaders should be started on separate machines (not multiple uploaders on one machine).

docker compose up

Downloader Container

This docker-compose.yml starts a connect standalone container on port 8084 and then submits a downloader job.

Volume mapping

Create a directory on the docker host for consumed and merged files. This will be a mapped volume in the docker container.

mkdir -p ./streamsend/download

Environment Variables

The following environment variables are required for Kafka authentication

Export these environment variables
  export BOOTSTRAP_SERVERS="your bookstrap servers here"
export API_KEY="your Kafka Auth API Key here"
export SECRET="your Kafka Auth Secret here"
export HOSTNAME=\`hostname\`

Docker Compose

Copy this to docker-compose.yml

version: '3'
downloader:
image: streamsend/downloader:latest
container_name: downloader
volumes:
- './streamsend/download:/streamsend/download'
ports:
- 8084:8084
deploy:
replicas: 1
environment:
CONNECT_GROUP_ID: "${HOSTNAME}-downloader-group"
CONNECT_CLIENT_ID: "${HOSTNAME}-downloader-client"
# Container location of the streamsend plugins
CONNECT_PLUGIN_PATH: /app/libs
#connect-distributed
CONNECT_OFFSET_STORAGE_TOPIC: offset-topic-down
CONNECT_CONFIG_STORAGE_TOPIC: config-topic-down
CONNECT_STATUS_STORAGE_TOPIC: status-topic-down
CONNECT_OFFSET_STORAGE_PARTITIONS: 1
CONNECT_CONFIG_STORAGE_PARTITIONS: 1
CONNECT_STATUS_STORAGE_PARTITIONS: 1
#
CONNECT_REST_ADVERTISED_HOST_NAME: 'downloader'
CONNECT_NAME: downloader
CONNECT_LISTENERS: "http://0.0.0.0:8084"
CONNECT_REST_PORT: 8084
CONNECT_REST_ADVERTISED_PORT: 8084
# kafka connection
CONNECT_BOOTSTRAP_SERVERS: ${BOOTSTRAP_SERVERS}
CONNECT_CONSUMER_BOOTSTRAP_SERVERS: ${BOOTSTRAP_SERVERS}
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: https
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${APIKEY}\" password=\"${SECRET}\";"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${APIKEY}\" password=\"${SECRET}\";"
#defaults for all connectors
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
# JMX
CONNECT_AUTO_INCLUDE_JMX_REPORTER: FALSE
#command:
#dont over-ride the command - this is not based on the confluent connect docker image


# a lightweight container to (re)submit the downloader job. It exits after submit.
submit-downloader:
image: alpine:latest
hostname: submit-downloader
depends_on:
- downloader
command:
- sh
- -c
- |
# install curl
apk add --update curl
echo "++++++++++++++++++++++++++++++++++++++++++++++++++ Downloader ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
sleep 20 # uncomment if the Uploader is not running in the same docker-compose.
echo "... Unsubmit the downloader (in case one is running). Ignore if 'not found'"
# the || true ensures continuation of commands, even if the connect server returns "7" becuase the job doesnt exist
curl -X DELETE "http://downloader:8084/connectors/downloader" || true
echo "++++++++++++++++++++++++++++++++++++++++++++++++++ Downloader ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
# note that the base image for this container only supports INFO logging.
#curl -s -X PUT http://downloader:8084/admin/loggers/root -H "Content-Type:application/json" -d '{"level": "INFO"}'
curl -X POST http://downloader:8084/connectors -H "Content-Type: application/json" -d '{
"name": "downloader"
,"config": {
"topics": "file-chunk-topic"
, "connector.class": "com.github.streamsend.file.chunk.sink.ChunkSinkConnector"
, "files.dir": "/streamsend/download"
, "tasks.max": "1"
, "auto.register.schemas": "false"
, "schema.ignore": "true"
,"schema.generation.enabled": "false"
}}'

Start the Downloader Container

docker compose up

Single-task and multi-task

This release of the source connectors enforces single-task operation (irrespective of the "tasks.max" setting). This means that consumed chunks are processed by one Kafka Connect task.

Start additional downloader Containers

Multiple single-task downloaders can be configured to consume from one Kafka cluster (using one Kafka topic). These must have differing "name", "connect-group-id" and "connect-client-id" configuration settings. Each downloader operates independantly from each other, consuming chunks and merging files to its own local filesystem.

Additional Downloader containers can be started by

  • duplication of the docker-compose to start a second container on the same docker host. Modify the port and volume mappings so that the new container "files.dir" does not overlap with files from other downloader containers.
  • Run multiple downloader jobs on the same Kafka Connect server: copy/paste the submit command to submit a second job after modifying the "name" property. Modify "name", "connect-group-id" and "connect-client-id" configuration settings to ensure that each downloader consumer operates independantly of other downloader consumers.