好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

elasticsearch节点的transport请求发送处理分析

transport请求的发送和处理过程

前一篇分析对 nettytransport的启动及连接 ,本篇主要分析transport请求的发送和处理过程。

cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。

所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。

request的发送过程

代码在nettytransport中如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

public void sendRequest( final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

         //参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;

     Channel targetChannel = nodeChannel(node, options); //获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)

         if (compress) {

             options.withCompress( true );

         }

         byte status = 0 ;

      //设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;

    status = TransportStatus.setRequest(status);

      ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); //初始写出流

         boolean addedReleaseListener = false ;

         try {

             bStream.skip(NettyHeader.HEADER_SIZE); //留出message header的位置

             StreamOutput stream = bStream;

             // only compress if asked, and, the request is not bytes, since then only

             // the header part is compressed, and the "body" can't be extracted as compressed

             if (options测试数据press() && (!(request instanceof BytesTransportRequest))) {

                 status = TransportStatus.setCompress(status);

                 stream = CompressorFactory.defaultCompressor().streamOutput(stream);

             }

             stream = new HandlesStreamOutput(stream);

             // we pick the smallest of the 2, to support both backward and forward compatibility

             // note, this is the only place we need to do this, since from here on, we use the serialized version

             // as the version to use also when the node receiving this request will send the response with

             Version version = Version.smallest( this .version, node.version());

             stream.setVersion(version);

             stream.writeString(transportServiceAdapter.action(action, version));

             ReleasableBytesReference bytes;

             ChannelBuffer buffer;

             // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output

             // that create paged channel buffers, but its tricky to know when to do it (where this option is

             // more explicit).

             if (request instanceof BytesTransportRequest) {

                 BytesTransportRequest bRequest = (BytesTransportRequest) request;

                 assert node.version().equals(bRequest.version());

                 bRequest.writeThin(stream);

                 stream.close();

                 bytes = bStream.bytes();

                 ChannelBuffer headerBuffer = bytes.toChannelBuffer();

                 ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();

                 buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);

             } else {

                 request.writeTo(stream);

                 stream.close();

                 bytes = bStream.bytes();

                 buffer = bytes.toChannelBuffer();

             }

             NettyHeader.writeHeader(buffer, requestId, status, version); //写信息头

             ChannelFuture future = targetChannel.write(buffer); //写buffer同时获取future,发送信息发生在这里

             ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);

             future.addListener(listener); //添加listener

             addedReleaseListener = true ;

             transportServiceAdapter.onRequestSent(node, requestId, action, request, options);

         } finally {

             if (!addedReleaseListener) {

                 Releasables.close(bStream.bytes());

             }

         }

     }

以上就是request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里transportRequest是一个抽象类,它继承了TransportMessage同时实现了streamable接口。cluster中对它的实现非常多,各个功能都有相应的request,这里就不一一列举,后面的代码分析中会时常涉及。

request的接受过程

request发送只是transport的一部分功能,有发送就要有接收,这样transport的功能才完整。接下来就是对接收过程的分析。上一篇中简单介绍过netty的使用,message的处理是通过MessageHandler处理,因此nettyTransport的信息处理逻辑都在MessageChannelHandler的messageReceived()方法中,代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

         Transports.assertTransportThread();

         Object m = e.getMessage();

         if (!(m instanceof ChannelBuffer)) { //非buffer之间返回

             ctx.sendUpstream(e);

             return ;

         }

      //解析message头

         ChannelBuffer buffer = (ChannelBuffer) m;

         int size = buffer.getInt(buffer.readerIndex() - 4 );

         transportServiceAdapter.received(size + 6 );

         // we have additional bytes to read, outside of the header

         boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6 )) != 0 ;

         int markedReaderIndex = buffer.readerIndex();

         int expectedIndexReader = markedReaderIndex + size;

         // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh

         // buffer, or in the cumlation buffer, which is cleaned each time

         StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);

       //读取信息头中的几个重要元数据

         long requestId = buffer.readLong();

         byte status = buffer.readByte();

         Version version = Version.fromId(buffer.readInt());

         StreamInput wrappedStream;

      …………

         if (TransportStatus.isRequest(status)) { //处理请求

             String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);

             if (buffer.readerIndex() != expectedIndexReader) {

                 if (buffer.readerIndex() < expectedIndexReader) {

                     logger.warn( "Message not fully read (request) for [{}] and action [{}], resetting" , requestId, action);

                 } else {

                     logger.warn( "Message read past expected size (request) for [{}] and action [{}], resetting" , requestId, action);

                 }

                 buffer.readerIndex(expectedIndexReader);

             }

         } else { //处理响应

             TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);

             // ignore if its null, the adapter logs it

             if (handler != null ) {

                 if (TransportStatus.isError(status)) {

                     handlerResponseError(wrappedStream, handler);

                 } else {

                     handleResponse(ctx.getChannel(), wrappedStream, handler);

                 }

             } else {

                 // if its null, skip those bytes

                 buffer.readerIndex(markedReaderIndex + size);

             }

           …………

         wrappedStream.close();

     }

以上就是信息处理逻辑,这个方法基础自netty的SimpleChannelUpstreamHandler类。作为MessageHandler会在client和server启动时加入到handler链中,在信息到达后netty会自动调用handler链依次处理。这是netty的内容,就不详细说明,请参考netty文档。

request和response是如何被处理

request的处理

代码如下所示:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {

         final String action = buffer.readString(); //读出action的名字

         transportServiceAdapter.onRequestReceived(requestId, action);

         final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);

         try {

             final TransportRequestHandler handler = transportServiceAdapter.handler(action, version); //获取处理该信息的handler

             if (handler == null ) {

                 throw new ActionNotFoundTransportException(action);

             }

             final TransportRequest request = handler.newInstance();

             request.remoteAddress( new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));

             request.readFrom(buffer);

             if (handler.executor() == ThreadPool.Names.SAME) {

                 //noinspection unchecked

                 handler.messageReceived(request, transportChannel); //使用该handler处理信息。

             } else {

                 threadPool.executor(handler.executor()).execute( new RequestHandler(handler, request, transportChannel, action));

             }

         } catch (Throwable e) {

             try {

                 transportChannel.sendResponse(e);

             } catch (IOException e1) {

                 logger.warn( "Failed to send error message back to client for action [" + action + "]" , e);

                 logger.warn( "Actual Exception" , e1);

             }

         }

         return action;

     }

几个关键部分在代码中进行了标注。这里仍旧不能看到请求是如何处理的。因为cluster中的请求各种各样,如ping,discovery,index等等,因此不可能使用同一种处理方式。因此request最终又被提交给handler处理。每个功能请求都实现了自己的handler,当请求被提交给handler时会做对应的处理。这里再说一下transportServiceAdapter,消息的处理都是通过它适配转发完成。request的完整处理流程是:messageReceived()方法收到信息判断是request会将其转发到transportServiceAdapter的handler方法,handler方法查找对应的requesthandler,使用将信息转发给该handler进行处理。这里就不举例说明,在后面的discover分析中我们会看到发现,ping等请求的处理过程。

response的处理过程

response通过handleResponse方法进行处理,代码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {

         final TransportResponse response = handler.newInstance();

         response.remoteAddress( new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));

         response.remoteAddress();

         try {

             response.readFrom(buffer);

         } catch (Throwable e) {

             handleException(handler, new TransportSerializationException( "Failed to deserialize response of type [" + response.getClass().getName() + "]" , e));

             return ;

         }

         try {

             if (handler.executor() == ThreadPool.Names.SAME) {

                 //noinspection unchecked

                 handler.handleResponse(response); //转发给对应的handler

             } else {

                 threadPool.executor(handler.executor()).execute( new ResponseHandler(handler, response));

             }

         } catch (Throwable e) {

             handleException(handler, new ResponseHandlerFailureTransportException(e));

         }

     }

response的处理过程跟request很类似。每个request都会对应一个handler和一个response的处理handler,会在时候的时候注册到transportService中。请求到达时根据action名称获取到handler处理request,根据requestId获取对应的response handler进行响应。

最后总结

nettyTransport的信息处理过程:信息通过request方法发送到目标节点,目标节点的messagehandler会受到该信息,确定是request还是response,将他们分别转发给transportServiceAdapter,TransportServiceAdapter会查询到对应的handler,信息最终会被转发给对应的handler处理并反馈。

对于nettyTransport信息发送的分析就到这里,在下一篇的cluster discovery分析中,我们会看到信息发送及处理的具体过程,希望大家以后多多支持!

原文链接:https://HdhCmsTestcnblogs测试数据/zziawanblog/p/6528616.html

查看更多关于elasticsearch节点的transport请求发送处理分析的详细内容...

  阅读:15次