/* * Copyright (C) 2024 johan@nosd.in * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . */ package in.nosd.redis.dataadapters; import java.util.List; import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.graylog.autovalue.WithBeanGetter; import org.graylog2.lookup.dto.DataAdapterDto; import org.graylog2.plugin.lookup.LookupCachePurge; import org.graylog2.plugin.lookup.LookupDataAdapter; import org.graylog2.plugin.lookup.LookupDataAdapterConfiguration; import org.graylog2.plugin.lookup.LookupResult; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.google.auto.value.AutoValue; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisCommands; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotEmpty; import reactor.util.annotation.Nullable; public class RedisLookupDataAdapter extends LookupDataAdapter { private static final Logger LOG = LoggerFactory.getLogger(RedisLookupDataAdapter.class); public static final String NAME = "RedisLookup"; private static final Duration REFRESH_INTERVAL_DURATION = Duration.ZERO; private final Config config; private final RedisClient client; private RedisCommands commands; private StatefulRedisConnection connection; private final Timer redisGetRequestTimer; private final Meter redisGetRequestErrors; private final Timer redisSetRequestTimer; private final Meter redisSetRequestErrors; private final Timer redisDelRequestTimer; private final Meter redisDelRequestErrors; private final Timer redisAssignTtlRequestTimer; private final Meter redisAssignTtlRequestErrors; private final Timer redisAddStringListRequestTimer; private final Meter redisAddStringListRequestErrors; private final Timer redisSetStringListRequestTimer; private final Meter redisSetStringListRequestErrors; private final Timer redisSetStringListWithTtlRequestTimer; private final Meter redisSetStringListWithTtlRequestErrors; private final Timer redisRemoveStringListRequestTimer; private final Meter redisRemoveStringListRequestErrors; @Inject public RedisLookupDataAdapter(@Assisted("dto") DataAdapterDto dto, MetricRegistry metricRegistry) { super(dto, metricRegistry); this.config = (Config) dto.config(); RedisURI redisUri ; redisUri = RedisURI.Builder.redis(this.config.redisHost(),this.config.redisPort()) .withPort(this.config.redisPort()) .withDatabase(this.config.redisDB()) .build(); if (this.config.redisPassword() != null && this.config.redisPassword().length() > 0) { redisUri.setPassword(this.config.redisPassword()); } if (this.config.redisUsername() != null && this.config.redisUsername().length() > 0) { redisUri.setUsername(this.config.redisUsername()); } this.client = RedisClient.create(redisUri); this.redisGetRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisGetRequestTime")); this.redisGetRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisGetRequestErrors")); this.redisSetRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetRequestTime")); this.redisSetRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetRequestErrors")); this.redisDelRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisDelRequestTime")); this.redisDelRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisDelRequestErrors")); this.redisAssignTtlRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisAssignTtlRequestTime")); this.redisAssignTtlRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisAssignTtlRequestErrors")); this.redisAddStringListRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisAddStringListRequestTime")); this.redisAddStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisAddStringListRequestErrors")); this.redisSetStringListRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetStringListRequestTime")); this.redisSetStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetStringListRequestErrors")); this.redisSetStringListWithTtlRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisSetStringListWithTtlRequestTime")); this.redisSetStringListWithTtlRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisSetStringListWithTtlRequestErrors")); this.redisRemoveStringListRequestTimer = metricRegistry.timer(MetricRegistry.name(getClass(), "redisRemoveStringListRequestTime")); this.redisRemoveStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisRemoveStringListRequestErrors")); } @Override protected void doStart() throws Exception { connection = this.client.connect(); this.commands = connection.sync(); } @Override protected void doStop() throws Exception { connection.close(); client.close(); } // Returns the refresh interval for this data adapter. Use {@link Duration#ZERO} if refresh should be disabled. @Override public Duration refreshInterval() { return REFRESH_INTERVAL_DURATION; } @Override protected void doRefresh(LookupCachePurge cachePurge) throws Exception { doStart(); cachePurge.purgeAll(); } private String getSingleValue(String key) { final String value = this.commands.get(key); return value; } @Override protected LookupResult doGet(Object key) { final Timer.Context time = redisGetRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); if (trimmedKey == null) { LOG.debug("A blank key was supplied"); return getEmptyResult(); } try { // Get item type and existence final String type = this.commands.type(trimmedKey); switch(type) { case "none": LOG.debug("Redis TYPE request for key <{}> returned null, key do not exists.", trimmedKey); redisGetRequestErrors.mark(); return LookupResult.empty(); case "list": final List result = this.commands.lrange(trimmedKey, 0, -1); return LookupResult.withoutTTL().stringListValue(result).build(); default: final String value = getSingleValue(trimmedKey); return LookupResult.single(value); } } catch (Exception e) { LOG.error("Exception: Redis GET request error for key <{}>", trimmedKey, e); redisGetRequestErrors.mark(); return LookupResult.empty(); } finally { time.stop(); } } // This is deprecated, see setValue @Deprecated public void set(Object key, Object value) { } @Override public LookupResult setValue(Object key, Object value) { return setValueWithTtl(key, value, this.config.redisKeyTTL()); } @Override public LookupResult setValueWithTtl(Object key, Object value, Long ttlSec) { final Timer.Context time = redisSetRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); try { final String result; if (ttlSec > 0 ) { result = this.commands.setex(trimmedKey, ttlSec, value.toString()); } else { result = this.commands.set(trimmedKey, value.toString()); } if (!result.equals("OK")) { LOG.warn("Redis SET(EX) key <{}> to value <{}> with TTL <{}> returned {}", key, value, ttlSec, result); redisSetRequestErrors.mark(); return LookupResult.empty(); } return LookupResult.single(value.toString()); } catch (Exception e) { LOG.error("Exception: Redis SET(EX) key <{}> to value <{}> with TTL <{}> returned {}", key, value, ttlSec, e); redisSetRequestErrors.mark(); return LookupResult.withError(); } finally { time.stop(); } } @Override public void clearKey(Object key) { final Timer.Context time = redisDelRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); try { final Long result = this.commands.del(trimmedKey); if (result != 1) { LOG.debug("Redis DEL key <{}> returned {}", trimmedKey, result); redisDelRequestErrors.mark(); } return; } catch (Exception e) { LOG.error("Exception: Redis DEL key <{}> returned {}", trimmedKey, e); redisDelRequestErrors.mark(); return; } finally { time.stop(); } } private LookupResult setExpire(String key, Long ttl) { try { if (!this.commands.expire(key, ttl)) { LOG.warn("Redis EXPIRE key <{}> to <{}> returned false (key does not exist or the timeout could not be set)", key, ttl); return LookupResult.withError(); } return LookupResult.single(this.commands.get(key).toString()); } catch (Exception e) { // lettuce 6.3.0 returns "WRONGTYPE Operation against a key holding the wrong kind of value" when EXPIRE on a list key, but do the job. if (e.getMessage().startsWith("WRONGTYPE Operation against a key holding the wrong kind of value")) { } else { LOG.error("Exception: Redis EXPIRE key <{}> to <{}> returned {}", key, ttl, e); return LookupResult.withError(e.toString()); } } return LookupResult.single(this.commands.get(key).toString()); } private LookupResult setPersist(String key) { try { if (!this.commands.persist(key)) { LOG.debug("Redis PERSIST key <{}> returned false (key does not exist or does not have an associated timeout)", key); return LookupResult.withError(); } return LookupResult.single(this.commands.get(key).toString()); } catch (Exception e) { LOG.error("Exception: Redis PERSIST key <{}> returned {}", key, e); return LookupResult.withError(e.toString()); } } // TTL -1 = never expire @Override public LookupResult assignTtl(Object key, Long ttlSec) { final Timer.Context time = redisAssignTtlRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); try { if (ttlSec > 0) { return setExpire(trimmedKey, ttlSec); } else { return setPersist(trimmedKey); } } catch (Exception e) { // lettuce 6.3.0 returns "WRONGTYPE Operation against a key holding the wrong kind of value" when TTL on a list key, but do the job. if (e.getMessage().startsWith("WRONGTYPE Operation against a key holding the wrong kind of value")) { } else { LOG.error("Exception: assignTtl <{}> to key <{}> returned {}", ttlSec, trimmedKey, e); redisAssignTtlRequestErrors.mark(); return LookupResult.withError(); } } finally { time.stop(); } return LookupResult.single(this.commands.get(trimmedKey)); } @Override public LookupResult addStringList(Object key, List listValue, boolean keepDuplicates) { final Timer.Context time = redisAddStringListRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); if (trimmedKey == null) { LOG.debug("A blank key was supplied"); return getEmptyResult(); } try { if (!keepDuplicates) { removeStringList(trimmedKey, listValue); } final Long len = this.commands.rpush(trimmedKey, listValue.toArray(new String[0])); if (len > 0) { return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build(); } return LookupResult.empty(); } catch (Exception e) { LOG.error("Exception: Redis RPUSH request error for key <{}>: <{}>", trimmedKey, e); redisAddStringListRequestErrors.mark(); return LookupResult.empty(); } finally { time.stop(); } } @Override public LookupResult setStringList(Object key, List listValue) { final Timer.Context time = redisSetStringListRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); if (trimmedKey == null) { LOG.debug("A blank key was supplied"); return getEmptyResult(); } try { // We need to replace list, so we delete it first this.commands.ltrim(trimmedKey, 1, 0); final Long len = this.commands.rpush(trimmedKey, listValue.toArray(new String[0])); if (len > 0) { return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build(); } return LookupResult.empty(); } catch (Exception e) { LOG.error("Exception: Redis RPUSH request error for key <{}>: <{}>", trimmedKey, e); redisSetStringListRequestErrors.mark(); return LookupResult.empty(); } finally { time.stop(); } } @Override public LookupResult setStringListWithTtl(Object key, List listValue, Long ttlSec) { final Timer.Context time = redisSetStringListWithTtlRequestTimer.time(); try { setStringList(key, listValue); return assignTtl(key, ttlSec); } catch (Exception e) { // This exception comes from assignTtl if (e.getMessage().startsWith("WRONGTYPE Operation against a key holding the wrong kind of value")) { } else { LOG.error("Exception: Redis RPUSH request error for key <{}>: <{}>", key, e); redisSetStringListWithTtlRequestErrors.mark(); return LookupResult.empty(); } } finally { time.stop(); } return LookupResult.withoutTTL().stringListValue(this.commands.lrange(StringUtils.trimToNull(key.toString()), 0, -1)).build(); } @Override public LookupResult removeStringList(Object key, List listValue) { final Timer.Context time = redisRemoveStringListRequestTimer.time(); final String trimmedKey = StringUtils.trimToNull(key.toString()); if (trimmedKey == null) { LOG.debug("A blank key was supplied"); return getEmptyResult(); } try { long removed = 0; for (String value : listValue) { removed += this.commands.lrem(trimmedKey, 0, value); } LOG.debug("Redis LREM for key <{}> and value <{}> deleted <{}> items", trimmedKey, listValue, removed); return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build(); } catch (Exception e) { LOG.error("Exception: Redis LREM request error for key <{}> and value <{}>: <{}>", trimmedKey, listValue, e); redisRemoveStringListRequestErrors.mark(); return LookupResult.empty(); } finally { time.stop(); } } public interface Factory extends LookupDataAdapter.Factory2 { @Override RedisLookupDataAdapter create(@Assisted("dto") DataAdapterDto dto); @Override Descriptor getDescriptor(); } public static class Descriptor extends LookupDataAdapter.Descriptor { public Descriptor() { super(NAME, Config.class); } @Override public Config defaultConfiguration() { return Config.builder() .type(NAME) .redisHost("127.0.0.1") .redisPort(6379) .redisDB(0) .redisKeyTTL(-1) .redisUsername("") .redisPassword("") .build(); } } @AutoValue @WithBeanGetter @JsonAutoDetect @JsonDeserialize(builder = AutoValue_RedisLookupDataAdapter_Config.Builder.class) @JsonTypeName(NAME) @JsonInclude(JsonInclude.Include.NON_EMPTY) public static abstract class Config implements LookupDataAdapterConfiguration { @Override @JsonProperty(TYPE_FIELD) public abstract String type(); @JsonProperty("redis_host") @NotEmpty public abstract String redisHost(); @JsonProperty("redis_port") @Min(1) public abstract int redisPort(); @JsonProperty("redis_database") @Min(0) public abstract int redisDB(); @JsonProperty("redis_ttl") @Min(0) public abstract long redisKeyTTL(); @JsonProperty("redis_username") @Nullable public abstract String redisUsername(); @JsonProperty("redis_password") @Nullable public abstract String redisPassword(); public static Builder builder() { return new AutoValue_RedisLookupDataAdapter_Config.Builder(); } @Override public Optional> validate() { final ArrayListMultimap errors = ArrayListMultimap.create(); if (redisPort() < 1 || redisPort() > 65535) { errors.put("redis_port", "Value cannot neither be smaller than 1 nor greater than 65535"); } if (redisDB() < 0) { errors.put("redis_database", "Value cannot be smaller than 0"); } return errors.isEmpty() ? Optional.empty() : Optional.of(errors); } @AutoValue.Builder public abstract static class Builder { @JsonProperty(TYPE_FIELD) public abstract Builder type(String type); @JsonProperty("redis_host") public abstract Builder redisHost(String redisHost); @JsonProperty("redis_port") public abstract Builder redisPort(int redisPort); @JsonProperty("redis_database") public abstract Builder redisDB(int redisDB); @JsonProperty("redis_ttl") public abstract Builder redisKeyTTL(long redisKeyTTL); @JsonProperty("redis_username") public abstract Builder redisUsername(String redisUsername); @JsonProperty("redis_password") public abstract Builder redisPassword(String redisPassword); public abstract Config build(); } } }