콘텐츠로 건너뛰기

Channel concept & implementation— el Project (2)

Intro

This is the second post of the series and it is about Channel concept and implementation which is a key part of transport data. The first one is written in Korean and it can be read here. It is about the concept of Promise and EventLoop. I’ve started this project named ‘el’ to understand the Spring Webflux Reactive System.

Implementing Channel is the first milestone#1 we achieved with the teammates (implementing Promise and EventLoop is done by myself). I’m proud to make a progress steadily in spite of their busy schedule. Because we all have full-time work.

https://github.com/zeroFruit/el

el project milestones
el project milestones

The second post is about the Channel concept which is the basis for the transport of data. A Channel interacts with lots of its sub-component. We’re going to see if those work together.

Channel

I think the concept of Channel is well explained in the Netty javadoc.

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

So we can think of the Channel as a connection between the client and the server. So if we are on the client side, we need to connect to the remote. We need to bind the address if we are on the server side.

ChannelPipeline can handle events that happen in the Channel such as reading, and writing data with the ChannelHandler which attaches to the ChannelPipeline .

And all actual tasks like bind, connect, and read/write are run in the EventLoop, especially single-thread EventLoop to keep the order of the operations.

We’ll see each component in detail in this post.

Abstract view of Channel with its related component
Abstract view of Channel with its related component
public interface Channel {
  /** Returns the assigned {@link ChannelPipeline} */
  ChannelPipeline pipeline();
  /** Returns {@code true} if the {@link Channel} is open and may get active later */
  boolean isOpen();
  /** Returns {@code true} if the {@link Channel} is registered with an {@link ChannelEventLoop} */
  boolean isRegistered();
  /** Returns {@code true} if the {@link Channel} is active and so connected. */
  boolean isActive();
  ChannelEventLoop channelEventLoop();
  /**
   * Returns the local address where this channel is bound to. {@link SocketAddress} is supposed to
   * be down-cast into more concrete type such as {@link java.net.InetSocketAddress} to retrieve the
   * detailed info
   */
  SocketAddress localAddress();
  /**
   * Returns the remote address where this channel is connected to. The returned {@link
   * SocketAddress} is supposed to be down-cast into more concrete type such as {@link
   * java.net.InetSocketAddress} to retrieve the detailed info
   */
  SocketAddress remoteAddress();
  ChannelPromise bind(SocketAddress localAddress);
  ChannelPromise connect(SocketAddress remoteAddress);
  /** Returns an internal-use-only object that providees unsafe operations */
  Internal internal();
  interface Internal {
    /** Return the {@link SocketAddress} to which is bound local or {@code null} if none. */
    SocketAddress localAddress();
    /**
     * Return the {@link SocketAddress} to which is bound remote or {@code null} if none is bound
     * yet.
     */
    SocketAddress remoteAddress();
    /**
     * Register the {@link Channel} of the {@link ChannelPromise} and notify the {@link
     * ChannelPromise} once the registration was complete.
     */
    void register(ChannelEventLoop eventLoop, ChannelPromise promise);
    /**
     * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and
     * notify it once its done
     */
    void bind(SocketAddress localAddress, ChannelPromise promise);
    /**
     * Connect the {@link Channel} of the given {@link ChannelPromise} with the given remote {@link
     * SocketAddress}.
     */
    void connect(SocketAddress remoteAddress, ChannelPromise promise);
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/Channel.java

ChannelPipeline

The interface of our ChannelPipeline is very straightforward. As you can expect from the name of the methods, ChannelPipeline manages the ChannelHandler. The kinds of jobs like adding ChannelHandler to the pipeline, and removing ChannelHandler from the pipeline is all for the ChannelPipeline. You can imagine as below. Then let’s take a look at the implementation of the ChannelPipeline.

ChannelPipeline conceptual view
ChannelPipeline conceptual view
public interface ChannelPipeline
    extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
  /** Add handlers to its handler chain at the end. */
  ChannelPipeline addLast(ChannelHandler... handlers);
  /** Remove handler */
  ChannelPipeline remove(ChannelHandler handler);
  /** Returns channel it binds to. */
  Channel channel();
  /** Returns first handler context */
  ChannelHandlerContext firstContext();
  /** Returns the context of the handler */
  ChannelHandlerContext context(ChannelHandler handler);
  /**
   * A {@link Channel} was registered to its {@link ChannelEventLoop}.
   *
   * <p>This will result in having the {@link
   * ChannelInboundHandler#channelRegistered(ChannelHandlerContext)} method called of the next
   * {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the {@link Channel}.
   */
  @Override
  ChannelPipeline fireChannelRegistered();
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/ChannelPipeline.java

DefaultChannelPipeline

From the constructor, we can see that ChannelPipeline knows about its Channel. And the context of ChannelHandler is managed as LinkedList. And when adding ChannelHandler is added, the context of the handler is created and added to the LinkedList of the context. ChannelPipeline not manages ChannelHandler directly but its context. As you can see, context can reference its handler.

Channel concept - DefaultChannelPipeline implementation -1
DefaultChannelPipeline implementation – 1
public class DefaultChannelPipeline implements ChannelPipeline {
  private final Channel channel;
  private final HeadContext headContext;
  private final TailContext tailContext;
  public DefaultChannelPipeline(Channel channel) {
    this.channel = channel;
    this.tailContext = new TailContext(this);
    this.headContext = new HeadContext(this);
    this.tailContext.prev = this.headContext;
    this.headContext.next = this.tailContext;
  }
    
  @Override
  public ChannelPipeline addLast(ChannelHandler... handlers) {
    for (ChannelHandler handler : handlers) {
      this.addLast(handler);
    }
    return this;
  }
  private ChannelPipeline addLast(ChannelHandler handler) {
    final AbstractChannelHandlerContext prev = this.tailContext.prev;
    final AbstractChannelHandlerContext newHandlerContext =
        new DefaultChannelHandlerContext(name, this, eventLoop, handler);
    this.tailContext.prev = newHandlerContext;
    newHandlerContext.next = this.tailContext;
    prev.next = newHandlerContext;
    newHandlerContext.prev = prev;
    
    return this;
  }
  
  ...
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/DefaultChannelPipeline.java

Bind/Connect

DefaultChannelPipeline delegates the bind/connect request to the ChannelHandlerContext LinkedList. You can find the details of how to handle the request of bind/connect in the LinkedList on the ChannelHandlerContext section.

Channel concept - DefaultChannelPipeline implementation - 2
DefaultChannelPipeline implementation – 2
public class DefaultChannelPipeline implements ChannelPipeline {
  @Override
  public ChannelPromise bind(SocketAddress localAddress) {
    return this.tailContext.bind(localAddress);
  }
  @Override
  public ChannelPromise bind(SocketAddress localAddress, ChannelPromise promise) {
    return this.tailContext.bind(localAddress, promise);
  }
  @Override
  public ChannelPromise connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return this.tailContext.connect(remoteAddress, promise);
  }
  
  private static final class TailContext extends AbstractChannelHandlerContext {
    private final TailContextHandler handler;
    public TailContext(ChannelPipeline pipeline) {
      super(HEAD_NAME, pipeline, null, TailContextHandler.class);
      this.handler = new TailContextHandler();
    }
    @Override
    public ChannelHandler handler() {
      return this.handler;
    }
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/DefaultChannelPipeline.java

ChannelHandlerContext

How ChannelHandler related with the ChannelHandlerContext is selected and triggered when receiving bind/connect request.

When one of ChannelHandlerContext receives the request to call, the first “outbound” ChannelHandlerContext handles that request on the EventLoop that Channel of ChannelHandlerContext is registered.

Then what is “outbound” and “inbound”?

Inbound/Outbound ChannelHandler

You can think of an “Inbound” event as the event that happens in the Channel. For example, in the el project, we defined ChannelInboundHandler interface as follows:

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user to hook in to
 * state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {
  /**
   * Called when {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link
   * ChannelEventLoop}
   */
  void channelRegistered(ChannelHandlerContext ctx) throws Exception;
  /** Gets called if a {@link Throwable} was thrown. */
  void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/ChannelInboundHandler.java

channelRegistered event fires when the Channel is registered to the EventLoop and we can think of it as an inbound event because that event should be handled inside the Channel. You may understand better about the “inbound” event after we define the “outbound” event.

We can think of an “outbound” event as the event that happens on the Socket. For example, bind and connect those events can be thought of as outbound events because both are related to the Socket. In the el project, we defined ChannelOutboundHandler interface as follows:

public interface ChannelOutboundHandler extends ChannelHandler {
  /**
   * Called once a bind operation is made.
   */
  void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
      throws Exception;
  /**
   * Called once a connect operation is made.
   */
  void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, ChannelPromise promise)
      throws Exception;
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/ChannelOutboundHandler.java

How inbound/outbound ChannelHandlerContext is selected and triggered (Cont’d)

As we talked about before, DefaultChannelPipeline adds the ChannelHandlerContext in the LinkedList, not the ChannelHandler. And when ChannelHandlerContext is created, it determines its execution flag by the ChannelHandler class is given to the constructor. For example, if the ChannelHandler implements the ChannelInboundHandler interface only, a ChannelHandlerContext sets its execution flag as FLAG_INBOUND.

Channel concpet: Event chaining by ChannelHandlers in the ChannelPipeline
Event chaining by ChannelHandlers in the ChannelPipeline
public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
  AbstractChannelHandlerContext(
      String name,
      ChannelPipeline pipeline,
      EventLoop eventLoop,
      Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executionFlag = ChannelHandlerFlag.flag(handlerClass);
    this.eventLoop = eventLoop;
  }
  ...
}
final class ChannelHandlerFlag {
  static final int FLAG_INBOUND = 0;
  static final int FLAG_OUTBOUND = 1;
  static final int FLAG_INOUTBOUND = 2;
  static int flag(Class<? extends ChannelHandler> clazz) {
    if (ChannelInboundHandler.class.isAssignableFrom(clazz)
        && ChannelOutboundHandler.class.isAssignableFrom(clazz)) {
      return FLAG_INOUTBOUND;
    }
    if (ChannelInboundHandler.class.isAssignableFrom(clazz)) {
      return FLAG_INBOUND;
    }
    return FLAG_OUTBOUND;
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/AbstractChannelHandlerContext.java

The execution flag is used to find ChannelHandler for handling the request that a DefaultChannelPipeline receives. And on the ChannelHandler side, we can propagate that event to the next ChannelHandlerContext of the LinkedList. We would see how it works in the next section.

The below code is about exceptionCaught inbound events. When an exception occurs on the Channel it publishes exceptionCaught event with the fireExceptionCaught() method. Find the first inbound ChannelHandler with the flag. If it is, ChannelHandlerContext runs the event handler for handling exceptions. The ChannelHandler has a responsibility to propagate the event to the next ChannelHandlerContext, so we can not see that code on the ChannelHandlerContext class.


public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
  
  static void invokeExceptionCaught(
      final AbstractChannelHandlerContext next, final Throwable cause) {
    EventLoop eventLoop = next.eventLoop();
    if (eventLoop.inEventLoop()) {
      next.invokeExceptionCaught(cause);
    } else {
      eventLoop.execute(() -> next.invokeExceptionCaught(cause));
    }
  }
    
  @Override
  public ChannelHandlerContext fireExceptionCaught(Throwable t) {
    invokeExceptionCaught(findContextInbound(), t);
    return this;
  }
    
  private void invokeExceptionCaught(Throwable t) {
    try {
      ((ChannelInboundHandler) handler()).exceptionCaught(this, t);
    } catch (Throwable err) {
	  ...
    }
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/AbstractChannelHandlerContext.java

What does ChannelHandler internally when calling bind/connect

We’ve seen how ChannelHandler is selected by event type and run by ChannelHandlerContext with the example of exceptionCaught inbound event. Then what about the outbound events: bind, connect. This is important because we eventually want to transfer the data to the remote.

The way how ChannelHandler is invoked is the same. We need to see how OutboundChannelHandler should be implemented.

One really simple implementation of OutboundChannelHandler is HeadContextHandler, the first handler that handles the outbound events. It just delegates the request to the Channel.

So the concrete logic of how to bind and connect is up to the concrete Channel class. We will see in soon.

Channel concept - ChannelHandler invokes bind/connect
ChannelHandler invokes bind/connect
private static final class HeadContextHandler
    implements ChannelOutboundHandler, ChannelInboundHandler {
  @Override
  public void bind(
      ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    ctx.channel().internal().bind(localAddress, promise);
  }
  @Override
  public void connect(
      ChannelHandlerContext ctx, SocketAddress remoteAddress, ChannelPromise promise)
      throws Exception {
    ctx.channel().internal().connect(remoteAddress, promise);
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/DefaultChannelPipeline.java#L162

Channel <> EventLoop

As we’ve talked about before and in the previous blog post (KOR), all the operations like bind, connect, read and write is happening in the EventLoop to run the tasks asynchronously and the result of the task is received from the Promise.

For now, the cases to use the EventLoop in the Channel in el project are bind and connect.

Let’s see the bind method. When Channel execute bind(...) , the head context of the ChannelPipeline runs bind(...) method code in the below. ChannelHandlerContext schedules the bind task to the (single-thread) EventLoop and return the promise object to the caller.

When EventLoop get the task from the queue and executes the binding task, it runs invokeBind(...) method which in turns calls ChannelHandler#bind(...) method. If it fails set an exception to the promise.

Like this, all the operations are not executed right away, it is scheduled to the EventLoop and the EventLoop executes the task and set the result to the promise.

public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
  ...
  @Override
  public ChannelPromise bind(SocketAddress localAddress, ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise)) {
      // canceled
      return promise;
    }
    final AbstractChannelHandlerContext next = findContextOutbound();
    final EventLoop eventLoop = next.eventLoop();
    if (eventLoop.inEventLoop()) {
      next.invokeBind(localAddress, promise);
    } else {
      safeExecute(eventLoop, () -> next.invokeBind(localAddress, promise), promise);
    }
    return promise;
  }
  private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    try {
      ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
    } catch (Throwable t) {
      promise.setFailure(t);
    }
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/AbstractChannelHandlerContext.java

Then how a caller receives the result from the Channel ? Let’s see one of the callers, Transport . Transport is a component that abstracts the way to make connections with the remote and transfer data. End-client is not using Channel directly, instead, they use a Transport.

You can see that the Transport , the caller, calls Channel#bind() method. In this case, it adds PromiseListener to the returned promise and got notified of the result of the bind. And depending on the result, it passes the result to the Promise called result . In this way, a caller handles the returned Promise and takes actions depending on the asynchronous task result.

public abstract class AbstractTransport<T extends AbstractTransport<T, C>, C extends Channel> {
  public ChannelPromise bind(SocketAddress localAddress) {
    doBind(...);
  }
  
  private void doBind(
    final ChannelPromise registered,
    final Channel channel,
    final SocketAddress localAddress,     
    final ChannelPromise result) {
    channel
        .channelEventLoop()
        .execute(
            () -> {
              channel
                  .bind(localAddress)
                  .addListener(
                      promise -> {
                        if (promise.isSuccess()) {
                          result.setSuccess(null);
                          return;
                        }
                        result.setFailure(promise.cause());
                      });
              return;
            });
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/transport/AbstractTransport.java

LocalChannel, LocalServerChannel

The last thing we’ve implemented in milestone#1 is the LocalChannel (client) and LocalServerChannel (server). One of the implementations of the Channel and the simplest one. However, as the name suggests it does not work with the socket. It just works on the memory.

Below is the test case that tests a LocalChannel connects to the LocalServerChannel .

  • Both the LocalChannel and the LocalServerChannel register its own EventLoopGroup which manages EventLoop.
  • The LocalServerChannel first binds to the address of SERVER_ADDRESS . Because it is a local channel the address format does not follow the URL format.
  • The LocalChannel then connects to the address of SERVER_ADDRESS.
  • Check both the LocalChannel and LocalServerChannel are in an ‘ACTIVE’ state.
public class LocalChannelTransportTests {
  private static final String SERVER_ADDRESS = "server:addr";
  private DefaultChannelEventLoopGroup group1, group2;
  @BeforeEach
  public void setUp() {
    group1 = new DefaultChannelEventLoopGroup(1);
    group2 = new DefaultChannelEventLoopGroup(1);
  }
  @Test
  @DisplayName("When client connect to server, both server and client is in active state")
  public void localChannel() throws InterruptedException {
    ServerTransport st = new ServerTransport();
    ClientTransport ct = new ClientTransport();
    st.group(group1)
        .channel(LocalServerChannel.class)
        .handler(
            new ChannelInitializer<LocalChannel>() {
              @Override
              protected void initChannel(LocalChannel ch) throws Exception {
                ch.pipeline().addLast(new TestHandler());
              }
            });
    ct.group(group2).channel(LocalChannel.class).handler(new TestHandler());
    Channel sc = st.bind(new LocalAddress(SERVER_ADDRESS)).await().channel();
    Channel cc = ct.connect(sc.localAddress()).await().channel();
    assertTrue(sc.isActive());
    assertTrue(cc.isActive());
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/test/java/io/el/channel/local/LocalChannelTransportTests.java

LocalServerChannel#bind()

First, let’s take a look at how LocalServerChannel#bind() works. It is so simple.

As we talked about before the head context of the channel pipeline calls the LocalServerChannel.LocalServerInternal#doBind() . In doBind(...) method, it registers its address to the LocalChannelRegistry and updates its state to ‘BOUND’.

LocalChannelRegistry is a simple key, value store, key as address, and value as the channel. When a client tries to connect to the address, it tries to find the Channel of the address in the LocalChannelRegistry and connects to it.


public class LocalServerChannel extends AbstractServerChannel {
  ...
  private class LocalServerInternal extends AbstractInternal {
    @Override
    public void doBind(SocketAddress localAddress) {
      ObjectUtil.checkNotNull(localAddress, "localAddress");
      LocalServerChannel.this.localAddress =
          LocalChannelRegistry.register(LocalServerChannel.this, localAddress);
      state = State.BOUND;
    }
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/local/LocalServerChannel.java

LocalChannel#connect()

When a client tries to connect to the remoteAddress, it tries to find the server Channel from the LocalChannelRegistry.

And it registers itself to the server using server.connectFrom(...) and updates the state to ‘CONNECTED’. When calling server.connectFrom(...) , the server Channel also updates the state to ‘CONNECTED’. And update the as success.

public class LocalChannel extends AbstractChannel {
  ...
  protected void doConnect(LocalServerChannel server, ChannelPromise result) {
    this.server = server;
    server.connectFrom(this);
    state = State.CONNECTED;
    remoteAddress = this.server.localAddress();
    result.setSuccess(null);
  }
  ...
  private class LocalInternal extends AbstractInternal {
    @Override
    public void connect(SocketAddress remoteAddress, ChannelPromise promise) {
      ...
      Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
      if (!(boundChannel instanceof LocalServerChannel)) {
        Exception cause = new ConnectException("Connection refused: " + remoteAddress);
        promise.setFailure(cause);
        return;
      }
      doConnect((LocalServerChannel) boundChannel, promise);
    }
  }
}

https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/local/LocalChannel.java

What’s Next?

We need to implement read/write operation on the Channel, the milestone#2. In milestone#1, we’ve implemented Channel concept and ready to build some thing on this Channel.

Leave a comment