[AWS] Lambda를 이용하여 EMR 클러스터 스케쥴링 하기

0. INTRO

  • 클라우드상에서 데이터 분석시 이용되는 Pyspark EMR Cluster 07:30 - 18:00 까지 User들이 사용을 하고 이 이외 시간에는 사용률이 극히 낮다. 이런 상황에서 과금 발생을 줄이기 위해서는 클러스터에 대한 스케쥴링 작업이 필수적이다.
  • EventBridge 서비스를 통해 매일 특정 시각에 Lambda 함수를 Trigger하여 EMR 클러스터를 시작하고 종료시키는 방법에 대해서 알아보자! 간단한 아키텍쳐는 아래와 같다.

xff


1. EMR CLUSTER 생성

  1. EMR 클러스터를 생성하는 Lambda 함수
import boto3
from datetime import date, datetime, timezone, timedelta

# 일일 스케쥴링을 위한 date 생성
KST = timezone(timedelta(hours=9))
time_record = datetime.now(KST).strftime('%Y%m%d')
_today = time_record[2:]

# boto3를 통한 emr client 생성
client = boto3.client("emr")


def lambda_handler(event, context):

    # run_job_flow > EMR Cluster 생성 함수
    cluster_id = client.run_job_flow(
        Name=f"emr_cluster_{_today}",
        Instances={
            "InstanceGroups": [
                {
                    "Name": "Master",
                    "Market": "ON_DEMAND",
                    "InstanceRole": "MASTER",
                    "InstanceType": "r5.xlarge",
                    "InstanceCount": 1,
                    "Configurations": [
                        {
                            "Classification": "spark",
                            "Properties": {"maximizeResourceAllocation": "false"},
                        },
                        {
                            "Classification": "spark-defaults",
                            "Properties": {"spark.dynamicAllocation.enabled": "false",},
                        },
                    ],
                },
                {
                    "Name": "Slave",
                    "Market": "ON_DEMAND",
                    "InstanceRole": "CORE",
                    "InstanceType": "r5.xlarge",
                    "InstanceCount": 2,
                    "Configurations": [
                        {
                            "Classification": "spark",
                            "Properties": {"maximizeResourceAllocation": "false"},
                        },
                        {
                            "Classification": "spark-defaults",
                            "Properties": {"spark.dynamicAllocation.enabled": "false",},
                        },
                    ],
                },
            ],
            "Ec2KeyName": "SSH-Key Name",
            # "Placement" : {
            #     "AvailabilityZone" : "ap-northeast-2c"
            # },
            "KeepJobFlowAliveWhenNoSteps": True,
            "TerminationProtected": True,
            "Ec2SubnetId": "subnet-xxxxx",
        },
        LogUri="s3://custom/log/path/",
        ReleaseLabel="emr-6.3.0",
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        BootstrapActions=[
          { 
             "Name": "emr_custom_library",
             "ScriptBootstrapAction": { 
                "Path": "s3://custom/bootstrap/script_path/"
             }
          }
       ],
        # 만드는 클러스터에 설치되는 app들 정의
        Applications=[
            {"Name": "Spark"},
            {"Name": "Hive"},
            {"Name": "Hadoop"},
            {"Name": "JupyterHub"},
            {"Name": "JupyterEnterpriseGateway"},
        ],
    )
  1. EventBridge Rule를 통한 Trigger Scheduling
    • Amazon EventBridge > Rules > Create rule 메뉴를 클릭하여 Event Pattern에 따른 CRON을 설정하여 Rule 생성

    image

    • Lambda Trigger로 해당 Eventbridge Rule 등록

    image

  2. Lambda 함수 Save & Deploy
    • EventBridge Rule에 정해놓은 CRON 표현식의 시간에 해당 Lambda 함수가 작동되어 코드에 명시된 Spec과 동일한 EMR Cluster 생성이 시작된다.

2. EMR CLUSTER 종료

  • 클러스터 종료의 경우는 종료 시각을 EventBridge Rule에 등록해놓은 후 Lambda Trigger 설정을 해놓는 부분은 위와 동일하다.
  • EMR 클러스터를 종료하는 Lambda 함수
import boto3
from datetime import date, datetime, timezone, timedelta

# 일일 스케쥴링을 위한 date 생성
KST = timezone(timedelta(hours=9))
time_record = datetime.now(KST).strftime('%Y%m%d')
_today = time_record[2:]
cluster_name = (f"emr_cluster_{_today}",)

# boto3를 통한 emr client 생성
client = boto3.client("emr")


def lambda_handler(event, context):

    # 현재 상태가 WAITING 혹은, RUNNING인 클러스터들의 목록을 조회한다.
    page_iterator = client.get_paginator("list_clusters").paginate(
        ClusterStates=["WAITING", "RUNNING"]
    )

    # 목록을 돌며 이름이 위에 명시한 cluster_name과 동일한 클러스터를 찾아 ID를 추출한다.
    for page in page_iterator:
        for item in page["Clusters"]:
            if item["Name"] == cluster_name[0]:
                cluster_id = item["Id"]

    # 특정 ID를 가지고 있는 클러스터의 종료 보호를 해제시킨 후 클러스터를 종료한다.
    client.set_termination_protection(
        JobFlowIds=[cluster_id], TerminationProtected=False
    )
    client.terminate_job_flows(JobFlowIds=[cluster_id])

3. OUTRO

  • 기본적으로 SDK를 이용하여 특정 AWS 서비스의 client를 호출할 때는 자격증명을 위해 아래 세가지 정보들을 함께 명시 후 호출을 하여야 한다.
    • AWS_ACCESS_KEY_ID : 유저 개인의 자격증명 Access Key의 ID
    • AWS_SECRET_ACCESS_KEY : 유저 개인의 자격증명 Access Key의 비밀번호
    • REGION_NAME : 유저의 서비스 사용 리전
      client = boto3.client('서비스명'
                          , AWS_ACCESS_KEY_ID = ''
                          , AWS_SECRET_ACCESS_KEY = ''
                          , REGION_NAME = '')
    
  • Lambda 서비스를 이용한 client 호출은 보통의 경우 AWS 내에서 같은 Region에 위한 서비스를 사용하는 것이기 때문에 특별한 경우가 아니면 자격증명 관련된 3개 인자를 생략하고 서비스명만 넣어 호출해도 된다.

      client = boto3.client('서비스명')
    

Leave a comment