数据导入

定义数据源

数据格式

  • 时间列( timestamp )

每个数据集合都必须有时间列. 这个列是数据聚合的重要维度. 所有的查询都需要指定查询时间范围

  • 维度列( dimension)

用来标识一些事件 (event), 主要用于过滤或者切片数据. 通常为 字符串类型

  • 指标列( metric )

即用于聚合和计算的列. 通常为 数值类型. 如 Count, Sum, Mean 等

数据导入方式

  • 实时 (Kafka)
  • 批处理( HDFS, CSV, JSON 等)

数据查询

原生是使用 JSON 格式, 通过 HTTP 传送.

构架

  • 分布式, Lambda 架构.
  • 实时数据处理: 面向 写多读少 的优化
  • 批处理部分是: 面向 读多写少 的优化
  • 采用 Shared nothing 结构
  • 使用 zookeeper 协调
  • 使用 MySQL 提供元数据存储

自身节点组成

  • 实时节点 ( realtime node ) : 导入实时数据, 以及生成 Segment 数据文件
  • 历史节点 ( historical node ) : 加载已生成好的数据文件, 以供数据查询
  • 查询节点 ( broker node ) : 对外提供数据查询服务, 并同时从实时节点与历史节点查询数据, 合并后返回给调用方
  • 协调节点 ( coordinator node ) : 负责历史节点的数据负载均衡, 以及通过规则管理数据的生命周期

外部依赖

  • 元数据库 (Metastore) : 存储 Druid 集群的元数据信息. 比如 Segment , 一般用 MySQLPostgreSQL
  • 分布式协调服务 (Coordination) : 为Druid集群提供一致性协调服务的组件, 通常为 Zookeeper
  • 数据文件存储库 ( deep storage ) : 存放生成的 Segment 文件, 并供历史节点下载. 单节点集群可以是本地磁盘, 分布式集群一般是 HDFSNFS

Segement

结构

  • 横向切割 : 通过参数 segmentGranularity , Druid 将不同时间范围内的数据存储在不同的 segment 数据块中
  • 纵向切割 : 在 Segment 中也面向列进行数据压缩存储

制造与传播

  1. 实时节点生产出 Segment 数据文件, 并将其上传到 Deep Storage
  2. Segment 数据文件的元数据被存放在 MetaStore (即 MySQL 或 PostgreSQL )
  3. Master 节点 ( coordinator 节点) 从 metastore 里得知 Segment 数据文件的相关元信息后, 将其根据规则的设置分配给符合条件的历史节点
  4. 历史节点得到指令后会主动从 deep storage 中摘取指定的 segment 数据文件, 并通过 zookeeper 向集群声明负责提供该 segment 数据文件的查询服务
  5. 实时节点丢弃该 segment 数据文件, 并向集群声明其不再提供该 segment 数据文件的查询服务

扩展系统

下载扩展

java -classpath "/my/druid/lib/*" io.druid.cli.Main tools pull-deps --clean -c io.druid.extensions:mysql-metadata-storage:0.9.0

加载扩展

  • 将扩展加载到 DruidService 的 classpath 中
  • 或在 common.runtime.properties 文件中通过 druid.extensions.directory 指定扩展目录

索引服务 Indexing Service

它同样也可以制造 Segment 数据文件.

主从结构

  • 统治节点 (Overlord Node) : 主节点
  • 中间管理者 ( Middle Manager) : 从节点

image-20181225141230702

统治节点

  • 本地模式 : 不仅负责集群的任务协调分配工作, 也负责启动一些苦工 (peon) 来完成一部分具体任务

  • 远程模式 : 统治节点与中间管理者分别运行在不同的节点上, 仅负责协调分配工作, 不负责完成任务.

http://<overlord_ip>:<port>/druid/indexer/v1/task

http://<overlord_ip>:<port>/druid/indexer/v1/task/{taskId}/shutdown

http://<overlord_ip>:<port>/console.html

中间管理者与苦工

它是索引服务的工作节点, 负责接收统治节点分配的任务, 然后启动相关苦工即独立JVM来完成具体的任务

任务

父类 type 名称 描述
创建 segment index_hadoop hadoop 索引任务 利用 hadoop 集群执行 MapReduce 任务 以完成数据文件的创建, 适合体量比较大的数据文件的创建任务
创建 segment index 普通索引任务 利用 Druid 集群本身的系统资源来完成 segment 数据文件的创建, 适合体量较小的 segment 数据文件创建任务
合并 segment append 附加索引任务 将若干个 segment 数据文件首尾相连, 最终合成一个 segment 数据文件
合并 segment merge 合并索引任务 将若干个 segment 数据文件按照指定的聚合方法合并为一个 segment 数据文件
销毁 segment kill 销毁索引任务 彻底将指定的 segment 数据文件从 druid 集群包括 deep storage 上删除
混杂型 hadoop_convert_segment 版本转换任务 通常用来重定义 segment 数据文件的压缩方法等要素. 利用 hadoop 集群执行, 适合体量较大的 segment 数据文件的版本转换
混杂型 convert_segment 版本转换任务 利用 Druid 集群本身的系统资源完成 segment 数据文件的版本转换, 适合体量较小的 segment 数据文件的版本转换
混杂型 noop 无操作任务 将任务启动并沉睡一段时间, 并不完成具体的任何工作. 通常用来测试

服务启动与停止

以 imply 套件为例

nohup bin/supervise -c conf/supervise/quickstart.conf > quickstart.log &

bin/service --down
bin/service --restart {服务名}
服务 简介 访问地址
zookeeper
coordinator 协调节点, 管理集群状态 http://localhost:8081
broker 查询节点, 处理查询请求 http://localhost:8082/druid/v2
historical 历史节点, 管理历史数据 http://localhost:8083/druid/v2
overlord 统治节点, 管理数据写入任务 http://localhost:8090/console.html
middleManager 中间管理者, 负责写数据处理
pivot Web UI http://localhost:9095
tranquility-server 实时写入服务, HTTP 协议 http://localhost:8200/v1/post/datasource

功能划分

  • master : 管理节点. 包含协调节点统治节点. 负责管理数据写入任务及容错相关处理
  • data : 数据节点, 包含 历史节点中间管理者, 负责数据写入处理, 历史数据的加载与查询
  • query : 查询节点, 包含查询节点pivot web 界面, 负责提供数据查询接口及web交互式查询功能

管理节点与查询节点配置

cp conf/supervise/master-no-zk.conf conf/supervise/master-with-query.conf
vim conf/supervise/master-with-query.conf

:verify bin/verify-java
:verify bin/verify-version-check
broker bin/run-druid broker conf
historical bin/run-druid historical conf
imply-ui bin/run-imply-ui conf
coordinator bin/run-druid coordinator conf
!p80 overlord bin/run-druid overlord conf

!pN 表示服务关闭顺序的权重. 默认50, 越大表示越先被关闭

运行

nohup bin/supervise -c conf/supervise/master-with-query.conf > master-with-query.log 2>&1 &

数据节点配置

vim conf/supervise/data.conf

:verify bin/verify-java
:verify bin/verify-version-check

historical bin/run-druid historical conf
middleManager bin/run-druid middleManager conf

运行

nohup bin/supervise -c conf/supervise/data.conf > data.log 2>&1 &

基础依赖配置

配置文件为 conf/druid/_common/common.runtime.properties

这里使用的是 imply-2.8.1 的示例代码. 具体请参考具体的版本. 书的版本比较旧

zookeeper

druid.zk.service.host=${zookeeper集群地址}
druid.zk.paths.base=/druid

MetadataStorage

书的版本比较旧, 指定了扩展配置

druid.extensions.loadList=[“mysql-metadata-storage”] , 新的好像已经默认加载了 mysql, postgresql 的 metadata-storage 扩展了, 所以就不用显式写了?

# For MySQL:
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://db.example.com:3306/druid
druid.metadata.storage.connector.user=...
druid.metadata.storage.connector.password=...

# For PostgreSQL:
druid.metadata.storage.type=postgresql
druid.metadata.storage.connector.connectURI=jdbc:postgresql://db.example.com:5432/druid
druid.metadata.storage.connector.user=...
druid.metadata.storage.connector.password=...

Deep Storage

# For local disk (only viable in a cluster if this is a network mount):
druid.storage.type=local
druid.storage.storageDirectory=var/druid/segments

# For HDFS:
#druid.storage.type=hdfs
#druid.storage.storageDirectory=/druid/segments

# For S3:
#druid.storage.type=s3
#druid.storage.bucket=your-bucket
#druid.storage.baseKey=druid/segments
#druid.s3.accessKey=...
#druid.s3.secretKey=...

配置调优

Doc Configuration

数据节点

  • historical
  • middleManager
historical/
├── jvm.config
├── main.config
└── runtime.properties
middleManager/
├── jvm.config
├── main.config
└── runtime.properties

查询节点

  • broker
broker/
├── jvm.config
├── main.config
└── runtime.properties

实际例子

将所有 JVM 参数(find . -name "jvm.config")的 -Duser.timezone=UTC 设置为 -Duser.timezone=UTC+0800

全局配置

配置 metadata storage, zookeeper, extennsions 等

conf/druid/_common/common.runtime.properties

全局日志文件配置

conf/druid/_common/log4j2.xml

Master

  • coordinator : 协调节点
  • overlord : 统治节点
  • broker : 查询节点
  • historical : 历史节点

Data

负责处理数据

  • historical : 历史节点
  • middleManager: 中间管理者

数据导入

流式数据源

  • PUSH : 使用 Indexing Service 提供 HTTP 服务, 数据源通过调用这个 HTTP 服务推送到 Druid 系统
  • PULL : 通过 Firehose 导入不同的流式数据. KafkaFirehose, RabbitMQFirehose 等

静态数据源

如文件系统中的文件等

可通过实时节点导入, 也可通过 Indexing Service 启动任务来导入

流式数据 静态数据
PUSH(index task, hadoop index task) PUSH(index task, hadoop index task)
PULL(Kafka Firehose, LocalFirehose) PULL(Kafka Firehose, LocalFirehose)

PULL

这个要定义一种称为 Ingestion Spec 文件来拉取. 它由三部分组成

{
    "dataSchema" : {...}, # JSON 对象, 指明数据源格式, 数据解析, 维度等信息
    "ioConfig" : {...}, # JSON 对象, 指明数据如何在 Druid 中存储
    "tuningConfig": {} # JSON 对象, 指明存储优化配置
}

data schema

{
    "datasource": "数据源名字",
    "parser": {}, 
    "metricsSpec": [] , 
    "granularitySpec": {}
}

parser

  • JSON parser
{
    "parser": {
        "type": "...",
        "parseSpec": {
            "format": "json",
            "timestampSpec": {
            	"column": "时间戳列名",
                "format": "iso|millis|posix|auto|joda, 默认为 auto"
            },
            "dimensionsSpec": {
                "dimensions": ["维度名1", "维度名2等等"],
                "dimensionExclusions": ["剔除的维度名列表, 可选"],
                "spatialDimentions": ["空间维度名列表, 地理几何运算, 可选"]
            },
            "flattenSpec": {
                // json 对象
            }
        }
    }
}
  • CSV parser
{
    "parser": {
        "type": "...",
        "parseSpec": {
            "format": "csv",
            "columns": ["CSV数据列名"],
            "listDelimiter": "多值维度列, 数据分隔符, 可选",
            "timestampSpec": {
            	"column": "时间戳列名",
                "format": "iso|millis|posix|auto|joda, 默认为 auto"
            },
            "dimensionsSpec": {
                "dimensions": ["维度名1", "维度名2等等"],
                "dimensionExclusions": ["剔除的维度名列表, 可选"],
                "spatialDimentions": ["空间维度名列表, 地理几何运算, 可选"]
            },
            "flattenSpec": {
                // json 对象
            }
        }
    }
}
  • TSV parser
{
    "parser": {
        "type": "...",
        "parseSpec": {
            "format": "tsv",
            "columns": ["TSV数据列名"],
            "listDelimiter": "多值维度列, 数据分隔符, 可选",
            "delimiter": "数据分隔符, 默认为 \t, 可选",
            "timestampSpec": {
            	"column": "时间戳列名",
                "format": "iso|millis|posix|auto|joda, 默认为 auto"
            },
            "dimensionsSpec": {
                "dimensions": ["维度名1", "维度名2等等"],
                "dimensionExclusions": ["剔除的维度名列表, 可选"],
                "spatialDimentions": ["空间维度名列表, 地理几何运算, 可选"]
            },
            "flattenSpec": {
                // json 对象
            }
        }
    }
}

spatialDimentions

空间维度

{
    "spatialDimentions": [
        "dimName": "空间维度的名字. 如果一个空间维度已存在, 它必须是坐标值的数组. 必须",
        "dims": ["lat", "long, 即包含空间维度的名字列表. 可选"]
    ]
}

metricsSpect

指明所有的指标列和所使用的聚合函数

{
    "metricsSpec": [
        {
            "type": "count|longSum等聚合函数类型",
            "fieldName": "聚合函数运用的列名, 可选",
            "name": "聚合后指标列名"
        }
    ]
}

granularitySpec

指定 segment 存储粒度和查询粒度

{
    "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "存储粒度, HOUR, DAY 等",
        "queryGranularity": "最小查询粒度, MINUTE, HOUR 等",
        "intervals": ["导入数据的时间段, 可以有多个值. 可选, 对于流式 PULL 可忽略"]
    }
}

ioConfig

{
    "ioConfig": {
        "type": "realtime",
        "firehose": {},
        "plumber": "realtime"
    }
}

不同的 firehose 的格式不太一样

tuningConfig

{
    "tuningConfig": {
        "type": "realtime",
        "maxRowsInMemory": "在存盘之前中最大的存储行数, 指的是聚合后的行数",
        "windowPeriod": "最大容忍时间窗口, 超过则数据丢弃",
        "intermediatePersistPeriod": "多长时间刷盘一次",
        "basePersistDirectory": "临时存盘目录",
        "versioningPolicy": "如何为 segment 设置版本号",
        "rejectionPolicy": "数据丢弃策略",
        "maxPendingPersists": "最大同时存盘请求数, 达到上限, 将会暂停导入数据",
        "shardSpec": {
            //分片设置
        },
        "buildV9Directly": "是否直接构建 v9 版本索引",
        "persistThreadPriority": "存盘线程优先级",
        "mergeThreadPriority": "存盘合并线程优先级",
        "reportParseExceptions": "是否汇报数据解析错误"
    }
}

shardSpec

目前有两种分片方式, linear, numbered

{
    "shardSpec": {
        "type": "linear",
        "partitionNum": 0
    }
}
{
    "shardSpec": {
        "type": "numbered",
        "partitionNum":0,
        "partitions": 2
    }
}

PUSH

需要索引服务. 所以要启动 MiddleManager (中间管理者), Overlord Node (统治节点).

将 Ingestion Spec 定义文件, 假设名为 index-task.json 发送一个 HTTP 请求

curl -X POST -H 'Content-Type:application/json' -d @index-task.json http://<overlord_ip>:<port>/druid/indexer/v1/task

索引服务任务相关接口

  • 提交任务
curl -X POST -H 'Content-Type:application/json' -d @index-task.json http://<overlord_ip>:<port>/druid/indexer/v1/task
  • 查看任务状态
curl http://<overlord_ip>:<port>/druid/indexer/v1/task/{taskId}/status
  • 查看 segment 信息
http://<overlord_ip>:<port>/druid/indexer/v1/task/{segment_name之类}/segments
  • 关闭任务
curl -X POST http://<overlord_ip>:<port>/druid/indexer/v1/task/{taskId}/shutdown
  • overlord 控制台
http://<overlord_ip>:<port>/console.html

查询组件

  • filter, 类似 SQL 中的 where
    • selector filter
    • regex filter
    • logical expression filter
    • search filter
    • in filter
    • bound filter. >=, <=,=. 如果需要 <, > , 则要指定 lowerStrictupperStrict 的 值 true
    • javascript filter : 通过写 JS 来代来自行判断 true 或 false, 参数为维度的值.
  • aggregator
    • count aggregator
    • sum
    • min/max
    • cardinality
    • hyperUnique
    • javascript
  • post-aggregator
    • arithmetic
    • field accessor
    • constant
    • hyperUnique cardinality
  • query
    • contains
    • insensitive_contains
    • fragment
  • interval , >= && <=
    • 指定时间区间. 如 "intervals":["2016-08-28T00:00:00+08:00/2016-08-29T00:00:00+08:00"]
  • context

Druid 监控

druid.emitter 来设置发送开关

  • noop: 默认值. 不发送任何 metric 信息
  • logging: 往日志写 metric 信息
  • http: 通过 http 往外发送 metric 信息

性能相关收集

doc

建议

  • broker : 20G-30G 堆内存
  • historical : 250MB * (processing.numThreads) 堆内存
  • coordinator : 默认即可?

Druid 大概 使用多少直接内存

估算公式

druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)