Skip to the content.

从读(Follower Reads)

在 follower 提供读请求服务,以实现更好的吞吐量和更低的延迟。

问题

当使用 Leader 和 follower 模式时,如果向 Leader 发送了太多请求,可能会导致 Leader 负载过大。此外,设置了多数据中心,位于远程数据中心的客户端,向leader 的请求将受到额外的延迟。

解决方案

当写请求需要转到 leader 以保持一致性时,只读请求可以转到最近的 follower。当客户端大部分是只读时,这特别有用。

值得注意的是,从 follower 中读取的客户端可能会得到旧的值。leader 和 follower 之间总会有一个复制延迟(replication lag),即使是在像 raft 这样实现共识算法的系统。这是因为,即使 leader 知道提交了哪些值,它也需要另外一条消息来与 follower 交互(通知)。所以从 follower 服务器读取数据只在允许稍微老一点的值的情况下使用。

​ 从最近的从节点读取数据

查找最近复制点

关于这些节点的位置,集群节点维护了额外的元数据信息。

class ReplicaDescriptor 
  public class ReplicaDescriptor {
      public ReplicaDescriptor(InetAddressAndPort address, String region) {
          this.address = address;
          this.region = region;
      }
      InetAddressAndPort address;
      String region;
  
      public InetAddressAndPort getAddress() {
          return address;
      }
  
      public String getRegion() {
          return region;
      }
  }

集群客户端可以基于自己的区域选择本地副本。

class ClusterClient
  public List<String> get(String key) {
      List<ReplicaDescriptor> allReplicas = allFollowerReplicas(key);
      ReplicaDescriptor nearestFollower = findNearestFollowerBasedOnLocality(allReplicas, clientRegion);
      GetValueResponse getValueResponse = sendGetRequest(nearestFollower.getAddress(), new GetValueRequest(key));
      return getValueResponse.getValue();
  }

  ReplicaDescriptor findNearestFollowerBasedOnLocality(List<ReplicaDescriptor> followers, String clientRegion) {
      List<ReplicaDescriptor> sameRegionFollowers = matchLocality(followers, clientRegion);
      List<ReplicaDescriptor> finalList = sameRegionFollowers.isEmpty()?followers:sameRegionFollowers;
      return finalList.get(0);
  }

  private List<ReplicaDescriptor> matchLocality(List<ReplicaDescriptor> followers, String clientRegion) {
      return followers.stream().filter(rd -> clientRegion.equals(rd.region)).collect(Collectors.toList());
  }

例如,如果有两个 follower 副本,一个在美国西部地区,另一个在美国东部地区。来自西部区域的客户端,将连接到东部区域的副本。

class CausalKVStoreTest 
  public void getFollowersInSameRegion() {
      List<ReplicaDescriptor> followers = createReplicas("us-west", "us-east");
      ReplicaDescriptor nearestFollower = new ClusterClient(followers, "us-east").findNearestFollower(followers);
      assertEquals(nearestFollower.getRegion(), "us-east");
  }

集群客户端或协调的集群节点也可以跟踪观察集群节点的延迟。它可以定期发送 HeartBeat 来捕获延迟,并使用它来获取具有最小延迟的从节点。为了做出更公平的选择,mongodbcockroachdb 等产品将延迟计算为移动平均数。集群节点通常维护一个 Single Socket 通道来与其他集群节点通信。Single Socket 通道需要一个心跳来保持连接活跃。因此,捕获延迟和计算移动平均可以很容易地实现。

class WeightedAverage 
  public class WeightedAverage {
      long averageLatencyMs = 0;
      public void update(long heartbeatRequestLatency) {
          //Mongodb 使用加权平均实现的例子
          //运行时,心跳消息到目标节点的加权平均往返时间。
          // 加权80%为旧往返时间,20%为新往返时间。
          averageLatencyMs = averageLatencyMs == 0
                  ? heartbeatRequestLatency
                  : (averageLatencyMs * 4 + heartbeatRequestLatency) / 5;
      }
  
      public long getAverageLatency() {
          return averageLatencyMs;
      }
  }

class ClusterClient 
  private Map<InetAddressAndPort, WeightedAverage> latencyMap = new HashMap<>();
  private void sendHeartbeat(InetAddressAndPort clusterNodeAddress) {
      try {
          long startTimeNanos = System.nanoTime();
          sendHeartbeatRequest(clusterNodeAddress);
          long endTimeNanos = System.nanoTime();

          WeightedAverage heartbeatStats = latencyMap.get(clusterNodeAddress);
          if (heartbeatStats == null) {
              heartbeatStats = new WeightedAverage();
              latencyMap.put(clusterNodeAddress, new WeightedAverage());
          }
          heartbeatStats.update(endTimeNanos - startTimeNanos);

      } catch (NetworkException e) {
          logger.error(e);
      }
  }

这个延迟信息可以用来选择网络延迟最小的从节点。

class ClusterClient 
  ReplicaDescriptor findNearestFollower(List<ReplicaDescriptor> allFollowers) {
      List<ReplicaDescriptor> sameRegionFollowers = matchLocality(allFollowers, clientRegion);
      List<ReplicaDescriptor> finalList
              = sameRegionFollowers.isEmpty() ? allFollowers
                                                :sameRegionFollowers;
      return finalList.stream().sorted((r1, r2) -> {
          if (!latenciesAvailableFor(r1, r2)) {
              return 0;
          }
          return Long.compare(latencyMap.get(r1).getAverageLatency(),
                              latencyMap.get(r2).getAverageLatency());

      }).findFirst().get();
  }

  private boolean latenciesAvailableFor(ReplicaDescriptor r1, ReplicaDescriptor r2) {
      return latencyMap.containsKey(r1) && latencyMap.containsKey(r2);
  }

断开或慢的从节点

follower 可能会与 leader 断开连接,停止更新。在某些情况下,缓慢的磁盘可能会阻碍整个复制过程,从而导致慢盘落后于 leader。follower 可以跟踪是否在一段时间内没有收到 leader 的消息,并停止为用户请求服务。

例如,像 mongodb 这样的产品允许选择一个具有最大允许延迟时间的副本。如果副本滞后于 leader 的时间超过这个最大时间,它就不会被选择为请求服务。在kafka 中,如果 follower 检测到消费者请求的偏移量太大,它会响应 OFFSET_OUT_OF_RANGE 错误。然后消费者与 leader 通信。

读自己的写

从 follower 服务器读可能会有问题,因为在常见的场景中,客户端写了一些内容,然后立即尝试读取它,这可能会带来令人惊讶的结果。

假设有一个客户注意到一些图书数据错误地包含了 “title”:“Nitroservices”。它通过一个写一个 “title”: “Microservices” 来纠正这一点,这是给 leader 的。然后它立即读这个值,但是读请求会转到一个可能还没有来得及更新的 follower

​ 从 follower 中读取旧值

这是一个常见的问题。例如,直到最近,Amazon S3 都没有阻止这种情况的发生。

为了解决这个问题,每次写入时,服务器不仅存储新值,而且还存储单调增长的版本戳(version stamp)。这个值(stamp)可以是高水位标记混合时钟。服务器在对写请求的响应中返回这个存储值的版本戳。然后,如果客户端希望稍后读取该值,它将版本戳作为其读请求的一部分。如果读请求转到一个 follower,它会检查它的存储值,看它是否等于或高于请求的版本戳记。如果不是,它会等到有了最新版本后再返回。这样,客户端将始终读取与它们写入的值一致的值—这通常称为读自己的写一致性

请求的流程如下所示:为了纠正错误的写值,将 “title”:“Microservices“ 写给 leader。leader 在响应中将版本 2 返回给客户端。当客户端试图读取 “title” 的值时,它在请求中传递版本号 2。接收请求的 follower 服务器检查它自己的版本号是否是最新的。因为 follower 服务器上的版本号仍然是 1,所以它会等待,直到它从 leader 那里得到那个版本。一旦它有了匹配的(或更高的)版本,它就完成读请求,并返回值。

​ 从节点读取自己的写

简直存储的代码如下所示。值得注意的是 follower 可能会落后太多或与 leader 断开连接。所以它不会无限期地等待。会配置超时值。如果跟踪服务器不能在超时时间内获得更新,将返回一个错误响应给客户端。然后,客户端可以重试从其他从节点读取数据。

class ReplicatedKVStore 
  Map<Integer, CompletableFuture> waitingRequests = new ConcurrentHashMap<>();
  public CompletableFuture<Optional<String>> get(String key, int atVersion) {
      if(this.server.serverRole() == ServerRole.FOLLOWING) {
          //check if we have the version with us;
          if (!isVersionUptoDate(atVersion)) {
              //wait till we get the latest version.
              CompletableFuture<Optional<String>> future = new CompletableFuture<>();
              //Timeout if version does not progress to required version
              //before followerWaitTimeout ms.
              future.orTimeout(config.getFollowerWaitTimeoutMs(), TimeUnit.MILLISECONDS);
              waitingRequests.put(atVersion, future);
              return future;
          }
      }
      return CompletableFuture.completedFuture(mvccStore.get(key, atVersion));
  }

  private boolean isVersionUptoDate(int atVersion) {
      Optional<Integer> maxVersion = mvccStore.getMaxVersion();
      return maxVersion.map(v -> v >= atVersion).orElse(false);
  }

一旦键值存储传递给客户端请求的版本,它就可以向客户端发送响应。

class ReplicatedKVStore 
  private Response applyWalEntry(WALEntry walEntry) {
      Command command = deserialize(walEntry);
      if (command instanceof SetValueCommand) {
          return applySetValueCommandsAndCompleteClientRequests((SetValueCommand) command);
      }
      throw new IllegalArgumentException("Unknown command type " + command);
  }

  private Response applySetValueCommandsAndCompleteClientRequests(SetValueCommand setValueCommand) {
      getLogger().info("Setting key value " + setValueCommand);
      version = version + 1;
      mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
      completeWaitingFuturesIfFollower(version, setValueCommand.getValue());
      Response response = Response.success(version);
      return response;
  }

  private void completeWaitingFuturesIfFollower(int version, String value) {
      CompletableFuture completableFuture = waitingRequests.remove(version);
      if (completableFuture != null) {
          completableFuture.complete(Optional.of(value));
      }
  }

线性读

有时读请求需要获得最新的可用数据。复制延迟是不能容忍的。在这些情况下,需要将读请求转到 leader。这是一致性核心解决的一个常见设计问题。

例子

原文

https://martinfowler.com/articles/patterns-of-distributed-systems/follower-reads.html