Skip to the content.

Gossip Dissemination

随机选择集群中的一个节点传递信息来确保这些信息能到达所有集群其它所有的节点,而不会在网络中丢失。

问题

在集群中,每个节点都需要将自己的元数据传递到集群中其它的节点,并且不是通过依赖状态共享的。在一个大集群中,如果所有的服务器都能互相之间通信,这样就会消耗大量的宽带。即使某些网络连接出现问题,信息也应该到达所有节点。

解决方案

集群节点使用传播风格(gossip style)的通信来传播状态更新。每个节点选择一个随机节点来传递它的信息。这是一个有规律的间隔,比如每1秒。每一次,都选择一个随机节点来传递信息。

在大集群中,下面列出的都是需要考虑的:

正如下面注释中所讨论的,gossip style 的通信满足了所有这些需求。

(流行病、谣言和计算机通信)Epidemics, Rumours and Computer Communication

信息交换与 gossip 协议本质上是一致的。即使它的 Gossip 状态收敛(converges)得非常快,在整个集群识别新节点或检测到节点故障之前也会有一定的延迟。用 gossip 协议实现信息交换,需要容忍最终一致性。

对于要求强一致性的操作,就需要用到一致性核心

在相同的节点中使用这两种方法是一种常见的做法。例如 consul 对于组成员和失败探测就是用了 gossip 协议,但是也使用了基于 raft 一致性存储一个强一致性的服务目录

每个集群节点将元数据存储为与每个节点关联的键值对列表,如下所示:

class Gossip
  Map<NodeId, NodeState> clusterMetadata = new HashMap<>();

class NodeState
  Map<String, VersionedValue> values = new HashMap<>();

在启动时,每个集群节点添加关于自身的元数据,需要将其传播到其他节点。例如元数据可以是节点监听的 IP 地址和端口、它负责的分区等。Gossip 实例需要知道至少一个其他节点才能启动 gossip 通信。众所周知,集群节点可以用来初始化 Gossip 实例,它被称为种子节点或是导入器。任何节点都能作为这样的引入器。

class Gossip 
  public Gossip(InetAddressAndPort listenAddress, 
  			   List<InetAddressAndPort> seedNodes, 
  			   String nodeId) throw IOException {
      this.listenAddress = listenAddress;
      // 过滤这个节点本身,以防它是种子节点的一部分
      this.seedNodes = removeSelfAddress(seedNodes);
      this.nodeId = new NodeId(nodeId);
      addLocalState(GossipKeys.ADDRESS, listenAddress.toString());
      
      this.socketServer = new NIOSocketListener(newGossipRequestConsumer(), listenAddress);
  }
  
  private void addLocalState(String key, String value) {
      NodeState nodeState = clusterMetadata.get(listenAddress);
      if (nodeState == null) {
          nodeState = new NodeState();
          clusterMetadata.put(nodeId, nodeState);
      }
      nodeState.add(key, new VersionedValue(value, incremenetVersion()));
  }

每个集群节点调度一个作业,以将其拥有的元数据定期传输到其他节点。

class Gossip 
private ScheduledThreadPoolExecutor gossipExecutor = new ScheduledThreadPoolExecutor(1);
  private long gossipIntervalMs = 1000;
  private ScheduledFuture<?> taskFuture;
  public void start() {
      socketServer.start();
      taskFuture = gossipExecutor.scheduleAtFixedRate(()-> doGossip(),
                  gossipIntervalMs,
                  gossipIntervalMs,
                  TimeUnit.MILLISECONDS);
}

当调度任务被调用时,它会从元数据映射的服务器列表中随机选取一小组节点。一个小的常量定义为 gossip fanout,它决定了有多少个节点可以选定为 gossip 目标。如果什么都没有,它就随机选择一个种子节点,并将它的元数据映射发送到该节点。

class Gossip
  public void doGossip() {
      List<InetAddressAndPort> knownClusterNodes = liveNodes();
      if (knownClusterNodes.isEmpty()) {
          sendGossip(seedNodes, gossipFanout);
      } else {
          sendGossip(knownClusterNodes, gossipFanout);
      }
  }
  
  private List<InetAddressAndPort> liveNodes() {
      Set<InetAddressAndPort> nodes
              = clusterMetadata.values()
              .stream()
              .map(n -> InetAddressAndPort.parse(n.get(GossipKeys.ADDRESS).getValue()))
              .collect(Collectors.toSet());
      return removeSelfAddress(nodes);
  }
  private void sendGossip(List<InetAddressAndPort> knownClusterNodes, int gossipFanout) {
      if (knownClusterNodes.isEmpty()) {
          return;
      }

      for (int i = 0; i < gossipFanout; i++) {
          InetAddressAndPort nodeAddress = pickRandomNode(knownClusterNodes);
          sendGossipTo(nodeAddress);
      }
  }
  private void sendGossipTo(InetAddressAndPort nodeAddress) {
      try {
          getLogger().info("Sending gossip state to " + nodeAddress);
          SocketClient<RequestOrResponse> socketClient = new SocketClient(nodeAddress);
          GossipStateMessage gossipStateMessage
                  = new GossipStateMessage(this.clusterMetadata);
          RequestOrResponse request
                  = createGossipStateRequest(gossipStateMessage);
          byte[] responseBytes = socketClient.blockingSend(request);
          GossipStateMessage responseState = deserialize(responseBytes);
          merge(responseState.getNodeStates());

      } catch (IOException e) {
          getLogger().error("IO error while sending gossip state to " + nodeAddress, e);
      }
  }
  private RequestOrResponse createGossipStateRequest(GossipStateMessage gossipStateMessage) {
      return new RequestOrResponse(RequestId.PushPullGossipState.getId(),
              JsonSerDes.serialize(gossipStateMessage), correlationId++);
  }

集群节点接收 gossip 消息并检查其元数据就会发现三件事。

然后将缺失的值添加到自己的状态映射中。传入消息中缺少的任何值都将作为响应返回。

集群节点将发送 gossip 信息并从 gossip 返回得到的值添加到自己状态中。

class Gossip 
  private void handleGossipRequest(org.distrib.patterns.common.Message<RequestOrResponse> request) {
      GossipStateMessage gossipStateMessage = deserialize(request.getRequest());
      Map<NodeId, NodeState> gossipedState = gossipStateMessage.getNodeStates();
      getLogger().info("Merging state from " + request.getClientSocket());
      merge(gossipedState);

      Map<NodeId, NodeState> diff = delta(this.clusterMetadata, gossipedState);
      GossipStateMessage diffResponse = new GossipStateMessage(diff);
      getLogger().info("Sending diff response " + diff);
      request.getClientSocket().write(new RequestOrResponse(RequestId.PushPullGossipState.getId(),
                      JsonSerDes.serialize(diffResponse),
                      request.getRequest().getCorrelationId()));
  }
public Map<NodeId, NodeState> delta(Map<NodeId, NodeState> fromMap, Map<NodeId, NodeState> toMap) {
    Map<NodeId, NodeState> delta = new HashMap<>();
    for (NodeId key : fromMap.keySet()) {
        if (!toMap.containsKey(key)) {
            delta.put(key, fromMap.get(key));
            continue;
        }
        NodeState fromStates = fromMap.get(key);
        NodeState toStates = toMap.get(key);
        NodeState diffStates = fromStates.diff(toStates);
        if (!diffStates.isEmpty()) {
            delta.put(key, diffStates);
        }
    }
    return delta;
}
public void merge(Map<NodeId, NodeState> otherState) {
    Map<NodeId, NodeState> diff = delta(otherState, this.clusterMetadata);
    for (NodeId diffKey : diff.keySet()) {
        if(!this.clusterMetadata.containsKey(diffKey)) {
            this.clusterMetadata.put(diffKey, diff.get(diffKey));
        } else {
            NodeState stateMap = this.clusterMetadata.get(diffKey);
            stateMap.putAll(diff.get(diffKey));
        }
    }
}

该过程每秒钟在每个集群节点上发生一次,每次选择一个不同的节点来交换状态。

避免不必要的状态交换

上面代码展示的是在 Gossip 消息中发送的节点的完整状态。这对于新加入的节点来说没有问题,但是一旦状态是最新的,就没有必要发送完整的状态。集群节点只需要发送自上次 gossip 的状态更改。为此,每个节点都维护了一个版本号,该版本号在每次新的元数据被添加到本地的时候自增。

class Gossip...
  private int gossipStateVersion = 1;
  
  private int incrementVersion() {
  		return gossipStateVersion++;
  }

在集群里的每个值都是用版本号来维护的。这是 Versioned Value 模式的一个例子。

class Gossip...
	int version;
	String value;
	
	public VersionedValue(String value, int version) {
			this.version = version;
			this.value = value;
	}
	
	public int getVersion() {
			return version;
	}
	
	public String getValue() {
			return value;
	}

然后每个 Gossip 周期内能从一个指定的版本交换状态。

class Gossip...
	private void sendKnownVersions(InetAddressAndPort gossipTo) throws IOException {
			Map<NoteId, Integer> maxKnownNodeVersions = getMaxKnownNodeVersions();
			RequestOrResponse knownVersionRequest = new RequestOrResponse(RequestId.GossipVersions.getId(),
							JsonSerDes.serialize(new GossipStateVersions(maxKnownNodeVersions)), 0);
			SocketClient<RequestOrResponse> socketClient = new SocketClient(gossipTo);
			byte[] knownVersionResponseBytes = socketClient.blockingSend(knownVersionRequest);
	}
	
	private Map<NodeId, Integer> getMaxKnownNodeVersions() {
			return clusterMetadata.entrySet()
							.stream()
							.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().maxVersion()));
	}

class NodeState...
  public int maxVersion() {
  		return values.values().stream().map(v -> v.getVersion()).max(Comparator.naturalOrder()).orElse(0);
}

只有接收节点的版本大于请求中的版本时,才能发送值。

class Gossip...
	Map<NodeId, NodeState> getMissingAndStatesHigherThan(Map<NodeId, Integer> nodeMaxVersions) {
			Map<NodeId, NodeState> delta = new HashMap<>();
			delta.putAll(higherVersionNodeStates(nodeMaxVersions));
			delta.putAll(missingNodeStates(nodeMaxVersions));
			return delta;
	}
	
	private Map<NodeId, NodeState> missingNodeStates(Map<NodeId, Integer> nodeMaxVersions) {
			Map<NodeId, NodeState> delta = new HashMap<>();
			List<NodeId> missingKeys = clusterMetadata.keySet().stream().filter(key -> !nodeMaxVersions.containsKey(key)).collect(Collectors.toList());
			for (NodeId missingKey : missingKeys) {
					delta.put(missingKey, clusterMetadata.get(missingKey));
			}
			return delta;
	}
	
	private Map<NodeId, NodeState> higherVersionNodeStates(Map<NodeId, Integer> nodeMaxVersions) {
			Map<NodeId, NodeState> delta = new HashMap<>();
      Set<NodeId> keySet = nodeMaxVersions.keySet();
      for (NodeId node : keySet) {
      		Integer maxVersion = nodeMaxVersions.get(node);
      		NodeState nodeState = clusterMetadata.get(node);
      		if (nodeState == null) {
      				continue;
      		}
      		NodeState deltaState = nodeState.statesGreaterThan(maxVersion);
      		if (!deltaState.isEmpty()) {
      				delta.put(node, deltaState);
      		}
      }
      return delta;
	}

cassandra 中的 Gossip 实现通过三次握手优化了状态交换,接收 gossip 消息的节点还会从发送者那里发送它(节点)需要的版本,以及它返回的元数据(where the node receiving the gossip message also sends the versions it needs from the sender, along with the metadata it returns)。发送者能立即响应请求的元数据。这能避免在其他情况下需要额外的消息。

gossip 协议使用在 cockroachdb 中为每个已连接的节点维护状态。对于每个连接,它维护发送到该节点的最后一个版本,以及从该节点接收到的版本。这样它就可以发送“自上个版本发送的状态”并请求“自上个接收版本的状态”。

还可以使用其他一些有效的替代方案,如发送整个 Map 的哈希,如果哈希相同,则不执行任何操作。

节点选择 gossip 的条件

集群节点随机选择节点发送 Gossip 消息。可以用 java 中的 java.util.Random 来实现:

class Gossip...
	private Random random = new Random();
	private InetAddressAndPort pickRandomNode(List<InetAddressAndPort> knownClusterNodes) {
			int randomNodeIndex = random.nextInt(knownClusterNodes.size());
			InetAddressAndPort gossipTo = knownClusterNodes.get(randomNodeIndex);
			return gossipTo;
	}

还可以考虑其他因素,比如最少接触的节点。例如 Gossip 协议在 Cockroachdb 就是采用的这种方式。

也有网络拓扑感知(network-topology-aware)的 gossip 目标选择方式存在。

这些都可以在 pickRandomNode() 方法中模块化地实现。

组成员和故障检测

在集群中维护这些可用的节点集合是 Gossip 协议中最通用的一种用法。里面用到以下两种方法。

节点重启处理

如果节点宕机或重启了,版本值就不能正常工作了,在内存中的所有状态都会丢失。更重要的是,对于相同的 key,这个节点会存在不同的值。例如,集群节点用了不同的 IP 和端口启动,或是用了不同的配置启动。 Generation Clock 可以用来标记生成的每个值,以便当元数据状态发送给一个随机的集群节点,接收端节点不仅可以通过版本号检测到变化,还可以通过生成来检测。

值得注意的是这个机制对于核心 gossip 协议不是必须的。但是在实践中要实现它是为了确保状态更新能够被正确追踪。

例子

cassandra 使用 Gossip 协议对集群节点进行组成员和故障检测。每个集群节点的元数据(例如分配给每个集群节点的令牌)也使用 Gossip协议进行传输。

consul 使用了 swim-gossip 协议对 consul 代理的组成员和故障检测。

cockroachdb 使用 Gossip 协议去传播节点元数据

Hyperledger Fabric 这样的区块链对于组成员和发送分类账(ledger)元数据的实现也是使用 Gossip 协议。

原文链接

https://martinfowler.com/articles/patterns-of-distributed-systems/gossip-dissemination.html