最近在尝试配置ELK给index配置生命周期策略,我创建了index templates, 然后创建了 index lifecycle policies, 我设想的是可以通过ILM的策略来清理 index templates里面的索引数据。
创建好了之后似乎一切看起来正常,但是对于index templates历史的数据似乎不能正常的映射到 新创建的 ILM上面。
Elasticsearch 的 Index Lifecycle Policies (ILM) 在关联 Index Templates 时,只会对未来创建的新索引生效,而不会自动追溯应用到已经存在的历史索引。
这是因为 ILM 的设计理念是管理索引的生命周期,它需要知道一个索引的初始状态,以便根据策略进行操作。对于历史索引,ILM 并不知道它们的初始状态,因此不会自动应用。
那么我们写一个脚本来刷新index的 lifecycle策略
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
import json
# Elasticsearch 配置
es_host = "192.168.1.100" # 替换为你的 Elasticsearch 主机地址
es_port = 9200 # 替换为你的 Elasticsearch 端口
es_user = "elastic" # 替换为你的 Elasticsearch 用户名
es_password = "changeme" # 替换为你的 Elasticsearch 密码
# ILM 策略配置
ilm_policies = {
"log-info-*": "APP-LOG-PROLICY",
"log-http-*": "HTTP-LOG-PROLICY",
"log-SYS-*": "SYS-LOG-PROLICY"
}
def get_ilm_setting(index_name):
return {
"index": {
"lifecycle": {
"name": ilm_policies[get_index_pattern(index_name)]
}
}
}
def get_index_pattern(index_name):
if index_name.startswith("log-center-"):
return "log-center-*"
elif index_name.startswith("log-http-"):
return "log-http-*"
elif index_name.startswith("log-jsf-"):
return "log-jsf-*"
return None
def update_ilm_policy(es_client, index_name, ilm_policy):
"""更新指定索引的 ILM 策略。"""
try:
print(f"[DEBUG] 准备更新索引 {index_name} 的 ILM 策略为 {ilm_policy}")
print(f"[DEBUG] 将要应用的设置: {json.dumps(get_ilm_setting(index_name), indent=2)}")
response = es_client.indices.put_settings(index=index_name, body=get_ilm_setting(index_name))
if response.body and response.body.get('acknowledged'):
print(f"Successfully updated ILM policy for index {index_name}")
print(json.dumps(response.body, indent=4))
else:
print(f"Failed to update ILM policy for index {index_name}")
except NotFoundError:
print(f"[DEBUG] 索引 {index_name} 不存在")
except Exception as e:
print(f"[DEBUG] 更新索引 {index_name} 的 ILM 策略时发生错误: {str(e)}")
def main():
"""主函数,连接 Elasticsearch 并更新 ILM 策略。"""
es = Elasticsearch(
f"http://{es_host}:{es_port}",
basic_auth=(es_user, es_password),
verify_certs=False # 生产环境请设置为 True
)
if not es.ping():
raise ValueError("Connection failed to Elasticsearch.")
print("Connected to Elasticsearch successfully!")
try:
print("[DEBUG] 开始获取所有索引...")
all_indices = es.indices.get_alias(index="*")
print(f"[DEBUG] 获取到的所有索引: {list(all_indices.keys())}")
for index_name in all_indices:
index_pattern = get_index_pattern(index_name)
print(f"[DEBUG] 处理索引 {index_name}, 匹配模式: {index_pattern}")
if index_pattern:
update_ilm_policy(es, index_name, ilm_policies[index_pattern])
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
ilm_policies = {
“log-info-*”: “APP-LOG-PROLICY”,
“log-http-*”: “HTTP-LOG-PROLICY”,
“log-SYS-*”: “SYS-LOG-PROLICY”
} 配置的是哪一类index 使用什么策略。