Optimising Kubernetes MapReduce Function

Summary

Here is a very quick method to create distributed workloads in Kubernetes using MapReduce. The code below is written in Python, however I have Node.js and C, C# variants (see my GitHub repo).

The Components

In a MapReduce job, there are two main stages:

  • Map stage – Processes input data, transforms it into key-value pairs.
  • Reduce stage – Aggregates the key-value pairs into a final result.

In Kubernetes, you can break these stages down into two sets of tasks, each running in different containers or pods. Kubernetes will orchestrate these pods.

Build a MapReduce Function

Create Docker Images for Mapper and Reducer

First, you need to create separate Docker images for the Mapper and Reducer processes. Each image should have your MapReduce code.

Mapper Code:

For example, a simple Python mapper.py:

import sys
for line in sys.stdin:
    words = line.strip().split()
    for word in words:
        print(f"{word}\t1")

Reducer Code:

For example, a simple Python reducer.py:

import sys
from collections import defaultdict

counts = defaultdict(int)
for line in sys.stdin:
    word, count = line.split("\t")
    counts[word] += int(count)

for word, count in counts.items():
    print(f"{word}\t{count}")

You then build Docker images for each:

# Dockerfile for Mapper
FROM python:3.9-slim
COPY mapper.py /mapper.py
CMD ["python", "/mapper.py"]

# Dockerfile for Reducer
FROM python:3.9-slim
COPY reducer.py /reducer.py
CMD ["python", "/reducer.py"]
# Build Docker images
docker build -t mapper-image -f Dockerfile-mapper .
docker build -t reducer-image -f Dockerfile-reducer .

Set Up Kubernetes Cluster

Ensure you have a Kubernetes cluster running. If not, you can set up a local cluster using minikube or a cloud-based Kubernetes cluster like Google Kubernetes Engine (GKE).

# Start minikube (for local)
minikube start

Deploy Mapper Pods

Create a Kubernetes Job for the Mapper phase. A Kubernetes Job can handle parallelisation and run multiple mapper instances simultaneously.

Create a mapper-job.yaml:

apiVersion: batch/v1
kind: Job
metadata:
  name: mapreduce-mapper
spec:
  template:
    spec:
      containers:
      - name: mapper
        image: mapper-image
        args: ["/mapper.py"]
        volumeMounts:
        - name: input-data
          mountPath: /data
      restartPolicy: Never
      volumes:
      - name: input-data
        hostPath:
          path: /path/on/host/to/input-data
  backoffLimit: 4

This Job will run the mappers on the input data you provide via the /data directory. Kubernetes can scale the mapper instances across different nodes.

Collect Intermediate Output

You’ll need a way to collect intermediate key-value pairs from the mappers, which you can store in a shared volume or a distributed file system like NFS or HDFS.

Deploy Reducer Pods

Once the Map phase completes, the intermediate data needs to be processed by the Reducers. You can also create a Kubernetes Job for the Reducer phase.

Create a reducer-job.yaml:

apiVersion: batch/v1
kind: Job
metadata:
  name: mapreduce-reducer
spec:
  template:
    spec:
      containers:
      - name: reducer
        image: reducer-image
        args: ["/reducer.py"]
        volumeMounts:
        - name: intermediate-data
          mountPath: /intermediate
      restartPolicy: Never
      volumes:
      - name: intermediate-data
        hostPath:
          path: /path/on/host/to/intermediate-data
  backoffLimit: 4

The Reducer job will aggregate the output from the Mapper phase to produce the final result.

Orchestrate the Jobs

In Kubernetes, you can use ConfigMaps or environment variables to pass information between the Mapper and Reducer stages, or trigger the Reducer Job once the Mapper job has completed. You can also use Kubernetes CronJobs if you want this MapReduce process to be scheduled periodically.

Alternatively, use a Kubernetes Job Controller or tools like Airflow or Argo Workflows to manage the execution flow between different stages.

Monitor and Manage the Cluster

Kubernetes provides built-in tools for monitoring the job statuses and scaling the containers:

  • kubectl get jobs: To see the status of Jobs.
  • kubectl logs <pod-name>: To see the logs of any running pod (e.g., mappers or reducers).

Scaling

You can scale the Map and Reduce phases by adjusting the parallelism for each Kubernetes Job. For instance, in the Mapper Job spec, you can set:

  parallelism: 10  # Run 10 mapper pods simultaneously

Summary

  • Mapper pods: Process input data and output intermediate key-value pairs.
  • Reducer pods: Process intermediate key-value pairs and generate the final output.
  • Use Kubernetes Jobs to run both the Mapper and Reducer stages, and manage parallelisation.