目录
- HDFS
- MySQL
- JDBC
- Kafka
ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。
例如直接读取HDFS的文件或者MySQL数据库的表。这些表引擎只负责元数据管理和数据查询,而它们自身通常并不负责数据的写入,数据文件直接由外部系统提供。目前ClickHouse提供了下面的外部集成表引擎:
- ODBC:通过指定odbc连接读取数据源
- JDBC:通过指定jdbc连接读取数据源;
- MySQL:将MySQL作为数据存储,直接查询其数据
- HDFS:直接读取HDFS上的特定格式的数据文件;
- Kafka:将Kafka数据导入ClickHouse
- RabbitMQ:与Kafka类似
HDFS
使用方式
| ENGINE = HDFS(URI, format)
 | 
- URI:HDFS文件路径
- format:文件格式,比如CSV、JSON、TSV等
使用示例
| CREATE TABLE hdfs_engine_table(
 emp_id UInt16 COMMENT '员工id',
 name String COMMENT '员工姓名',
 work_place String COMMENT '工作地点',
 age UInt8 COMMENT '员工年龄',
 depart String COMMENT '部门',
 salary Decimal32(2) COMMENT '工资'
 ) ENGINE=HDFS('hdfs://cdh03:8020/user/hive/hdfs_engine_table', 'CSV');
 
 
 INSERT INTO hdfs_engine_table
 VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000);
 
 cdh04 :) select * from hdfs_engine_table;
 
 SELECT *
 FROM hdfs_engine_table
 
 ┌─emp_id─┬─name─┬─work_place─┬─age─┬─depart─┬───salary─┐
 │      1 │ tom  │ 上海       │  25 │ 技术部 │ 20000.00 │
 │      2 │ jack │ 上海       │  26 │ 人事部 │ 10000.00 │
 └────────┴──────┴────────────┴─────┴────────┴──────────┘
 
 cdh04 :) select * from hdfs_engine_table;
 
 SELECT *
 FROM hdfs_engine_table
 
 ┌─emp_id─┬─name───┬─work_place─┬─age─┬─depart─┬───salary─┐
 │      1 │ tom    │ 上海       │  25 │ 技术部 │ 20000.00 │
 │      2 │ jack   │ 上海       │  26 │ 人事部 │ 10000.00 │
 │      3 │ lili   │ 北京       │  28 │ 技术部 │ 20000.00 │
 │      4 │ jasper │ 杭州       │  27 │ 人事部 │  8000.00 │
 └────────┴────────┴────────────┴─────┴────────┴──────────┘
 
 | 
可以看出,这种方式与使用Hive类似,我们直接可以将HDFS对应的文件映射成ClickHouse中的一张表,这样就可以使用SQL操作HDFS上的文件了。
值得注意的是:ClickHouse并不能够删除HDFS上的数据,当我们在ClickHouse客户端中删除了对应的表,只是删除了表结构,HDFS上的文件并没有被删除,这一点跟Hive的外部表十分相似。
注意: 如果文件不存在则可写可读,如果文件存在则是只读
MySQL
在上一篇文章[篇一|ClickHouse快速入门]中介绍了MySQL数据库引擎,即ClickHouse可以创建一个MySQL数据引擎,这样就可以在ClickHouse中操作其对应的数据库中的数据。其实,ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。
使用方式
| CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster](
 name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
 name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
 ...
 ) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
 
 | 
使用示例
| CREATE TABLE mysql_engine_table(
 id Int32,
 name String
 ) ENGINE = MySQL(
 '192.168.200.241:3306',
 'clickhouse',
 'test',
 'root',
 '123qwe');
 
 cdh04 :) SELECT * FROM mysql_engine_table;
 
 SELECT *
 FROM mysql_engine_table
 
 ┌─id─┬─name──┐
 │  1 │ tom   │
 │  2 │ jack  │
 │  3 │ lihua │
 └────┴───────┘
 
 
 INSERT INTO mysql_engine_table VALUES(4,'robin');
 
 cdh04 :) select * from mysql_engine_table;
 
 SELECT *
 FROM mysql_engine_table
 
 ┌─id─┬─name──┐
 │  1 │ tom   │
 │  2 │ jack  │
 │  3 │ lihua │
 │  4 │ robin │
 └────┴───────┘
 
 | 
注意:对于MySQL表引擎,不支持UPDATE和DELETE操作,比如执行下面命令时,会报错:
| ALTER TABLE mysql_engine_table UPDATE name = 'hanmeimei' WHERE id = 1;
 
 
 ALTER TABLE mysql_engine_table DELETE WHERE id = 1;
 
 
 DB::Exception: Mutations are not supported by storage MySQL.
 
 | 
JDBC
使用方式
JDBC表引擎不仅可以对接MySQL数据库,还能够与PostgreSQL等数据库。为了实现JDBC连接,ClickHouse使用了clickhouse-jdbc-bridge的查询代理服务。
首先我们需要下载clickhouse-jdbc-bridge,然后按照ClickHouse的github中的步骤进行编译,编译完成之后会有一个clickhouse-jdbc-bridge-1.0.jar的jar文件,除了需要该文件之外,还需要JDBC的驱动文件,本文使用的是MySQL,所以还需要下载MySQL驱动包。将MySQL的驱动包和clickhouse-jdbc-bridge-1.0.jar文件放在了/opt/softwares路径下,执行如下命令:
| [root@cdh04 softwares]# java -jar clickhouse-jdbc-bridge-1.0.jar  --driver-path .  --listen-host cdh04
 | 
其中--driver-path是MySQL驱动的jar所在的路径,listen-host是代理服务绑定的主机。默认情况下,绑定的端口是:9019。上述jar包的下载:
链接:https://pan.baidu.com/s/1ZcvF22GvnvAQpVTleNry7Q 提取码:la9n
然后我们再配置/etc/clickhouse-server/config.xml,在文件中添加如下配置,然后重启服务。
| <jdbc_bridge><host>cdh04</host>
 <port>9019</port>
 </jdbc_bridge>
 
 | 
使用示例
- 直接查询MySQL中对应的表 | SELECT * FROM
 jdbc(
 'jdbc:mysql://192.168.200.241:3306/?user=root&password=123qwe',
 'clickhouse',
 'test');
 
 |  
 
- 创建一张映射表 | CREATE TABLE [IF NOT EXISTS] [db.]table_name
 (
 columns list...
 )
 ENGINE = JDBC(dbms_uri, external_database, external_table)
 
 
 CREATE TABLE jdbc_table_mysql (
 order_id INT NOT NULL AUTO_INCREMENT,
 amount FLOAT NOT NULL,
 PRIMARY KEY (order_id));
 INSERT INTO  jdbc_table_mysql VALUES (1,200);
 
 
 CREATE TABLE jdbc_table
 (
 order_id Int32,
 amount Float32
 )
 ENGINE JDBC(
 'jdbc:mysql://192.168.200.241:3306/?user=root&password=123qwe',
 'clickhouse',
 'jdbc_table_mysql');
 
 
 cdh04 :) select * from jdbc_table;
 
 SELECT *
 FROM jdbc_table
 
 ┌─order_id─┬─amount─┐
 │        1 │    200 │
 └──────────┴────────┘
 
 |  
 
Kafka
使用方式
| CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster](
 name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
 name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
 ...
 ) ENGINE = Kafka()
 SETTINGS
 kafka_broker_list = 'host:port',
 kafka_topic_list = 'topic1,topic2,...',
 kafka_group_name = 'group_name',
 kafka_format = 'data_format'[,]
 [kafka_row_delimiter = 'delimiter_symbol',]
 [kafka_schema = '',]
 [kafka_num_consumers = N,]
 [kafka_max_block_size = 0,]
 [kafka_skip_broken_messages = N,]
 [kafka_commit_every_batch = 0,]
 [kafka_thread_per_consumer = 0]
 
 | 
- kafka_broker_list:逗号分隔的brokers地址 (localhost:9092).
- kafka_topic_list:Kafka 主题列表,多个主题用逗号分隔.
- kafka_group_name:消费者组.
- kafka_format– Message format. 比如- JSONEachRow、JSON、CSV等等
使用示例
在kafka中创建ck_topic主题,并向该主题写入数据
|  CREATE TABLE kafka_table (id UInt64,
 name String
 ) ENGINE = Kafka()
 SETTINGS
 kafka_broker_list = 'cdh04:9092',
 kafka_topic_list = 'ck_topic',
 kafka_group_name = 'group1',
 kafka_format = 'JSONEachRow'
 ;
 
 cdh04 :) select * from kafka_table ;
 
 SELECT *
 FROM kafka_table
 
 ┌─id─┬─name─┐
 │  1 │ tom  │
 └────┴──────┘
 ┌─id─┬─name─┐
 │  2 │ jack │
 └────┴──────┘
 
 | 
注意点
当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。
- 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
- 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
- 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中
| CREATE TABLE kafka_table_consumer (
 id UInt64,
 name String
 ) ENGINE = Kafka()
 SETTINGS
 kafka_broker_list = 'cdh04:9092',
 kafka_topic_list = 'ck_topic',
 kafka_group_name = 'group1',
 kafka_format = 'JSONEachRow'
 ;
 
 
 CREATE TABLE kafka_table_mergetree (
 id UInt64 ,
 name String
 )ENGINE=MergeTree()
 ORDER BY id
 ;
 
 
 CREATE MATERIALIZED VIEW consumer TO kafka_table_mergetree
 AS SELECT id,name FROM kafka_table_consumer ;
 
 cdh04 :) select * from kafka_table_mergetree;
 
 SELECT *
 FROM kafka_table_mergetree
 
 ┌─id─┬─name─┐
 │  2 │ jack │
 └────┴──────┘
 ┌─id─┬─name─┐
 │  1 │ tom  │
 └────┴──────┘
 
 |