博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
04.InfluxDB系统化学习-TSDBStore
阅读量:4109 次
发布时间:2019-05-25

本文共 42939 字,大约阅读时间需要 143 分钟。

概述

在《》讲述了influxdb数据启动过程中加载的服务,其中在

cmd/influxdb/run/server.go中创建tsdb.NewStore对象时有以下代码

// 初始化存储结构--NewServer(c *Config, buildInfo *BuildInfo)    s.TSDBStore = tsdb.NewStore(c.Data.Dir)    s.TSDBStore.EngineOptions.Config = c.Data    // Open TSDB store.--初始化tsdb数据文件--func (s *Server) Open()    if err := s.TSDBStore.Open(); err != nil {        return fmt.Errorf("open tsdb store: %s", err)    }

来自官网的说明:

The storage engine ties a number of components together and provides the external interface for storing and querying series data. It is composed of a number of components that each serve a particular role:

  • In-Memory Index - The in-memory index is a shared index across shards that provides the quick access to , , and . The index is used by the engine, but is not specific to the storage engine itself.
  • WAL - The WAL is a write-optimized storage format that allows for writes to be durable, but not easily queryable. Writes to the WAL are appended to segments of a fixed size.
  • Cache - The Cache is an in-memory representation of the data stored in the WAL. It is queried at runtime and merged with the data stored in TSM files.
  • TSM Files - TSM files store compressed series data in a columnar format.
  • FileStore - The FileStore mediates access to all TSM files on disk. It ensures that TSM files are installed atomically when existing ones are replaced as well as removing TSM files that are no longer used.
  • Compactor - The Compactor is responsible for converting less optimized Cache and TSM data into more read-optimized formats. It does this by compressing series, removing deleted data, optimizing indices and combining smaller files into larger ones.
  • Compaction Planner - The Compaction Planner determines which TSM files are ready for a compaction and ensures that multiple concurrent compactions do not interfere with each other.
  • Compression - Compression is handled by various Encoders and Decoders for specific data types. Some encoders are fairly static and always encode the same type the same way; others switch their compression strategy based on the shape of the data.
  • Writers/Readers - Each file type (WAL segment, TSM files, tombstones, etc..) has Writers and Readers for working with the formats.

翻译如下:

一个TSM存储引擎包含:

In-Memory Index 在shard之间共享,提供measurements,tags,和series的索引

WAL 同其他database的binlog一样,当WAL的大小达到一定大小后,会重启开启一个WAL文件。

Cache 内存中缓存的WAL,加速查找

TSM Files 压缩后的series数据

FileStore TSM Files的封装

Compactor 存储数据的比较器

Compaction Planner 用来确定哪些TSM文件需要compaction,同时避免并发compaction之间的相互干扰

Compression 用于压缩持久化文件

Writers/Readers 用于访问文件

该代码主要是完成tsdb相关的创建和初始化,本节主要是针对tsdb做详细的介绍。

整体架构

当一个point写入时,influxdb根据其所属的database、measurements和timestamp选取一个对应的shard。每个 shard对应一个TSM存储引擎。每个shard对应一段时间范围的存储。

shard通过CreateShard 来创建。可以看出其依次创建了所需的文件目录,然后创建Index 和Shard 数据结构。

DataBase

数据库名,在 InfluxDB 中可以创建多个数据库,不同数据库中的数据文件是隔离存放的,存放在磁盘上的不同目录。

语法如下

CREATE DATABASE <database_name> [WITH [DURATION <duration>] [REPLICATION <n>] [SHARD DURATION <duration>] [NAME <retention-policy-name>]]

Retention Policy(RP)

存储策略,用于设置数据保留的时间,每个数据库刚开始会自动创建一个默认的存储策略 autogen,数据保留时间为永久,之后用户可以自己设置。插入和查询数据时如果不指定存储策略,则使用默认存储策略,且默认存储策略可以修改。InfluxDB 会定期清除过期的数据。核心作用有3个:指定数据的过期时间,指定数据副本数量以及指定ShardGroup Duration

语法如下

CREATE RETENTION POLICY <retention_policy_name> ON <database_name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] [DEFAULT]

DURATION子句确定InfluxDB保留数据的时间。 <duration>是持续时间字符串或INF(无限)。 保留策略的最短持续时间为1小时,最大持续时间为INF。

REPLICATION子句确定每个点的多少独立副本存储在集群中,其中n是数据节点的数量。该子句不能用于单节点实例。

SHARD DURATION子句确定shard group覆盖的时间范围。 <duration>是一个持续时间字符串,不支持INF(无限)持续时间。此设置是可选的。 默认情况下,shard group持续时间由保留策略的DURATION决定:

Retention Policy’s DURATION

Shard Group Duration

< 2 days

1 hour

>= 2 days and <= 6 months

1 day

> 6 months

7 days

// shardGroupDuration returns the default duration for a shard group based on a policy duration.func shardGroupDuration(d time.Duration) time.Duration {    if d >= 180*24*time.Hour || d == 0 { // 6 months or 0        return 7 * 24 * time.Hour    } else if d >= 2*24*time.Hour { // 2 days        return 1 * 24 * time.Hour    }    return 1 * time.Hour}// normalisedShardDuration returns normalised shard duration based on a policy duration.func normalisedShardDuration(sgd, d time.Duration) time.Duration {    // If it is zero, it likely wasn't specified, so we default to the shard group duration    if sgd == 0 {        return shardGroupDuration(d)    }    // If it was specified, but it's less than the MinRetentionPolicyDuration, then normalize    // to the MinRetentionPolicyDuration    if sgd < MinRetentionPolicyDuration {        return shardGroupDuration(MinRetentionPolicyDuration)    }    return sgd}

最小允许SHARD GROUP DURATION为1小时。 如果CREATE RETENTION POLICY查询尝试将SHARD GROUP DURATION设置为小于1小时且大于0,则InfluxDB会自动将SHARD GROUP DURATION设置为1h。 如果CREATE RETENTION POLICY查询尝试将SHARD GROUP DURATION设置为0,InfluxDB会根据上面列出的默认设置自动设置SHARD GROUP DURATION。

DEFAULT将新的保留策略设置为数据库的默认保留策略。此设置是可选的。

特点如下

1. RP是数据库级别而不是表级别的属性。这和很多数据库都不同。

2. 每个数据库可以有多个数据保留策略,但只能有一个默认策略。

3. 不同表可以根据保留策略规划在写入数据的时候指定RP进行写入,下面语句就指定six_mouth_rollup的rp进行写入:

curl -X POST '' --data-binary 'disk_free,hostname=server01 value=442221834240i 1435362189575692182'

4. 如果没有指定任何RP,则使用默认的RP。

Shard Group

Shard Group是InfluxDB中一个重要的逻辑概念,从字面意思来看Shard Group会包含多个Shard,每个Shard Group只存储指定时间段的数据,不同Shard Group对应的时间段不会重合。每个Shard Group对应多长时间是通过Retention Policy中字段”SHARD DURATION”指定的,如果没有指定,也可以通过Retention Duration(数据过期时间)计算出来。

设计Shard Group的目的

1. 将数据按照时间分割成小的粒度会使得数据过期实现非常简单,InfluxDB中数据过期删除的执行粒度就是Shard Group,系统会对每一个Shard Group判断是否过期,而不是一条一条记录判断。

2. 实现了将数据按照时间分区的特性。将时序数据按照时间分区是时序数据库一个非常重要的特性,基本上所有时序数据查询操作都会带有时间的过滤条件,比如查询最近一小时或最近一天,数据分区可以有效根据时间维度选择部分目标分区,淘汰部分分区。

Shard

         Shard Group实现了数据分区,但是Shard Group只是一个逻辑概念,在它里面包含了大量Shard,Shard才是InfluxDB中真正存储数据以及提供读写服务的概念,类似于HBase中Region,Kudu中Tablet的概念。在 InfluxDB 中按照数据的时间戳所在的范围,会去创建不同的 shard,每一个 shard 都有自己的 cache、wal、tsm file 以及 compactor,这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。

       删除:在 LSM Tree 中删除数据是通过给指定 key 插入一个删除标记的方式,数据并不立即删除,需要等之后对文件进行压缩合并时才会真正地将数据删除,所以删除大量数据在 LSM Tree 中是一个非常低效的操作。而在 InfluxDB 中,通过 retention policy 设置数据的保留时间,当检测到一个 shard 中的数据过期后,只需要将这个 shard 的资源释放,相关文件删除即可,这样的做法使得删除过期数据变得非常高效。

1. Shard是InfluxDB的存储引擎实现,具体称之为TSM(Time Sort Merge Tree) Engine,负责数据的编码存储、读写服务等。TSM类似于LSM,因此Shard和HBase Region一样包含Cache、WAL以及Data File等各个组件,也会有flush、compaction等这类数据操作。

2. InfluxDB采用了Hash分区的方法将落到同一个Shard Group中的数据再次进行了一次分区。这里特别需要注意的是,InfluxDB是根据hash(Series)将时序数据映射到不同的Shard,而不是根据Measurement进行hash映射,这样会使得相同Series的数据肯定会存在同一个Shard中,但这样的映射策略会使得一个Shard中包含多个Measurement的数据,不像HBase中一个Region的数据肯定都属于同一张表。

Cache

WAL

tsm file

compactor

代码分析

说明

         Store 是存储结构中最顶层的抽象结构体,主要包含了 InfluxDB 中所有数据库的 索引 和 实际存储数据的 Shard 对象。InfluxDB 中的其他服务都需要通过 Store 对象对底层数据进行操作。store是influxdb的存储模块,全局只有一个该实例。主要负责将数据按一定格式写入磁盘,并且维护influxdb相关的 存储概念。例如:创建/删除Shard、创建/删除retention policy、调度shard的compaction、以及最重要的WriteToShard 等等。store内部又包含indexengine2个抽象概念,index是对应shard的索引,engine是对应shard的存储实现, 不同的engine采用不同的存储格式和策略。tsdb其实就是一个engine的实现。在influxdb启动时,会创建 一个store实例,然后Open它,初始化时,它会加载已经存在的Shard ,并启动一个Shard监控任务, 监控任务负责调度每个Shard的Compaction和对使用inmem索引的Shard计算每种Tag拥有的数值的基数(与配置中max-values-per-tag有关)。

入口-创建store

cmd/influxdb/run/server.go中创建tsdb.NewStore,代码如下:

// 初始化存储结构--NewServer(c *Config, buildInfo *BuildInfo)s.TSDBStore = tsdb.NewStore(c.Data.Dir)s.TSDBStore.EngineOptions.Config = c.Data

tsdb/store.go中创建具体对象,代码如下:

// 包含了 InfluxDB 中所有数据库的 索引 和 实际存储数据的 Shard 对象// store是influxdb的存储模块,全局只有一个该实例。// 主要负责将数据按一定格式写入磁盘,并且维护influxdb相关的存储概念// Store manages shards and indexes for databases.type Store struct {    mu sync.RWMutex    shards map[uint64]*Shard // 所有 shards 的索引,key 为其 shard ID    databases map[string]struct{}    sfiles map[string]*SeriesFile    SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.    path string // 数据库文件在磁盘上的存储路径    // shared per-database indexes, only if using "inmem".    indexes map[string]interface{}    // Maintains a set of shards that are in the process of deletion.    // This prevents new shards from being created while old ones are being deleted.    pendingShardDeletes map[uint64]struct{}    EngineOptions EngineOptions    baseLogger *zap.Logger    Logger *zap.Logger    closing chan struct{}    wg sync.WaitGroup    opened bool}// 在store内部又包含index和engine2个抽象概念,// index是对应shard的索引,engine是对应shard的存储实现, 不同的engine采用不同的存储格式和策略。// NewStore returns a new store with the given path and a default configuration.// The returned store must be initialized by calling Open before using it.func NewStore(path string) *Store {    logger := zap.NewNop()    return &Store{        databases: make(map[string]struct{}),        path: path,        sfiles: make(map[string]*SeriesFile),        indexes: make(map[string]interface{}),        pendingShardDeletes: make(map[uint64]struct{}),        EngineOptions: NewEngineOptions(),        Logger: logger,        baseLogger: logger,    }}

在store的对象中定义了如下几个关键信息

shards

    每一个 Shard 对象都有一个单独的底层数据存储引擎,engine 负责和底层的文件数据打交道。Shard 还保存了一个指向所在数据库索引的指针,便于快速检索到该 Shard 中的元数据信息。存储引擎,抽象接口,可插拔设计,目前是 tsm1 存储引擎。Index也是一个抽象接口,可插拔设计,目前有inmem和tsi1。代码路径:tsdb/shard.go

// Shard represents a self-contained time series database. An inverted index of// the measurement and tag data is kept along with the raw time series data.// Data can be split across many shards. The query engine in TSDB is responsible// for combining the output of many shards into a single query result.type Shard struct {    path string // shard 在磁盘上的路径    walPath string // 对应的 wal 文件所在目录    id uint64 // shard ID,就是在磁盘上的文件名    database string // 所在数据库名    retentionPolicy string // 对应存储策略名    sfile *SeriesFile    options EngineOptions    mu sync.RWMutex    _engine Engine // 存储引擎,抽象接口,可插拔设计,目前是 tsm1 存储引擎    index Index    enabled bool    // expvar-based stats.    stats *ShardStatistics    defaultTags models.StatisticTags    baseLogger *zap.Logger    logger *zap.Logger    EnableOnOpen bool    // CompactionDisabled specifies the shard should not schedule compactions.    // This option is intended for offline tooling.    CompactionDisabled bool}

Engine

Engine 是一个抽象接口,可插拔设计,对于 InfluxDB 来说,可以很方便地替换掉底层的存储引擎,目前是 tsm1 存储引擎。代码路径:tsdb/engine.go

// Engine represents a swappable storage engine for the shard.type Engine interface {    Open() error    Close() error    SetEnabled(enabled bool)    SetCompactionsEnabled(enabled bool)    ScheduleFullCompaction() error    WithLogger(*zap.Logger)    LoadMetadataIndex(shardID uint64, index Index) error    CreateSnapshot() (string, error)    Backup(w io.Writer, basePath string, since time.Time) error    Export(w io.Writer, basePath string, start time.Time, end time.Time) error    Restore(r io.Reader, basePath string) error    Import(r io.Reader, basePath string) error    Digest() (io.ReadCloser, int64, error)    CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)    CreateCursorIterator(ctx context.Context) (CursorIterator, error)    IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)    WritePoints(points []models.Point) error    CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error    CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error    DeleteSeriesRange(itr SeriesIterator, min, max int64) error    DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error    MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)    SeriesSketches() (estimator.Sketch, estimator.Sketch, error)    SeriesN() int64    MeasurementExists(name []byte) (bool, error)    MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)    MeasurementFieldSet() *MeasurementFieldSet    MeasurementFields(measurement []byte) *MeasurementFields    ForEachMeasurementName(fn func(name []byte) error) error    DeleteMeasurement(name []byte) error    HasTagKey(name, key []byte) (bool, error)    MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)    TagKeyCardinality(name, key []byte) int    // Statistics will return statistics relevant to this engine.    Statistics(tags map[string]string) []models.Statistic    LastModified() time.Time    DiskSize() int64    IsIdle() bool    Free() error    io.WriterTo}

Index

代码路径:tsdb/index.go

type Index interface {    Open() error    Close() error    WithLogger(*zap.Logger)    Database() string    MeasurementExists(name []byte) (bool, error)    MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)    DropMeasurement(name []byte) error    ForEachMeasurementName(fn func(name []byte) error) error    InitializeSeries(keys, names [][]byte, tags []models.Tags) error    CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error    CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error    DropSeries(seriesID uint64, key []byte, cascade bool) error    DropMeasurementIfSeriesNotExist(name []byte) error    // Used to clean up series in inmem index that were dropped with a shard.    DropSeriesGlobal(key []byte) error    MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)    SeriesN() int64    SeriesSketches() (estimator.Sketch, estimator.Sketch, error)    SeriesIDSet() *SeriesIDSet    HasTagKey(name, key []byte) (bool, error)    HasTagValue(name, key, value []byte) (bool, error)    MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)    TagKeyCardinality(name, key []byte) int    // InfluxQL system iterators    MeasurementIterator() (MeasurementIterator, error)    TagKeyIterator(name []byte) (TagKeyIterator, error)    TagValueIterator(name, key []byte) (TagValueIterator, error)    MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)    TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)    TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error)    // Sets a shared fieldset from the engine.    FieldSet() *MeasurementFieldSet    SetFieldSet(fs *MeasurementFieldSet)    // Size of the index on disk, if applicable.    DiskSizeBytes() int64    // Bytes estimates the memory footprint of this Index, in bytes.    Bytes() int    // To be removed w/ tsi1.    SetFieldName(measurement []byte, name string)    Type() string    // Returns a unique reference ID to the index instance.    // For inmem, returns a reference to the backing Index, not ShardIndex.    UniqueReferenceID() uintptr    Rebuild()}

SeriesFile

SeriesFile其实叫SeriesKeyFile比较合适,里面存储了当前DB下的所有series key;其中的series key = (measurement + tag set)。文件路径是:

/var/lib/influxdb/data/${dbname}/_series

每个DB下面的series文件分成至多8partition, 每个partition下又分成多个Segment, 每个partition又对应一个内存索引

/**

    1.Influxdb将paritition数量定死了为 8, 就是说所有的serieskey放在这8个桶里

    2.计算SeriesKey的hash值然后取模parition个数 int(xxhash.Sum64(key) % SeriesFilePartitionN)

    3.所有这些partition的id是0 到 7, 每个partiton都有一个顺列号seq, 初始值为partition id + 1,

     这个顺列号就是放入这个parition中的seriese key对应的id,每次增加 8, 比如对于1号partition,

     第一个放入的series id就是2, 第二个就是10

    4.有了上面的规则,从seriese id上就很容易得到它属于哪个 partition:int((id - 1) % SeriesFilePartitionN)

**/

代码路径:tsdb/series_file.go

// SeriesFile represents the section of the index that holds series data.// 管理当前db下所有的SeriesePartition,// 提供了操作Series的公共接口,对外屏蔽了SeriesPartition和SeriesSegment的存在;// 所有查询操作,基本上都是首先定位到Partition, 然后再由partition代劳,partition使用index和segment来搞定type SeriesFile struct {    path string    partitions []*SeriesPartition    refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.    Logger *zap.Logger}

代码路径:tsdb/series_partition.go

// SeriesPartition represents a subset of series file data.type SeriesPartition struct {    mu sync.RWMutex    wg sync.WaitGroup    id int    path string    closed bool    closing chan struct{}    once sync.Once    segments []*SeriesSegment    index *SeriesIndex    seq uint64 // series id sequence    compacting bool    compactionsDisabled int    CompactThreshold int    Logger *zap.Logger}

SeriesIndex

SeriesIndex是对Partition下所有Segment file的内存索引,最主要的就是series key到 series id的map和series id到offset的map;

在内存中的Index数量超过阈值时,会在调用CreateSeriesListIfNoExists时被compact到磁盘文件;SeriesIndex对象在被初始化时会从磁盘文件中读取index, 在磁盘文件中的存储是按hash方式来定位写入的,使用的是mmap的方式;查找索引时先从内存查找在从磁盘文件查找。

// SeriesIndex represents an index of key-to-id & id-to-offset mappings.type SeriesIndex struct {    path string    count uint64    capacity int64    mask int64    maxSeriesID uint64    maxOffset int64    // 以下这三项用来mmap磁盘index到内存    data []byte // mmap data    keyIDData []byte // key/id mmap data    idOffsetData []byte // id/offset mmap data    // In-memory data since rebuild.    // 针对serieskey -> seriesid这个hash map, 存入时的key是series key, value是offset和id    keyIDMap *rhh.HashMap //series key到 series id的map    // 针对seriesid -> seriesoffset这个hash map, 存入时的key是series id, value是id和offset    idOffsetMap map[uint64]int64 //series id到offset的map    tombstones map[uint64]struct{}}func NewSeriesIndex(path string) *SeriesIndex {    return &SeriesIndex{        path: path,    }}

EngineOptions

代码路径:tsdb/engine.go

// EngineOptions represents the options used to initialize the engine.type EngineOptions struct {    EngineVersion string    IndexVersion string    ShardID uint64    InmemIndex interface{} // shared in-memory index    // Limits the concurrent number of TSM files that can be loaded at once.    OpenLimiter limiter.Fixed    // CompactionDisabled specifies shards should not schedule compactions.    // This option is intended for offline tooling.    CompactionDisabled bool    CompactionPlannerCreator CompactionPlannerCreator    CompactionLimiter limiter.Fixed    CompactionThroughputLimiter limiter.Rate    WALEnabled bool    MonitorDisabled bool    // DatabaseFilter is a predicate controlling which databases may be opened.    // If no function is set, all databases will be opened.    DatabaseFilter func(database string) bool    // RetentionPolicyFilter is a predicate controlling which combination of database and retention policy may be opened.    // nil will allow all combinations to pass.    RetentionPolicyFilter func(database, rp string) bool    // ShardFilter is a predicate controlling which combination of database, retention policy and shard group may be opened.    // nil will allow all combinations to pass.    ShardFilter func(database, rp string, id uint64) bool    Config Config    SeriesIDSets SeriesIDSets    FieldValidator FieldValidator    OnNewEngine func(Engine)    FileStoreObserver FileStoreObserver}// NewEngineOptions constructs an EngineOptions object with safe default values.// This should only be used in tests; production environments should read from a config file.func NewEngineOptions() EngineOptions {    return EngineOptions{        EngineVersion: DefaultEngine,        IndexVersion: DefaultIndex,        Config: NewConfig(),        WALEnabled: true,        OpenLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),    }}

加载store

// Open TSDB store.--初始化tsdb数据文件--func (s *Server) Open()if err := s.TSDBStore.Open(); err != nil {    return fmt.Errorf("open tsdb store: %s", err)}

Open()

代码路径:tsdb/engine.go

// 加载已经存在的Shard ,并启动一个Shard监控任务,// 监控任务负责调度每个Shard的Compaction和对使用inmem索引的Shard计算// 每种Tag拥有的数值的基数(与配置中max-values-per-tag有关)。// Open initializes the store, creating all necessary directories, loading all// shards as well as initializing periodic maintenance of them.func (s *Store) Open() error {    s.mu.Lock()    defer s.mu.Unlock()    if s.opened {        // Already open        return nil    }    s.closing = make(chan struct{})    s.shards = map[uint64]*Shard{}    s.Logger.Info("Using data dir", zap.String("path", s.Path()))    // Create directory.    if err := os.MkdirAll(s.path, 0777); err != nil {        return err    }    // 加载所有的Shard    if err := s.loadShards(); err != nil {        return err    }    s.opened = true    if !s.EngineOptions.MonitorDisabled {        s.wg.Add(1)        go func() {            s.wg.Done()            s.monitorShards()        }()    }    return nil}

loadShards()

代码路径:tsdb/store.go

func (s *Store) loadShards() error {    // res holds the result from opening each shard in a goroutine    type res struct {        s *Shard        err error    }    // Limit the number of concurrent TSM files to be opened to the number of cores.    // 限制并发打开tsm文件的数量为内核的数量    s.EngineOptions.OpenLimiter = limiter.NewFixed(runtime.GOMAXPROCS(0))    // Setup a shared limiter for compactions    lim := s.EngineOptions.Config.MaxConcurrentCompactions    if lim == 0 {        lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions        if lim < 1 {            lim = 1        }    }    // Don't allow more compactions to run than cores.    if lim > runtime.GOMAXPROCS(0) {        lim = runtime.GOMAXPROCS(0)    }    s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim)    compactionSettings := []zapcore.Field{zap.Int("max_concurrent_compactions", lim)}    throughput := int(s.EngineOptions.Config.CompactThroughput)    throughputBurst := int(s.EngineOptions.Config.CompactThroughputBurst)    if throughput > 0 {        if throughputBurst < throughput {            throughputBurst = throughput        }        compactionSettings = append(            compactionSettings,            zap.Int("throughput_bytes_per_second", throughput),            zap.Int("throughput_bytes_per_second_burst", throughputBurst),        )        s.EngineOptions.CompactionThroughputLimiter = limiter.NewRate(throughput, throughputBurst)    } else {        compactionSettings = append(            compactionSettings,            zap.String("throughput_bytes_per_second", "unlimited"),            zap.String("throughput_bytes_per_second_burst", "unlimited"),        )    }    s.Logger.Info("Compaction settings", compactionSettings...)    log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb_open")    defer logEnd()    t := limiter.NewFixed(runtime.GOMAXPROCS(0))    resC := make(chan *res)    var n int    // Determine how many shards we need to open by checking the store path.    // 确认需要加载的文件数量    dbDirs, err := ioutil.ReadDir(s.path)    if err != nil {        return err    }    for _, db := range dbDirs {        // 根据数据库名称获取文件存储路径        // 默认数据路径:dir = "/var/lib/influxdb/data/${dbname}"        dbPath := filepath.Join(s.path, db.Name())        if !db.IsDir() {            log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory"))            continue        }        if s.EngineOptions.DatabaseFilter != nil && !s.EngineOptions.DatabaseFilter(db.Name()) {            log.Info("Skipping database dir", logger.Database(db.Name()), zap.String("reason", "failed database filter"))            continue        }        // Load series file.        sfile, err := s.openSeriesFile(db.Name())        if err != nil {            return err        }        // Retrieve database index.        idx, err := s.createIndexIfNotExists(db.Name())        if err != nil {            return err        }        // Load each retention policy within the database directory.        // 读取数据库所有策略的目录        rpDirs, err := ioutil.ReadDir(dbPath)        if err != nil {            return err        }        // 遍历所有的rp路径        for _, rp := range rpDirs {            // 根据数据库名称+rp名称获取策略目录:/var/lib/influxdb/data/telegraf/7d_7h_rp            rpPath := filepath.Join(s.path, db.Name(), rp.Name())            if !rp.IsDir() {                log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory"))                continue            }            // The .series directory is not a retention policy.            if rp.Name() == SeriesFileDirectory { // 排除_series目录                continue            }            // 过滤掉不需要的rp目录            if s.EngineOptions.RetentionPolicyFilter != nil && !s.EngineOptions.RetentionPolicyFilter(db.Name(), rp.Name()) {                log.Info("Skipping retention policy dir", logger.RetentionPolicy(rp.Name()), zap.String("reason", "failed retention policy filter"))                continue            }            // 读取rpPath:/var/lib/influxdb/data/telegraf/7d_7h_rp            shardDirs, err := ioutil.ReadDir(rpPath)            if err != nil {                return err            }            // 遍历所有的shard目录            for _, sh := range shardDirs {                n++                go func(db, rp, sh string) {                    t.Take()                    defer t.Release()                    start := time.Now()                    // 获取shard全路径:/var/lib/influxdb/data/telegraf/7d_7h_rp/14                    path := filepath.Join(s.path, db, rp, sh)                    // 获取wal全路径:/var/lib/influxdb/wal/telegraf/7d_7h_rp/14                    walPath := filepath.Join(s.EngineOptions.Config.WALDir, db, rp, sh)                    // Shard file names are numeric shardIDs                    // 根据目录名称获取shardID                    shardID, err := strconv.ParseUint(sh, 10, 64)                    if err != nil {                        log.Info("invalid shard ID found at path", zap.String("path", path))                        resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)}                        return                    }                    if s.EngineOptions.ShardFilter != nil && !s.EngineOptions.ShardFilter(db, rp, shardID) {                        log.Info("skipping shard", zap.String("path", path), logger.Shard(shardID))                        resC <- &res{}                        return                    }                    // Copy options and assign shared index.                    opt := s.EngineOptions                    opt.InmemIndex = idx                    // Provide an implementation of the ShardIDSets                    opt.SeriesIDSets = shardSet{store: s, db: db}                    // Existing shards should continue to use inmem index.                    // 如果path路径下不存在index目录则认为使用InmemIndex                    if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) {                        opt.IndexVersion = InmemIndexName                    }                    // Open engine.                    shard := NewShard(shardID, path, walPath, sfile, opt)                    // Disable compactions, writes and queries until all shards are loaded                    shard.EnableOnOpen = false                    shard.CompactionDisabled = s.EngineOptions.CompactionDisabled                    shard.WithLogger(s.baseLogger)                    err = shard.Open()                    if err != nil {                        log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err))                        resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)}                        return                    }                    resC <- &res{s: shard}                    log.Info("Opened shard", zap.String("index_version", shard.IndexType()), zap.String("path", path), zap.Duration("duration", time.Since(start)))                }(db.Name(), rp.Name(), sh.Name())            }        }    }    // indexVersions tracks counts of the number of different types of index    // being used within each database.    indexVersions := make(map[string]map[string]int)    // Gather results of opening shards concurrently, keeping track of how    // many databases we are managing.    for i := 0; i < n; i++ {        res := <-resC        if res.s == nil || res.err != nil {            continue        }        s.shards[res.s.id] = res.s        s.databases[res.s.database] = struct{}{}        if _, ok := indexVersions[res.s.database]; !ok {            indexVersions[res.s.database] = make(map[string]int, 2)        }        indexVersions[res.s.database][res.s.IndexType()]++    }    close(resC)    // Check if any databases are running multiple index types.    for db, idxVersions := range indexVersions {        if len(idxVersions) > 1 {            var fields []zapcore.Field            for idx, cnt := range idxVersions {                fields = append(fields, zap.Int(fmt.Sprintf("%s_count", idx), cnt))            }            s.Logger.Warn("Mixed shard index types", append(fields, logger.Database(db))...)        }    }    // Enable all shards    for _, sh := range s.shards {        sh.SetEnabled(true)        if sh.IsIdle() {            if err := sh.Free(); err != nil {                return err            }        }    }    return nil}

openSeriesFile(database string)

代码路径:tsdb/store.go

// openSeriesFile either returns or creates a series file for the provided// database. It must be called under a full lock.func (s *Store) openSeriesFile(database string) (*SeriesFile, error) {    if sfile := s.sfiles[database]; sfile != nil {        return sfile, nil    }    // 实例化SeriiesFile对象    sfile := NewSeriesFile(filepath.Join(s.path, database, SeriesFileDirectory))    sfile.Logger = s.baseLogger    if err := sfile.Open(); err != nil {        return nil, err    }    s.sfiles[database] = sfile    return sfile, nil}

代码路径:tsdb/series_file.go

// NewSeriesFile returns a new instance of SeriesFile.func NewSeriesFile(path string) *SeriesFile {    return &SeriesFile{        path: path,        Logger: zap.NewNop(),    }}// Open memory maps the data file at the file's path.func (f *SeriesFile) Open() error {    // Wait for all references to be released and prevent new ones from being acquired.    f.refs.Lock()    defer f.refs.Unlock()    // Create path if it doesn't exist.    // 根据路径创建目录,并且完成授权,例如:/var/lib/influxdb/data/${dbname}/_series    if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil {        return err    }    // Open partitions.(SeriesFilePartitionN = 8):00~07    f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)    for i := 0; i < SeriesFilePartitionN; i++ {        p := NewSeriesPartition(i, f.SeriesPartitionPath(i))        p.Logger = f.Logger.With(zap.Int("partition", p.ID()))        if err := p.Open(); err != nil {            f.Close()            return err        }        f.partitions = append(f.partitions, p)    }    return nil}

代码路径:tsdb/series_partition.go

// NewSeriesPartition returns a new instance of SeriesPartition.func NewSeriesPartition(id int, path string) *SeriesPartition {    return &SeriesPartition{        id: id,        path: path,        closing: make(chan struct{}),        CompactThreshold: DefaultSeriesPartitionCompactThreshold,        Logger: zap.NewNop(),        seq: uint64(id) + 1,    }}// Open memory maps the data file at the partition's path.// 遍历目录下所有的segment file, 针对最新的一个segment作写入的初始化,构建内存indexfunc (p *SeriesPartition) Open() error {    if p.closed {        return errors.New("tsdb: cannot reopen series partition")    }    // Create path if it doesn't exist.    // 如果路径不存在则根据路径创建目录并且完成授权    // 路径:/var/lib/influxdb/data/${dbname}/_series/00    if err := os.MkdirAll(filepath.Join(p.path), 0777); err != nil {        return err    }    // Open components.    if err := func() (err error) {        // 遍历所有的segment        if err := p.openSegments(); err != nil {            return err        }        // Init last segment for writes.        // 从头开始读取当前segment, 计算读到结尾时的size,        // 在读的过程中作简单有效性校验,然后打开文件,文件写入的游标定位在文件结尾        if err := p.activeSegment().InitForWrite(); err != nil {            return err        }        // /var/lib/influxdb/data/${dbname}/_series/00/index        // 构建内存索引        p.index = NewSeriesIndex(p.IndexPath())        if err := p.index.Open(); err != nil {            return err        } else if p.index.Recover(p.segments); err != nil {            return err        }        return nil    }(); err != nil {        p.Close()        return err    }    return nil}func (p *SeriesPartition) openSegments() error {    // 路径:/var/lib/influxdb/data/${dbname}/_series/00    fis, err := ioutil.ReadDir(p.path)    if err != nil {        return err    }    // 读取SeriesPartition路径下的文件,比如0000    for _, fi := range fis {        // 根据文件名称获取其对应的hexidecimal值        segmentID, err := ParseSeriesSegmentFilename(fi.Name())        if err != nil {            continue        }        // 实例化NewSeriesSegment对象:tsdb/series_segment.go        segment := NewSeriesSegment(segmentID, filepath.Join(p.path, fi.Name()))        // 打开segment,把文件内容加入缓存中        if err := segment.Open(); err != nil {            return err        }        p.segments = append(p.segments, segment)    }    // Find max series id by searching segments in reverse order.    for i := len(p.segments) - 1; i >= 0; i-- {        // MaxSeriesID returns the highest series id in the segment.        if seq := p.segments[i].MaxSeriesID(); seq >= p.seq {            // Reset our sequence num to the next one to assign            p.seq = seq + SeriesFilePartitionN            break        }    }    // Create initial segment if none exist.    if len(p.segments) == 0 {        segment, err := CreateSeriesSegment(0, filepath.Join(p.path, "0000"))        if err != nil {            return err        }        p.segments = append(p.segments, segment)    }    return nil}

处理SeriesSegment

代码路径:tsdb/series_segment.go

实例化SeriesSegment

// SeriesSegment represents a log of series entries.// 由seriese entries的log会组成磁盘文件, 这个类就负责读写这个磁盘文件type SeriesSegment struct {    id uint16    path string    data []byte // mmap file    file *os.File // write file handle    w *bufio.Writer // bufferred file handle    size uint32 // current file size}// NewSeriesSegment returns a new instance of SeriesSegment.func NewSeriesSegment(id uint16, path string) *SeriesSegment {    return &SeriesSegment{        id: id,        path: path,    }}

SeriesSegment磁盘文件格式:

 

其中的flag有两个可能的值:

SeriesEntryInsertFlag:表示当前写入的SeriesKey是有效的;

SeriesEntryTombstoneFlag:墓碑标识。

创建SeriesSegment,并且打开一个SeriesSegment, 使用内存映射读到内存

// CreateSeriesSegment generates an empty segment at path.// 创建一个空segmentfunc CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) {    // Generate segment in temp location.    // 先创建 .initializing结尾的临时文件    f, err := os.Create(path + ".initializing")    if err != nil {        return nil, err    }    defer f.Close()    // Write header to file and close.    // 先头部,包括Magic, Version,定义头部的信息    // const (    //  SeriesSegmentVersion = 1    //  SeriesSegmentMagic = "SSEG"    //  SeriesSegmentHeaderSize = 4 + 1 // magic + version    // )    hdr := NewSeriesSegmentHeader()    if _, err := hdr.WriteTo(f); err != nil {        return nil, err        // 一个Segment文件需要预分配文件大小 ,最小4M, 最大256M    } else if err := f.Truncate(int64(SeriesSegmentSize(id))); err != nil {        return nil, err    } else if err := f.Close(); err != nil {        return nil, err    }    // Swap with target path.    if err := os.Rename(f.Name(), path); err != nil {        return nil, err    }    // Open segment at new location.创建并打开segment    segment := NewSeriesSegment(id, path)    if err := segment.Open(); err != nil {        return nil, err    }    return segment, nil}// Open memory maps the data file at the file's path.func (s *SeriesSegment) Open() error {    if err := func() (err error) {        // Memory map file data.        // 内存映射读到内存中        if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil {            return err        }        // Read header.        // 读头部并且校验Version        hdr, err := ReadSeriesSegmentHeader(s.data)        if err != nil {            return err        } else if hdr.Version != SeriesSegmentVersion {            return ErrInvalidSeriesSegmentVersion        }        return nil    }(); err != nil {        s.Close()        return err    }    return nil}// 初始化写操作 InitForWrite:从头开始读取当前segment, 计算读到结尾时的size,// 在读的过程中作简单有效性校验,然后打开文件,文件写入的游标定位在文件结尾// InitForWrite initializes a write handle for the segment.// This is only used for the last segment in the series file.func (s *SeriesSegment) InitForWrite() (err error) {    // Only calculcate segment data size if writing.    for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); {        flag, _, _, sz := ReadSeriesEntry(s.data[s.size:])        if !IsValidSeriesEntryFlag(flag) {            break        }        s.size += uint32(sz)    }    // Open file handler for writing & seek to end of data.    if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil {        return err    } else if _, err := s.file.Seek(int64(s.size), io.SeekStart); err != nil {        return err    }    s.w = bufio.NewWriterSize(s.file, 32*1024)    return nil}

处理SeriesIndex

代码路径:tsdb/series_index.go

// Open memory-maps the index file.func (idx *SeriesIndex) Open() (err error) {    // Map data file, if it exists.    if err := func() error {        if _, err := os.Stat(idx.path); err != nil && !os.IsNotExist(err) {            return err        } else if err == nil {            // 将index磁盘文件内存映射到idx.data            // /var/lib/influxdb/data/${dbname}/_series/00/index            if idx.data, err = mmap.Map(idx.path, 0); err != nil {                return err            }            // 读文件构造header            hdr, err := ReadSeriesIndexHeader(idx.data)            if err != nil {                return err            }            idx.count, idx.capacity, idx.mask = hdr.Count, hdr.Capacity, hdr.Capacity-1            idx.maxSeriesID, idx.maxOffset = hdr.MaxSeriesID, hdr.MaxOffset            // 通过header信息构造两个map的byte slice            idx.keyIDData = idx.data[hdr.KeyIDMap.Offset : hdr.KeyIDMap.Offset+hdr.KeyIDMap.Size]            idx.idOffsetData = idx.data[hdr.IDOffsetMap.Offset : hdr.IDOffsetMap.Offset+hdr.IDOffsetMap.Size]        }        return nil    }(); err != nil {        idx.Close()        return err    }    idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)    idx.idOffsetMap = make(map[uint64]int64)    idx.tombstones = make(map[uint64]struct{})    return nil}// Recover rebuilds the in-memory index for all new entries.// 在内存中构建索引func (idx *SeriesIndex) Recover(segments []*SeriesSegment) error {    // Allocate new in-memory maps.    idx.keyIDMap = rhh.NewHashMap(rhh.DefaultOptions)    idx.idOffsetMap = make(map[uint64]int64)    idx.tombstones = make(map[uint64]struct{})    // Process all entries since the maximum offset in the on-disk index.    minSegmentID, _ := SplitSeriesOffset(idx.maxOffset)    // 遍历每一个Segment    for _, segment := range segments {        if segment.ID() < minSegmentID {            continue        }        // 遍历Segment中的每一个SeriesEntry        if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error {            if offset <= idx.maxOffset {                return nil            }            // 每个SeriesEntry都用idx.execEntry处理            idx.execEntry(flag, id, offset, key)            return nil        }); err != nil {            return err        }    }    return nil}// 操作一个Entryfunc (idx *SeriesIndex) execEntry(flag uint8, id uint64, offset int64, key []byte) {    switch flag {    // 更新两个map    case SeriesEntryInsertFlag:        idx.keyIDMap.Put(key, id)        idx.idOffsetMap[id] = offset        if id > idx.maxSeriesID {            idx.maxSeriesID = id        }        if offset > idx.maxOffset {            idx.maxOffset = offset        }    case SeriesEntryTombstoneFlag:        idx.tombstones[id] = struct{}{}    default:        panic("unreachable")    }}

createIndexIfNotExists(db.Name()) 

// createIndexIfNotExists returns a shared index for a database, if the inmem// index is being used. If the TSI index is being used, then this method is// basically a no-op.func (s *Store) createIndexIfNotExists(name string) (interface{}, error) {    if idx := s.indexes[name]; idx != nil {        return idx, nil    }    sfile, err := s.openSeriesFile(name)    if err != nil {        return nil, err    }    idx, err := NewInmemIndex(name, sfile)    if err != nil {        return nil, err    }    s.indexes[name] = idx    return idx, nil}

处理shard

代码路径:tsdb/shard.go

// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type indexfunc NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {    db, rp := decodeStorePath(path)    logger := zap.NewNop()    if opt.FieldValidator == nil {        opt.FieldValidator = defaultFieldValidator{}    }    s := &Shard{        id: id,        path: path,        walPath: walPath,        sfile: sfile,        options: opt,        stats: &ShardStatistics{},        defaultTags: models.StatisticTags{            "path": path,            "walPath": walPath,            "id": fmt.Sprintf("%d", id),            "database": db,            "retentionPolicy": rp,            "engine": opt.EngineVersion,        },        database: db,        retentionPolicy: rp,        logger: logger,        baseLogger: logger,        EnableOnOpen: true,    }    return s}// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type indexfunc NewShard(id uint64, path string, walPath string, sfile *SeriesFile, opt EngineOptions) *Shard {    db, rp := decodeStorePath(path)    logger := zap.NewNop()    if opt.FieldValidator == nil {        opt.FieldValidator = defaultFieldValidator{}    }    s := &Shard{        id: id,        path: path,        walPath: walPath,        sfile: sfile,        options: opt,        stats: &ShardStatistics{},        defaultTags: models.StatisticTags{            "path": path,            "walPath": walPath,            "id": fmt.Sprintf("%d", id),            "database": db,            "retentionPolicy": rp,            "engine": opt.EngineVersion,        },        database: db,        retentionPolicy: rp,        logger: logger,        baseLogger: logger,        EnableOnOpen: true,    }    return s}// Open initializes and opens the shard's store.func (s *Shard) Open() error {    if err := func() error {        s.mu.Lock()        defer s.mu.Unlock()        // Return if the shard is already open        if s._engine != nil {            return nil        }        seriesIDSet := NewSeriesIDSet()        // Initialize underlying index.        ipath := filepath.Join(s.path, "index")        // 处理Index        idx, err := NewIndex(s.id, s.database, ipath, seriesIDSet, s.sfile, s.options)        if err != nil {            return err        }        idx.WithLogger(s.baseLogger)        // Open index.        if err := idx.Open(); err != nil {            return err        }        s.index = idx        // Initialize underlying engine.        e, err := NewEngine(s.id, idx, s.path, s.walPath, s.sfile, s.options)        if err != nil {            return err        }        // Set log output on the engine.        e.WithLogger(s.baseLogger)        // Disable compactions while loading the index        e.SetEnabled(false)        // Open engine.        if err := e.Open(); err != nil {            return err        }        // Load metadata index for the inmem index only.        if err := e.LoadMetadataIndex(s.id, s.index); err != nil {            return err        }        s._engine = e        return nil    }(); err != nil {        s.close()        return NewShardError(s.id, err)    }    if s.EnableOnOpen {        // enable writes, queries and compactions        s.SetEnabled(true)    }    return nil}

整体架构

参考

参考

 

 

转载地址:http://efasi.baihongyu.com/

你可能感兴趣的文章
git中文安装教程
查看>>
虚拟机 CentOS7/RedHat7/OracleLinux7 配置静态IP地址 Ping 物理机和互联网
查看>>
弱类型、强类型、动态类型、静态类型语言的区别是什么?
查看>>
Struts2技术内幕图书 转载
查看>>
Java异常分类
查看>>
项目中的jackson与json-lib使用比较
查看>>
Jackson Tree Model Example
查看>>
j2ee-验证码
查看>>
日志框架logj的使用
查看>>
js-高德地图规划路线
查看>>
常用js收集
查看>>
mydata97的日期控件
查看>>
如何防止sql注入
查看>>
maven多工程构建与打包
查看>>
springmvc传值
查看>>
Java 集合学习一 HashSet
查看>>
在Eclipse中查看Android源码
查看>>
Android-Socket登录实例
查看>>
Android使用webservice客户端实例
查看>>
层在页面中的定位
查看>>