0%

Neo4j CDC 至 Kafka

本文介绍如何监听 Neo4j 变更数据并发送到 kafka 。

背景

使用 Neo4j 构建商品知识图谱,同时通过 spaCy 导入图谱中的实体词对用户搜索词汇进行 NER(命名实体识别)。

问题

初始化时还可以直接读取全量的实体词,但是增量就比较麻烦了,毕竟不知道什么时候图谱里数据变动了。当然可以通过更新 Neo4j 的程序同时更新 spaCy 里的实体词,但是遇到复杂场景还是束手无策了。

目前在关系型数据库场景下,像 MySQL 这种 CDC 之前都是用 Canal 监听 binlog 然后发送到 Kafka 来解决,近期也可以通过 Flink CDC 来处理数据。

所以在 Neo4j 场景下,更倾向于使用 CDC 技术来解决这个问题,后期的扩展性也较高。

解决

官方提供了neo4j-streams项目,提供了两种解决方案:

  1. Neo4j 插件
  2. Kafka 插件

本文介绍如何使用方案1来解决上述问题。


软件环境(截止2022年05月10日):

1
2
neo4j on docker, version 4.4.5
neo4j-streams, version 4.1.2

因为使用 Docker 版,里面会隐藏一些坑。

Docker 启动脚本

1
2
3
4
5
6
7
8
9
10
11
docker run \
--name testneo4j \
-p7474:7474 -p7687:7687 \
-d \
-v $HOME/neo4j/data:/data \
-v $HOME/neo4j/logs:/logs \
-v $HOME/neo4j/import:/var/lib/neo4j/import \
-v $HOME/neo4j/plugins:/plugins \
-v $HOME/neo4j/conf:/conf \
--env NEO4J_AUTH=neo4j/yourpasswd \
neo4j:4.4.5

挂载目录默认为$HOME/neo4j,有需要可以自己修改。
这里重点注意把容器里/conf目录挂载出来,待会要加自定义配置。

此外因为 docker 默认的网络模式为 bridge,这时候如果在局域网内配置了 kafka,从容器内是无法访问类似192.168.x.x 的服务地址的。这里我选择改变容器运行模式:

1
2
3
4
5
6
7
8
9
10
11
docker run \
--name testneo4j \
--network="host" \
-d \
-v $HOME/neo4j/data:/data \
-v $HOME/neo4j/logs:/logs \
-v $HOME/neo4j/import:/var/lib/neo4j/import \
-v $HOME/neo4j/plugins:/plugins \
-v $HOME/neo4j/conf:/conf \
--env NEO4J_AUTH=neo4j/yourpasswd \
neo4j:4.4.5

这时候容器以 host 模式运行,可以访问局域网内的 kafka 服务。

该问题可以参见:https://stackoverflow.com/a/24326540/5425709

配置 streams 插件

启动容器后,在目录$HOME/neo4j下会生成对应的挂载目录。
github 上下载neo4j-streams-4.1.2.jar,放置在$HOME/neo4j/plugins目录下。

同时在 $HOME/neo4j/conf 目录下新建streams.conf文件,并加入如下内容:

1
2
3
4
kafka.bootstrap.servers=192.168.3.111:9092,192.168.3.112:9092,192.168.3.113:9092
streams.source.topic.nodes.neo4j-label-brand=Brand{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000

上述配置文件可以做到如果 Brand 实体发生变化,将发送 CDC 数据至 Kafka 名为neo4j-label-brand 的 topic。
实体和关系的监听语法如下,更多配置可参见文档

1
2
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>

然后重启容器,docker restart testneo4j

监听变更

Create

1
CREATE (b:Brand {name: '123'}) RETURN b;

此时Kafka收到的 消息内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
"meta": {
"timestamp": 1652172619302,
"username": "neo4j",
"txId": 438650,
"txEventId": 0,
"txEventsCount": 1,
"operation": "created",
"source": {
"hostname": "test01.xxx.cn"
}
},
"payload": {
"id": "63330",
"before": null,
"after": {
"properties": {
"name": "123"
},
"labels": [
"Brand"
]
},
"type": "node"
},
"schema": {
"properties": {
"name": "String"
},
"constraints": [
{
"label": "Brand",
"properties": [
"name"
],
"type": "UNIQUE"
}
]
}
}

Update

1
MATCH (n:Brand) WHERE n.name = '123' SET n.tableId = 1 RETURN n;

此时Kafka收到的 消息内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
{
"meta": {
"timestamp": 1652172703813,
"username": "neo4j",
"txId": 438651,
"txEventId": 0,
"txEventsCount": 1,
"operation": "updated",
"source": {
"hostname": "test01.xxx.cn"
}
},
"payload": {
"id": "63330",
"before": {
"properties": {
"name": "123"
},
"labels": [
"Brand"
]
},
"after": {
"properties": {
"name": "123",
"tableId": 1
},
"labels": [
"Brand"
]
},
"type": "node"
},
"schema": {
"properties": {
"name": "String",
"tableId": "Long"
},
"constraints": [
{
"label": "Brand",
"properties": [
"name"
],
"type": "UNIQUE"
}
]
}
}

Delete

1
MATCH (n:Brand) WHERE n.name = '123' DELETE n;

此时Kafka收到的 消息内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"meta": {
"timestamp": 1652172749584,
"username": "neo4j",
"txId": 438652,
"txEventId": 0,
"txEventsCount": 1,
"operation": "deleted",
"source": {
"hostname": "test01.xxx.cn"
}
},
"payload": {
"id": "63330",
"before": {
"properties": {
"name": "123",
"tableId": 1
},
"labels": [
"Brand"
]
},
"after": null,
"type": "node"
},
"schema": {
"properties": {
"name": "String",
"tableId": "Long"
},
"constraints": [
{
"label": "Brand",
"properties": [
"name"
],
"type": "UNIQUE"
}
]
}
}

接下来再写个 kafka 消费者即可,至此数据流程打通。

Welcome to my other publishing channels