EmbeddedNodeFactoryBean.java
/*
* Copyright 2020 Global Crop Diversity Trust
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.genesys.custom.elasticsearch;
import static java.util.Arrays.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.transport.TransportService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.client.NodeClientFactoryBean;
/**
* Based on {@link NodeClientFactoryBean }
*/
public class EmbeddedNodeFactoryBean implements FactoryBean<EmbeddedNodeFactoryBean.EmbeddedNode>, InitializingBean, DisposableBean {
private static final Logger LOG = LoggerFactory.getLogger(EmbeddedNodeFactoryBean.class);
private String clusterName;
private EmbeddedNode node;
private String pathData;
private String pathHome;
private String pathConfiguration;
private String portRange = "9200-9300"; // default
public static class EmbeddedNode extends Node {
public EmbeddedNode(Settings preparedSettings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins, false);
}
protected void registerDerivedNodeNameWithLogger(String nodeName) {
try {
LogConfigurator.setNodeName(nodeName);
} catch (Exception e) {
// nagh - just forget about it
}
}
@Override
public EmbeddedNode start() throws NodeValidationException {
super.start();
return this;
}
public int getPort() {
var transportService = injector().getInstance(TransportService.class);
for (var localAddr : transportService.getLocalAddresses()) {
LOG.debug("Running ES Embedded node on: {}", localAddr);
}
var esHttpTransport = injector().getInstance(HttpServerTransport.class);
for (var boundAddress : esHttpTransport.boundAddress().boundAddresses()) {
LOG.warn("ES Embedded node listening on: {}", boundAddress.address().toString());
return boundAddress.getPort();
}
return 9200; // stick to defaults
}
}
public EmbeddedNodeFactoryBean() {}
@Override
public EmbeddedNode getObject() throws Exception {
return node;
}
@Override
public Class<? extends Node> getObjectType() {
return EmbeddedNode.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
LOG.info("Starting ES Embedded node on port: {}", this.portRange);
/* @formatter:off */
Settings settings = Settings.builder()
.put(loadConfig())
.put("transport.type", "netty4")
.put("http.port", this.portRange) // change port
.put("http.type", "netty4")
.put("path.home", this.pathHome)
.put("path.data", this.pathData)
.put("cluster.name", this.clusterName)
.put("node.max_local_storage_nodes", 100)
.put("cluster.routing.allocation.disk.threshold_enabled", false)
.build();
/* @formatter:on */
node = new EmbeddedNode(settings, asList(Netty4Plugin.class, ReindexPlugin.class)).start();
}
private Settings loadConfig() throws IOException {
if (! StringUtils.isBlank(pathConfiguration)) {
InputStream stream = getClass().getClassLoader().getResourceAsStream(pathConfiguration);
if (stream != null) {
return Settings.builder().loadFromStream(pathConfiguration,
getClass().getClassLoader().getResourceAsStream(pathConfiguration), false).build();
}
LOG.error(String.format("Unable to read node configuration from file [%s]", pathConfiguration));
}
return Settings.builder().build();
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setPathData(String pathData) {
this.pathData = pathData;
}
public void setPathHome(String pathHome) {
this.pathHome = pathHome;
}
public void setPathConfiguration(String configuration) {
this.pathConfiguration = configuration;
}
public void setPortRange(String portRange) {
this.portRange = portRange;
}
@Override
public void destroy() {
try {
if (node != null) {
LOG.info("Closing elasticSearch node");
node.close();
node = null;
} else {
LOG.info("Node is null");
}
} catch (final Exception e) {
LOG.error("Error closing ElasticSearch node: ", e);
}
}
}