WIP: implement streams

This commit is contained in:
yo000 2024-03-03 16:30:04 +01:00
parent 97ac9b0c4b
commit 566111a982
3 changed files with 61 additions and 18 deletions

View File

@ -28,6 +28,11 @@ dramatically by making use of hot reloading. To do this, do the following:
Usage
-----
### About data types
* "Strings" lookup table will use GET/SET redis commands. Designed to set, get, expire, delete keys.
* "Streams" is designed for a "fire-and-forget" usage. Only "lookup_set_value" will work. Please note there is currently no TTL on stream, so you need to have an external process to purge your stream if you do not want it to grow indefinitely.
### Usage in pipelines
* Create data adapter, cache (or not), lookup table
* Use 'lookup_set_value(lookup_table, key, value, [ttl])' to create or update key in redis
@ -46,8 +51,8 @@ Be aware that only setting TTL with 'lookup_assign_ttl' of 'lookup_set_string_li
Known bugs
----------
Deletion via lookup_remove_string_list, lookup_clear_key or keep_duplicates=false sometimes not done.
* Deletion via lookup_remove_string_list, lookup_clear_key or keep_duplicates=false sometimes not done.
* lookup_add_string_list : Each character of a string list is considered an item
Getting started
---------------

View File

@ -60,15 +60,16 @@ import java.util.StringJoiner;
public class RedisLookupDataAdapter extends LookupDataAdapter {
private static final Logger LOG = LoggerFactory.getLogger(RedisLookupDataAdapter.class);
public static final String NAME = "RedisLookup";
private static final Duration REFRESH_INTERVAL_DURATION = Duration.ZERO;
private final Config config;
private final RedisClient client;
private RedisCommands<String, String> commands;
private StatefulRedisConnection<String, String> connection;
private final Timer redisGetRequestTimer;
private final Meter redisGetRequestErrors;
private final Timer redisSetRequestTimer;
@ -123,6 +124,9 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
this.redisRemoveStringListRequestErrors = metricRegistry.meter(MetricRegistry.name(getClass(), "redisRemoveStringListRequestErrors"));
}
/**************************************************************************
* LookupDataAdapter functions
**************************************************************************/
@Override
protected void doStart() throws Exception {
connection = this.client.connect();
@ -147,11 +151,6 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
cachePurge.purgeAll();
}
private String getSingleValue(String key) {
final String value = this.commands.get(key);
return value;
}
@Override
protected LookupResult doGet(Object key) {
final Timer.Context time = redisGetRequestTimer.time();
@ -240,6 +239,11 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
}
}
private String getSingleValue(String key) {
final String value = this.commands.get(key);
return value;
}
private LookupResult setExpire(String key, Long ttl) {
try {
if (!this.commands.expire(key, ttl)) {
@ -308,6 +312,10 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
if (!keepDuplicates) {
removeStringList(trimmedKey, listValue);
}
LOG.info("Redis addStringList: List is <{}> items", listValue.size());
LOG.info("Redis addStringList: List is <{}>", listValue);
final Long len = this.commands.rpush(trimmedKey, listValue.toArray(new String[0]));
if (len > 0) {
return LookupResult.withoutTTL().stringListValue(this.commands.lrange(trimmedKey, 0, -1)).build();
@ -410,6 +418,7 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
.redisHost("127.0.0.1")
.redisPort(6379)
.redisDB(0)
.redisType("strings")
.redisKeyTTL(-1)
.redisUsername("")
.redisPassword("")
@ -441,6 +450,10 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
@JsonProperty("redis_database")
@Min(0)
public abstract int redisDB();
@JsonProperty("redis_type")
@NotEmpty
public abstract String redisType();
@JsonProperty("redis_ttl")
@Min(0)
@ -488,6 +501,9 @@ public class RedisLookupDataAdapter extends LookupDataAdapter {
@JsonProperty("redis_database")
public abstract Builder redisDB(int redisDB);
@JsonProperty("redis_type")
public abstract Builder redisType(String redisType);
@JsonProperty("redis_ttl")
public abstract Builder redisKeyTTL(long redisKeyTTL);

View File

@ -16,13 +16,23 @@
*/
import React, { useCallback, useEffect, useRef, useState } from 'react';
import PropTypes from 'prop-types';
//import { Button } from 'components/graylog';
import ObjectUtils from 'util/ObjectUtils';
import { Input } from 'components/bootstrap';
// "Missing or invalid plugin" in "dataAdapter create/Data adapter type" list when imported
//import { Select } from './components/common';
//import { Select } from '../../../../graylog2-server/graylog2-web-interface/src/components/common';
class RedisLookupAdapterFieldSet extends React.Component {
static propTypes = {
config: PropTypes.shape({
redis_host: PropTypes.string.isRequired,
redis_port: PropTypes.number.isRequired,
redis_database: PropTypes.number.isRequired,
redis_type: PropTypes.string.isRequired,
redis_ttl: PropTypes.number.isRequired,
redis_username: PropTypes.string,
redis_password: PropTypes.string,
}).isRequired,
updateConfig: PropTypes.func.isRequired,
handleFormEvent: PropTypes.func.isRequired,
@ -30,14 +40,14 @@ class RedisLookupAdapterFieldSet extends React.Component {
validationMessage: PropTypes.func.isRequired,
};
handleSelect = (fieldName) => {
return (selectedIndicator) => {
const config = lodash.cloneDeep(this.props.config);
config[fieldName] = selectedIndicator;
this.props.updateConfig(config);
};
/* _onRedisTypeSelect = (type) => {
const { config, updateConfig } = this.props;
const newConfig = ObjectUtils.clone(config);
newConfig.redis_type = type;
updateConfig(newConfig);
};
*/
render() {
const { config } = this.props;
@ -76,6 +86,18 @@ class RedisLookupAdapterFieldSet extends React.Component {
value={config.redis_database}
labelClassName="col-sm-3"
wrapperClassName="col-sm-9" />
<Input type="text"
id="redis_type"
name="redis_type"
label="Redis data type"
required
onChange={this.props.handleFormEvent}
help={this.props.validationMessage('redis_type', 'Redis data type used for lookups. Should be one of "strings", "streams".')}
bsStyle={this.props.validationState('redis_type')}
value={config.redis_type}
labelClassName="col-sm-3"
wrapperClassName="col-sm-9">
</Input>
<Input type="text"
id="redis_ttl"
name="redis_ttl"