A Java-based distributed systems project with three cooperating roles:
Master: accepts jobs, splits them into tasks, schedules workers, and aggregates results.Worker: requests tasks, executes compute jobs, and reports progress/results.Producer: interactive CLI client to submit jobs, inspect states, and visualize metrics.
The system supports numeric jobs (SUM, SUMSQ) and distributed mini-batch training (TrainGdBatchJob) with sync and async modes.
- Distributed task scheduling over TCP sockets.
- Job types:
- range sum (
sum) - range sum of squares (
sumsq) - gradient-based training (
train)
- range sum (
- Fault tolerance:
- lease-based liveness checks
- retries with exponential backoff
- max-attempt protection
- progress-aware task reclaim
- speculative execution for stragglers
- Metrics and artifacts:
- per-epoch CSV logs
- job summary and speedup logs
- generated PNG charts
- exported model weights (CSV and 28x28 image for MNIST-size vectors)
- State persistence for master, producer, and worker processes.
- Java 24 (project classpath is set to
JavaSE-24) - Standard library only (no external runtime dependencies)
- Eclipse-style project layout (
src/->bin/)
src/de/luh/vss/chat/
common/ # message protocol and role types
dtq/
master/ # scheduler, leases, task lifecycle, aggregation
worker/ # worker runtime and training task execution
producer/ # producer CLI, result handling, plotting trigger
job/ # job models (including TrainGdBatchJob)
task/ # task state model
metrics/ # CSV logging + chart generation
Important runtime output folders (created automatically):
logs/-> epoch logs, summaries, speedup CSV, generated chartsstate/-> persistent state snapshots per component- project root ->
weights_<jobId>.csvand optionallyweights_<jobId>.png
- JDK 24 installed and available on
PATH - Terminal access (run 3+ terminals for full demo: master, workers, producer)
Verify Java:
java -version
javac -versionWorker and Producer currently use a hardcoded master host:
src/de/luh/vss/chat/dtq/worker/Worker.javasrc/de/luh/vss/chat/dtq/producer/Producer.java
By default they connect to:
"10.172.119.178", 44444For local testing on one machine, change host to 127.0.0.1 (or your master machine IP on LAN).
From repository root:
mkdir -p bin
javac -d bin $(find src -name "*.java")Start components in this order.
- Start master:
java -cp bin de.luh.vss.chat.dtq.master.Master- Start one or more workers (each with unique worker ID):
java -cp bin de.luh.vss.chat.dtq.worker.Worker 1 2
java -cp bin de.luh.vss.chat.dtq.worker.Worker 2 2Worker args:
arg0: worker ID (int)arg1: slots/concurrency (int)arg2(optional): task timeout in ms for training tasks
- Start producer:
java -cp bin de.luh.vss.chat.dtq.producer.Producer 1Producer arg:
arg0: producer ID (int)
helpsum start|endsumsq start|endtrain key=value|key=value|...resume jobId|key=value|...jobsplotchat <message>exit
jobs-> list job states and producer IDslist-> list active workers/producers with lease timestampsinfo-> show master IP and portworkers-> worker performance snapshotworkers verbose-> include in-flight task agesexit-> persist completed jobs and shut down
stats-> local worker stats (completed/failed/last task)REQUEST_TASK-> manual task request<text>-> sends chat textexit
sum 1|1000000
sumsq 1|1000000
Training example:
train type=TrainGdBatchJob|model=logreg|task=binary_is_zero|samples=2000|features=784|epochs=1|batchSize=128|lr=0.1|seed=42|dataRef=file:./data/train.csv|testRef=file:./data/mnist_test.csv|hasHeader=true|csvLabel=first|normalize=255|init=zeros|mode=sync
Resume from previous model weights:
resume <jobId>|jobId=<newJobId>|epochs=1|lr=0.01|batchSize=128
Prebuilt demo commands are available in:
src/de/luh/vss/chat/dtq/producer/jobs.txt
Required fields for train:
model(linregorlogreg)task(for examplebinary_is_zero,binary_is_one, ...)samplesfeaturesepochsbatchSizelrseeddataRef
Common optional fields:
jobIdtestRefhasHeader(true/false)csvLabel(firstorlast)normalize(for example255)mode(syncorasync)init(zerosorweights)w(comma-separated weight vector)wFile(path to a local weights file, producer side convenience)patience,minDelta(early stopping behavior)
Datasets used by training jobs are included under:
src/de/luh/vss/chat/dtq/worker/datasrc/de/luh/vss/chat/dtq/producer/data
dataRef/testRef resolution supports:
- direct path
file:prefixed path- fallback to worker data folders when relative paths are provided
During/after training, the system writes:
logs/<jobId>_epochs.csvlogs/job_summaries.csvlogs/speedup.csvlogs/speedup.png(viaplot)logs/sync_async.png(viaplot)weights_<jobId>.csvweights_<jobId>.png(when weights length is784)
- Lease renewals every 5s; stale peers are removed by master lease checks.
- Running tasks have leases and progress heartbeats.
- Reclaim triggers include worker death, stale progress, or long-running tasks.
- Retries use exponential backoff and stop after max attempts.
- Speculative re-execution can launch duplicate work for slow tasks; first valid completion wins.
State snapshots are persisted under state/:
state/master_state.propertiesstate/producer_<id>.propertiesstate/worker_<id>.properties
Completed/failed jobs are reloaded on restart.
- Worker/producer cannot connect:
- verify master is running
- verify host IP in
Worker.javaandProducer.java - verify port
44444is reachable
- Job fails with
FAILED|reason=DATAREF:- verify
dataRef/testRefpaths exist and are readable
- verify
- No charts generated:
- run at least one training job first, then run
plotin producer
- run at least one training job first, then run