• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

ES源码添加自己的API 流程梳理

武飞扬头像
水的精神
帮助1

从现象到本质:我们的一个请求通常是这样子的

学新通

es的请求是restFull风格的请求,由请求方式和请求的URL组成。如果想要自定义添加一个API应该从哪里开始呢?

1.第一步需要定义一个类来继承BaseRestHandler类。这里为了方便查看,以 DELET index这个为例子

package org.elasticsearch.rest.action.admin.indices;

import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;

public class RestDeleteIndexAction extends BaseRestHandler {
    public RestDeleteIndexAction(Settings settings, RestController controller) {
        super(settings);
        // 在这里定义需要的 url路径,定义了请求方式。
        controller.registerHandler(RestRequest.Method.DELETE, "/", this);
        controller.registerHandler(RestRequest.Method.DELETE, "/{index}", this);
    }

    @Override
    public String getName() {
        return "delete_index_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        // 这里定义了请求,将rest请求转义成自己特定功能的请求。所以需要模仿DeleteIndexRequest类来定义一个自己的去请求类。
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
        deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout()));
        deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
        deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions()));
        // 在这里去关联了处理该请求的类
        return channel -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel));
    }
}

学新通

RestDeleteIndexAction绑定了 url,和预处理请求,将RestRequest请求构造成 es内部请求。并关联了处理类

channel -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel));

我们点进client.admin().indices().delete() 这是一个接口IndicesAdminClient,向下跟进它的实现方法,到了AbstractClient类,看到了实现了delete()方法,在方法中可以看到,实际上这里也只是做关联,它把处理这个url请求关联到了DeleteIndexAction上,指定该请求去找***Action这个类。

    @Override
        public void delete(final DeleteIndexRequest request, final ActionListener<AcknowledgedResponse> listener) {
            execute(DeleteIndexAction.INSTANCE, request, listener);
        }

如果想添加方法,可以再添加类似于delete()的方法。

看一下这个DeleteIndexAction类的代码

这个类中关联了相应的

import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.action.support.master.AcknowledgedResponse;

public class DeleteIndexAction extends Action<DeleteIndexRequest, AcknowledgedResponse, DeleteIndexRequestBuilder> {

    public static final DeleteIndexAction INSTANCE = new DeleteIndexAction();
    public static final String NAME = "indices:admin/delete";

    private DeleteIndexAction() {
        super(NAME);
    }

    @Override
    public AcknowledgedResponse newResponse() {
        return new AcknowledgedResponse();
    }

    @Override
    public DeleteIndexRequestBuilder newRequestBuilder(ElasticsearchClient client) {
        return new DeleteIndexRequestBuilder(client, this);
    }
}

学新通

ActionMode类中,关联了这个actoin和处理这个action 的类 (package org.elasticsearch.action)

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
        // Subclass NamedRegistry for easy registration
        class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
            ActionRegistry() {
                super("action");
            }

            public void register(ActionHandler<?, ?> handler) {
                register(handler.getAction().name(), handler);
            }

            public <Request extends ActionRequest, Response extends ActionResponse> void register(
                    GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
                    Class<?>... supportTransportActions) {
                register(new ActionHandler<>(action, transportAction, supportTransportActions));
            }
        }
        ActionRegistry actions = new ActionRegistry();
		// 删减出来的下边的代码。action都是在这里进行绑定的。
         actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
        return unmodifiableMap(actions.getRegistry());
    }


学新通

ActionMode类中 注册restAction 到handler里边

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
        List<AbstractCatAction> catActions = new ArrayList<>();
        Consumer<RestHandler> registerHandler = a -> {
            if (a instanceof AbstractCatAction) {
                catActions.add((AbstractCatAction) a);
            }
        };
        // 有删减代码,删减部分都和下行一样。都是用来注册restAction的。
        registerHandler.accept(new RestDeleteIndexAction(settings, restController);
    }

再看一下 restAction是什么(上边有提到)

public class RestDeleteIndexAction extends BaseRestHandler {
    public RestDeleteIndexAction(Settings settings, RestController controller) {
        super(settings);
        controller.registerHandler(RestRequest.Method.DELETE, "/", this);
        controller.registerHandler(RestRequest.Method.DELETE, "/{index}", this);
    }

    @Override
    public String getName() {
        return "delete_index_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
        IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
        indicesStatsRequest.setAddFreezingIndices(true);
        indicesStatsRequest.indices(indices);
        final IndicesOptions strictExpandIndicesOptions = IndicesOptions.strictExpand();
        indicesStatsRequest.indicesOptions(strictExpandIndicesOptions);
        indicesStatsRequest.all();
        return channel -> client.admin().indices().stats(indicesStatsRequest, new RestActionListener<IndicesStatsResponse>(channel) {
            @Override
            public void processResponse(IndicesStatsResponse indicesStatsResponse) throws Exception {
                ImmutableOpenMap.Builder<String, IndexStats> builder = ImmutableOpenMap.builder();
                for(String index : indices) {
                    IndexStats indexStats = indicesStatsResponse.getIndices().get(index);
                    builder.put(index, indexStats);
                }

                DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indices);
                deleteIndexRequest.indicesStats(builder.build());
                deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout()));
                deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
                deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions()));
                client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel));
            }
        });
    }
}

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbggjg
系列文章
更多 icon
同类精品
更多 icon
继续加载