DEV Community

David Espejo
David Espejo

Posted on

Boost your ML pipeline performance with efficient parallelism

I'll show you different levels of parallelism you can achieve to improve the performance of complex Machine Learning workflows.

Parallelism in ML pipelines

Map is a functional programming paradigm that allows you to apply a function to each element of a collection (e.g., a list) and return the output in a collection of the same type.
This can be very useful in ML pipelines, where multiple transformation steps are applied iteratively to the input dataset. Say you need to preprocess a text dataset represented as a list:

raw_texts = [
    "Hello! This is a SAMPLE text with UPPERCASE words.",
    "  Extra   spaces   AND 123 numbers...  ",
    "Another EXAMPLE with Punctuation!!!"
]
Enter fullscreen mode Exit fullscreen mode

You may want to define a preprocessing function:

def preprocess_text(text):
    # Lowercase
    text = text.lower()
    # Remove special characters and numbers
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    return text
Enter fullscreen mode Exit fullscreen mode

Then, you need to apply this function to the input text. Doing it sequentially would likely involve writing something like a for loop. While the output is the same, this can be time-consuming for large datasets, and processing it in parallel may be preferable.

Enter the map pattern.

# Apply preprocessing to all elements in the inputs list using map
preprocessed = list(map(preprocess_text, raw_texts))
Enter fullscreen mode Exit fullscreen mode

Using this approach, we can complete preprocessing in parallel, which can be beneficial in ML pipelines where this step may happen multiple times.

But, how to do it more efficiently? How to distribute the load among multiple compute nodes? Enter Flyte Map Tasks.

Flyte MapTask

Flyte is a distributed computation framework that uses a Kubernetes Pod as the fundamental execution environment for each task in a pipeline. When you use MapTasks, Flyte automatically distributes the load among multiple Pods that run in parallel and limits each Pod to downloading and processing only a specific index from the inputs list, preventing inefficient duplicate data movement.

But how many Pods are created?

Let’s see.

In this simple example:

from flytekit import map_task, task, workflow

threshold = 11


@task
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold


@workflow
def map_workflow(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return map_task(detect_anomalies)(data_point=data)
Enter fullscreen mode Exit fullscreen mode

We can see it creates one Pod per element in the inputs list:

K8s Pods created in the namespace of this execution

We said it’s an iterative process, so next time you have to do this, MapTasks again spin up 10 Pods. What if there are thousands or millions of inputs (like in many LLM input datasets)? What is the impact of booting up one container per element in the inputs list?

With the example above, this is the time it takes to complete the execution:

Union UI showing execution time for the map task defined above

20 seconds.
If we duplicate the list size, time starts piling up:

Union UI showing a new execution with more data taking more time
This is, for a 100% increase in the input dataset size, there was a 50% increase in total execution time.

What if you could reuse a specific number of Pods to mitigate this penalty?

Union Actors

This feature allows you to declare an execution environment and then reuse it through multiple executions to mitigate the impact of container boot-up times.

We define a slightly modified version of the previous example to run it in the Union platform using an input dataset with 100 items.
As you may imagine at this point, it would create 100 Pods.

To mitigate this workflow becoming a "noisy neighbour" in your cluster, you can specify the number of concurrent executions and, hence, the number of Pods that are created simultaneously at any point in time. In this example, we limit this number to 10:

from flytekit import map_task
import union
import random
threshold = 31

#Declare a container image
image = union.ImageSpec(
    packages=["union==0.1.168"],
    builder= "union",
)

@union.task(container_image=image)
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold

@union.workflow
def map_workflow(data: list[int] = random.sample(range(1,101), k=100)) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return map_task(detect_anomalies, concurrency=10)(data_point=data)
Enter fullscreen mode Exit fullscreen mode

Execution takes 1 minute:

Union UI showing an execution that takes 1 minute

Now, if we define an Actors environment with 10 replicas, or 10 "reusable" Pods:

from flytekit import map_task
import union
import random

threshold = 101

image = union.ImageSpec(
    #registry="ghcr.io/davidmirror-ops/images",
    packages=["union==0.1.168"],
    builder= "union",
)
#Here we define the Actors settings
actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=10,
    ttl_seconds=30,
    requests=union.Resources(
        cpu="125m",
        mem="256Mi",
    ),
    container_image=image,
)

@actor.task
def detect_anomalies(data_point: int) -> bool:
    return data_point > threshold


@union.workflow
def map_workflow(data: list[int] = random.sample(range(1,101), k=100)) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return map_task(detect_anomalies)(data_point=data)
Enter fullscreen mode Exit fullscreen mode

Execution time is 22 seconds:

Union UI showing an execution that takes 22 seconds

That's a 63% decrease in execution time, enabling much higher iteration velocity and more efficient resource consumption.

Conclusion

Modern ML workloads are typically designed to process in parallel big datasets. That carries a performance penalty due to the latency of booting up a container. Flyte let's you achieve efficient parallel processing but Union Actors take it to the next level by also removing the limitation of one Pod per input item, allowing faster and more efficient executions.


Signup for Union's free tier
Check out the repo
Questions? Join us on Slack!

Postmark Image

"Please fix this..."

Focus on creating stellar experiences without email headaches. Postmark's reliable API and detailed analytics make your transactional emails as polished as your product.

Start free

Top comments (0)

ACI image

ACI.dev: Fully Open-source AI Agent Tool-Use Infra (Composio Alternative)

100% open-source tool-use platform (backend, dev portal, integration library, SDK/MCP) that connects your AI agents to 600+ tools with multi-tenant auth, granular permissions, and access through direct function calling or a unified MCP server.

Check out our GitHub!