DEV Community

1

Enhancing Your Matillion Python Component with a Custom Matillion Python Package Powered by Amazon Q Developer

At MRH Trowe, we've integrated Matillion with Snowflake and AWS to streamline our ELT and ETL data integrations (Extract, Load, Transform). Matillion offers a low-code/no-code solution for creating these integrations, providing both SQL-related components (Transformation) for transformation pipelines and orchestration pipelines for data integration and automation (Extract and Load).

While Matillion comes with many pre-built components to connect with various sources, there are situations where Python components are necessary for more complex tasks or to implement feature which are not supported. This combination allows us to load data from different sources into Snowflake, leveraging its robust data platform capabilities.

Given Matillion's focus on low-code integration, I often wondered how to streamline Python development for use within Matillion. Considering that Matillion operates on Docker in AWS (e.g., with Fargate), I decided it was best to take ownership of the container by creating a custom Matillion Helper Package for Python to ease integration. For this, I utilized Amazon Q Developer.

Amazon Q can be leveraged with VS Code and it's server version using a dedicated free extension. You can use your Builders profile or leverage SSO (Amazon Identity Center) to streamline IAM-related tasks for Amazon Q Developer. After installing this extension, you're provided with a chat interface that facilitates an agentic development lifecycle.

Creating a Custom PyPi Package with Amazon Q

I set out to create my first Python package as a companion to the Matillion Dockerfile with the following structure:

matillion-helper/
├── matillion_helper/
│   ├── __init__.py
│   └── helper.py
├── setup.py
├── pyproject.toml
├── README.md
├── Dockerfile
└── .dockerignore
Enter fullscreen mode Exit fullscreen mode

To generate this structure, I prompted Amazon Q: “Create the needed minimal setup for PyPi package to be installed into a Docker container. Name the package matillion_helper.” With Claude Sonnet 3.7. this took only a matter of seconds.

Use Case: Consolidated Logging

Once the package was set up, I focused on creating a module for consolidated logging. This module would log messages from any function up to the main execution, making logging consistent across the package and all Matillion projects. The prompt "Create a logging_config.py file for my matillion_helper package with the following features:

  • Functions to configure logging with console and file output
  • Support for log rotation with configurable max size and backup count
  • Environment variable support for log directory and log level
  • A get_logger convenience function
  • Proper error handling for file permissions
  • Type hints for all functions"

As a result I got this code.


import logging
import logging.handlers
import os
import sys
from pathlib import Path
from typing import Any, Optional

def configure_logging(
    logger_name: Optional[str] = None,
    log_level: int = logging.INFO,
    log_file: Optional[str] = None,
    log_format: str = "[%(asctime)s] [%(levelname)s] [%(name)s] - %(message)s",
    date_format: str = "%Y-%m-%d %H:%M:%S",
    console_output: bool = True,
    max_bytes: int = 10485760,  # 10MB
    backup_count: int = 5,
    propagate: bool = False,
) -> logging.Logger:
    """Configure logging for a module.

    Args:
        logger_name: Name of the logger (defaults to the calling module name)
        log_level: Logging level (default: INFO)
        log_file: Log file name (default: based on logger_name)
        log_format: Format string for log messages
        date_format: Format string for timestamps
        console_output: Whether to output logs to console
        file_output: Whether to output logs to file
        max_bytes: Maximum size of log file before rotation
        backup_count: Number of backup log files to keep
        propagate: Whether to propagate logs to parent loggers

    Returns:
        logging.Logger: Configured logger instance
    """
    # If no logger name is provided, use the calling module's name
    if logger_name is None:
        import inspect

        frame = inspect.stack()[1]
        module = inspect.getmodule(frame[0])
        logger_name = module.__name__ if module else "matillion_helper"

    # Get or create the logger
    logger = logging.getLogger(logger_name)

    # Clear any existing handlers to avoid duplicate logs
    if logger.hasHandlers():
        logger.handlers.clear()

    # Set the log level
    logger.setLevel(log_level)
    logger.propagate = propagate

    # Create formatter
    formatter = logging.Formatter(log_format, date_format)

    # Add console handler if requested
    if console_output:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        console_handler.setLevel(log_level)
        logger.addHandler(console_handler)

    return logger

def get_logger(
    name: Optional[str] = None,
    log_level: Optional[int] = None,
    **kwargs: Any,
) -> logging.Logger:
    """Get a configured logger.

    This is a convenience function for getting a logger with the default configuration.

    Args:
        name: Logger name (defaults to the calling module name)
        log_level: Override default log level
        **kwargs: Additional arguments to pass to configure_logging

    Returns:
        logging.Logger: Configured logger instance
    """
    # Get environment variable for log level, defaulting to INFO
    env_log_level = os.environ.get("MATILLION_LOG_LEVEL", "INFO")

    # Map string log levels to logging constants
    log_level_map = {
        "DEBUG": logging.DEBUG,
        "INFO": logging.INFO,
        "WARNING": logging.WARNING,
        "ERROR": logging.ERROR,
        "CRITICAL": logging.CRITICAL,
    }

    # Use provided log_level, or get from environment, or default to INFO
    effective_log_level = log_level or log_level_map.get(env_log_level, logging.INFO)

    return configure_logging(logger_name=name, log_level=effective_log_level, **kwargs)
Enter fullscreen mode Exit fullscreen mode

This implementation ensured that the logging mechanism was centralized and not dependent on individual developers' preferences.

Use Case: Interaction with AWS API

Another regular task we conduct with Matillion involves IAM context switches. This setup allows us to eliminate static credentials. To achieve this, we implemented the ability to make STS (Security Token Service) AssumeRole calls. The Matillion instance needs permissions from both the source and target AWS accounts to facilitate cross-account access.

Using a prompt like “Create a function to make STS AssumeRole possible in a generic way,” I developed a module that utilizes the existing logging_config.py for consistent logging throughout the implementation.

import boto3
from botocore.exceptions import ClientError, ParamValidationError
from typing import Dict, Optional, Any
from matillion_helper.logging_config import get_logger

# Get configured logger for this module
logger = get_logger(__name__)

def assume_role(
    role_arn: str,
    session_name: str = "AssumedRoleSession",
    duration_seconds: int = 3600,
    external_id: Optional[str] = None,
    region_name: Optional[str] = None,
    sts_client: Optional[Any] = None,
) -> Dict[str, Any]:
    """Assume an AWS IAM role and return temporary credentials.

    This function allows for assuming an IAM role and obtaining temporary credentials
    that can be used to make AWS API calls with the permissions of the assumed role.

    Args:
        role_arn (str): The Amazon Resource Name (ARN) of the role to assume.
        session_name (str, optional): An identifier for the assumed role session.
            Defaults to "AssumedRoleSession".
        duration_seconds (int, optional): The duration, in seconds, of the role session.
            Defaults to 3600 (1 hour).
        external_id (str, optional): A unique identifier that might be required when
            assuming a role in another account. Defaults to None.
        region_name (str, optional): The AWS region to connect to. Defaults to None,
            which uses the default region from the AWS configuration.
        sts_client (boto3.client, optional): An existing STS client to use.
            If not provided, a new client will be created.

    Returns:
        Dict[str, Any]: A dictionary containing the temporary credentials and session information.
        The dictionary includes:
            - AccessKeyId: The access key ID for the temporary credentials
            - SecretAccessKey: The secret access key for the temporary credentials
            - SessionToken: The session token for the temporary credentials
            - Expiration: When the temporary credentials expire

    Raises:
        ClientError: If AWS returns an error during role assumption.
        ParamValidationError: If the parameters provided are invalid.
        Exception: Any other exception that occurs during role assumption.
    """
    try:
        # Create STS client if not provided
        if sts_client is None:
            sts_client = boto3.client("sts", region_name=region_name)

        # Prepare assume role parameters
        assume_role_params = {
            "RoleArn": role_arn,
            "RoleSessionName": session_name,
            "DurationSeconds": duration_seconds,
        }

        # Add external ID if provided
        if external_id:
            assume_role_params["ExternalId"] = external_id

        # Assume the role
        response = sts_client.assume_role(**assume_role_params)

        # Extract credentials from the response
        credentials = response["Credentials"]

        logger.info(f"Successfully assumed role: {role_arn}")

        return credentials

    except ClientError as e:
        logger.exception(f"Failed to assume role due to AWS error: {e}")
        raise
    except ParamValidationError as e:
        logger.exception(f"Invalid parameters for assume_role: {e}")
        raise
    except Exception as e:
        logger.exception(f"Unexpected error assuming role: {e}")
        raise
Enter fullscreen mode Exit fullscreen mode

Use Case: Refreshing Power BI Semantic Models

Another useful feature we developed is the ability to kick off dataset refreshes for Power BI from Matillion. This change transforms a scheduled dependency between Matillion and Power BI semantic models into an event-based approach, resulting in more consistent behavior and fewer error sources. However, configuration in Power BI Admin Settings is required to allow API interaction, and there are restrictions on refreshes—limited to eight per day. If you meet the prerequisites, you leverage event based refreshes for all power bi workspaces and semantic models, by inniting the service principal and assigning contributor rights.

Using a prompt like “Create a Python function to update Power BI datasets in a workspace.”, I implemented a function that interacts with the Power BI API to refresh datasets, ensuring that we can maintain updated reports directly from our data integration workflows.


import requests
import json
import msal
from matillion_helper.logging_config import get_logger

# Get configured logger for this module
logger = get_logger(__name__)

def update_powerbi_datasets(workspace_id, client_id, client_secret, tenant_id):
    """
    Update Power BI datasets in a specified workspace.

    Args:
        workspace_id (str): The ID of the Power BI workspace
        client_id (str): Azure AD application client ID
        client_secret (str): Azure AD application client secret
        tenant_id (str): Azure AD tenant ID

    Returns:
        dict: Result of the refresh operation
    """
    # Get access token using MSAL
    authority = f"https://login.microsoftonline.com/{tenant_id}"
    app = msal.ConfidentialClientApplication(
        client_id=client_id,
        client_credential=client_secret,
        authority=authority
    )

    # Acquire token for Power BI service
    scopes = ["https://analysis.windows.net/powerbi/api/.default"]
    result = app.acquire_token_for_client(scopes=scopes)

    if "access_token" not in result:
        logger.exception(f"Error getting token: {result.get('error')}")
        logger.exception (f"Error description: {result.get('error_description')}")
        return None

    access_token = result["access_token"]

    # Get datasets in the workspace
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    datasets_url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets"
    datasets_response = requests.get(datasets_url, headers=headers)

    if datasets_response.status_code != 200:
        logger.exception (f"Error getting datasets: {datasets_response.text}")
        return None

    datasets = datasets_response.json()["value"]
    refresh_results = {}

    # Refresh each dataset
    for dataset in datasets:
        dataset_id = dataset["id"]
        dataset_name = dataset["name"]

        refresh_url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes"
        refresh_response = requests.post(refresh_url, headers=headers)

        if refresh_response.status_code == 202:
            refresh_results[dataset_name] = "Refresh triggered successfully"
        else:
            refresh_results[dataset_name] = f"Error: {refresh_response.text}"

    return refresh_results

# Example usage
if __name__ == "__main__":
    # Replace these with your actual value
    client_id="client_id"
    client_secret="client_secret"
    tenant_id="tenant_id"
    workspace_id="workspace_id"

    results = update_powerbi_datasets(workspace_id, client_id, client_secret, tenant_id)
    logger.info(results)
Enter fullscreen mode Exit fullscreen mode

Summary

By utilizing Amazon Q and creating a custom Matillion Python package, we gained much-needed flexibility in our development process. While this approach requires initial setup and containerization efforts, the benefits include a more integrated CI/CD process and the ability to manage Python scripts effectively within Matillion.

As of today, the current Copilot from Matillion (MAIA) is not helping you to create and maintain Python scripts. But by introducing Amazon Q Developer you can overcome this hurdle of hight code implementation by procideng meanigful buildings blocks and your data enginneers to leverage pure but predefined Python code.

Happy (vibed) Conding! :-)

Runner H image

An AI Agent That Handles Life, Not Just Work

From ordering flowers to booking your dinner — let Runner H turn your ideas into actions. No prompts, no hassle. Just outcomes.

Try for Free

Top comments (0)

👋 Kindness is contagious

Explore this practical breakdown on DEV’s open platform, where developers from every background come together to push boundaries. No matter your experience, your viewpoint enriches the conversation.

Dropping a simple “thank you” or question in the comments goes a long way in supporting authors—your feedback helps ideas evolve.

At DEV, shared discovery drives progress and builds lasting bonds. If this post resonated, a quick nod of appreciation can make all the difference.

Okay