Optimising Kubernetes MapReduce Function
Summary
Here is a very quick method to create distributed workloads in Kubernetes using MapReduce. The code below is written in Python, however I have Node.js and C, C# variants (see my GitHub repo).
The Components
In a MapReduce job, there are two main stages:
- Map stage – Processes input data, transforms it into key-value pairs.
- Reduce stage – Aggregates the key-value pairs into a final result.
In Kubernetes, you can break these stages down into two sets of tasks, each running in different containers or pods. Kubernetes will orchestrate these pods.
Build a MapReduce Function
Create Docker Images for Mapper and Reducer
First, you need to create separate Docker images for the Mapper and Reducer processes. Each image should have your MapReduce code.
Mapper Code:
For example, a simple Python mapper.py
:
import sys
for line in sys.stdin:
words = line.strip().split()
for word in words:
print(f"{word}\t1")
Reducer Code:
For example, a simple Python reducer.py
:
import sys
from collections import defaultdict
counts = defaultdict(int)
for line in sys.stdin:
word, count = line.split("\t")
counts[word] += int(count)
for word, count in counts.items():
print(f"{word}\t{count}")
You then build Docker images for each:
# Dockerfile for Mapper
FROM python:3.9-slim
COPY mapper.py /mapper.py
CMD ["python", "/mapper.py"]
# Dockerfile for Reducer
FROM python:3.9-slim
COPY reducer.py /reducer.py
CMD ["python", "/reducer.py"]
# Build Docker images
docker build -t mapper-image -f Dockerfile-mapper .
docker build -t reducer-image -f Dockerfile-reducer .
Set Up Kubernetes Cluster
Ensure you have a Kubernetes cluster running. If not, you can set up a local cluster using minikube or a cloud-based Kubernetes cluster like Google Kubernetes Engine (GKE).
# Start minikube (for local)
minikube start
Deploy Mapper Pods
Create a Kubernetes Job for the Mapper phase. A Kubernetes Job can handle parallelisation and run multiple mapper instances simultaneously.
Create a mapper-job.yaml
:
apiVersion: batch/v1
kind: Job
metadata:
name: mapreduce-mapper
spec:
template:
spec:
containers:
- name: mapper
image: mapper-image
args: ["/mapper.py"]
volumeMounts:
- name: input-data
mountPath: /data
restartPolicy: Never
volumes:
- name: input-data
hostPath:
path: /path/on/host/to/input-data
backoffLimit: 4
This Job will run the mappers on the input data you provide via the /data
directory. Kubernetes can scale the mapper instances across different nodes.
Collect Intermediate Output
You’ll need a way to collect intermediate key-value pairs from the mappers, which you can store in a shared volume or a distributed file system like NFS or HDFS.
Deploy Reducer Pods
Once the Map phase completes, the intermediate data needs to be processed by the Reducers. You can also create a Kubernetes Job for the Reducer phase.
Create a reducer-job.yaml
:
apiVersion: batch/v1
kind: Job
metadata:
name: mapreduce-reducer
spec:
template:
spec:
containers:
- name: reducer
image: reducer-image
args: ["/reducer.py"]
volumeMounts:
- name: intermediate-data
mountPath: /intermediate
restartPolicy: Never
volumes:
- name: intermediate-data
hostPath:
path: /path/on/host/to/intermediate-data
backoffLimit: 4
The Reducer job will aggregate the output from the Mapper phase to produce the final result.
Orchestrate the Jobs
In Kubernetes, you can use ConfigMaps or environment variables to pass information between the Mapper and Reducer stages, or trigger the Reducer Job once the Mapper job has completed. You can also use Kubernetes CronJobs if you want this MapReduce process to be scheduled periodically.
Alternatively, use a Kubernetes Job Controller or tools like Airflow or Argo Workflows to manage the execution flow between different stages.
Monitor and Manage the Cluster
Kubernetes provides built-in tools for monitoring the job statuses and scaling the containers:
- kubectl get jobs: To see the status of Jobs.
- kubectl logs <pod-name>: To see the logs of any running pod (e.g., mappers or reducers).
Scaling
You can scale the Map and Reduce phases by adjusting the parallelism for each Kubernetes Job. For instance, in the Mapper Job spec, you can set:
parallelism: 10 # Run 10 mapper pods simultaneously
Summary
- Mapper pods: Process input data and output intermediate key-value pairs.
- Reducer pods: Process intermediate key-value pairs and generate the final output.
- Use Kubernetes Jobs to run both the Mapper and Reducer stages, and manage parallelisation.