Season 1/기술 보안

[Active Directory] 4장. AD 정보수집 프로그램 제작

작성자 - S1ON

개요

이번 장에서는 파이썬을 이용하여 AD 도메인 서버에 연동된 모든 계정과 그룹 정보를 파싱하고, 불필요 또는 관리가 필요한 계정을 식별 및 표기하여 엑셀로 출력하는 프로그램을 만들어보자.

 

프로그램 요구사항

2.1. Python으로 AD 사용자, 그룹 정보를 파싱하여 엑셀에 활용 가능한 DataFrame으로 저장

2.2. 전체 AD 계정의 정보 {Key,Data} Set을 엑셀 시트('AD_Account_Info')에 기록

2.3. 전체 AD 그룹 별 소속된 유저 수와 활성 유저 정보를 엑셀 시트('AD_Group_Info')에 기록

2.4. 엑셀에 작성된 Data를 검토하여 조건에 부합하는 계정 및 그룹 Cell에 스타일 지정

        > Cell 배경(노란색): 암호 변경일 또는 최근 로그인 기준 90일이 초과하거나 이력이 없는 계정

        > Cell 배경(주황색): AD서버에서 기본 제공하는 주요 권한 그룹(로컬/글로벌)

2.5. 최종 작성된 엑셀을 파일로 저장

 

AD 정보 구조 파악

3.1. AD 사용자 목록 출력

- 하이픈(-)과 명령 실행 완료 구문 사이에 계정 정보가 위치하며, 각 계정은 공백으로 구분되어 출력된다.

net /domain user

 

3.2. AD 단일 사용자 정보 출력

- 도메인 컨트롤러 요청 처리 구문과 명령실행 완료 구문 사이에 AD 사용자 정보가 위치한다.

- 사용자 정보는 20개의 Key, Data로 구성되어 있고, Key와 Data는 공백으로 구분되어 출력된다.

net /domain user {user ID}

 

3.3. AD 그룹 목록 출력

- 하이픈(-)과 명령 실행 완료 구문 사이에 계정 정보가 위치하며, 각 계정은 행(\n)으로 구분되어 출력된다.

net /domain group

 

3.4. AD 그룹 소속 유저 목록 출력

- 하이픈(-)과 명령 실행 완료 구문 사이에 계정 정보가 위치하며, 각 계정은 공백으로 구분되어 출력된다.

net /domain group {group Name}

 

프로그램 코드 요약

함수 명 설명
run_cmd(cmd) 시스템 명령(cmd) 실행 후, 결과를 텍스트 형태로 반환하는 함수
parse_account() AD 사용자 계정 목록을 파싱하여 리스트 형태로 반환하는 함수
parse_user_data() AD 계정 리스트 활용, 전체 AD 사용자 정보를 파싱하고 Key, Data 형태로 반환하는 함수
fetch_user_data() parse_user_data ()로 출력된 AD 사용자 정보에서 불필요한 문자열을 제거하여 반환하는 함수
user_data_to_df() 쓰레드를 이용하여 전체 AD 사용자 정보를 파싱(fetch_user_data()) 하고, df 형태로 반환하는 함수
check_data() AD 사용자 정보에서 파싱한 시간(text)을 date 형으로 변환하고, 90일 기준으로 비교하는 함수
apply_style_if_needed() 요구사항에서 정의한 보안검토 필요 대상 AD 사용자/그룹 식별하여 엑셀 스타일 지정(색 표기)
user_df_to_excel() user_data_to_df()에서 반환한 DataFrame을 엑셀 시트에 작성하는 함수
parse_group() AD 그룹 목록을 파싱하고 불필요 문자열을 제거하여, 리스트 형태로 반환하는 함수
fetch_group_data() AD 그룹 리스트 활용, 전체 AD 그룹 정보를 파싱하고, 딕셔너리 형태로 반환하는 함수
group_data_to_df() 쓰레드를 이용하여 전체 AD 그룹 정보를 파싱(fetch_group_data()) 하고, df 형태로 반환하는 함수
group_df_to_excel() group_data_to_df()에서 반환한 DataFrame을 엑셀 시트에 작성하는 함수
get_ad_domain() 프로그램이 실행되는 환경의 AD 도메인 이름을 파싱하는 함수

 

프로그램 전체 코드

import re
import subprocess
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import PatternFill, Border, Side, Font
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.utils import get_column_letter
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_cmd(cmd):
    """외부 명령 실행 및 결과 반환"""
    result = subprocess.run(cmd, stdout=subprocess.PIPE, text=True, shell=True)
    return result.stdout

def parse_account():
    cmd = "net /domain user"
    output = run_cmd(cmd)
    accounts_part = output.split('-------------------------------------------------------------------------------\n')[1]
    accounts_part = accounts_part.split('명령을 잘 실행했습니다.')[0]
    return [account for line in accounts_part.split('\n') for account in line.strip().split(' ') if account]

def parse_user_data(user_info):
    user_data = {}
    current_key = None
    status = 0
    
    for info in user_info:
        parts = re.split(r'\s{2,}', info)
        if len(parts) >= 2:
            if parts[0]:
                current_key = parts[0]
                user_data[current_key] = parts[1]
                status = 1
            else:
                user_data[current_key] += "; "+parts[1]
            
    return user_data

def fetch_user_data(user):
    cmd = f"net /domain user {user}"
    output = run_cmd(cmd)
    
    user_info = output.split('요청을 처리할 것입니다.')[1]
    user_info = user_info.split('명령을 잘 실행했습니다.')[0]
    user_info = user_info.split('\n')
    return parse_user_data(user_info)

def user_data_to_df():
    accounts = parse_account()
    total = len(accounts)
    users_data = []

    #병렬 처리(max=10)
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_user = {executor.submit(fetch_user_data, user): user for user in accounts}
        #future_to_user = {executor.submit(fetch_user_data, user): user for user in accounts[:3]}
        for count, future in enumerate(as_completed(future_to_user), 1):
            user_data = future.result()
            users_data.append(user_data)
            print('\r 사용자 정보 확인 중: ', count, "/", total, end='')
    print()
    return pd.DataFrame(users_data)
    
def check_date(date_str, ninety_days_ago):
    if '오전' in date_str:
        date_str = date_str.replace('오전', 'AM')
    elif '오후' in date_str:
        date_str = date_str.replace('오후', 'PM')
        
    if date_str == '아님':
        return True
    
    try:
        date = datetime.strptime(date_str, "%Y-%m-%d %p %I:%M:%S")
        return date < ninety_days_ago
    except ValueError:
         return False
        
def apply_style_if_needed(df, ws, r_index, c_idx, column_name, value, ninety_days_ago):
    yellow_fill = PatternFill(start_color='FFFF00', end_color='FFFF00', fill_type='solid')

    if column_name in ['마지막으로 암호 설정한 날짜', '최근 로그온'] and df.iloc[r_index-2]['활성 계정'] == '예':
        if column_name == '마지막으로 암호 설정한 날짜' and check_date(value, ninety_days_ago):
            ws.cell(row=r_index, column=c_idx).fill = yellow_fill
            ws.cell(row=r_index, column=1).fill = yellow_fill
        elif column_name == '최근 로그온' and (value == '아님' or check_date(value, ninety_days_ago)):
            ws.cell(row=r_index, column=c_idx).fill = yellow_fill
            ws.cell(row=r_index, column=1).fill = yellow_fill
                                 
def user_df_to_excel(df, wb):
    ws = wb["AD_Account_Info"]
    thin_border = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin'))
    today = datetime.now()
    ninety_days_ago = today - timedelta(days=90)
    
    # 엑셀 헤더 쓰기
    for c_idx, column_name in enumerate(df.columns, start=1):
        cell = ws.cell(row=1, column=c_idx)
        cell.value = column_name
        cell.font = Font(bold=True)
        cell.border = thin_border
        cell.fill = PatternFill(start_color='D3D3D3', end_color='D3D3D3', fill_type='solid')
        
    for r_idx, row in df.iterrows():
        r_index = r_idx + 2
        for c_idx, value in enumerate(row, start=1):
            column_name = df.columns[c_idx-1]
            cell = ws.cell(row=r_index, column=c_idx)
            cell.value = value
            cell.border = thin_border
            apply_style_if_needed(df, ws, r_index, c_idx, column_name, value, ninety_days_ago)

    local_group_name_idx = df.columns.get_loc('로컬 그룹 구성원') + 1
    global_group_name_idx = df.columns.get_loc('글로벌 그룹 구성원') + 1

    for r_idx, value in enumerate(df['로컬 그룹 구성원'], start=1):
        if 'admin' in str(value).lower() or 'account operator' in str(value).lower() or 'remote' in str(value).lower() or 'backup' in str(value).lower():
            ws.cell(row = r_idx + 1, column = local_group_name_idx).fill = PatternFill(start_color='FFA500', end_color='FFA500', fill_type='solid')

    for r_idx, value in enumerate(df['글로벌 그룹 구성원'], start=1):
        if 'admin' in str(value).lower() or 'owner' in str(value).lower():
            ws.cell(row = r_idx + 1, column = global_group_name_idx).fill = PatternFill(start_color='FFA500', end_color='FFA500', fill_type='solid')

    ws.column_dimensions['A'].width = 13
    ws.column_dimensions['B'].width = 13
    ws.column_dimensions['C'].width = 13
    ws.column_dimensions['H'].width = 25
    ws.column_dimensions['Q'].width = 25
    ws.column_dimensions['S'].width = 18
    ws.column_dimensions['T'].width = 18

    return wb

def parse_group():
    cmd = "net /domain group"
    output = run_cmd(cmd)
    groups_part = output.split('-------------------------------------------------------------------------------\n')[1]
    groups_part = groups_part.split('명령을 잘 실행했습니다.')[0]
    groups = groups_part.split('\n')
    return groups   

def fetch_group_data(group_name, inactive_users):
    cmd = f"net /domain group \"{group_name.replace('*','')}\""
    output = run_cmd(cmd)
    group_describe = output.split('\n')[3].split('          ')[1]
    groups_part = output.split('-------------------------------------------------------------------------------\n')[1].split('명령을 잘 실행했습니다.')[0]

    group_users = [group for line in groups_part.split('\n') for group in line.strip().split(' ') if group]
    updated_users = [user for user in group_users if user not in inactive_users]

    return {
        "group_name": group_name,
        "description": group_describe,
        "total_users_count": len(group_users),
        "active_users_count": len(updated_users),
        "active_users": updated_users[:10]
    }


def group_data_to_df(users_df):
    groups = parse_group()[:-1]
    total = len(groups)
    groups_data = []
    inactive_users = users_df[users_df["활성 계정"] == "아니요"]["사용자 이름"].tolist()

    #병렬 처리(max=10)
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_group = [executor.submit(fetch_group_data, group, inactive_users) for group in groups]
        for count, future in enumerate(as_completed(future_to_group), 1):
            group_data = future.result()
            #print(group_data["group_name"],group_data["active_users_count"])
            if group_data["active_users_count"] > 0:
                groups_data.append(group_data)
            print('\r 그룹 정보 확인 중: ', count, "/", total, end='')

    rows = []
    for group in groups_data:
        if group["active_users_count"] > 0:
            row = [group["group_name"],group["description"],group["total_users_count"],group["active_users_count"]] + group["active_users"] + [None] * (10-len(group["active_users"]))
            rows.append(row)            

    columns = ["그룹 명", "그룹 설명", "전체 유저(수)", "활성 유저(수)"] + [f"활성 유저{i+1}" for i in range(10)]
    return pd.DataFrame(rows, columns=columns)

def group_df_to_excel(df, wb):
    ws = wb["AD_Group_Info"]
    thin_border = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin'))
    
    # 엑셀 헤더 쓰기
    for c_idx, column_name in enumerate(df.columns, start=1):
        cell = ws.cell(row=1, column=c_idx)
        cell.value = column_name
        cell.font = Font(bold=True)
        cell.border = thin_border
        cell.fill = PatternFill(start_color='D3D3D3', end_color='D3D3D3', fill_type='solid')

    for r_idx, row in df.iterrows():
        r_index = r_idx + 2
        for c_idx, value in enumerate(row, start=1):
            column_name = df.columns[c_idx-1]
            cell = ws.cell(row=r_index, column=c_idx)
            cell.value = value
            cell.border = thin_border

    group_name_idx = df.columns.get_loc('그룹 명') + 1

    for r_idx, value in enumerate(df['그룹 명'], start=1):
        if 'admin' in str(value).lower() or 'owner' in str(value).lower():
            ws.cell(row = r_idx + 1, column = group_name_idx).fill = PatternFill(start_color='FFA500', end_color='FFA500', fill_type='solid')

    ws.column_dimensions['A'].width = 30
    ws.column_dimensions['B'].width = 60

    for col in range(3,14):
        col_letter = get_column_letter(col)
        ws.column_dimensions[col_letter].width = 13
            
    return wb

def get_ad_domain():
    result = subprocess.check_output(['nltest', '/dsgetdc:'], text=True)
    lines = result.split('\n')
    for line in lines:
        if "Dom 이름" in line:
            _, domain_name = line.split(': ')
            return domain_name.strip()

start_time = datetime.now()

wb = Workbook()
ws1 = wb.active
ws1.title = "AD_Account_Info"
ws2 = wb.create_sheet(title="AD_Group_Info")

df1 = user_data_to_df()
user_df_to_excel(df1, wb)

df2 = group_data_to_df(df1)
group_df_to_excel(df2, wb)

ad_domain = get_ad_domain()

file_name = "AD(%s)_Info_%s.xlsx" % (ad_domain, start_time.strftime("%y%m%d_%H%M"))
wb.save(file_name)
printf("프로그램 실행 완료")

 

프로그램 변환(Python to EXE)

Pyinstaller 모듈을 이용하여 코드 상에서 import된 라이브러리를 포함하는 EXE 파일로 변환

pyinstaller -F ad_v1.0.py

 

프로그램 실행 결과

- AD_INFO_TO_EXCEL_v1.1.exe 프로그램 실행

 

- 엑셀 결과 확인

 

프로그램 활용 방안

- 조직 AD 계정 및 그룹 현황 파악
- 관리자 권한 부여된 AD 계정 식별

- 관리 미흡(암호 미변경, 미사용) AD 계정 식별

Contents

이 글이 도움이 되었다면, 응원의 댓글 부탁드립니다.