
Intro
This is the second post of the series and it is about Channel concept and implementation which is a key part of transport data. The first one is written in Korean and it can be read here. It is about the concept of Promise and EventLoop. I’ve started this project named ‘el’ to understand the Spring Webflux Reactive System.
Implementing Channel is the first milestone#1 we achieved with the teammates (implementing Promise and EventLoop is done by myself). I’m proud to make a progress steadily in spite of their busy schedule. Because we all have full-time work.
https://github.com/zeroFruit/el




The second post is about the Channel concept which is the basis for the transport of data. A Channel interacts with lots of its sub-component. We’re going to see if those work together.
Channel
I think the concept of Channel
is well explained in the Netty javadoc.
A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
So we can think of the Channel
as a connection between the client and the server. So if we are on the client side, we need to connect to the remote. We need to bind the address if we are on the server side.
A ChannelPipeline
can handle events that happen in the Channel
such as reading, and writing data with the ChannelHandler
which attaches to the ChannelPipeline
.
And all actual tasks like bind, connect, and read/write are run in the EventLoop
, especially single-thread EventLoop
to keep the order of the operations.
We’ll see each component in detail in this post.


public interface Channel {
/** Returns the assigned {@link ChannelPipeline} */
ChannelPipeline pipeline();
/** Returns {@code true} if the {@link Channel} is open and may get active later */
boolean isOpen();
/** Returns {@code true} if the {@link Channel} is registered with an {@link ChannelEventLoop} */
boolean isRegistered();
/** Returns {@code true} if the {@link Channel} is active and so connected. */
boolean isActive();
ChannelEventLoop channelEventLoop();
/**
* Returns the local address where this channel is bound to. {@link SocketAddress} is supposed to
* be down-cast into more concrete type such as {@link java.net.InetSocketAddress} to retrieve the
* detailed info
*/
SocketAddress localAddress();
/**
* Returns the remote address where this channel is connected to. The returned {@link
* SocketAddress} is supposed to be down-cast into more concrete type such as {@link
* java.net.InetSocketAddress} to retrieve the detailed info
*/
SocketAddress remoteAddress();
ChannelPromise bind(SocketAddress localAddress);
ChannelPromise connect(SocketAddress remoteAddress);
/** Returns an internal-use-only object that providees unsafe operations */
Internal internal();
interface Internal {
/** Return the {@link SocketAddress} to which is bound local or {@code null} if none. */
SocketAddress localAddress();
/**
* Return the {@link SocketAddress} to which is bound remote or {@code null} if none is bound
* yet.
*/
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify the {@link
* ChannelPromise} once the registration was complete.
*/
void register(ChannelEventLoop eventLoop, ChannelPromise promise);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and
* notify it once its done
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* Connect the {@link Channel} of the given {@link ChannelPromise} with the given remote {@link
* SocketAddress}.
*/
void connect(SocketAddress remoteAddress, ChannelPromise promise);
}
}
https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/Channel.java
ChannelPipeline
The interface of our ChannelPipeline
is very straightforward. As you can expect from the name of the methods, ChannelPipeline
manages the ChannelHandler
. The kinds of jobs like adding ChannelHandler
to the pipeline, and removing ChannelHandler
from the pipeline is all for the ChannelPipeline
. You can imagine as below. Then let’s take a look at the implementation of the ChannelPipeline
.


public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
/** Add handlers to its handler chain at the end. */
ChannelPipeline addLast(ChannelHandler... handlers);
/** Remove handler */
ChannelPipeline remove(ChannelHandler handler);
/** Returns channel it binds to. */
Channel channel();
/** Returns first handler context */
ChannelHandlerContext firstContext();
/** Returns the context of the handler */
ChannelHandlerContext context(ChannelHandler handler);
/**
* A {@link Channel} was registered to its {@link ChannelEventLoop}.
*
* <p>This will result in having the {@link
* ChannelInboundHandler#channelRegistered(ChannelHandlerContext)} method called of the next
* {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the {@link Channel}.
*/
@Override
ChannelPipeline fireChannelRegistered();
}
https://github.com/zeroFruit/el/blob/main/transport/src/main/java/io/el/channel/ChannelPipeline.java
DefaultChannelPipeline
From the constructor, we can see that ChannelPipeline
knows about its Channel. And the context of ChannelHandler
is managed as LinkedList. And when adding ChannelHandler
is added, the context of the handler is created and added to the LinkedList of the context. ChannelPipeline
not manages ChannelHandler
directly but its context. As you can see, context can reference its handler.




public class DefaultChannelPipeline implements ChannelPipeline {
private final Channel channel;
private final HeadContext headContext;
private final TailContext tailContext;
public DefaultChannelPipeline(Channel channel) {
this.channel = channel;
this.tailContext = new TailContext(this);
this.headContext = new HeadContext(this);
this.tailContext.prev = this.headContext;
this.headContext.next = this.tailContext;
}
@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
for (ChannelHandler handler : handlers) {
this.addLast(handler);
}
return this;
}
private ChannelPipeline addLast(ChannelHandler handler) {
final AbstractChannelHandlerContext prev = this.tailContext.prev;
final AbstractChannelHandlerContext newHandlerContext =
new DefaultChannelHandlerContext(name, this, eventLoop, handler);
this.tailContext.prev = newHandlerContext;
newHandlerContext.next = this.tailContext;
prev.next = newHandlerContext;
newHandlerContext.prev = prev;
return this;
}
...
}
Bind/Connect
A DefaultChannelPipeline
delegates the bind/connect request to the ChannelHandlerContext
LinkedList. You can find the details of how to handle the request of bind/connect in the LinkedList on the ChannelHandlerContext
section.




public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelPromise bind(SocketAddress localAddress) {
return this.tailContext.bind(localAddress);
}
@Override
public ChannelPromise bind(SocketAddress localAddress, ChannelPromise promise) {
return this.tailContext.bind(localAddress, promise);
}
@Override
public ChannelPromise connect(SocketAddress remoteAddress, ChannelPromise promise) {
return this.tailContext.connect(remoteAddress, promise);
}
private static final class TailContext extends AbstractChannelHandlerContext {
private final TailContextHandler handler;
public TailContext(ChannelPipeline pipeline) {
super(HEAD_NAME, pipeline, null, TailContextHandler.class);
this.handler = new TailContextHandler();
}
@Override
public ChannelHandler handler() {
return this.handler;
}
}
}
ChannelHandlerContext
How ChannelHandler
related with the ChannelHandlerContext
is selected and triggered when receiving bind/connect request.
When one of ChannelHandlerContext
receives the request to call, the first “outbound” ChannelHandlerContext
handles that request on the EventLoop that Channel of ChannelHandlerContext
is registered.
Then what is “outbound” and “inbound”?
ChannelHandler
Inbound/Outbound You can think of an “Inbound” event as the event that happens in the Channel. For example, in the el project, we defined ChannelInboundHandler
interface as follows:
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user to hook in to
* state changes easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
/**
* Called when {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link
* ChannelEventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/** Gets called if a {@link Throwable} was thrown. */
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
channelRegistered
event fires when the Channel is registered to the EventLoop and we can think of it as an inbound event because that event should be handled inside the Channel. You may understand better about the “inbound” event after we define the “outbound” event.
We can think of an “outbound” event as the event that happens on the Socket. For example, bind and connect those events can be thought of as outbound events because both are related to the Socket. In the el project, we defined ChannelOutboundHandler
interface as follows:
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* Called once a bind operation is made.
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception;
/**
* Called once a connect operation is made.
*/
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, ChannelPromise promise)
throws Exception;
}
How inbound/outbound ChannelHandlerContext
is selected and triggered (Cont’d)
As we talked about before, DefaultChannelPipeline
adds the ChannelHandlerContext
in the LinkedList, not the ChannelHandler
. And when ChannelHandlerContext
is created, it determines its execution flag by the ChannelHandler
class is given to the constructor. For example, if the ChannelHandler
implements the ChannelInboundHandler
interface only, a ChannelHandlerContext
sets its execution flag as FLAG_INBOUND
.




public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
AbstractChannelHandlerContext(
String name,
ChannelPipeline pipeline,
EventLoop eventLoop,
Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executionFlag = ChannelHandlerFlag.flag(handlerClass);
this.eventLoop = eventLoop;
}
...
}
final class ChannelHandlerFlag {
static final int FLAG_INBOUND = 0;
static final int FLAG_OUTBOUND = 1;
static final int FLAG_INOUTBOUND = 2;
static int flag(Class<? extends ChannelHandler> clazz) {
if (ChannelInboundHandler.class.isAssignableFrom(clazz)
&& ChannelOutboundHandler.class.isAssignableFrom(clazz)) {
return FLAG_INOUTBOUND;
}
if (ChannelInboundHandler.class.isAssignableFrom(clazz)) {
return FLAG_INBOUND;
}
return FLAG_OUTBOUND;
}
}
The execution flag is used to find ChannelHandler
for handling the request that a DefaultChannelPipeline
receives. And on the ChannelHandler
side, we can propagate that event to the next ChannelHandlerContext
of the LinkedList. We would see how it works in the next section.
The below code is about exceptionCaught
inbound events. When an exception occurs on the Channel it publishes exceptionCaught
event with the fireExceptionCaught()
method. Find the first inbound ChannelHandler
with the flag. If it is, ChannelHandlerContext
runs the event handler for handling exceptions. The ChannelHandler
has a responsibility to propagate the event to the next ChannelHandlerContext
, so we can not see that code on the ChannelHandlerContext
class.
public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
static void invokeExceptionCaught(
final AbstractChannelHandlerContext next, final Throwable cause) {
EventLoop eventLoop = next.eventLoop();
if (eventLoop.inEventLoop()) {
next.invokeExceptionCaught(cause);
} else {
eventLoop.execute(() -> next.invokeExceptionCaught(cause));
}
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable t) {
invokeExceptionCaught(findContextInbound(), t);
return this;
}
private void invokeExceptionCaught(Throwable t) {
try {
((ChannelInboundHandler) handler()).exceptionCaught(this, t);
} catch (Throwable err) {
...
}
}
}
What does ChannelHandler internally when calling bind/connect
We’ve seen how ChannelHandler
is selected by event type and run by ChannelHandlerContext
with the example of exceptionCaught
inbound event. Then what about the outbound events: bind, connect. This is important because we eventually want to transfer the data to the remote.
The way how ChannelHandler
is invoked is the same. We need to see how OutboundChannelHandler
should be implemented.
One really simple implementation of OutboundChannelHandler
is HeadContextHandler
, the first handler that handles the outbound events. It just delegates the request to the Channel.
So the concrete logic of how to bind and connect is up to the concrete Channel class. We will see in soon.




private static final class HeadContextHandler
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
ctx.channel().internal().bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress, ChannelPromise promise)
throws Exception {
ctx.channel().internal().connect(remoteAddress, promise);
}
}
Channel <> EventLoop
As we’ve talked about before and in the previous blog post (KOR), all the operations like bind, connect, read and write is happening in the EventLoop
to run the tasks asynchronously and the result of the task is received from the Promise
.
For now, the cases to use the EventLoop
in the Channel
in el project are bind and connect.
Let’s see the bind method. When Channel
execute bind(...)
, the head context of the ChannelPipeline
runs bind(...)
method code in the below. ChannelHandlerContext
schedules the bind task to the (single-thread) EventLoop and return the promise object to the caller.
When EventLoop
get the task from the queue and executes the binding task, it runs invokeBind(...)
method which in turns calls ChannelHandler#bind(...)
method. If it fails set an exception to the promise.
Like this, all the operations are not executed right away, it is scheduled to the EventLoop
and the EventLoop
executes the task and set the result to the promise.
public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
...
@Override
public ChannelPromise bind(SocketAddress localAddress, ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise)) {
// canceled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
final EventLoop eventLoop = next.eventLoop();
if (eventLoop.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(eventLoop, () -> next.invokeBind(localAddress, promise), promise);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
promise.setFailure(t);
}
}
}
Then how a caller receives the result from the Channel
? Let’s see one of the callers, Transport
. Transport
is a component that abstracts the way to make connections with the remote and transfer data. End-client is not using Channel
directly, instead, they use a Transport
.
You can see that the Transport
, the caller, calls Channel#bind()
method. In this case, it adds PromiseListener
to the returned promise and got notified of the result of the bind. And depending on the result, it passes the result to the Promise
called result
. In this way, a caller handles the returned Promise
and takes actions depending on the asynchronous task result.
public abstract class AbstractTransport<T extends AbstractTransport<T, C>, C extends Channel> {
public ChannelPromise bind(SocketAddress localAddress) {
doBind(...);
}
private void doBind(
final ChannelPromise registered,
final Channel channel,
final SocketAddress localAddress,
final ChannelPromise result) {
channel
.channelEventLoop()
.execute(
() -> {
channel
.bind(localAddress)
.addListener(
promise -> {
if (promise.isSuccess()) {
result.setSuccess(null);
return;
}
result.setFailure(promise.cause());
});
return;
});
}
}
LocalChannel, LocalServerChannel
The last thing we’ve implemented in milestone#1 is the LocalChannel
(client) and LocalServerChannel
(server). One of the implementations of the Channel
and the simplest one. However, as the name suggests it does not work with the socket. It just works on the memory.
Below is the test case that tests a LocalChannel
connects to the LocalServerChannel
.
- Both the
LocalChannel
and theLocalServerChannel
register its ownEventLoopGroup
which managesEventLoop
. - The
LocalServerChannel
first binds to the address ofSERVER_ADDRESS
. Because it is a local channel the address format does not follow the URL format. - The
LocalChannel
then connects to the address ofSERVER_ADDRESS
. - Check both the
LocalChannel
andLocalServerChannel
are in an ‘ACTIVE’ state.
public class LocalChannelTransportTests {
private static final String SERVER_ADDRESS = "server:addr";
private DefaultChannelEventLoopGroup group1, group2;
@BeforeEach
public void setUp() {
group1 = new DefaultChannelEventLoopGroup(1);
group2 = new DefaultChannelEventLoopGroup(1);
}
@Test
@DisplayName("When client connect to server, both server and client is in active state")
public void localChannel() throws InterruptedException {
ServerTransport st = new ServerTransport();
ClientTransport ct = new ClientTransport();
st.group(group1)
.channel(LocalServerChannel.class)
.handler(
new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(new TestHandler());
}
});
ct.group(group2).channel(LocalChannel.class).handler(new TestHandler());
Channel sc = st.bind(new LocalAddress(SERVER_ADDRESS)).await().channel();
Channel cc = ct.connect(sc.localAddress()).await().channel();
assertTrue(sc.isActive());
assertTrue(cc.isActive());
}
}
LocalServerChannel#bind()
First, let’s take a look at how LocalServerChannel#bind()
works. It is so simple.
As we talked about before the head context of the channel pipeline calls the LocalServerChannel.LocalServerInternal#doBind()
. In doBind(...)
method, it registers its address to the LocalChannelRegistry
and updates its state to ‘BOUND’.
LocalChannelRegistry
is a simple key, value store, key as address, and value as the channel. When a client tries to connect to the address, it tries to find the Channel of the address in the LocalChannelRegistry
and connects to it.
public class LocalServerChannel extends AbstractServerChannel {
...
private class LocalServerInternal extends AbstractInternal {
@Override
public void doBind(SocketAddress localAddress) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
LocalServerChannel.this.localAddress =
LocalChannelRegistry.register(LocalServerChannel.this, localAddress);
state = State.BOUND;
}
}
}
LocalChannel#connect()
When a client tries to connect to the remoteAddress
, it tries to find the server Channel from the LocalChannelRegistry
.
And it registers itself to the server using server.connectFrom(...)
and updates the state to ‘CONNECTED’. When calling server.connectFrom(...)
, the server Channel also updates the state to ‘CONNECTED’. And update the as success.
public class LocalChannel extends AbstractChannel {
...
protected void doConnect(LocalServerChannel server, ChannelPromise result) {
this.server = server;
server.connectFrom(this);
state = State.CONNECTED;
remoteAddress = this.server.localAddress();
result.setSuccess(null);
}
...
private class LocalInternal extends AbstractInternal {
@Override
public void connect(SocketAddress remoteAddress, ChannelPromise promise) {
...
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ConnectException("Connection refused: " + remoteAddress);
promise.setFailure(cause);
return;
}
doConnect((LocalServerChannel) boundChannel, promise);
}
}
}
What’s Next?
We need to implement read/write operation on the Channel, the milestone#2. In milestone#1, we’ve implemented Channel concept and ready to build some thing on this Channel.