4
4
5
5
<img src =" https://tva1.sinaimg.cn/large/008i3skNly1gyol111yspj31aa0jggob.jpg " alt =" image-20220124110420220 " width =" 700 " align =" left " />
6
6
7
+
8
+
7
9
##### Netty并不是只支持过NIO,但是不建议(depercate)阻塞I/O(BIO/OIO)
8
10
9
11
- 连接数高的情况下:阻塞 -> 消耗源、效率低
@@ -31,6 +33,8 @@ BIO 下是 Thread-Per-Connection
31
33
32
34
<img src =" https://tva1.sinaimg.cn/large/008i3skNly1gyolp1ody6j31fy0kggpn.jpg " alt =" image-20220124112504631 " width =" 700 " align =" left " />
33
35
36
+
37
+
34
38
*** Thread-Per-Connection:对应每个连接都有1个线程处理,1个线程同时处理:读取、解码、计算、编码、发送***
35
39
36
40
@@ -39,6 +43,8 @@ NIO 下是 Reactor
39
43
40
44
<img src =" https://tva1.sinaimg.cn/large/008i3skNly1gyolp5zqpaj316w0pmadn.jpg " alt =" image-20220124112753682 " width =" 700 " align =" left " >
41
45
46
+
47
+
42
48
*** Reactor 多线程模式,由多个线程负责:读取、发送,由线程池负责处理:解码、计算、编码***
43
49
44
50
*** Reactor 主从多线程模式,由单独mainReactor 单线程负责接收请求,subReactor和 Reactor 多线程模式一致***
@@ -89,6 +95,146 @@ serverBootStrap.group(bossGroup, workerGroup);
89
95
90
96
91
97
98
+ ##### Netty 支持主从 Reactor 源码分析
99
+
100
+ 1.初始化 Main EventLoopGroup
101
+
102
+ ``` java
103
+ public abstract class AbstractBootstrap <B extends AbstractBootstrap<B , C > , C extends Channel > implements Cloneable {
104
+
105
+ // main Event Loop Group
106
+ volatile EventLoopGroup group;
107
+
108
+ ....
109
+
110
+ // 初始化 mian Event Loop Group 方法
111
+ public B group (EventLoopGroup group ) {
112
+ ObjectUtil . checkNotNull(group, " group" );
113
+ if (this . group != null ) {
114
+ throw new IllegalStateException (" group set already" );
115
+ }
116
+ this . group = group;
117
+ return self();
118
+ }
119
+
120
+ ....
121
+ }
122
+ ```
123
+
124
+ 2 . 初始化 Worker EventLoopGroup
125
+
126
+ ``` java
127
+ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap , ServerChannel > {
128
+
129
+ // woker Events Loop Group
130
+ private volatile EventLoopGroup childGroup;
131
+ .....
132
+
133
+ public ServerBootstrap group (EventLoopGroup parentGroup , EventLoopGroup childGroup ) {
134
+ super . group(parentGroup);
135
+ ObjectUtil . checkNotNull(childGroup, " childGroup" );
136
+ if (this . childGroup != null ) {
137
+ throw new IllegalStateException (" childGroup set already" );
138
+ }
139
+ // 初始化 worker Event Loop Group 方法
140
+ this . childGroup = childGroup;
141
+ return this ;
142
+ }
143
+
144
+ ....
145
+ }
146
+ ```
147
+
148
+ 3 . MainEventLoopGroup 和 WorkerEventLoop 绑定# bind(),并实现新建和初始化 SocketChannel 绑定到 MainEventLoopGroup中
149
+
150
+ ``` java
151
+ // 绑定 地址:端口
152
+ public ChannelFuture bind(SocketAddress localAddress) {
153
+ validate();
154
+ return doBind(ObjectUtil . checkNotNull(localAddress, " localAddress" ));
155
+ }
156
+
157
+ // 绑定逻辑
158
+ private ChannelFuture doBind(final SocketAddress localAddress) {
159
+ // 初始化 & 注册 MainEventLoopGroup
160
+ final ChannelFuture regFuture = initAndRegister();
161
+ final Channel channel = regFuture. channel();
162
+ ....
163
+ }
164
+
165
+ // 初始化 & 注册 MainEventLoopGroup
166
+ final ChannelFuture initAndRegister() {
167
+ Channel channel = null ;
168
+ try {
169
+ // 创建新的 ServerSocketChannel
170
+ channel = channelFactory. newChannel();
171
+ // 初始化 ServerSocketChannel 中的 Handler
172
+ init(channel);
173
+ } catch (Throwable t) {
174
+ if (channel != null ) {
175
+ // channel can be null if newChannel crashed (eg SocketException("too many open files"))
176
+ channel. unsafe(). closeForcibly();
177
+ // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
178
+ return new DefaultChannelPromise (channel, GlobalEventExecutor . INSTANCE ). setFailure(t);
179
+ }
180
+ // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
181
+ return new DefaultChannelPromise (new FailedChannel (), GlobalEventExecutor . INSTANCE ). setFailure(t);
182
+ }
183
+
184
+ // 将 ServerSocketChannel 注册到 MainEventLoop 中
185
+ // 因为端口和地址 只有1个,channel只能被注册一次,所以 MainEventLoopGroup 是单线程的
186
+ ChannelFuture regFuture = config(). group(). register(channel);
187
+ if (regFuture. cause() != null ) {
188
+ if (channel. isRegistered()) {
189
+ channel. close();
190
+ } else {
191
+ channel. unsafe(). closeForcibly();
192
+ }
193
+ }
194
+ ...
195
+ }
196
+
197
+ ```
198
+
199
+ 4 . WorkerEventLoopGroup 和 SocketChannel 绑定关系
200
+
201
+ ``` java
202
+ private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
203
+ @Override
204
+ @SuppressWarnings (" unchecked" )
205
+ public void channelRead (ChannelHandlerContext ctx , Object msg ) {
206
+ // 每次读取都是一个 SocketChannel
207
+ final Channel child = (Channel ) msg;
208
+
209
+ child. pipeline(). addLast(childHandler);
210
+
211
+ setChannelOptions(child, childOptions, logger);
212
+
213
+ for (Entry<AttributeKey<?> , Object > e: childAttrs) {
214
+ child. attr((AttributeKey<Object > ) e. getKey()). set(e. getValue());
215
+ }
216
+
217
+ try {
218
+ // 将 SocketChannel 注册到 workerEventLoopGroup中
219
+ childGroup. register(child). addListener(new ChannelFutureListener () {
220
+ @Override
221
+ public void operationComplete (ChannelFuture future ) throws Exception {
222
+ if (! future. isSuccess()) {
223
+ forceClose(child, future. cause());
224
+ }
225
+ }
226
+ });
227
+ } catch (Throwable t) {
228
+ forceClose(child, t);
229
+ }
230
+ }
231
+ }
232
+ ```
233
+
234
+
235
+
236
+
237
+
92
238
#### 3.Netty 粘包/半包解决方案
93
239
94
240
关于半包的主要原因:
@@ -100,6 +246,8 @@ serverBootStrap.group(bossGroup, workerGroup);
100
246
101
247
102
248
249
+
250
+
103
251
关于粘包的主要原因:
104
252
105
253
- 发送方每次写入数据 > 套接字缓冲区大小
@@ -162,3 +310,5 @@ TCP 是流式协议,消息无边界
162
310
163
311
164
312
313
+
314
+
0 commit comments