关于es routing的介绍,网上已经有很多介绍,这里不在赘述,只介绍使用过程中的一些注意事项及测试情况。
一、服务器概况
硬件平台:x86_64
机器硬件名:x86_64
系统处理器的体系结构:x86_64
内核版本:#1 SMP Fri May 8 10:59:10 UTC 2020
操作系统:GNU/Linux
操作系统的发行版号:4.18.0-193.el8.x86_64
Linux系统:CentOS Linux release 8.2.2004 (Core)
物理CPU个数:1
每个物理CPU核数:20
逻辑CPU个数:40
内存:32GB
彼时服务器资源占用情况:

二、导入数据
2.1 降低数据倾斜程度
注意这里说的是降低,在默认情况下 es 使用docid作为 routing key 将文档均匀分散在所有分片。
# 默认路由算法
shard = hash(routing) % number_of_primary_shards
而路由则是使用用户定义的routing key,具有相同routing key的文档,将被分配到同一个分片。
# 引入该参数以后路由算法
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
在实际开发中,可能存在某些routing key的文档数量很多,导致集群数据不平衡。
使用index.routing_partition_size可以将数据分配到分片的子集而不是单个分片。
# settings
{
"index": {
"number_of_shards": "30",
"routing_partition_size": 10,
"number_of_replicas": "0",
}
}
这样做虽然可以使数据更均匀,但是搜索的分片也更多,所以请根据业务设置子集的大小。
2.2 文档 id 不再全局唯一
es 的每个分片都是一个倒排索引,索引 id 是唯一的。而 es 默认使用docid作为routing key,所以 es 只需要根据docid计算就能得知数据在哪个分片上。这也是为什么索引定义之后,分片数不能修改的原因。
而使用自定义路由,则可能存在docid一样,但是routing key不一样。例如某个索引将文档类型作为routing key,在首次插入时,routing key等于 A,被分配到了分片 1。在第二次插入时,routing key被修改等于 B,被分配到了分片 2。
这时如果不指定routing key查询,将会查出具有相同docid的两条数据。同样的,增加删除修改数据,都可能出现问题。所以使用自定义路由,只能用户自己保证docid的全局唯一性。
一种方法是,强制所有的增删改查操作都必须带有**routing**参数
# mappings
{
"_routing": {
"required": True
}
}
携带routing查询:
curl -XGET "http://localhost:9200/aiip.route_test/_search?routing=scenic&pretty" -H "Content-Type: application/json" -d '{"track_total_hits":true, "query":{"bool":{"filter":[{"match":{"title":"国家公园"}}]}}, "size": 3}'
routing key可以是多个,用逗号隔开即可。
2.3 Python客户端导入数据
使用elasticsearch.helpers.bulk提供的批量操作接口。
因业务不同,这里只贴出简化后的核心代码:
class EsHelper:
def __init__(self, nodes:list=['http://127.0.0.1:9200'], pool=None, logger=None):
self.pool = pool
self.logger = logger
if not self.pool:
assert isinstance(nodes, list)
self.conn = Elasticsearch(nodes, timeout=10)
if not self.conn.ping():
raise Exception('无法与es集群建立连接')
def insert_init(self, size:int=500):
self.bucket_ = []
self.bulk_size = size
self.bulk_total = 0
def bulk(self, data):
try:
if self.pool:
conn = self.pool.get_conn()
else:
conn = self.conn
success, failed = helpers.bulk(
conn, data,
stats_only=True, chunk_size=500, request_timeout=30
)
self.bulk_total += len(data)
except:
print(format_exc())
if self.logger:
self.logger.error(format_exc())
finally:
if self.pool:
self.pool.put_conn(conn)
@property
def bucket(self):
if self.bucket_:
if len(self.bucket_) == self.bulk_size or self.bucket_[-1] == None:
self.bucket_.pop()
if self.bucket_:
self.bulk(self.bucket_)
self.bucket_ = []
return self.bucket_
es_conn = EsHelper(pool=esconn_pool)
es_conn.insert_init()
for msg in file_or_kafka:
_routing = msg['type'] # 根据业务设计 routing key
action = {
"_index": 'route_test'
"_id": md5(msg["website"].strip().encode()).hexdigest(), # 自定义 id
"_routing": _routing, # 多了这个字段
"_source": msg
}
try:
es_conn.bucket.append(action)
except:
from traceback import format_exc
logger.error(format_exc())
es_conn.bucket.append(None)
三、测试
| NO | 总时间 | ES 查询时间 | MongoDB 查询时间 | 查询效果 | 备注 |
|---|---|---|---|---|---|
| 1 | 0.3753 | 0.365214 | 0.010013 | 正常 | es查询100次,mongo查询一次取12条数据,单位:秒/次。下同。 |
| 2 | 0.122578 | 0.113337 | 0.009176 | 正常 | |
| 3 | 0.189266 | 0.180636 | 0.008565 | 正常 |
0 评论
大哥整点话呗~