2025-02-09 18:41:57 +01:00

499 lines
20 KiB
Java
Executable File

/*
* Copyright (C) 2024 johan@nosd.in
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package in.nosd.redis.dataadapters;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.graylog.autovalue.WithBeanGetter;
import org.graylog2.lookup.dto.DataAdapterDto;
import org.graylog2.plugin.lookup.LookupCachePurge;
import org.graylog2.plugin.lookup.LookupDataAdapter;
import org.graylog2.plugin.lookup.LookupDataAdapterConfiguration;
import org.graylog2.plugin.lookup.LookupResult;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotEmpty;
import reactor.util.annotation.Nullable;
public class RedisLookupDataAdapter extends LookupDataAdapter {
private static final Logger LOG = LoggerFactory.getLogger(RedisLookupDataAdapter.class);
// Should be same as lookupTableAdapters type in index.jsx
public static final String NAME = "RedisLookup";
private static final Duration REFRESH_INTERVAL_DURATION = Duration.ZERO;
private final Config config;
private final RedisClient client;
private RedisCommands<String, String> commands;
private StatefulRedisConnection<String, String> connection;
private final Timer redisGetRequestTimer;
private final Meter redisGetRequestErrors;
private final Timer redisSetRequestTimer;
private final Meter redisSetRequestErrors;
private final Timer redisDelRequestTimer;
private final Meter redisDelRequestErrors;
private final Timer redisAssignTtlRequestTimer;
private final Meter redisAssignTtlRequestErrors;
private final Timer redisAddStringListRequestTimer;
private final Meter redisAddStringListRequestErrors;
private final Timer redisSetStringListRequestTimer;
private final Meter redisSetStringListRequestErrors;
private final Timer redisSetStringListWithTtlRequestTimer;
private final Meter redisSetStringListWithTtlRequestErrors;
private final Timer redisRemoveStringListRequestTimer;
private final Meter redisRemoveStringListRequestErrors;
@Inject
public RedisLookupDataAdapter(@Assisted("dto") DataAdapterDto dto,
MetricRegistry metricRegistry) {
super(dto, metricRegistry);
this.config = (Config) dto.config();
RedisURI redisUri ;
redisUri = RedisURI.Builder.redis(this.config.redisHost(),this.config.redisPort())
.withPort(this.config.redisPort())
.withDatabase(this.config.redisDB())
.build();
if (this.config.redisPassword() != null && this.config.redisPassword().length() > 0) {
redisUri.setPassword(this.config.redisPassword());
}
if (this.config.redisUsername() != null && this.config.redisUsername().length() > 0) {
redisUri.setUsername(this.config.redisUsername());
}
this.client = RedisClient.create(redisUri);
this.redisGetRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisGetRequestTime"));
this.redisGetRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisGetRequestErrors"));
this.redisSetRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetRequestTime"));
this.redisSetRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetRequestErrors"));
this.redisDelRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisDelRequestTime"));
this.redisDelRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisDelRequestErrors"));
this.redisAssignTtlRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisAssignTtlRequestTime"));
this.redisAssignTtlRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisAssignTtlRequestErrors"));
this.redisAddStringListRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisAddStringListRequestTime"));
this.redisAddStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisAddStringListRequestErrors"));
this.redisSetStringListRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetStringListRequestTime"));
this.redisSetStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetStringListRequestErrors"));
this.redisSetStringListWithTtlRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetStringListWithTtlRequestTime"));
this.redisSetStringListWithTtlRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetStringListWithTtlRequestErrors"));
this.redisRemoveStringListRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisRemoveStringListRequestTime"));
this.redisRemoveStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisRemoveStringListRequestErrors"));
}
@Override
protected void doStart() throws Exception {
connection = this.client.connect();
this.commands = connection.sync();
}
@Override
protected void doStop() throws Exception {
connection.close();
client.close();
}
// Returns the refresh interval for this data adapter. Use {@link Duration#ZERO} if refresh should be disabled.
@Override
public Duration refreshInterval() {
return REFRESH_INTERVAL_DURATION;
}
@Override
protected void doRefresh(LookupCachePurge cachePurge) throws Exception {
doStart();
cachePurge.purgeAll();
}
private String getSingleValue(String key) {
final String value = this.commands.get(key);
return value;
}
@Override
protected LookupResult doGet(Object key) {
final Timer.Context time = redisGetRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
if (trimmedKey == null) {
LOG.debug("A blank key was supplied");
return getEmptyResult();
}
try {
// Get item type and existence
final String type = this.commands.type(trimmedKey);
switch(type) {
case "none":
LOG.debug("Redis TYPE request for key <{}> returned null, key do not exists.", trimmedKey);
redisGetRequestErrors.mark();
return LookupResult.empty();
case "list":
final List<String> result = this.commands.lrange(trimmedKey, 0, -1);
return LookupResult.withoutTTL().stringListValue(result).build();
default:
final String value = getSingleValue(trimmedKey);
return LookupResult.single(value);
}
} catch (Exception e) {
LOG.error("Exception: Redis GET request error for key <{}>", trimmedKey, e);
redisGetRequestErrors.mark();
return LookupResult.empty();
} finally {
time.stop();
}
}
// This is deprecated, see setValue
@Deprecated
public void set(Object key, Object value) {
}
@Override
public LookupResult setValue(Object key, Object value) {
return setValueWithTtl(key, value, this.config.redisKeyTTL());
}
@Override
public LookupResult setValueWithTtl(Object key, Object value, Long ttlSec) {
final Timer.Context time = redisSetRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
try {
final String result;
if (ttlSec > 0 ) {
result = this.commands.setex(trimmedKey, ttlSec, value.toString());
} else {
result = this.commands.set(trimmedKey, value.toString());
}
if (!result.equals("OK")) {
LOG.warn("Redis SET(EX) key <{}> to value <{}> with TTL <{}> returned {}", key, value, ttlSec, result);
redisSetRequestErrors.mark();
return LookupResult.empty();
}
return LookupResult.single(value.toString());
} catch (Exception e) {
LOG.error("Exception: Redis SET(EX) key <{}> to value <{}> with TTL <{}> returned {}", key, value, ttlSec, e);
redisSetRequestErrors.mark();
return LookupResult.withError();
} finally {
time.stop();
}
}
@Override
public void clearKey(Object key) {
final Timer.Context time = redisDelRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
try {
final Long result = this.commands.del(trimmedKey);
if (result != 1) {
LOG.debug("Redis DEL key <{}> returned {}", trimmedKey, result);
redisDelRequestErrors.mark();
}
return;
} catch (Exception e) {
LOG.error("Exception: Redis DEL key <{}> returned {}", trimmedKey, e);
redisDelRequestErrors.mark();
return;
} finally {
time.stop();
}
}
private LookupResult setExpire(String key, Long ttl) {
try {
if (!this.commands.expire(key, ttl)) {
LOG.warn("Redis EXPIRE key <{}> to <{}> returned false (key does not exist or the timeout could not be set)", key, ttl);
return LookupResult.withError();
}
return LookupResult.single(this.commands.get(key).toString());
}
catch (Exception e) {
// lettuce 6.3.0 returns "WRONGTYPE Operation against a key holding the wrong kind of value" when EXPIRE on a list key, but do the job.
if (e.getMessage().startsWith("WRONGTYPE Operation against a key holding the wrong kind of value")) {
} else {
LOG.error("Exception: Redis EXPIRE key <{}> to <{}> returned {}", key, ttl, e);
return LookupResult.withError(e.toString());
}
}
return LookupResult.single(this.commands.get(key).toString());
}
private LookupResult setPersist(String key) {
try {
if (!this.commands.persist(key)) {
LOG.debug("Redis PERSIST key <{}> returned false (key does not exist or does not have an associated timeout)", key);
return LookupResult.withError();
}
return LookupResult.single(this.commands.get(key).toString());
} catch (Exception e) {
LOG.error("Exception: Redis PERSIST key <{}> returned {}", key, e);
return LookupResult.withError(e.toString());
}
}
// TTL -1 = never expire
@Override
public LookupResult assignTtl(Object key, Long ttlSec) {
final Timer.Context time = redisAssignTtlRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
try {
if (ttlSec > 0) {
return setExpire(trimmedKey, ttlSec);
} else {
return setPersist(trimmedKey);
}
} catch (Exception e) {
// lettuce 6.3.0 returns "WRONGTYPE Operation against a key holding the wrong kind of value" when TTL on a list key, but do the job.
if (e.getMessage().startsWith("WRONGTYPE Operation against a key holding the wrong kind of value")) {
} else {
LOG.error("Exception: assignTtl <{}> to key <{}> returned {}", ttlSec, trimmedKey, e);
redisAssignTtlRequestErrors.mark();
return LookupResult.withError();
}
} finally {
time.stop();
}
return LookupResult.single(this.commands.get(trimmedKey));
}
@Override
public LookupResult addStringList(Object key, List<String> listValue, boolean keepDuplicates) {
final Timer.Context time = redisAddStringListRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
if (trimmedKey == null) {
LOG.debug("A blank key was supplied");
return getEmptyResult();
}
try {
if (!keepDuplicates) {
removeStringList(trimmedKey, listValue);
}
final Long len = this.commands.rpush(trimmedKey, listValue.toArray(new String[0]));
if (len > 0) {
return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build();
}
return LookupResult.empty();
} catch (Exception e) {
LOG.error("Exception: Redis RPUSH request error for key <{}>: <{}>", trimmedKey, e);
redisAddStringListRequestErrors.mark();
return LookupResult.empty();
} finally {
time.stop();
}
}
@Override
public LookupResult setStringList(Object key, List<String> listValue) {
final Timer.Context time = redisSetStringListRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
if (trimmedKey == null) {
LOG.debug("A blank key was supplied");
return getEmptyResult();
}
try {
// We need to replace list, so we delete it first
this.commands.ltrim(trimmedKey, 1, 0);
final Long len = this.commands.rpush(trimmedKey, listValue.toArray(new String[0]));
if (len > 0) {
return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build();
}
return LookupResult.empty();
} catch (Exception e) {
LOG.error("Exception: Redis RPUSH request error for key <{}>: <{}>", trimmedKey, e);
redisSetStringListRequestErrors.mark();
return LookupResult.empty();
} finally {
time.stop();
}
}
@Override
public LookupResult setStringListWithTtl(Object key, List<String> listValue, Long ttlSec) {
final Timer.Context time = redisSetStringListWithTtlRequestTimer.time();
try {
setStringList(key, listValue);
return assignTtl(key, ttlSec);
} catch (Exception e) {
// This exception comes from assignTtl
if (e.getMessage().startsWith("WRONGTYPE Operation against a key holding the wrong kind of value")) {
} else {
LOG.error("Exception: Redis RPUSH request error for key <{}>: <{}>", key, e);
redisSetStringListWithTtlRequestErrors.mark();
return LookupResult.empty();
}
} finally {
time.stop();
}
return LookupResult.withoutTTL().stringListValue(this.commands.lrange(StringUtils.trimToNull(key.toString()), 0, -1)).build();
}
@Override
public LookupResult removeStringList(Object key, List<String> listValue) {
final Timer.Context time = redisRemoveStringListRequestTimer.time();
final String trimmedKey = StringUtils.trimToNull(key.toString());
if (trimmedKey == null) {
LOG.debug("A blank key was supplied");
return getEmptyResult();
}
try {
long removed = 0;
for (String value : listValue) {
removed += this.commands.lrem(trimmedKey, 0, value);
}
LOG.debug("Redis LREM for key <{}> and value <{}> deleted <{}> items", trimmedKey, listValue, removed);
return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build();
} catch (Exception e) {
LOG.error("Exception: Redis LREM request error for key <{}> and value <{}>: <{}>", trimmedKey, listValue, e);
redisRemoveStringListRequestErrors.mark();
return LookupResult.empty();
} finally {
time.stop();
}
}
public interface Factory extends LookupDataAdapter.Factory2<RedisLookupDataAdapter> {
@Override
RedisLookupDataAdapter create(@Assisted("dto") DataAdapterDto dto);
@Override
Descriptor getDescriptor();
}
public static class Descriptor extends LookupDataAdapter.Descriptor<Config> {
public Descriptor() {
super(NAME, Config.class);
}
@Override
public Config defaultConfiguration() {
return Config.builder()
.type(NAME)
.redisHost("127.0.0.1")
.redisPort(6379)
.redisDB(0)
.redisKeyTTL(-1)
.redisUsername("")
.redisPassword("")
.build();
}
}
@AutoValue
@WithBeanGetter
@JsonAutoDetect
@JsonDeserialize(builder = AutoValue_RedisLookupDataAdapter_Config.Builder.class)
@JsonTypeName(NAME)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public static abstract class Config implements LookupDataAdapterConfiguration {
@Override
@JsonProperty(TYPE_FIELD)
public abstract String type();
@JsonProperty("redis_host")
@NotEmpty
public abstract String redisHost();
@JsonProperty("redis_port")
@Min(1)
public abstract int redisPort();
@JsonProperty("redis_database")
@Min(0)
public abstract int redisDB();
@JsonProperty("redis_ttl")
@Min(0)
public abstract long redisKeyTTL();
@JsonProperty("redis_username")
@Nullable
public abstract String redisUsername();
@JsonProperty("redis_password")
@Nullable
public abstract String redisPassword();
public static Builder builder() {
return new AutoValue_RedisLookupDataAdapter_Config.Builder();
}
@Override
public Optional<Multimap<String, String>> validate() {
final ArrayListMultimap<String, String> errors = ArrayListMultimap.create();
if (redisPort() < 1 || redisPort() > 65535) {
errors.put("redis_port", "Value cannot neither be smaller than 1 nor greater than 65535");
}
if (redisDB() < 0) {
errors.put("redis_database", "Value cannot be smaller than 0");
}
return errors.isEmpty() ? Optional.empty() : Optional.of(errors);
}
@AutoValue.Builder
public abstract static class Builder {
@JsonProperty(TYPE_FIELD)
public abstract Builder type(String type);
@JsonProperty("redis_host")
public abstract Builder redisHost(String redisHost);
@JsonProperty("redis_port")
public abstract Builder redisPort(int redisPort);
@JsonProperty("redis_database")
public abstract Builder redisDB(int redisDB);
@JsonProperty("redis_ttl")
public abstract Builder redisKeyTTL(long redisKeyTTL);
@JsonProperty("redis_username")
public abstract Builder redisUsername(String redisUsername);
@JsonProperty("redis_password")
public abstract Builder redisPassword(String redisPassword);
public abstract Config build();
}
}
}