[AWS] Lambda에서 boto3를 이용하여 DynamoDB에 CSV, JSON data 삽입하기

0. INTRO

  • DynamoDB는 AWS의 대표적인 NoSQL 서비스이다. Key-Value 쌍으로 data가 저장되는데 RDB의 기준에서 본다면 Key가 Column이 되고 Value가 Row가 된다. 하지만 일반적인 RDB와는 다르게 기존에 없던 Key값이 들어오더라도 새롭게 추가되며 계속 늘어날 수 있는 유연함을 가지고 있다.

아래는 DynamoDB가 가지는 특성을 고려했을 때 기본적인 유의사항이다.

  1. DynamoDB 생성시 Partition Key를 지정하게 되어있는데 이게 소위 말하는 PK의 역할을 한다. 따라서 Input 하려는 data에는 반드시 해당 Key 값이 들어 있어야 한다.
  2. 일반적인 MongoDB와 같은 NoSQL에 data를 삽입할 때 처럼 list에 감싸진 JSON 파일 형태로 Input 시도를 하게 되면 삽입되지 않는다. DynamoDB는 삽입시 포맷이 정해져있는데 삽입 포맷에 맞게 data를 약간 변형 후 for문을 돌며 하나씩 넣어주어야 한다. (변형하지 않고 넣은 방법이 있는데 내가 모르는 것일 수도 있다.)
  • DynamoDB의 data type mapping table
Data Type DynamoDB 형식 Data 예시
Number Data Type N 10, 15.5, 0 …
String Data Type S “Hello”, “world”
Boolean Data Type BOOL True, False
Null Data Type NULL  
Binary Data Type B bXkgc3VwZXIgc2VjcmV0IHRleHQh==
List Data Type L [“Hello”, “World”, 100.54]
Map Data Type M {“name”:”Ryan”}
Number Set Data Type NS [42.2, -19, 7.5, 3.14]
String Set Data Type SS [“Black”, “Green”, “Red”]
Binary Set Data Type BS [“U3Vubnk=”, “UmFpbnk=”, “U25vd3k=”]

1. 본문

1. CSV DATA

  • Lambda로 S3에 있는 CSV data 읽어서 DynamoDB에 Input

    import boto3
    import csv
        
    def lambda_handler(event, context):
        s3_bucket = "data가 존재하는 S3 버킷 이름"
        s3_key = 'S3 버킷 하위 data 디렉토리 경로' # csv_data/titanic.csv
        dynamodb_table = 'DynamoDB 테이블 이름'
    
        # 각 서비스의 boto3 client 생성
        s3 = boto3.client('s3')
        dynamodb = boto3.client('dynamodb')
        
        # S3에 저장되어 있는 CSV 파일 read
        response = s3.get_object(Bucket=s3_bucket, Key=s3_key)
        csv_content = response['Body'].read().decode('utf-8')
        
        # CSV data 추출
        csv_data = csv.reader(csv_content.splitlines())
        header = next(csv_data)
            
        # CSV data의 컬럼 type을 미리 선언
        col_types = ['S', 'N', 'N', 'S', 'S', 'S', 'S', 'S', 'S', 'S', 'S', 'S']
            
        # Iterate through the CSV rows and insert into DynamoDB
        for row in csv_data:
            item = {}
                
            for i in range(len(header)):
                item[header[i]] = {str(col_type[i]): row[i]}
        
            response = dynamodb.put_item(
                TableName=dynamodb_table,
                Item=item
            )
        
        return 'CSV data inserted into DynamoDB'
    

2. JSON DATA

  • Local에 있는 JSON data 읽어서 DynamoDB로 Input

    import boto3
    import json
        
    # boto3 client 생성
    dynamodb = boto3.client('dynamodb',
        aws_access_key_id='',
        aws_secret_access_key='',
        region_name='ap-northeast-2'
    )
    dynamodb_table = 'DynamoDB 테이블 이름'
        
    with open('sample.json', 'r') as file:
        datas = json.load(file)
        for data in datas:
            item = {}
            for k, v in data.items():
                item[k] = {"S":v}
        
            response = dynamodb.put_item(
                TableName=dynamodb_table,
                Item=item
            )
            print(response)
    

3. DynamoDB Insert 형식의 JSON data로 바꿔주는 함수 적용 후 사용

  • 변환 함수

    import json
        
    def get_dynamodb_data_type(value):
        type_mapping = {
            int: "N",
            str: "S",
            float: "N",
            bool: "BOOL",
            list: "L",
            dict: "M",
            type(None): "NULL"
        }
        
        data_type = type_mapping.get(type(value))
        if data_type is None:
            raise ValueError(f"Unsupported data type: {type(value)}")
        
        return data_type
    
  • 변환 함수 적용 코드

    def convert_to_dynamodb_json(json_data, is_file:bool=False):
        '''
        1) json_data가 dict list일때는 해당 data를 바로 넣어준다.
        2) json_data가 json file 경로일때는 경로를 쓰고 is_file자리를 True로 해준다.
        '''
        if is_file:
            json_path = json_data
            json_data = []
            with open(json_path, 'r') as file:
                datas = json.load(file)
                [json_data.append(i) for i in datas]
        else: pass
            
        dynamodb_json = []
        for item in json_data:
            dynamodb_item = {}
        
            for key, value in item.items():
                dynamodb_item[key] = {get_dynamodb_data_type(value): value}
        
            dynamodb_json.append(dynamodb_item)
        
        return dynamodb_json
    

Leave a comment