ClickHouse 外部集成表引擎

目录

  1. HDFS
  2. MySQL
  3. JDBC
  4. Kafka

ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。

例如直接读取HDFS的文件或者MySQL数据库的表。这些表引擎只负责元数据管理和数据查询,而它们自身通常并不负责数据的写入,数据文件直接由外部系统提供。目前ClickHouse提供了下面的外部集成表引擎:

  • ODBC:通过指定odbc连接读取数据源
  • JDBC:通过指定jdbc连接读取数据源;
  • MySQL:将MySQL作为数据存储,直接查询其数据
  • HDFS:直接读取HDFS上的特定格式的数据文件;
  • Kafka:将Kafka数据导入ClickHouse
  • RabbitMQ:与Kafka类似

HDFS

使用方式

ENGINE = HDFS(URI, format)
  • URI:HDFS文件路径
  • format:文件格式,比如CSV、JSON、TSV等

使用示例

-- 建表
CREATE TABLE hdfs_engine_table(
emp_id UInt16 COMMENT '员工id',
name String COMMENT '员工姓名',
work_place String COMMENT '工作地点',
age UInt8 COMMENT '员工年龄',
depart String COMMENT '部门',
salary Decimal32(2) COMMENT '工资'
) ENGINE=HDFS('hdfs://cdh03:8020/user/hive/hdfs_engine_table', 'CSV');

-- 写入数据
INSERT INTO hdfs_engine_table
VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000);
-- 查询数据
cdh04 :) select * from hdfs_engine_table;

SELECT *
FROM hdfs_engine_table

┌─emp_id─┬─name─┬─work_place─┬─age─┬─depart─┬───salary─┐
1 │ tom │ 上海 │ 25 │ 技术部 │ 20000.00
2 │ jack │ 上海 │ 26 │ 人事部 │ 10000.00
└────────┴──────┴────────────┴─────┴────────┴──────────┘
--再在HDFS上其对应的文件,添加几条数据,再次查看
cdh04 :) select * from hdfs_engine_table;

SELECT *
FROM hdfs_engine_table

┌─emp_id─┬─name───┬─work_place─┬─age─┬─depart─┬───salary─┐
1 │ tom │ 上海 │ 25 │ 技术部 │ 20000.00
2 │ jack │ 上海 │ 26 │ 人事部 │ 10000.00
3 │ lili │ 北京 │ 28 │ 技术部 │ 20000.00
4 │ jasper │ 杭州 │ 27 │ 人事部 │ 8000.00
└────────┴────────┴────────────┴─────┴────────┴──────────┘

可以看出,这种方式与使用Hive类似,我们直接可以将HDFS对应的文件映射成ClickHouse中的一张表,这样就可以使用SQL操作HDFS上的文件了。

值得注意的是:ClickHouse并不能够删除HDFS上的数据,当我们在ClickHouse客户端中删除了对应的表,只是删除了表结构,HDFS上的文件并没有被删除,这一点跟Hive的外部表十分相似。

注意: 如果文件不存在则可写可读,如果文件存在则是只读

MySQL

在上一篇文章[篇一|ClickHouse快速入门]中介绍了MySQL数据库引擎,即ClickHouse可以创建一个MySQL数据引擎,这样就可以在ClickHouse中操作其对应的数据库中的数据。其实,ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。

使用方式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

使用示例

-- 连接MySQL中clickhouse数据库的test表
CREATE TABLE mysql_engine_table(
id Int32,
name String
) ENGINE = MySQL(
'192.168.200.241:3306',
'clickhouse',
'test',
'root',
'123qwe');
-- 查询数据
cdh04 :) SELECT * FROM mysql_engine_table;

SELECT *
FROM mysql_engine_table

┌─id─┬─name──┐
1 │ tom │
2 │ jack │
3 │ lihua │
└────┴───────┘
-- 插入数据,会将数据插入MySQL对应的表中
-- 所以当查询MySQL数据时,会发现新增了一条数据
INSERT INTO mysql_engine_table VALUES(4,'robin');
-- 再次查询
cdh04 :) select * from mysql_engine_table;

SELECT *
FROM mysql_engine_table

┌─id─┬─name──┐
1 │ tom │
2 │ jack │
3 │ lihua │
4 │ robin │
└────┴───────┘

注意:对于MySQL表引擎,不支持UPDATE和DELETE操作,比如执行下面命令时,会报错:

-- 执行更新
ALTER TABLE mysql_engine_table UPDATE name = 'hanmeimei' WHERE id = 1;

-- 执行删除
ALTER TABLE mysql_engine_table DELETE WHERE id = 1;

-- 报错
DB::Exception: Mutations are not supported by storage MySQL.

JDBC

使用方式

JDBC表引擎不仅可以对接MySQL数据库,还能够与PostgreSQL等数据库。为了实现JDBC连接,ClickHouse使用了clickhouse-jdbc-bridge的查询代理服务。

首先我们需要下载clickhouse-jdbc-bridge,然后按照ClickHouse的github中的步骤进行编译,编译完成之后会有一个clickhouse-jdbc-bridge-1.0.jar的jar文件,除了需要该文件之外,还需要JDBC的驱动文件,本文使用的是MySQL,所以还需要下载MySQL驱动包。将MySQL的驱动包和clickhouse-jdbc-bridge-1.0.jar文件放在了/opt/softwares路径下,执行如下命令:

[root@cdh04 softwares]# java -jar clickhouse-jdbc-bridge-1.0.jar  --driver-path .  --listen-host cdh04

其中--driver-path是MySQL驱动的jar所在的路径,listen-host是代理服务绑定的主机。默认情况下,绑定的端口是:9019。上述jar包的下载:

链接:https://pan.baidu.com/s/1ZcvF22GvnvAQpVTleNry7Q 提取码:la9n

然后我们再配置/etc/clickhouse-server/config.xml,在文件中添加如下配置,然后重启服务。

<jdbc_bridge>
<host>cdh04</host>
<port>9019</port>
</jdbc_bridge>

使用示例

  • 直接查询MySQL中对应的表

    SELECT * 
    FROM
    jdbc(
    'jdbc:mysql://192.168.200.241:3306/?user=root&password=123qwe',
    'clickhouse',
    'test');
  • 创建一张映射表

    -- 语法
    CREATE TABLE [IF NOT EXISTS] [db.]table_name
    (
    columns list...
    )
    ENGINE = JDBC(dbms_uri, external_database, external_table)

    -- MySQL建表
    CREATE TABLE jdbc_table_mysql (
    order_id INT NOT NULL AUTO_INCREMENT,
    amount FLOAT NOT NULL,
    PRIMARY KEY (order_id));
    INSERT INTO jdbc_table_mysql VALUES (1,200);

    -- 在ClickHouse中建表
    CREATE TABLE jdbc_table
    (
    order_id Int32,
    amount Float32
    )
    ENGINE JDBC(
    'jdbc:mysql://192.168.200.241:3306/?user=root&password=123qwe',
    'clickhouse',
    'jdbc_table_mysql');

    -- 查询数据
    cdh04 :) select * from jdbc_table;

    SELECT *
    FROM jdbc_table

    ┌─order_id─┬─amount─┐
    1200
    └──────────┴────────┘

Kafka

使用方式

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
  • kafka_broker_list :逗号分隔的brokers地址 (localhost:9092).
  • kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔.
  • kafka_group_name :消费者组.
  • kafka_format – Message format. 比如JSONEachRow、JSON、CSV等等

使用示例

在kafka中创建ck_topic主题,并向该主题写入数据

 CREATE TABLE kafka_table (
id UInt64,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'cdh04:9092',
kafka_topic_list = 'ck_topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow'
;
-- 查询
cdh04 :) select * from kafka_table ;

SELECT *
FROM kafka_table

┌─id─┬─name─┐
1 │ tom │
└────┴──────┘
┌─id─┬─name─┐
2 │ jack │
└────┴──────┘

注意点

当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。

  • 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
  • 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
  • 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中
--  创建Kafka引擎表
CREATE TABLE kafka_table_consumer (
id UInt64,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'cdh04:9092',
kafka_topic_list = 'ck_topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow'
;

-- 创建一张终端用户使用的表
CREATE TABLE kafka_table_mergetree (
id UInt64 ,
name String
)ENGINE=MergeTree()
ORDER BY id
;

-- 创建物化视图,同步数据
CREATE MATERIALIZED VIEW consumer TO kafka_table_mergetree
AS SELECT id,name FROM kafka_table_consumer ;
-- 查询,多次查询,已经被查询的数据依然会被输出
cdh04 :) select * from kafka_table_mergetree;

SELECT *
FROM kafka_table_mergetree

┌─id─┬─name─┐
2 │ jack │
└────┴──────┘
┌─id─┬─name─┐
1 │ tom │
└────┴──────┘
Author: Tunan
Link: http://yerias.github.io/2020/12/03/clickhouse/5/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.