下载安装
下载后在解压目录中,输入cmd执行exe文件
浏览器访问localhost:8086
选择快速开始,填写用户信息,组织信息相关概念
InfluxDB是一个由InfluxData开发的开源时序型数据。它由Go写成,着力于高性能地查询与存储时序型数据。InfluxDB被广泛应用于存储系统的监控数据,IoT行业的实时数据等场景。
名词
bucket:相当于mysql中的数据库
measurement:相当于mysql中的数据表
tag:标签可以有多个,相当于索引
time:时间戳
field:字段
数据操作
Line Protocol
选择Enter Manually执行语句
语法
InfluxDB使用行协议写入数据点。它是一种基于文本的格式,提供数据点的度量、标记集、字段集和时间戳。measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200--------------- --------------- --------------------- ------------------- | | | | Measurement Tag set Field set Timestamp例:myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000由换行符分隔的行 \ n表示InfluxDB中的单个点。线路协议对空格敏感。
Explore
可进行页面上的筛选,点击script Editor查看执行的语句可切换数据呈现的样式
查询数据
声明数据库:from(bucket:"example-bucket")
指定查询范围:|> range(start: -1h)
设置筛选条件:|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
输出结果:yield()
Flux自动假定在每个脚本的末尾有一个yield()函数,用于输出和可视化数据。只有在同一个Flux查询中包含多个查询时,才需要显式地调用yield()。每一组返回的数据都需要使用yield()函数命名。
完整语句:
from(bucket: "example-bucket") |> range(start: -15m) |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total") |> yield(name: "test")
java开发
引入依赖
<dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>3.1.0</version> </dependency>
数据模型
@Data @Accessors(chain = true) @Measurement(name = "monitoring_data") public class MonData { @Column(tag = true) private String pointName; @Column(tag = true) private String indexName; @Column private Double value; @Column(timestamp = true) private Instant time; }
@InfluxColumn为自定义注解,用于拼接查询语句构造map函数使用
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface InfluxColumn { String value(); }
@Data public class MonDataDTO { // tag1名称 @InfluxColumn("pointName") private String pointName; // tag2名称 @InfluxColumn("indexName") private String indexName; // 时间片开始时间 @InfluxColumn("_start") private String start; // 时间片结束时间 @InfluxColumn("_stop") private String stop; // 数据产生时间 @InfluxColumn("_time") private String time; // 值 @InfluxColumn("_value") private String value; }
@Data public class SearchParams<T> { // 查询时间范围开始时间 private String start; // 时间戳字段排序规则,true:降序 private Boolean sortRule = true; // 查询时间范围结束时间 private String end; // 时间间隔 private String every; // 筛选条件 private List<String> filterList; // map构造的目标类对象 private Class<? extends T> mapClazz; }
功能类
@Repository @Slf4j public class InfluxRepository { @Autowired private WriteApi writeApi; @Autowired private QueryApi queryApi; @Autowired private InfluxdbConfigProp influxdbConfigProp; /** * 向influx写入数据 * * @param data 写入数据实体 */ public <T> void writeData(T data) { writeApi.writeMeasurement( influxdbConfigProp.getBucket(), influxdbConfigProp.getOrg(), WritePrecision.MS, data); } /** * 查询数据 * * @param params 查询参数 */ public <T> List<FluxTable> findMonitoringData(SearchParams<T> params) { StringBuffer queryBuffer = new StringBuffer(); // BUCKET queryBuffer.append("from(bucket: \""); queryBuffer.append(influxdbConfigProp.getBucket()); // 时间范围条件 queryBuffer.append("\") \n|> range(start: "); queryBuffer.append(params.getStart()); queryBuffer.append(", stop: "); queryBuffer.append(params.getEnd()); queryBuffer.append(")\n"); List<String> filterList = params.getFilterList(); if (!CollectionUtils.isEmpty(filterList)) { queryBuffer.append(" |> filter(fn: (r) => "); // 拼接查询条件 for (int i = 0; i < filterList.size(); i++) { String[] filters = filterList.get(i).split(">"); queryBuffer.append("r[\""); queryBuffer.append(filters[0]); queryBuffer.append("\"]"); queryBuffer.append(filters[1]); if (i < filterList.size() - 1) queryBuffer.append(" and "); } queryBuffer.append(")\n"); } // aggregateWindow函数 queryBuffer.append(" |> aggregateWindow(every: "); queryBuffer.append(params.getEvery()); queryBuffer.append(",fn: first, createEmpty: true)\n"); // 为查询结果添加排序 queryBuffer.append(" |> sort(columns: [\"_time\"], desc: "); queryBuffer.append(params.getSortRule().booleanValue()); queryBuffer.append(")\n"); // map函数语句拼接 Class<? extends T> mapClazz = params.getMapClazz(); if (!ObjectUtils.isEmpty(mapClazz)) { queryBuffer.append(" |> map("); queryBuffer.append(" fn:(r) => { \n"); queryBuffer.append(" return {\n"); Field[] fields = mapClazz.getDeclaredFields(); // 目标实体字段和influx查询结果字段的映射 Map<String, String> fieldMap = new HashMap<>(); for (Field field : fields) { InfluxColumn influxColumn = field.getAnnotation(InfluxColumn.class); if (influxColumn != null) { fieldMap.put(field.getName(), influxColumn.value()); } } // 若有需要映射的字段则构建语句 if (!CollectionUtils.isEmpty(fieldMap)) { for (String key : fieldMap.keySet()) { queryBuffer.append(key); queryBuffer.append(": r[\""); queryBuffer.append(fieldMap.get(key)); queryBuffer.append("\"],\n"); } queryBuffer.append("}})\n"); } } String influxQl = queryBuffer.toString(); log.info("查询语句, {}", influxQl); List<FluxTable> queryData = queryApi.query(influxQl, influxdbConfigProp.getOrg()); return queryData; } }
@Service @Slf4j public class InfluxQueryService { @Autowired private ObjectMapper objectMapper; @Autowired private InfluxRepository influxRepository; /** * 监测数据查询 * * @param start 起始范围时间点 * @param end 结束范围时间点 * @param every 时间片 * @param filterList 筛选条件集合(集合内元素例:pointName>csd-001) * @param clazz 去除数据时map对象映射的类对象 * @param sort 时间字段排序规则 */ public <T> List<T> findMonitoringDataInFluxDB( String start, String end, String every, List<String> filterList, Class<? extends T> clazz, boolean sort) { // mainTag和 subTag需要特殊处理,将逗号替换成"|"正则表达 filterList = filterList.stream() .map(filter -> StringUtils.replace(filter, ",", "|")) .collect(Collectors.toList()); SearchParams<T> searchParams = new SearchParams<>(); searchParams.setStart(start); searchParams.setEnd(end); searchParams.setEvery(every); searchParams.setFilterList(filterList); searchParams.setMapClazz(clazz); searchParams.setSortRule(sort); List<FluxTable> fluxTableList = influxRepository.findMonitoringData(searchParams); return mapFluxData(fluxTableList, clazz); } /** * 解析原始数据 * * @param data 原始数据 */ public <T> List<T> mapFluxData(List<FluxTable> data, Class<? extends T> clazz) { List<T> result = new LinkedList<>(); for (FluxTable ft : data) { List<FluxRecord> records = ft.getRecords(); for (FluxRecord rc : records) { try { T originData = objectMapper.readValue(objectMapper.writeValueAsString(rc.getValues()), clazz); result.add(originData); } catch (JsonProcessingException e) { log.error("influx查询数据转换为DTO时解析出错"); throw new RuntimeException(e); } } } return result; } }
业务Service构造查询条件,并提供相应的:查询结果实体 => 实体之间的转换方法
/** * 设备指标监测值 * * @param start 起始范围时间点 * @param end 结束范围时间点 * @param every 时间片 * @param tagName 设备id */ public List<MonDataDTO> getMonitoringData( String start, String end, String every, String tagName) { // 筛选条件 List<String> filterList = new ArrayList<>(); filterList.add("_measurement> == \"monitoring_data\""); filterList.add("tagName> =~/" + tagName + "/"); // 处理时间参数 String startDate; String endDate; LocalDate startLocalDate = LocalDate.parse(start).plusDays(-1); // 一天内的数据 (开始时间的前一天的23点,到结束时间的23点,时区原因查询时时间减去8小时) // 跨天的数据(开始和结束时间减8小时) String endTime = (start.equals(end) ? "T15:00:00Z" : "T16:00:00Z"); startDate = startLocalDate + endTime; endDate = end + endTime; List<MonDataDTO> dataInFluxDB = influxQueryService.findMonitoringDataInFluxDB( startDate, endDate, every, filterList, MonDataDTO.class, false); return dataInFluxDB; }
配置类
@Data @ConfigurationProperties(prefix = "influxdb") @Component public class InfluxdbConfigProp { private String token; private String bucket; private String org; private String url; }
@Configuration public class InfluxdbConfig { @Autowired private InfluxdbConfigProp influxdbConfigProp; @Bean public InfluxDBClient influxDBClient() { InfluxDBClient influxClient = InfluxDBClientFactory.create( influxdbConfigProp.getUrl(), influxdbConfigProp.getToken().toCharArray()); influxClient.setLogLevel(LogLevel.BASIC); return influxClient; } @Bean public WriteApi writeApi(InfluxDBClient influxDBClient) { WriteOptions writeOptions = WriteOptions.builder() .batchSize(5000) .flushInterval(1000) .bufferLimit(10000) .jitterInterval(1000) .retryInterval(5000) .build(); return influxDBClient.getWriteApi(writeOptions); } @Bean public QueryApi queryApi(InfluxDBClient influxDBClient) { return influxDBClient.getQueryApi(); } }
常用函数
window()
使用window()函数根据时间界限对数据进行分组。window()传递的最常用参数是every,它定义了窗口之间的持续时间。也可以使用其他参数,但是对于本例,将基本数据集窗口化为一分钟窗口。dataSet |> window(every: 1m)
first()和last()
获取查询结果的第一条或最后一条
drop()
删除查询结果的指定列|> drop(columns: ["host"])
sort()和limit()
排序和分页|> sort(columns: ["index", "time"], desc: true)|> limit(n: 10)n参数为pageSize
timedMovingAverage()
对于表中的每一行,timedMovingAverage()返回当前值和上一个周期(持续时间)中所有行值的平均值。它以每个参数定义的频率返回移动平均线。|> timedMovingAverage(every: 1h, period: 1h)
aggregateWindow()
|> aggregateWindow(every: 1h, fn: first, createEmpty: true)每一小时时间片的第一条记录,空数据以null填充
map()
|> map( fn:(r) => { return { code: r["code"], time: r["_time"], value: r["_value"], index: r["indexName"] } } )
注意事项
tag与tag之间用逗号分隔
field与field之间用逗号分隔
tag与field之间用空格分隔
tag都是string类型,不需要引号将value包裹
tag的值不能有空格
写入数据时,若tag和时间戳都相同的多条记录,则最后只会保存一条
标签:
留言评论