当通过程序向图数据库中写入大量数据的时候,你会希望它能够高效的处理。
下面这些方式不是十分有效:
将值直接写入到语句中,而不是通过参数的方式
每一个更新都通过一个Transaction
发送一个请求
通过一个Transaction
发送大量的单个请求
生成一个巨大复杂的语句(几百行),然后通过一个Transaction
进行提交
在一个Transaction
中,发送一个巨大的请求,会导致OOM错误
正确的方法
你需要构造尽可能小的请求,并且语句格式固定(这样可以利用缓存),然后通过参数方式进行使用。
每一个请求可以只修改一个属性,或者修改整个子图(上百个节点),但是它的语句结构必须是一致的,否则就不能使用缓存。
UNWIND
为了实现这个目标,你只需要在你单次请求的前面加上一个UNWIND
语句。UNWIND
会将大量的数据(高达10k或者50k条实体)分散成一行一行的,每一行都包含一次更新所需要的全部信息。
你添加一个{batch}
参数,并且将它的值设置成一个Map
列表,其中可以包含你的数据(10k或者50k条)。这些数据会被打包成一个完整的请求,并且符合语法结构,还用上了缓存(因为其结构一致)。
语法结构
1 2 3 {batch: [{row1},{row2},{row3},...10 k]} UNWIND {batch} as row / / now perform updates with the data in each "row" map
示例
1 2 3 4 {batch: [{name:"Alice",age:32 },{name:"Bob",age:42 }]} UNWIND {batch} as row CREATE (n:Label)SET n + = row
1 2 3 4 5 6 {batch: [ {id:"alice@example.com",properties:{name:"Alice",age:32 }},{id:"bob@example.com",properties:{name:"Bob",age:42 }}]} UNWIND {batch} as row MERGE (n:Label {row.id})(ON CREATE ) SET n + = row.properties
寻找节点,MERGE/CREATE关系,并写入属性
1 2 3 4 5 6 7 8 {batch: [ {from :"alice@example.com",to :"bob@example.com",properties:{since:2012 }},{from :"alice@example.com",to :"charlie@example.com",properties:{since:2016 }}]} UNWIND {batch} as row MATCH (from :Label {row.from})MATCH (to :Label {row.to})CREATE / MERGE (from )- [rel:KNOWS]- > (to )(ON CREATE ) SET rel + = row.properties
这对于多叉树很好用,在这里我们只传入了一个单独的属性created
。实际上你可以不传入任何属性,或者传入一个map
的属性来进行更新。
1 2 3 4 5 6 7 8 9 {batch: [ {from :123 ,to :[44 ,12 ,128 ],created:"2016-01-13"}, {from :34 ,to :[23 ,35 ,2983 ],created:"2016-01-15"},...] UNWIND {batch} as row MATCH (from ) WHERE id(from ) = row.fromMATCH (to ) WHERE id(from ) IN row.to / / list of idsCREATE / MERGE (from )- [rel:FOO]- > (to )SET rel.created = row.created
更快更高效
下面是一些更多的技巧。
你可以传入一个Map
,其中的key
是节点id或者关系id。这样以来,通过id查找会变得更高效。
1 2 3 4 5 6 7 8 9 10 11 12 13 { batch : [{"1":334 ,"2":222 ,3 :3840 , ... 100 k}]} WITH {batch} as data, [k in keys({batch}) | toInt(k)] as idsMATCH (n) WHERE id(n) IN ids/ / single property value SET n.count = data[toString(id(n))]/ / or override all propertiesSET n = data[toString(id(n))]/ / or add all propertiesSET n + = data[toString(id(n))]
1 2 3 4 5 6 { batch : [{"1":334 ,"2":222 ,3 :3840 , ... 100 k}]} WITH {batch} as data, [k in keys({batch}) | toInt(k)] as idsMATCH ()- [rel]- > () WHERE id(rel) IN idsSET rel.foo = data[toString(id(rel))] / / single propertySET rel= data[toString(id(rel))] / / all properties
有些时候,你希望根据输入动态的创建数据。但是Cypher
目前没有诸如WHEN
或者IF
的条件语句,CASE WHEN
也只是一个表达式,因此,你必须使用一个我多年前想出来的技巧。
Cypher
提供FOREACH
语句,用来遍历列表中的每一个元素并分别执行更新操作。于是,一个包含0个元素或者1个元素的列表则可以看成一个条件表达式。因为当0个元素的时候,就不会执行遍历,而当1个元素的时候,就只执行一次遍历。
大致思路如下:
1 2 3 4 ... FOREACH (_ IN CASE WHEN predicate THEN [true] ELSE [] END | ... update operations .... )
其中,列表中的true
值可以是其他任何值,42,"",null
等等。只要它是一个值,那么我们就可以得到一个非空的列表。
相似的,你也可以使用RANGE(1, CASE WHEN predicate THEN 1 ELSE 0 END)
。当predicate
的值为false的时候,就会范围一个空列表。或者,如果你喜欢使用filter
,那么也可以通过filter(_ IN [1] WHERE predicate)
来构造。
下面是一个完整的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 LOAD CSV FROM {url} AS row MATCH (o:Organization {name:row.org})FOREACH (_ IN case when row.type = 'Person' then [1 ] else [] end | MERGE (p:Person {name:row.name}) CREATE (p)- [:WORKS_FOR]- > (o) ) FOREACH (_ IN case when row.type = 'Agency' then [1 ] else [] end | MERGE (a:Agency {name:row.name}) CREATE (a)- [:WORKS_FOR]- > (o) )
需要注意的是,在FOREACH
内部创建的变量无法在外部访问。你需要再重新查询一次,或者你需要再FOREACH内
完成全部更新操作。
使用APOC库
APOC 库提供了很多有用的方法供你使用。在这里,我推荐下面3个方法:
创建节点和关系,并且可以动态设定标签和属性
批量提交和更新
动态创建或者操作Map
,并赋给属性
通过apoc.create.node
和apoc.create.relationship
你可以动态的计算节点标签,关系类型和任意的属性。
1 2 3 UNWIND {batch} as row CALL apoc.create.node(row.labels, row.properties) yield nodeRETURN count (* )
在apoc.create.* 方法中,也提供了设置/更新/删除属性和标签的功能。
1 2 3 4 5 UNWIND {batch} as row MATCH (from ) WHERE id(n) = row.fromMATCH (to :Label) where to.key = row.toCALL apoc.create.relationship(from , row.type, row.properties, to ) yield relRETURN count (* )
在一开始j就提到了,大量的提交Transaction
是有问题的。你可以用2G-4G的heap来更新百万条记录,但当量级更大了之后就会很困难了。在使用32G的heap下,我最大的Transaction
可以达到10M的节点。
这时,apoc.periodic.iterate
可以提供很大的帮助。
它的原理很简单:你有两个Cypher
语句,第一条语句能够提供可操纵的数据并产生巨大的数据流,第二条语句执行真正的更新操作,它对每一个数据都进行一次更新操作,但是它只在处理一定数量的数据后才创建一个新的Transaction
。
打个比方,假如你第一条语句返回了五百万个需要更新的节点,如果使用内部语句的话,那么每一个节点都会进行一次更新操作。但是如果你设置批处理大小为10k的话,那么每一个Transaction
会批量更新10k的节点。
如果你的更新操作是相互独立的话(创建节点,更新属性或者更新独立的子图),那么你可以添加parallel:true
来充分利用cpu
。
比方说,你想计算多个物品的评分,并通过批处理的方式来更新属性,你应该按下面这样操作
1 2 3 4 5 call apoc.periodic.iterate(' MATCH (n:User)-[r1:LIKES]->(thing)<-[r2:RATED]-(m:User) WHERE id(n)<id(m) RETURN thing, avg( r1.rating + r2.rating ) as score ' ,' SET thing.score = score ' , {batchSize:10000 , parallel:true , iterateList:true })
尽管Cypher为列表提供了相当遍历的操作,如range
, collect
, unwind
, reduce
, extract
, filter
, size
等,但Map
在有的时候也是需要进行创建和更改的。
apoc.map.* 提供了一系列的方法来简化这个过程。
通过其他数据创建Map:
1 2 3 4 5 6 7 8 9 10 11 12 RETURN apoc.map.fromPairs([["alice",38 ],["bob",42 ],...])/ / {alice:38 , bob: 42 , ...}RETURN apoc.map.fromLists(["alice","bob",...],[38 ,42 ])/ / {alice:38 , bob: 42 , ...}/ / groups nodes, relationships, maps by key, good for quick lookups by that keyRETURN apoc.map.groupBy([{name:"alice",gender:"female"},{name:"bob",gender:"male"}],"gender")/ / {female:{name:"alice",gender:"female"}, male:{name:"bob",gender:"male"}}RETURN apoc.map.groupByMulti([{name:"alice",gender:"female"},{name:"bob",gender:"male"},{name:"Jane",gender:"female"}],"gender")/ / {female:[{name:"alice",gender:"female"},{name:"jane",gender:"female"}], male:[{name:"bob",gender:"male"}]}
更新Map:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 RETURN apoc.map.merge({alice: 38 },{bob:42 })/ / {alice:38 , bob: 42 }RETURN apoc.map.setKey({alice:38 },"bob",42 )/ / {alice:38 , bob: 42 }RETURN apoc.map.removeKey({alice:38 , bob: 42 },"alice")/ / {bob: 42 }RETURN apoc.map.removeKey({alice:38 , bob: 42 },["alice","bob","charlie"])/ / {}/ / remove the given keys and values , good for data from load- csv/ json/ jdbc/ xmlRETURN apoc.map.clean({name: "Alice", ssn:2324434 , age:"n/a", location:""},["ssn"],["n/a",""])/ / {name:"Alice"}
结论
通过上面这些方式,我能够快速的执行更新操作。当然,你也可以组合这些方法,来实现更复杂的操作。
原文地址: https://medium.com/@mesirii/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher-73c7f693c8cc