请求管道
描述:在连接上发送多个请求而不用等待前一个请求的响应从而优化延迟。
问题
在集群中用一个 Socket Channel 通信,如果这个请求需要等待上一个请求的响应会导致性能问题。为了实现更好的延时和高吞吐量,服务器的请求队列应该被填充满,来确保服务器资源得到充分的利用。例如,在服务器中使用Singular Update Queue,在它正在处理一个请求时,它会一直接收请求直到这个队列被填满。如果一次只发送一个请求,那么大多数服务器都会不必要的浪费。
解决方案
节点发送请求到其它节点,不需要等待上一个请求的响应返回。这是通过创建两个独立的线程来实现的,一个在网络通道上发送请求,另一个在网络通道上接收响应。
发送消息的节点在 socket 信道上发送请求,不比等待响应。
class SingleSocketChannel {
public void sendOneWay(RequestOrResponse request) throws IOException {
var dataStream = new DataOutputStream(socketOutputStream);
byte[] messageBytes = serialize(request);
dataStream.writeInt(messageBytes.length);
dataStream.write(messageBytes);
}
}
另一个独立的线程就开始读取响应
class ResponseThread extends Thread implements Logging {
private volatile boolean isRunning = false;
private SingleSocketChannel socketChannel;
public ResponseThread(SingleSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
isRunning = true;
logger.info("Starting responder thread = " + isRunning);
while (isRunning) {
doWork();
}
} catch (IOException e) {
getLogger().error(e); //thread exits if stopped or there is IO error
}
}
public void doWork() throws IOException {
RequestOrResponse response = socketChannel.read();
logger.info("Read Response = " + response);
processResponse(response);
}
响应处理程序能立马处理响应,或将其提交给Singular Update Queue。
在使用请求管道时这里有两个问题需要去处理。
如果请求是连续发送的且没有等待其响应,那么这个节点接收到这个请求会直到超出限制。为此,这里需要设置一个上限阈值来表明一次只能接收多少个请求。任何节点都可以向其它节点最多发送该阈值请求数量。一旦发送的请求超过了设置的这个阈值就不会再接收这个请求,并且会阻塞。一个最简单的策略就是保留一个阻塞队列来跟踪请求。这个队列在初始化的设置这个请求的数量。一旦收到请求的响应,它就会从队列中移除,以便继续接收后面的请求。下面的代码展示了。每个 socket 连接能接受的最大的请求消息数。
class RequestLimitingPipelinedConnection {
...
private final Map<InetAddressAndPort, ArrayBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
private int maxInflightRequests = 5;
public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
if (requestsForAddress == null) {
requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
inflightRequests.put(to, requestsForAddress);
}
requestsForAddress.put(request);
...
}
}
一旦接收到响应,请求就会从队列中删除
class RequestLimitingPipelinedConnection {
...
private void consume(SocketRequestOrResponse response) {
Integer correlationId = response.getRequest().getCorrelationId();
Queue<RequestOrResponse> requestsForAddress = inflightRequests.get(response.getAddress());
RequestOrResponse first = requestsForAddress.peek();
if (correlationId != first.getCorrelationId()) {
throw new RuntimeException("First response should be for the first request");
}
requestsForAddress.remove(first);
responseConsumer.accept(response.getRequest());
}
}
保证处理故障和维护有序的实现就变得棘手。假设这里有两个未处理的请求。第一个请求失败并且重试,服务器可能会在第一个请求到达服务器之前处理第二个请求。服务器需要一些机制来确保无序的请求被拒绝。否则在故障和重试的情况下消息会有重排序的危险。例如,Raft 总是发送每个日志条目上的上一个日志索引。如果这个索引无法匹配,服务器就拒绝这个请求。Kafka 能够设置 max.in.flight.requests.per.connection
超过一个,使用幂等生成器实现,该实现为发送给代理(broker)的每个消息批分配唯一标识符。broker 然后就会检查这些过来的请求的序号,如果这个是无序的序号就会拒绝这个请求。