Skip to the content.

版本值(Versioned Value)

用一个新的版本号来记录每次值的更新,允许读取历史值。

问题

在分布式系统中,节点需要能够知道键的哪个值是最新的。有时他们需要知道过去的值,以便能够正确地对值的变化作出反应。

解决方案

为每个值都存储一个版本值。这个版本编号在每次更新的时候自增。它允许每次更新都转换为新的写操作,而不会阻塞读操作。客户端能通过指定的版本号读取历史值。

思考一个复制键值存储的简单示例。集群 leader 处理对键值存储的所有写操作。它把所有写操作都存进 WAL 中。在主从下 WAL 是有多个的。Leader 将来自高水位标记的 WAL 日志的条目应用到键值存储中。这种标准复制方法被称为状态机复制(state-machine-replication)。大多数由共识算法(如 Raft )支持的数据系统都是这样实现的。在这种情况下,键值存储保持一个整数版本计数器。每次从预写日志应用键值写命令时,它都会增加版本计数器。然后用自增后的版本计数器构造新键。这样就不会更新现有的值,但是每个写请求都会将新值添加到后备存储(backing store)中。

class ReplicatedKVStore...
  int version = 0;
  MVCCStore mvccStore = new MVCCStore();
  
  @ovveride
  public CompletableFuture<Response> put(String key, String value) {
      return server.propose(new SetValueCommand(key, value));
  }
  
  private Response applySetValueCommand(SetValueCommand setValueCommand) {
  	  getLogger().info("Setting key value " + setValueCommand);
  	  version = version + 1;
      mvccStore.put(new VersionedKey(setValueCommand.getKey(), version), setValueCommand.getValue());
      Response response = Response.success(version);
      return response;
  }

版本键的排序

因为快速导航到最佳匹配版本是一个重要的实现关注事项,所以版本控制的键的排列方式是,通过使用版本号作为键的后缀,形成自然的顺序。这维护的顺序能很好的与底层数据结构匹配。例如,如果一个键有两个版本,key1 和 key2,那么 key1 将被排序在 key2 之前。

为了存储版本化的键值,需要使用一个数据结构,例如调表,它允许快速导航到最近的匹配版本。在 Java 中,mvcc 存储可以构建如下:

class MVVCStore...
  public class MVCCStore {
      NavigableMap<VersionedKey, String> kv = new ConcurrentSkipListMap<>();
      
      public void put(VersionedKey key, String value) {
          kv.put(key, value);
      }
  }

通过导航哈希表来执行,VersionedKey 是通过下面这样实现的。它实现了一个比较器来让 keys 排序

class VersionedKey...
  public class VersionedKey implements Comparable<VersionedKey> {
  	  private String key;
      private int version;
      
      public VersionedKey(String key, int version) {
          this.key = key;
          this.version = version;
      }
      
      public String getKey() {
          return key;
      }
  
      public int getVersion() {
          return version;
      }
  
      @Override
      public int compareTo(VersionedKey other) {
          int keyCompare = this.key.compareTo(other.key);
          if (keyCompare != 0) {
              return keyCompare;
          }
          return Integer.compare(this.version, other.version);
      }
  }

这个实现允许使用可导航的 map API 获取特定版本的值。

class MVCCStore 
  public Optional<String> get(final String key, final int readAt) {
      Map.Entry<VersionedKey, String> entry = kv.floorEntry(new VersionedKey(key, readAt));
      return (entry == null)? Optional.empty(): Optional.of(entry.getValue());
  }

思考一个示例,其中一个键有四个版本,分别存储在版本号1、2、3 和 5 中。根据客户端读取值时使用的版本,返回最匹配的键版本。

将存储特定键值的版本返回给客户端。然后客户端拿这个版本来读取值。总体工作如下。

​ put 请求处理

​ 读取特定版本对应的值

多个版本读

有时客户端需要从给定的版本号获取所有版本。例如,在状态监控中,客户端需要从特定版本获取所有事件。

集群节点可以存储额外的索引结构来存储键的所有版本。

class IndexedMVCCStore 
  public class IndexedMVCCStore {
      NavigableMap<String, List<Integer>> keyVersionIndex = new TreeMap<>();
      NavigableMap<VersionedKey, String> kv = new TreeMap<>();
  
      ReadWriteLock rwLock = new ReentrantReadWriteLock();
      int version = 0;
      
      public int put(String key, String value) {
          rwLock.writeLock().lock();
          try {
              version = version + 1;
              kv.put(new VersionedKey(key, version), value);
  
              updateVersionIndex(key, version);
  
              return version;
          } finally {
              rwLock.writeLock().unlock();
          }
      }
      
      private void updateVersionIndex(String key, int newVersion) {
          List<Integer> versions = getVersions(key);
          versions.add(newVersion);
          keyVersionIndex.put(key, versions);
      }
      
      private List<Integer> getVersions(String key) {
          List<Integer> versions = keyVersionIndex.get(key);
          if (versions == null) {
              versions = new ArrayList<>();
              keyVersionIndex.put(key, versions);
          }
          return versions;
      }
  }

然后,可以提供一个客户端 API 来读取特定版本或版本范围的值。

class IndexedMVCCStore 
  public List<String> getRange(String key, final int fromRevision, int toRevision) {
  	  rwLock.readLock().lock();
      try {
          List<Integer> versions = keyVersionIndex.get(key);
          Integer maxRevisionForKey = versions.stream().max(Integer::compareTo).get();
          Integer revisionToRead = maxRevisionForKey > toRevision ? toRevision : maxRevisionForKey;
          SortedMap<VersionedKey, String> versionMap = kv.subMap(new VersionedKey(key, revisionToRead), new VersionedKey(key, toRevision));
          getLogger().info("Available version keys " + versionMap + ". Reading@" + fromRevision + ":" + toRevision);
          return new ArrayList<>(versionMap.values());
          
      } finally {
          rwLock.readLock().unlock();
      }
  }

从索引更新和读取数据时,必须注意使用适当的锁定。

还有令一种替代的实现,可以用键保存所有版本化值的列表,就像在 Gossip distribution 中使用的那样,以避免不必要的状态交换。

MVCC 和事务隔离

数据库使用 Versioned Value 实现 MVCC事务隔离

并发控制是关于当有多个并发请求访问相同数据时如何使用锁定。当锁用于同步访问时,所有其他请求都会被阻塞,直到持有锁的请求完成并释放锁为止。使用Versioned Value,每个写请求都会添加一条新记录(都有自己在那个时刻的数据版本)。这允许使用非阻塞数据结构来存储值。

事务隔离级别(如快照隔离)也可以自然实现。当客户端以特定版本开始读取时,即使存在多个读请求之间提交不同值的并发写事务,它保证每次从数据库读取时都获得相同的值。

​ 快照读

使用像 RocksDb 存储引擎

使用 rocksdb 或类似的嵌入式存储引擎作为数据存储的存储后端是非常常见的。例如,etcd 使用 boltdbcockroachdb 更早使用 rocksdb,现在使用的是 rocksdb的 go-lang 版本的克隆产品 pebble

这些存储引擎提供了适合存储 versioned 值的实现。与上一节描述的相同它们内部使用跳表的方式,并依赖于键的顺序。也可以提供自定义比较器为键排序。

class VersionedKeyComparator 
  public class VersionedKeyComparator extends Comparator {
  	  public VersionedKeyComparator() {
          super(new ComparatorOptions());
      }
  
      @Override
      public String name() {
          return "VersionedKeyComparator";
      }
  
      @Override
      public int compare(Slice s1, Slice s2) {
          VersionedKey key1 = VersionedKey.deserialize(ByteBuffer.wrap(s1.data()));
          VersionedKey key2 = VersionedKey.deserialize(ByteBuffer.wrap(s2.data()));
          return key1.compareTo(key2);
      }
  }

使用 rocksdb 可以实现如下操作:

class RocksDBMvccStore 
  private final RocksDB db;
  
  public RocksDBMvccStore(File cacheDir) throws RocksDBException {
      Options options = new Options();
      options.setKeepLogFileNum(30);
      options.setCreateIfMissing(true);
      options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
      options.setComparator(new VersionedKeyComparator());
      db = RocksDB.open(options, cacheDir.getPath());
  }

  public void put(String key, int version, String value) throws RocksDBException {
      VersionedKey versionKey = new VersionedKey(key, version);
      db.put(versionKey.serialize(), value.getBytes());
  }

  public String get(String key, int readAtVersion) {
      RocksIterator rocksIterator = db.newIterator();
      rocksIterator.seekForPrev(new VersionedKey(key, readAtVersion).serialize());
      byte[] valueBytes = rocksIterator.value();
      return new String(valueBytes);
  }

例子

etcd3 背后使用 MVCC 与一个单一的整数表示一个版本。

mongodbcockroachdb 背后使用 MVCC 混合逻辑时钟。