▪ Synopsis
Airflow을 Kubernetes 기반으로 세팅 후 Dag Task들을 KubernetesPodOperator로 사용하고 있다.
code refactoring 과정에서 resource 부분을 MIN, MID, MAX로 분류 후 사용하기 위해 enum 변수를 정의해서 적용했다.
def get_k8s_operator(
task_id,
image,
dag,
cmds=None,
arguments=None,
secrets=None,
volumes=None,
volume_mounts=None,
resources=min_resources,
):
return KubernetesPodOperator(
name=task_id.replace("_", "-"),
task_id=task_id,
image=image,
namespace=variable_namespace,
image_pull_policy=get_image_pull_policy(),
service_account_name=variable_service_account,
env_vars=[k8s.V1EnvVar(name="PHASE", value=variable_phase)],
in_cluster=True,
get_logs=True,
is_delete_operator_pod=True,
cmds=cmds,
arguments=arguments,
dag=dag,
secrets=secrets,
volumes=volumes,
volume_mounts=volume_mounts,
resources=resources,
)
class ResourceEnum(Enum):
REQUEST_CPU_MIN = "200m"
REQUEST_MEMORY_MIN = "300m"
LIMIT_CPU_MIN = "200m"
LIMIT_MEMORY_MIN = "500m"
REQUEST_CPU_MID = "400m"
REQUEST_MEMORY_MID = "1Gi"
LIMIT_CPU_MID = "400m"
LIMIT_MEMORY_MID = "2Gi"
REQUEST_CPU_MAX = "1000m"
REQUEST_MEMORY_MAX = "2Gi"
LIMIT_CPU_MAX = "1000m"
LIMIT_MEMORY_MAX = "4Gi"
def get_min_resources(
request_cpu=ResourceEnum.REQUEST_CPU_MIN.value,
request_memory=ResourceEnum.REQUEST_MEMORY_MIN.value,
limit_cpu=ResourceEnum.LIMIT_CPU_MIN.value,
limit_memory=ResourceEnum.LIMIT_MEMORY_MIN.value,
):
resources = Resources()
resources.request_cpu = request_cpu
resources.request_memory = request_memory
resources.limit_cpu = limit_cpu
resources.limit_memory = limit_memory
return resources
def get_mid_resources(
request_cpu=ResourceEnum.REQUEST_CPU_MID.value,
request_memory=ResourceEnum.REQUEST_MEMORY_MID.value,
limit_cpu=ResourceEnum.LIMIT_CPU_MID.value,
limit_memory=ResourceEnum.LIMIT_MEMORY_MID.value,
):
resources = Resources()
resources.request_cpu = request_cpu
resources.request_memory = request_memory
resources.limit_cpu = limit_cpu
resources.limit_memory = limit_memory
return resources
def get_max_resources(
request_cpu=ResourceEnum.REQUEST_CPU_MAX.value,
request_memory=ResourceEnum.REQUEST_MEMORY_MAX.value,
limit_cpu=ResourceEnum.LIMIT_CPU_MAX.value,
limit_memory=ResourceEnum.LIMIT_MEMORY_MAX.value,
):
resources = Resources()
resources.request_cpu = request_cpu
resources.request_memory = request_memory
resources.limit_cpu = limit_cpu
resources.limit_memory = limit_memory
return resources
min_resources = get_min_resources()
mid_resources = get_mid_resources()
max_resources = get_max_resources()
처음 세팅한 적용하고 아래와 같이 오류가 발생했다.
[2022-09-20 07:37:55,044] {pod_launcher.py:198} INFO - Event: [__task_name__].cca85e4486ee43cfa141f51550549b85 had an event of type Pending
[2022-09-20 07:37:55,044] {pod_launcher.py:128} WARNING - Pod not yet started: [__task_name__].cca85e4486ee43cfa141f51550549b85
[2022-09-20 07:37:56,053] {pod_launcher.py:198} INFO - Event: [__task_name__].cca85e4486ee43cfa141f51550549b85 had an event of type Pending
[2022-09-20 07:37:56,053] {pod_launcher.py:128} WARNING - Pod not yet started: [__task_name__].cca85e4486ee43cfa141f51550549b85
[2022-09-20 07:37:57,063] {pod_launcher.py:198} INFO - Event: [__task_name__].cca85e4486ee43cfa141f51550549b85 had an event of type Pending
[2022-09-20 07:37:57,064] {pod_launcher.py:128} WARNING - Pod not yet started: [__task_name__].cca85e4486ee43cfa141f51550549b85
[2022-09-20 07:37:58,071] {pod_launcher.py:198} INFO - Event: [__task_name__].cca85e4486ee43cfa141f51550549b85 had an event of type Pending
[2022-09-20 07:37:58,071] {pod_launcher.py:128} WARNING - Pod not yet started: [__task_name__].cca85e4486ee43cfa141f51550549b85
[2022-09-20 07:37:58,099] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 367, in execute
final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 520, in create_new_pod_for_operator
launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds)
File "/home/airflow/.local/lib/python3.6/site-packages/tenacity/__init__.py", line 329, in wrapped_f
return self.call(f, *args, **kw)
File "/home/airflow/.local/lib/python3.6/site-packages/tenacity/__init__.py", line 409, in call
do = self.iter(retry_state=retry_state)
File "/home/airflow/.local/lib/python3.6/site-packages/tenacity/__init__.py", line 356, in iter
return fut.result()
File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.6/site-packages/tenacity/__init__.py", line 412, in call
result = fn(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py", line 131, in start_pod
raise AirflowException("Pod took too long to start")
airflow.exceptions.AirflowException: Pod took too long to start
확인해보니 메모리 설정에서 단위가 잘못되어서 .. pod을 띄워야하는데 구동에 필요한 메모리가 부족하여 발생하였다.
pod 생성이 안되고 지연되면서 "task had an event of type Pending" WARNING 발생하고
pod 생성 지연으로 "pod took too long to start" exception이 결국 발생하였던 문제였다.
아래와 같이 단위 변경(REQUEST_MEMORY_MIN = "300m" -> "300Mi") 후 정상 구동 완료 !
class ResourceEnum(Enum):
REQUEST_CPU_MIN = "200m"
REQUEST_MEMORY_MIN = "300Mi"
LIMIT_CPU_MIN = "200m"
LIMIT_MEMORY_MIN = "500Mi"
REQUEST_CPU_MID = "400m"
REQUEST_MEMORY_MID = "1Gi"
LIMIT_CPU_MID = "400m"
LIMIT_MEMORY_MID = "2Gi"
REQUEST_CPU_MAX = "1000m"
REQUEST_MEMORY_MAX = "2Gi"
LIMIT_CPU_MAX = "1000m"
LIMIT_MEMORY_MAX = "4Gi"
쿠버네티스의 리소스 단위에 대해 공식 페이지에서 확인해보면 소문자 m은 2^-10인 milli bytes를 의미하고 의도했던 메가(메비)바이트는 Mi(mebi bytes, 1000^2bytes) 혹은 M(mega bytes, 2^20bytes)로 표현하여 사용해야한다.

참고 :
https://kubernetes.io/ko/docs/concepts/configuration/manage-resources-containers/
파드 및 컨테이너 리소스 관리
파드를 지정할 때, 컨테이너에 필요한 각 리소스의 양을 선택적으로 지정할 수 있다. 지정할 가장 일반적인 리소스는 CPU와 메모리(RAM) 그리고 다른 것들이 있다. 파드에서 컨테이너에 대한 리소
kubernetes.io