콘텐츠로 건너뛰기

EventLoop 설계와 구현 – el Project (1)

들어가기에 앞서

이 글을 쓰기 시작하기 전에 여러가지를 살펴보고 있었다. 첫 번째는 Spring Webflux의 Reactive 시스템의 방식을 이해하기 위해서 Webflux가 사용하는 Netty를 살펴보는 것이었다. 직접적인 관련은 없지만 Netty를 이해하면 좀 더 확실하게 이해할 수 있다고 생각하였다.

두 번째는 NodeJS 엔진의 동작 방식에 대해서도 궁금증을 가지고 있었다. ‘Single thread 에서 동작하는 EventLoop가 있고 거기서(그 엔진에서) Javascript 코드가 런타임 때 비동기적으로 동작한다’라고 대충 알고 있었다. 이것에 대해 좀 더 디테일하게 알아보자라는 마음이 있었고 ‘Promise: 구현해보며 원리 살펴보기’ 라는 포스팅을 쓰고 나서는 다른 일들 때문에 리서치가 더뎌지고 있었다.

그러다 2021년에 Netty를 좀 더 깊게 이해해보자라는 목표를 가지고 코드를 살펴보던 도중 EventLoop를 발견하게 된다. 이것을 발견했을 때 이전에 잊혀졌던 NodeJS의 EventLoop에 대한 호기심이 되살아나며 ‘아 이걸 이해하면 NodeJS의 EventLoop도 같이 이해할 수 있을까?’라는 큰 착각을 하게 된다.

그리고 퇴근하고 짬짬이 간단한 버전의 EventLoop를 구현하게 된다: el Project. 그리고 NodeJS의 EventLoop와 Netty의 EventLoop가 다르다는 것을 만들면서 깨닫게 된다.

그래서 이 글은 Netty의 EventLoop에 기반하여 구현한el 프로젝트의 EventLoop 대해서 설명할 것이다. NodeJS의 EventLoop와 큰 원리는 동일하다. 어떤 것이 동일하고 다른지는 아래에서 설명하겠다.

해당 포스트의 다음 시리즈 글은 아래 링크에서 볼 수 있습니다.

Asynchronous computation result

보통 프로그램에서 어떤 함수를 통해 연산을 하면 그 결과값을 계산하고 그 값을 변수에 넣고 사용하면 된다. 그런데 만약 연산이 오래걸린다면 그 연산이 끝나기를 기다릴 수도 있지만 일단 넘어가고 나중에 연산이 끝나면 해당 값을 받아서 사용할 수도 있다.

방금 얼렁뚱땅 말로 푼 것을 살펴보면 별 문제 없어보이지만 그것을 코드로 풀어본다고 생각해보자. 고민이다. 먼저 어디서 결과값을 받아올지를 생각해야한다.

비동기 함수를 호출하는 Caller 입장에서 연산 결과물을 어떻게 받아올지에 대한 문제를 해결하기 위해 등장한 것이 Promise, Future 같은 것들이다. 핵심은 모두 비동기 작업 결과물을 담아줄 보관함, placeholder의 역할을 한다는 것이다.

Caller는 이제 Promise를 통해 비동기 연산의 결과물을 받아볼 수 있다. 해당 연산이 성공했는지, 실패했는지, 성공했다면 그 값은 무엇인지 실패했다면 에러는 어떤 것인지는 Promise 인터페이스를 통해 살펴볼 수 있다.

el 프로젝트 코드 작성할 때도 Promise 대신 Placeholder와 같은 이름으로 클래스를 작성하려고 했으나 ‘미래’에 대한 의미가 잘 안담기는 것 같아, 그리고 다른 사람들이 알아보기도 더 쉬울 것 같아 Promise로 하기로 했다.

Promise

핵심은 이전에 얘기했듯이,비동기 작업 결과물을 담아줄 보관함의 역할을 한다는 것이다.

Promise는 비동기 작업이 끝났을 때 혹은 실패했을 때 결과물과 에러를 담을 수 있는 인터페이스가 존재한다: setSuccess(V result), setFailure(Throwable cause). 또한 비동기 작업 결과물을 기다리는 클라이언트는 await() 와 같이 해당 작업이 끝났을 때 결과물이 담긴 Promise를 받아볼 수 있는 인터페이스도 존재한다. 이는 클라이언트가 동기적으로 blocking 하여 결과물을 받아보는 방식이고, 작업이 끝났을 때 해당 Promise를 listener를 통해서 받아볼 수 있는 방식도 존재한다: addListener(PromiseListener<? extends Promise<? super V>> listener). 흔히 이러한 방식을 우리는 event-driven 한 방식이라고 한다. 어떤 작업이 끝났을 때 listener를 통해 해당 이벤트를 받고 이후의 작업을 처리해준다.

한 가지 더 눈여겨 봐야할 것은 Promise 인터페이스가 상속한 인터페이스인데 하나가 Runnable 이다. Runnable 은 스레드에서 수행되는 작업을 나타내는 인터페이스 중 하나이다. 이것을 통해서 해당 Promise 구현체가 스레드에서 비동기 작업을 수행할 것이라는 것을 암시하고 있다.

public interface Promise<V> extends Future<V>, Runnable {
  boolean isSuccess();
  Promise<V> setSuccess(V result);
  Promise<V> addListener(PromiseListener<? extends Promise<? super V>> listener);
  Promise<V> await(long timeout, TimeUnit unit) throws InterruptedException;
  Promise<V> await() throws InterruptedException;
  Promise<V> setFailure(Throwable cause);
}

https://github.com/zerofruit/el/blob/main/core/src/main/java/io/el/concurrent/Promise.java

public interface PromiseListener<P extends Promise<?>> {
  void onComplete(P promise) throws Exception;
}

https://github.com/zerofruit/el/blob/main/core/src/main/java/io/el/concurrent/PromiseListener.java

Promise Implementation

Promise와 다른 컴포넌트, 액터와의 상호작용은 다음과 같이 이루어진다. 그림 1과 같이 먼저 비동기적으로 연산을 수행하는 곳에서 요청을 받으면, 요청을 한 클라이언트에게 Promise를 반환해준다.

PromiseListener가 Promise에 등록할 수도 있다. 클라이언트는 직접 Promise를 통하거나 PromiseListener를 통해 비동기 연산의 결과물을 받아볼 수 있다.

Interaction with Promise
그림 1. Promise와 다른 액터 간의 상호작용

Promise 인터페이스의 가장 기본이되는 DefaultPromise 구현체를 살펴보자. DefaultPromise는 자신을 생성한 EventLoop와 수행해야할 작업을 나타내는 Callable 객체를 가지고 있다. 그리고 Runnable의 run() 메소드를 구현한 것을 볼 수 있는데 해당 작업이 언제 수행될지는 Promise는 알지 못하고 자신을 생성한 EventLoop가 시작 시점을 결정한다. 이는 아래 EventLoop 설명할 때 살펴보겠다.

runTask() 에서 연산이 끝나면 setSuccess(result) 를 호출해 결과값을 저장한다. 만약 연산 도중 예외가 발생하면 setFailure(e) 를 호출하여 예외값을 저장한다.

public class DefaultPromise<V> implements Promise<V> {
  ...
  private final EventLoop eventLoop;
  private final Callable<V> task;
  ...
  @SuppressWarnings("unchecked")
  public DefaultPromise(EventLoop eventLoop, Runnable task) {
    this.eventLoop = eventLoop;
    this.task = (Callable<V>) Executors.callable(task);
  }
  public DefaultPromise(EventLoop eventLoop, Callable<V> task) {
    this.eventLoop = eventLoop;
    this.task = task;
  }
  @Override
  public void run() {
    if (!eventLoop().inEventLoop()) {
      return;
    }
    runTask();
  }
  private void runTask() {
    try {
      V result = task.call();
      setSuccess(result);
    } catch (Exception e) {
      setFailure(e);
    }
  }
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/DefaultPromise.java

setSuccess() 를 살펴보자. 우선 이미 한번 연산이 끝났는데 새로운 값을 할당하려고 한다면 예외를 던진다. 그리고 만약 인자로 넘어온 값이 null 이라면 이를 대신 EMPTY_RESULT 로 채워준다. 이는 현재 Promise가 연산을 끝났는지 확인하기 위한 메소드에서 resultnull 인지 아닌지로 구분하고 있기 때문이다. 별도의 플래그를 통해 구현해도 별 차이는 없을 것 같다. 그리고 AtomicReferenceFieldUpdater 을 이용해 atomic 하게 result 를 업데이트한다. setFailure() 도 비슷한 방식으로 동작한다.

마지막으로 notifyAll(), notifyListeners() 를 호출해주는데 notifyAll() 는 Java Object 클래스에서 제공해주는 메소드이다. 이는 다른 스레드에서 같은 객체에 대해 wait() 메소드를 통해 Thread를 blocking하고 있는 경우 이를 풀어주는 역할을 한다. wait() 메소드는 DefaultPromiseawait() 메소드에서 찾아볼 수 있다.

위의 다이어그램에서 클라이언트는 비동기 연산 결과물이 반환되기까지 기다릴 수 있다고 했다. 이 때 클라이언트는 await() 을 통해 기다리게 되며 비동기 작업이 끝나면 해당 blocking이 풀리고 다음 로직으로 넘어간다.

public class DefaultPromise<V> implements Promise<V> {
  private static final Object EMPTY_RESULT = new Object();
  private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> resultUpdater =
      AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
  private static final AtomicReferenceFieldUpdater<DefaultPromise, Throwable> causeUpdater =
      AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Throwable.class, "cause");
  private volatile V result;
  ...
  @Override
  public Promise<V> await() throws InterruptedException {
    if (isDone()) {
      return this;
    }
    if (Thread.interrupted()) {
      throw new InterruptedException(toString());
    }
    synchronized (this) {
      while (!isDone()) {
        wait();
      }
    }
    return this;
  }
  @Override
  public synchronized Promise<V> setSuccess(V result) {
    if (isDone()) {
      throw new IllegalStateException("Task already complete: " + this);
    }
    Object value = result == null ? EMPTY_RESULT : result;
    if (resultUpdater.compareAndSet(this, null, value)) {
      notifyAll();
      notifyListeners();
    }
    return this;
  }
  @Override
  public synchronized Promise<V> setFailure(Throwable cause) {
    if (isDone()) {
      throw new IllegalStateException("Task already complete: " + this);
    }
    if (causeUpdater.compareAndSet(this, null, cause)) {
      notifyAll();
      notifyListeners();
    }
    return this;
  }
  @Override
  public boolean isDone() {
    return result != null || cause != null;
  }
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/DefaultPromise.java

addListener() 를 먼저 살펴보면 thread-safe 하게 PromiseListener들을 List에 추가하고 있다. 만약 해당 Promise가 연산이 끝난 상태라면 listener를 등록하자마자 해당 결과값을 알려준다.

notifyListeners() 를 살펴보면 notify를 마친다면 해당 listener들을 삭제한다. 그리고 for 문 속 listener.onComplete(this) 를 통해서 자기 자신을 listener들에게 전달해준다. 마지막으로 다시 한번 더 listener List가 존재하는지 체크한다. 왜냐하면 잘 살펴보면 for 문은 thread-safe 하지 않다. listener들에게 notify를 해주는 순간 다른 Thread에서 해당 Promise에 listener를 등록할 수도 있다. 이런 상황을 체크해주기 위해서 마지막에 한번 더 listener들이 존재하지 않는지 확인한다.

public class DefaultPromise<V> implements Promise<V> {
  ...
  private List<PromiseListener> listeners = new ArrayList<>();
  @Override
  public Promise<V> addListener(PromiseListener<? extends Promise<? super V>> listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
      listeners.add(listener);
    }
    if (isDone()) {
      notifyListeners();
    }
    return this;
  }
  @SuppressWarnings("unchecked")
  private void notifyListeners() {
    if (!eventLoop.inEventLoop()) {
      return;
    }
    List<PromiseListener> listeners;
    synchronized (this) {
      if (this.listeners.isEmpty()) {
        return;
      }
      listeners = this.listeners;
      this.listeners = new ArrayList<>();
    }
    while (true) {
      for (PromiseListener listener : listeners) {
        try {
          listener.onComplete(this);
        } catch (Exception e) {
          LOGGER.error("A task terminated with unexpected exception. Exception: ", e);
        }
      }
      // At this point, listeners might be modified from other threads,
      // if more listeners added to list while executing this method, notify them also.
      // After notify them, initialize listeners to prevent double-notifying.
      synchronized (this) {
        if (this.listeners.isEmpty()) {
          return;
        }
        listeners = this.listeners;
        this.listeners = new ArrayList<>();
      }
    }
  }
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/DefaultPromise.java

EventLoop

EventLoop을 하이레벨에서 봤을 때 동작은 Netty, NodeJS 에서 둘다 동일하다. 아래 pseudocode와 같이 EventLoop가 종료되지 않는 이상 반복해서 EventLoop는 자신이 처리해야하는 이벤트를 가져오고 해당 이벤트를 처리하게 된다.

// Pseudocode of EventLoop
while (!terminated) {
  event = get the next event to execute
  event.run()
}

Netty

Netty는 글 서두에서도 얘기했지만 network application framework이다. 클라이언트와 서버는 아래 그림과 같이 connection을 맺는다. Netty에서는 한 connection 안에서 발생하는 inbound, outbound 데이터 흐름을 핸들링할 수 있는 Handler가 존재한다. 그리고 inbound, outbound 각각에 대해서 Handler는 체인 형식으로 구성된다. 한 Handler가 핸들링한 데이터는 다음 Handler로 넘어가는 형태이다.

이 때 한 connection에서 발생하는 다양한 이벤트들을 하나의 EventLoop, 즉 하나의 Thread에서 처리하게 된다. 여기서 말하는 이벤트란 connection 안에서 발생하는 Handler들의 액션들을 말한다. 예를 들어 어떤 Handler가 데이터를 넘겨받는 것도 이벤트이고, Handler 내에서 Buffer에 데이터를 쓰는 것도 이벤트이다. 그림 2 에서는 두 Handler에서 Server가 Client로 보내는 메세지를 변경하기 위해 Handler 내에서 write(msg) 를 호출한 것을 표현하고 있다. 이 두 번의 write(msg) 는 모두 EventLoop의 스레드에서 처리하게 된다.

왜 하나의 Thread에서 connection에서 발생하는 이벤트들을 처리하는 것일까?

그 이유는 복잡도를 낮추기 위함이다. 항상 각각의 Handler가 독립적인 데이터를 가지고 로직을 수행하면 좋겠지만, 이전 예시처럼 여러 Handler에서 하나의 데이터를 참조하거나 업데이트하는 경우 해당 데이터를 thread-safe하게 만들어야 할 것이다. EventLoop가 여러 Thread에서 Handler의 이벤트들을 처리하도록 한다면(간단한 모델링으로는 EventLoop가 이벤트를 받을 때마다 자기가 관리하고 있는 Thread들 중 하나를 round-robin 방식으로 골라 해당 Thread에 할당하는 방식이 있을 것 같다), 로직 순서 상으로는 이전에 있는 Handler의 로직이 나중에 있는 Handler 로직보다 먼저 처리되어야 하지만 이것이 이제 보장되지 않을 것이다.

하나의 Thread만 쓴다면 이제 하나의 connection 상에서 발생하는 이벤트들의 순서가 보장될 것이다. Handler의 로직은 이제 더 이상 thread-safe 하지 않아도 된다. Netty가 프레임워크라는 것을 생각해보면 프레임워크 사용자들은 이제 어떻게 하면 자신의 비즈니스 로직을 thead-safe 하게 작성해야하는지 고민하지 않아도 된다.

EventLoop, Netty Client-Server connection 다이어그램
그림 2. Netty EventLoop, Client-Server connection 다이어그램

SingleThreadEventLoop Implementation

el 컨텍스트에서EventLoop는 클라이언트들로부터 이벤트, 혹은 작업을 받는다. 그리고 EventLoop는 비동기적으로 해당 작업을 수행하고 클라이언트에게 결과물로 Promise를 넘겨준다. SingleThreadEventLoop는 하나의 스레드에서 동작하는 EventLoop이며, 그렇기 때문에 클라이언트가 SingleThreadEventLoop에 요청한 작업 처리 결과는 순서를 보장한다.

el 에서 SingleThreadEventLoop를 구현한 방식에 대해서 설명하겠다. el의SingleThreadEventLoop는 내부적으로 두 개의 큐를 관리한다. 하나는 RunnableQueue 다른 하나는 ScheduledTaskQueue 이다. RunnableQueue 의 경우는 클라이언트가 요청한 비동기 작업들이 해당 큐에 담기게 된다. 그리고SingleThreadEventLoop는 takeTask() 를 통해 클라이언트들이 요청한 비동기 작업을 가져와서 처리할 수 있다.

el의 EventLoop는 작업 ‘예약’을 지원한다. Javascript의 setTimeout() 기능과 비슷하다고 보면된다(그렇지만 다르게 동작한다). 클라이언트는 자신이 원하는 작업을 얼마 뒤에 비동기적으로 수행하고 싶다면 EventLoop의 schedule() 인터페이스를 사용하면 된다.

이렇게 schedule() 을 통해 들어온 작업들은 ScheduledTaskQueue 에서 관리된다. ScheduledTaskQueuePriorityQueue 이다. priority는 해당 작업이 실행되기까지 남은 시간 순으로 높아진다. SingleThreadEventLoop는ScheduledTaskQueue 를 살펴보며 실행 시간이 된 작업들을 RunnableQueue 로 옮겨담게 된다. 이렇게 스케쥴링된 작업들이 실행된다.

EventLoop internal structure on el
그림 3. el SingleThreadEventLoop 내부 구조 다이어그램

EventLoop 인터페이스는 JDK의 ExecutorService 를 상속받는다. ExecutorService 는 비동기 작업들을 클라이언트들로부터 받을 수 있는 인터페이스와 작업을 수행해주는 서비스의 상태를 확인할 수 있는 인터페이스를 제공해준다. EventLoop는 submit() 을 overriding 하여 Future 대신 직접 만든 Promise 를 반환해준다.

그리고 위에서 얘기했다시피 schedule() 인터페이스를 제공해주고 이 때는 ScheduledPromise 를 반환한다. ScheduledPromise 는 조금 특별한 Promise 로 해당 스케쥴링한 비동기 작업의 결과를 나타낸다. 해당 클래스에서는 스케쥴링 상태를 알 수 있는 메소드들을 제공해준다.

public interface EventLoop extends ExecutorService {
  boolean shutdownGracefully(long timeout, TimeUnit unit);
  @Override
  <V> Promise<V> submit(Callable<V> task);
  @Override
  Promise<?> submit(Runnable task);
  ScheduledPromise<?> schedule(Runnable command, long delay, TimeUnit unit);
  <V> ScheduledPromise<V> schedule(Callable<V> command, long delay, TimeUnit unit);
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/EventLoop.java

public class ScheduledPromise<V> extends DefaultPromise<V> implements PriorityQueueNode,
    Delayed {
  ...
  @Override
  public long getDelay(TimeUnit unit) {
    ...
  }
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/ScheduledPromise.java

SingleThreadEventLoop LifeCycle

el SingleThreadEventLoop가 어떻게 시작되고, 클라이언트로부터 작업을 받으며, 어떻게 실행되고, 종료되는지 라이프사이클에 대해서 살펴보자.

SingleThreadEventLoop start looping

el SingleThreadEventLoop는 다음과 같은 state를 가지고 있다.

  • NOT_STARTED: 아직 EventLoop가 시작하지 않은 상태
  • STARTED: EventLoop가 시작하여 클라이언트로부터 작업을 받아서 처리할 수 있는 상황
  • SHUTTING_DOWN: shutdownGracefully() 등을 통하여 EventLoop가 종료되고 있는 상태
  • SHUTDOWN: EventLoop가 완전히 종료된 상태

위와 같은 state에 따라 SingleThreadEventLoop는 클라이언트로부터 작업을 받을 수도 있고 그러지 못할 수도 있다. start() 를 하게되면 EventLoop는 사진의 state를 STARTED 로 바꾸고 스레드를 생성한 뒤 루프를 돌게된다. 그리고 외부에서 shutdownGracefully() 를 호출하게 되면 state는 SHUTTING_DOWN 으로 바꾸게 되고 EventLoop는 루프를 멈추게 된다. 그리고 graceful shutdown period가 지나면 SHUTDOWN 으로 상태를 바꾸고 완전히 종료한다.

SingleThreadEventLoop가 STARTED 상태가 되면, 클라이언트로부터 submit(), schedule() 을 통하여 비동기 작업 받아 수행할 수 있게 된다.

public abstract class SingleThreadEventLoop extends AbstractEventLoop {
  private static final AtomicReferenceFieldUpdater<SingleThreadEventLoop, State> stateUpdater =
      AtomicReferenceFieldUpdater.newUpdater(SingleThreadEventLoop.class, State.class, "state");
  private volatile Thread thread;
  ...
  private void start() {
    if (!state.equals(State.NOT_STARTED)) {
      return;
    }
    if (!stateUpdater.compareAndSet(this, State.NOT_STARTED, State.STARTED)) {
      return;
    }
    try {
      doStart();
    } finally {
      ...
    }
  }
  private void doStart() {
    if (thread != null) {
      return;
    }
    executor().execute(() -> {
      thread = Thread.currentThread();
      try {
        SingleThreadEventLoop.this.run();
      } catch (Throwable t) {
        LOGGER.error("An event loop terminated with unexpected exception. Exception:", t);
      } finally {
        while (true) {
          if (state.compareTo(State.SHUTTING_DOWN) >= 0 ||
              stateUpdater.compareAndSet(SingleThreadEventLoop.this, state, State.SHUTTING_DOWN)) {
            break;
          }
        }
        ...
    });
  }
}

https://github.com/zerofruit/el/blob/main/core/src/main/java/io/el/concurrent/SingleThreadEventLoop.java

public abstract class AbstractEventLoop implements EventLoop {
  enum State {
    NOT_STARTED(1),
    STARTED(2),
    SHUTTING_DOWN(3),
    SHUTDOWN(4),
    TERMINATED(5);
    int value;
    State(int value) {
      this.value = value;
    }
  }
}

https://github.com/zerofruit/el/blob/main/core/src/main/java/io/el/concurrent/AbstractEventLoop.java

SingleThreadEventLoop submit task

클라이언트가 작업을 submit() 할 때 어떻게 되는지 살펴보자.

el에서는 Promise에서 비동기 작업을 수행하게 된다. 그래서 submit() 하는 시점에 작업을 Promise에 넣고 execute() 를 호출한다. execute() 에서는 작업이 바로 스케쥴링 될 수 있도록 RunnableQueue 에 넣는다. 그리고 만약 SingleThreadEventLoop 가 시작이 안된 상태라면 start() 를 통해 lazy하게 실행된다. 그래서 이전 단계에서 살펴봤던 SingleThreadEventLoop가 시작하는 것은 최초로 클라이언트가 작업을 요청하였을 때 진행된다.

public abstract class SingleThreadEventLoop extends AbstractEventLoop {
  private volatile Thread thread;
  private final Queue<Runnable> taskQueue;
  private final PriorityQueue<ScheduledPromise<?>> scheduledPromiseQueue;
  @Override
  public void execute(Runnable task) {
    checkNotNull(task, "task");
    addTask(task);
    if (inEventLoop()) {
      return;
    }
    start();
  }
  private void addTask(Runnable task) {
    checkNotNull(task, "task");
    if (isShuttingDown()) {
      throw new RejectedExecutionException("Event loop is terminating");
    }
    if (task instanceof ScheduledPromise) {
      scheduledPromiseQueue.add((ScheduledPromise<?>) task);
      return;
    }
    if (!taskQueue.offer(task)) {
      throw new RejectedExecutionException("Event loop failed to add task");
    }
  }
  @Override
  public boolean inEventLoop() {
    return Thread.currentThread() == this.thread;
  }
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/SingleThreadEventLoop.java

public abstract class AbstractEventLoop implements EventLoop {
  private final Executor executor;
  public AbstractEventLoop(Executor executor) {
    this.executor = executor;
  }
  protected abstract void run();
  @Override
  public Promise<?> submit(Runnable task) {
    Promise<Void> promise = new DefaultPromise<>(this, task);
    execute(promise);
    return promise;
  }
  protected Executor executor() {
    return executor;
  }
}

https://github.com/zeroFruit/el/blob/main/core/src/main/java/io/el/concurrent/AbstractEventLoop.java

SingleThreadEventLoop shutdown gracefully

이제 어떻게SingleThreadEventLoop가 ‘graceful’하게 종료되는지 살펴보자. Graceful의 정의는 제각각 다르다. el에서 SingleThreadEventLoop가 ‘graceful하게 종료한다’의 의미는:

  • ‘주어진 시간동안 RunnableQueue 에 남아있는 작업들을 실행시키고, 시간이 지났을 때 남아있는 작업들은 모두 중단하고 버린다’를 나타낸다.
  • 예약된 작업에 대해서도 마찬가지다. graceful shutdown period 동안 시작할 수 있는 작업들은 모두 시작하지만, 예약 시간이 너무 길어 아직 시작하지 못한 작업들은 모두 버린다.

해당 정책과 관련된 부분을 코드로 이해하기 위해서는 doStart() 를 자세히 봐야한다. shutdownGracefully() 에서는 단순히 shutdown을 시작한 시간과 gracefulshutdown period를 저장하고 자신의 상태를 SHUTTING_DOWN 으로 바꾸는 것으로 끝나게 된다.

SingleThreadEventLoop는 루프를 돌다가 상태가 SHUTTING_DOWN 으로 바뀌게 되면 남은 작업을 처리하는 로직을 타게 된다. 위에서 얘기했듯이 큐에 남아있는 작업들을 모두 실행하는데 이는 graceful period 동안 수행하게 된다 (confirmShutdown()). 그리고 시간이 다되면 상태를 SHUTDOWN 으로 바꾸고 남은 작업들을 모두 drain 하는 과정을 거치게 된다.

여기까지 el에서SingleThreadEventLoop가 어떻게 시작하고 작업을 받으며 종료되는지를 확인해보았다.

한 가지 빠진 과정이 있는데 바로SingleThreadEventLoop가 받은 작업을 실행하는 과정이다. 이것은 아래 섹션에서 바로 살펴보겠다.

public abstract class SingleThreadEventLoop extends AbstractEventLoop {
  ...
  @Override
  public boolean shutdownGracefully(long timeout, TimeUnit unit) {
    checkNotNull(unit, "unit");
    checkPositiveOrZero(timeout, "timeout");
    shutdownStartNanos = Time.currentNanos();
    ...
    while (true) {
      ...
      if (stateUpdater.compareAndSet(this, state, State.SHUTTING_DOWN)) {
        break;
      }
    }
    shutdownTimeoutNanos = unit.toNanos(timeout);
    return true;
  }
  private void doStart() {
    if (thread != null) {
      return;
    }
    executor().execute(() -> {
      ...
      } finally {
        while (true) {
          if (state.compareTo(State.SHUTTING_DOWN) >= 0 ||
              stateUpdater.compareAndSet(SingleThreadEventLoop.this, state, State.SHUTTING_DOWN)) {
            break;
          }
        }
        try {
          // Run all remaining tasks
          while (true) {
            if (confirmShutdown()) {
              break;
            }
          }
          // Now we want to make sure no more tasks can be added from this point.
          while (true) {
            if (state.compareTo(State.SHUTDOWN) >= 0 ||
                stateUpdater.compareAndSet(SingleThreadEventLoop.this, state, State.SHUTDOWN)) {
              break;
            }
          }
        } finally {
          // drain tasks
          int numTasks = drainTasks();
          if (numTasks > 0) {
            LOGGER.info("An event loop terminated with " +
                "non-empty task queue ({})", numTasks);
          }
        }
      }
    });
  }
  protected boolean confirmShutdown() {
    if (!isShuttingDown()) {
      return false;
    }
    if (!inEventLoop()) {
      throw new IllegalStateException("must be invoked from an event loop");
    }
    cancelScheduledTasks();
    runAllTasks();
    if (Time.currentNanos() - shutdownStartNanos > shutdownTimeoutNanos) {
      return true;
    }
    return isShutdown();
  }
}

https://github.com/zerofruit/el/blob/main/core/src/main/java/io/el/concurrent/SingleThreadEventLoop.java

SingleThreadEventLoop Example

작업을 실행하는 부분을 왜 Example 섹션에서 이야기하냐고 할 수도 있는데, 코드를 살펴보면 알겠지만 SingleThreadEventLoop에서 작업을 큐에서 가져와서 수행하는 코드가 없다. 이는 SingleThreadEventLoop의 추상 메소드 run() 에서 el 프레임워크 사용자가 직접 구현해야할 부분이다.

Example에서는 run() 을 어떻게 구현할 수 있는지 한 가지 예시에 대해서 설명하고 시연 영상을 첨부하려고 한다. 예제 코드는 example 패키지에서 확인할 수 있다.

TaskExecutor 클래스는 SingleThreadEventLoop 를 상속받고 있다. run() 을 살펴보면 SHUTTING_DOWN 상태가 아니면 계속해서 루프를 돌게 된다. 루프를 돌면서 takeTask() 를 호출하는데 이를 통해 RunnableQueue 에서 자신이 수행해야할 작업을 가져오게 된다. 그리고 작업을 수행하게 된다.

Example 클래스는 SingleThreadEventLoop 구현체에게 여러가지 조건으로 비동기 작업을 요청하는 클라이언트 역할을 하게 된다. 아래 예시에서는 각 작업은 500ms가 걸리고 작업을 요청하는 간격은 100ms이며 총 10개의 작업을 요청한다.

public class TaskExecutor extends SingleThreadEventLoop {
  public TaskExecutor(Executor executor) {
    super(executor);
  }
  @Override
  protected void run() {
    while (!confirmShutdown()) {
      Runnable task = takeTask();
      if (task != null) {
        task.run();
      }
    }
  }
  private static class Example extends AbstractSingleThreadEventLoopExample {
    public static void main(String[] args) throws InterruptedException {
      TaskExecutor scheduler = new TaskExecutor(
          new ThreadPerTaskExecutor(Executors.defaultThreadFactory()));
      new Example(scheduler)
          .taskDelay(500)
          .numOfTasks(10)
          .intervalBetweenTasks(100)
          .start();
    }
  }
}

https://github.com/zerofruit/el/blob/main/example/src/main/java/io/el/example/executor/TaskExecutor.java

public class SimpleTask implements Runnable {
  @Override
  public void run() {
    try {
      Thread.sleep(delayMillis);
    } catch (InterruptedException e) {
    }
    ...
  }
}

https://github.com/zerofruit/el/blob/main/example/src/main/java/io/el/example/executor/SimpleTask.java

SingleThreadEventLoop Demo

실제로 여러가지 인자를 바꿔 돌려보면서 어떻게 el의 SingleThreadEventLoop가 동작하는지 살펴보자.

Case 1

첫 번째 케이스는 100ms가 걸리는 (.taskDelay(100)) 100개의 작업을 (.numOfTasks(100)) 100ms 간격으로 (.intervalBetweenTasks(100))SingleThreadEventLoop에 요청하는 경우이다.

CASE 1: Submit tasks with interval
  • 먼저 살펴봐야하는 것은 스레드 ID이다. 100개의 작업이 실행되는 스레드는 모두 pool-2-thread-1 이다. 나머지 SingleThreadEventLoop를 실행하고 종료하는 부분은 main 스레드에서 실행된다.
  • 그리고 순차적으로 작업이 주어졌기 때문에 ID가 1씩 일정하게 올라간다.
  • Executed all tasks 라는 로그는 main 스레드에서 100개의 작업을 SingleThreadEventLoop에 요청하는데 걸린 시간을 나타낸다. 100개의 작업을 100ms 간격으로 요청했기 때문에 약 10000ms가 걸린 것을 확인할 수 있다.
  • 마지막에 Total: 로그는 SingleThreadEventLoop가 작업을 수행하고 종료되기까지 걸린 시간을 나타낸다. 100개의 작업을 100ms 동안 수행하고 완전히 shutdown하기까지 graceful period가 지난 시간을 더해서 약 10000ms가 걸리게 된다.

Case 2

두 번째 케이스는100ms가 걸리는 (.taskDelay(100)) 100개의 작업을 (.numOfTasks(100)) 0ms 간격으로 (.intervalBetweenTasks(0))SingleThreadEventLoop에 요청하는 경우이다. 첫 번째 케이스와 달라진 점은 작업을 요청하는 간격이 0ms로 줄어들었다는 것이다.

CASE 2: Submit tasks without interval
  • 이전과 마찬가지로 작업은 pool-2-thread-1에서 실행된다. 그런데 이번에는 Executed failed. 라는 메세지가 엄청 많이 보인다. 약 16개에서 17개의 수행만 성공했고 나머지는 모두 실패해버렸다. 이러한 현상의 원인은 현재 스펙에서 RunnableQueue가 16개로 고정되어있기 때문이다. 먼저 16개의 작업이 큐에 들어간다. 그런데 첫 번째 작업이 끝나기 전에 나머지 작업들이 큐에 들어오려고 하다보니 예외를 던지며 실패하게 된다.
    이것에 대해서는 여러가지 대안을 생각해볼 수 있다. 첫 번째로 사용자가 RunnableQueue의 사이즈를 설정할 수 있도록 한다. 두 번째로 다이나믹하게 큐에 n% 만큼의 작업이 들어왔을 때 큐 사이즈를 두 배로 늘리는 정책을 가져볼 수도 있다. 세 번째로는 SingleThreadEventLoop를 이용하는 사용자가 back-pressure 를 구현하게 하여 적당하게 큐 사이즈를 유지하도록 열어 줄 수도 있을 것이다.
  • 이전과 마찬가지로 순차적으로 작업이 주어졌기 때문에 ID가 1씩 일정하게 올라간다.
  • Executed all tasks 로그에서 걸린 시간을 보면 19ms 밖에 걸리지 않는데 이는 클라이언트가 인터벌을 가지지 않고 바로바로 요청했기 때문에 짧게 걸린 것이다.
  • 마지막에 Total: 로그에서는 약 1700ms가 걸린 것을 확인할 수 있는데 이는 각 작업이 100ms가 걸리고 17개가 수행되었기 때문이다.

Case 3

세 번째 케이스는 이전 두 케이스와는 다르게 작업을 예약하는 예제이다. submit() 이 아닌 schedule() 을 통해 작업을 예약한다.100ms가 걸리는 (.taskDelay(100)) 100개의 작업을 (.numOfTasks(100)) 0ms 간격으로 (.intervalBetweenTasks(0))SingleThreadEventLoop에 예약하는데, 0ms에서 500ms 사이에 랜덤하게 예약 시간이 결정된다 (minDelay(0), maxDelay(500)).

CASE 3: Schedule tasks
  • 이전과 마찬가지로 작업은 pool-2-thread-1에서 실행된다. 그런데 CASE 2와 다르게 Executed failed. 와 같은 에러 메세지가 보이지 않는다. 이는 schedule() 을 통해 예약된 작업은 별도의 priority queue에서 관리되기 때문이다. 이 때 이 큐는 고정된 사이즈가 아니라 50% 만큼 아이템이 들어오면 자신의 사이즈를 다이나믹하게 두 배로 늘리게 된다. 그래서 이전과 같이 공간이 부족하다는 에러 메세지가 뜨지 않은 것이다. 100개의 아이템은 별도의 priority queue에서 모두 들어가게 되고 예약 시간이 다 된 작업은 하나씩 SingleThreadEventLoop가 꺼내와서 실행시키게 된다.
  • 작업이 실행된 로그를 살펴보면 delay가 점점 증가하는 것을 살펴볼 수 있다.
  • 가장 첫 줄에 Executed all tasks: 2ms 로그가 찍혀 있다. priority queue에 막힘 없이 들어가고 가장 작은 delay를 가진 작업이 4ms 이기 때문이다.
  • 마지막에 Total: 10333ms, Scheduled: 26440ms 로그를 확인할 수 있다. Total의 경우는 총 SingleThreadEventLoop가 작업을 수행한 시간으로 약 10000ms가 나왔는데 이것은 100ms가 걸리는 작업을 100개를 처리했기 때문이다. 최대 예약 시간이 100ms보다 긴 것이 많은데 더 시간이 걸리지 않은 이유는 SingleThreadEventLoop가 예약 시간이 짧은 작업을 처리하는 동안 예약 시간이 긴 작업들이 모두 RunnableQueue로 옮겨갔기 때문이다. Scheduled의 경우는 100개의 총 예약 시간의 합이다. 최소 시간이 0ms이고 최대 시간이 500ms이니 평균적으로 250ms가 걸린다. 그래서 250ms x 100개의 작업을 해서 저런 숫자가 나왔다.

NodeJS, Chrome Engine

NodeJS에서도 EventLoop가 핵심적인 역할을 하고 있다. 하지만 위에서 말했듯이 하이 레벨에서 동작은 동일하지만 조금만 디테일하게 들어가도 완전히 다르다. 위의 예제를 보면서도 아마 NodeJS를 써본 사람이거나 웹 개발을 해봤으면 조금 이상하다고 느꼈을지도 모른다.

크롬 개발자 도구에서 네트워크 탭을 켜보면 하나씩 리소스(JS script, HTML document, image, …)를 다운로드 하는 것이 아닌, 동시에 파바박 하고 다운로드 받고 웹페이지를 보여준다. 위의 데모처럼 리소스를 하나 다 받고 그 다음 것을 받고 하는게 아니다. 리소스 다운로드에 대한 작업이 병렬로 이뤄진다.

NodeJS는 single thread의 EventLoop에서 동작한다고 한다. 하지만 정말 그렇다면 위의 데모처럼 작업을 하나씩 처리해야한다. NodeJS는 별도의 worker thread가 존재한다. 하나의 스레드에서 비동기 작업이 필요한 부분을 worker thread에게 처리하도록 요청한다.

JS 스크립트를 V8을 통해 컴파일하고 이를 libuv를 통해 별도의 스레드에서 비동기 작업을 수행할 수 있도록 하고 그 결과물을 전달 받을 수 있도록 한다.

(NodeJS 엔진까지 정리하면 너무 길어져서 다른 포스팅에 정리하고 여기에 링크를 걸도록 해야겠다.)

EventLoop abstract diagram on NodeJS
그림 4. NodeJS engine abstract diagram

앞으로 남은 일은?

el 프로젝트의 목표는 reactive한 방식의 네트워크 어플리케이션을 만들 수 있는 프레임워크를 만들어보는 것이다. 프로젝트 README에도 써놓긴 했지만 앞으로 해야할 작업은 크게 두 가지이다.

첫 번째로는 지금까지 만들어놓은 EventLoop 위에 네트워크 레이어를 쌓는 것이다. 좀 더 구체적 이야기하면 HTTP Server/Client 기능을 쓸 수 있도록 해야한다. Non-blocking 방식으로 connection을 맺고 끊을 수 있어야 한다.

두 번째로는 reactive 한 방식으로 connection의 상태를 관리할 수 있도록 만드는 것이다. 이 때는 Project Reactor를 사용해볼 수도 있고 아닐 수도 있을 것 같다.

이렇게 크게 두 가지 작업이 남았고 이왕이면 이번에는 다른 사람들과 같이 할 수 있다면 공부하면서 같이 만들어보고 싶다.

Leave a comment