291 lines
10 KiB
Java
Executable File
291 lines
10 KiB
Java
Executable File
/*
|
|
* Copyright (C) 2020 Graylog, Inc.
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the Server Side Public License, version 1,
|
|
* as published by MongoDB, Inc.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* Server Side Public License for more details.
|
|
*
|
|
* You should have received a copy of the Server Side Public License
|
|
* along with this program. If not, see
|
|
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
|
*/
|
|
package in.nosd.redis.dataadapters;
|
|
|
|
import com.codahale.metrics.Meter;
|
|
import com.codahale.metrics.MetricRegistry;
|
|
import com.codahale.metrics.Timer;
|
|
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
import com.fasterxml.jackson.annotation.JsonTypeName;
|
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
|
import com.google.auto.value.AutoValue;
|
|
import org.graylog2.lookup.dto.DataAdapterDto;
|
|
import com.google.common.collect.ArrayListMultimap;
|
|
import com.google.common.collect.Multimap;
|
|
import com.google.inject.assistedinject.Assisted;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import io.lettuce.core.RedisClient;
|
|
import io.lettuce.core.RedisURI;
|
|
import io.lettuce.core.api.StatefulRedisConnection;
|
|
import io.lettuce.core.api.sync.RedisCommands;
|
|
/*
|
|
//To delete after clean
|
|
import okhttp3.HttpUrl;
|
|
import okhttp3.OkHttpClient;
|
|
import okhttp3.Request;
|
|
import okhttp3.Response;
|
|
import okhttp3.ResponseBody;
|
|
//END To delete after clean
|
|
*/
|
|
import in.nosd.redis.dataadapters.AutoValue_RedisLookupDataAdapter_Config;
|
|
import org.graylog2.plugin.lookup.LookupCachePurge;
|
|
import org.graylog2.plugin.lookup.LookupDataAdapter;
|
|
import org.graylog2.plugin.lookup.LookupDataAdapterConfiguration;
|
|
import org.graylog2.plugin.lookup.LookupResult;
|
|
import org.joda.time.Duration;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import javax.annotation.Nullable;
|
|
import com.google.inject.Inject;
|
|
import com.google.inject.assistedinject.Assisted;
|
|
import javax.validation.constraints.Min;
|
|
import javax.validation.constraints.NotEmpty;
|
|
|
|
import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Optional;
|
|
import java.util.StringJoiner;
|
|
|
|
public class RedisLookupDataAdapter extends LookupDataAdapter {
|
|
private static final Logger LOG = LoggerFactory.getLogger(RedisLookupDataAdapter.class);
|
|
|
|
public static final String NAME = "RedisLookup";
|
|
|
|
private static final Duration REFRESH_INTERVAL_DURATION = Duration.ZERO;
|
|
|
|
private final Config config;
|
|
private final RedisClient client;
|
|
private RedisCommands<String, String> commands;
|
|
private final Timer redisGetRequestTimer;
|
|
private final Meter redisGetRequestErrors;
|
|
private final Timer redisSetRequestTimer;
|
|
private final Meter redisSetRequestErrors;
|
|
|
|
|
|
@Inject
|
|
public RedisLookupDataAdapter(@Assisted("dto") DataAdapterDto dto,
|
|
MetricRegistry metricRegistry) {
|
|
super(dto, metricRegistry);
|
|
|
|
this.config = (Config) dto.config();
|
|
RedisURI redisUri = RedisURI.Builder.redis(this.config.redisHost(),this.config.redisPort())
|
|
.withPort(this.config.redisPort())
|
|
//.withAuthentication(this.config.redisUsername(), this.config.redisPassword())
|
|
.withDatabase(this.config.redisDB())
|
|
.build();
|
|
this.client = RedisClient.create(redisUri);
|
|
/*this.client = RedisClient.create(RedisURI.Builder.redis(this.config.redisHost(),this.config.redisPort())
|
|
.withPort(this.config.redisPort())
|
|
.withAuthentication(this.config.redisUsername(), this.config.redisPassword())
|
|
.withDatabase(this.config.redisDB())
|
|
.build());
|
|
*/
|
|
this.redisGetRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisGetRequestTime"));
|
|
this.redisGetRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisGetRequestErrors"));
|
|
this.redisSetRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetRequestTime"));
|
|
this.redisSetRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetRequestErrors"));
|
|
}
|
|
|
|
// Add code to initialise Redis connection
|
|
@Override
|
|
protected void doStart() throws Exception {
|
|
StatefulRedisConnection<String, String> connection = this.client.connect();
|
|
this.commands = connection.sync();
|
|
}
|
|
|
|
// Add code to close Redis connection
|
|
@Override
|
|
protected void doStop() throws Exception {
|
|
|
|
}
|
|
|
|
@Override
|
|
public Duration refreshInterval() {
|
|
return REFRESH_INTERVAL_DURATION;
|
|
}
|
|
|
|
@Override
|
|
protected void doRefresh(LookupCachePurge cachePurge) throws Exception {
|
|
doStart();
|
|
cachePurge.purgeAll();
|
|
}
|
|
|
|
@Override
|
|
protected LookupResult doGet(Object key) {
|
|
final Timer.Context time = redisGetRequestTimer.time();
|
|
final String trimmedKey = StringUtils.trimToNull(key.toString());
|
|
if (trimmedKey == null) {
|
|
LOG.debug("A blank key was supplied");
|
|
return getEmptyResult();
|
|
}
|
|
try {
|
|
final String value = this.commands.get(trimmedKey);
|
|
if (value == null) {
|
|
LOG.warn("Redis GET request for key <{}> returned null, key do not exists.", trimmedKey);
|
|
redisGetRequestErrors.mark();
|
|
return LookupResult.empty();
|
|
}
|
|
return LookupResult.single(value);
|
|
} catch (Exception e) {
|
|
LOG.error("Redis GET request error for key <{}>", trimmedKey, e);
|
|
redisGetRequestErrors.mark();
|
|
return LookupResult.empty();
|
|
} finally {
|
|
time.stop();
|
|
}
|
|
}
|
|
|
|
// This is deprecated, see setValue
|
|
@Override
|
|
public void set(Object key, Object value) {
|
|
return;
|
|
}
|
|
|
|
@Override
|
|
public LookupResult setValue(Object key, Object value) {
|
|
final Timer.Context time = redisSetRequestTimer.time();
|
|
try {
|
|
final String result = this.commands.set(key.toString(), value.toString());
|
|
if (!result.equals("OK")) {
|
|
LOG.warn("Redis SET key <{}> to value <{}> returned {}", key, value, result);
|
|
redisSetRequestErrors.mark();
|
|
return LookupResult.empty();
|
|
}
|
|
return LookupResult.single(value.toString());
|
|
} catch (Exception e) {
|
|
LOG.error("Redis SET key <{}> to value <{}> returned an exception: {}", key, value, e);
|
|
redisSetRequestErrors.mark();
|
|
return LookupResult.empty();
|
|
} finally {
|
|
time.stop();
|
|
}
|
|
}
|
|
|
|
public interface Factory extends LookupDataAdapter.Factory2<RedisLookupDataAdapter> {
|
|
@Override
|
|
RedisLookupDataAdapter create(@Assisted("dto") DataAdapterDto dto);
|
|
|
|
@Override
|
|
Descriptor getDescriptor();
|
|
}
|
|
|
|
public static class Descriptor extends LookupDataAdapter.Descriptor<Config> {
|
|
public Descriptor() {
|
|
super(NAME, Config.class);
|
|
}
|
|
|
|
@Override
|
|
public Config defaultConfiguration() {
|
|
return Config.builder()
|
|
.type(NAME)
|
|
.redisHost("127.0.0.1")
|
|
.redisPort(6379)
|
|
.redisDB(0)
|
|
/*.redisUsername("")
|
|
.redisPassword("")*/
|
|
.build();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@AutoValue
|
|
@JsonAutoDetect
|
|
@JsonDeserialize(builder = RedisLookupDataAdapter.Config.Builder.class)
|
|
@JsonTypeName(NAME)
|
|
@JsonInclude(JsonInclude.Include.NON_EMPTY)
|
|
public abstract static class Config implements LookupDataAdapterConfiguration {
|
|
|
|
@Override
|
|
@JsonProperty(TYPE_FIELD)
|
|
public abstract String type();
|
|
|
|
@JsonProperty("redis_host")
|
|
@NotEmpty
|
|
public abstract String redisHost();
|
|
|
|
@JsonProperty("redis_port")
|
|
@Min(1)
|
|
public abstract int redisPort();
|
|
|
|
@JsonProperty("redis_database")
|
|
@Min(0)
|
|
public abstract int redisDB();
|
|
|
|
/*@JsonProperty("redis_username")
|
|
@Nullable
|
|
public abstract String redisUsername();
|
|
|
|
@JsonProperty("redis_password")
|
|
@Nullable
|
|
public abstract String redisPassword();*/
|
|
|
|
public static Builder builder() {
|
|
return new AutoValue_RedisLookupDataAdapter_Config.Builder();
|
|
}
|
|
|
|
/* @Override
|
|
public Optional<Multimap<String, String>> validate() {
|
|
final ArrayListMultimap<String, String> errors = ArrayListMultimap.create();
|
|
|
|
if (redisPort() < 1 || redisPort() > 65535) {
|
|
errors.put("redis_port", "Value cannot neither be smaller than 1 nor greater than 65535");
|
|
}
|
|
if (redisDB() < 0) {
|
|
errors.put("redis_database", "Value cannot be smaller than 0");
|
|
}
|
|
|
|
return errors.isEmpty() ? Optional.empty() : Optional.of(errors);
|
|
}
|
|
*/
|
|
@AutoValue.Builder
|
|
public abstract static class Builder {
|
|
@JsonCreator
|
|
public static Builder create() {
|
|
return Config.builder();
|
|
}
|
|
|
|
@JsonProperty(TYPE_FIELD)
|
|
public abstract Builder type(String type);
|
|
|
|
@JsonProperty("redis_host")
|
|
public abstract Builder redisHost(String redisHost);
|
|
|
|
@JsonProperty("redis_port")
|
|
public abstract Builder redisPort(int redisPort);
|
|
|
|
@JsonProperty("redis_database")
|
|
public abstract Builder redisDB(int redisDB);
|
|
|
|
/*@JsonProperty("redis_username")
|
|
public abstract Builder redisUsername(String redisUsername);
|
|
|
|
@JsonProperty("redis_password")
|
|
public abstract Builder redisPassword(String redisPassword);*/
|
|
|
|
public abstract Config build();
|
|
}
|
|
}
|
|
}
|