提交 a1ea0aca 作者: 925993793@qq.com

【fix】打开kafka消费以及清空事件数据时逻辑补充

上级 4ad22e47
......@@ -34,7 +34,6 @@ import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
......@@ -85,6 +84,9 @@ public class EventDataController {
@Value("${clb.eventDelete:}")
private String CLB_EVENT_DELETE;
@Value("${clb.clearData:}")
private String CLB_CLEAR_DATA;
private static final String prefix = "http://obs.ciglobal.cn/";
......@@ -335,6 +337,18 @@ public class EventDataController {
*/
@GetMapping("/clearData")
public Result<?> clearData(@RequestParam String eventId){
if (StringUtils.isNotEmpty(CLB_CLEAR_DATA)) {
try {
Map<String, String> params = new HashMap<>();
params.put("eventId", eventId);
UserVo loginUser = UserUtil.getLoginUser();
Map<String, String> headers = new HashMap<>();
params.put("loginUser", JSON.toJSONString(loginUser));
HttpUtil.doGet(CLB_CLEAR_DATA, params, headers,"utf-8");
} catch (Exception e) {
return Result.FAIL(500, "调用克虏宝清空专题数据异常!");
}
}
eventService.clearSubjectData(eventId);
return Result.OK("正在清空数据");
}
......
//package com.zzsn.event.kafka;
//
//import com.alibaba.fastjson2.JSON;
//import com.alibaba.fastjson2.JSONObject;
//import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
//import com.baomidou.mybatisplus.core.toolkit.Wrappers;
//import com.zzsn.event.constant.Constants;
//import com.zzsn.event.entity.EventAnalysisReport;
//import com.zzsn.event.entity.SubjectAnalysis;
//import com.zzsn.event.service.EventAnalysisReportService;
//import com.zzsn.event.service.SubjectAnalysisService;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.commons.lang3.StringUtils;
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.kafka.annotation.KafkaListener;
//import org.springframework.stereotype.Component;
//
//import java.util.List;
//
///**
// * @author lkg
// * 监听消费kafka消息
// * 消费topic中消息(注意:如果监听的topic不存在则会自动创建)
// * @date 2022/7/15
// */
//@Slf4j
//@Component
//public class KafkaConsumer {
// @Autowired
// private SubjectAnalysisService subjectAnalysisService;
// @Autowired
// private EventAnalysisReportService eventAnalysisReportService;
//
// /**
// * 获取-事件脉络-分析结果数据,并入库
// *
// * @param record 接收的kafka数据
// */
// @KafkaListener(topics = {Constants.EVENT_CONTEXT_RECEIVE_TOPIC})
// public void eventContext(ConsumerRecord<String, String> record) {
// String value = record.value();
// if (StringUtils.isNotEmpty(value)) {
// String subjectId;
// Integer category = 2;
// try {
// List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class);
// subjectId = subjectAnalyses.get(0).getSubjectId();
// subjectAnalyses.forEach(e -> {
// e.setCategory(category);
// e.setTitle(removeNonBmpUniCodes(e.getTitle()));
// });
// LambdaQueryWrapper<SubjectAnalysis> queryWrapper = Wrappers.lambdaQuery();
// queryWrapper.eq(SubjectAnalysis::getSubjectId, subjectId).eq(SubjectAnalysis::getCategory, category);
// int count = subjectAnalysisService.count(queryWrapper);
// if (count > 0) {
// subjectAnalysisService.remove(queryWrapper);
// }
// subjectAnalysisService.saveBatch(subjectAnalyses);
// log.info("id为-{}-的专题,此次-事件脉络-数据更新完成", subjectId);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// }
//
// /**
// * 获取-伪事件脉络-分析结果数据,并入库
// *
// * @param record 接收的kafka数据
// */
// @KafkaListener(topics = {Constants.FAKE_EVENT_CONTEXT_RECEIVE_TOPIC})
// public void eventContext_fake(ConsumerRecord<String, String> record) {
// String value = record.value();
// if (StringUtils.isNotEmpty(value)) {
// String subjectId;
// Integer category = 3;
// try {
// List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class);
// subjectId = subjectAnalyses.get(0).getSubjectId();
// subjectAnalyses.forEach(e -> {
// e.setCategory(category);
// e.setTitle(removeNonBmpUniCodes(e.getTitle()));
// });
// LambdaQueryWrapper<SubjectAnalysis> queryWrapper = Wrappers.lambdaQuery();
// queryWrapper.eq(SubjectAnalysis::getSubjectId, subjectId).eq(SubjectAnalysis::getCategory, category);
// int count = subjectAnalysisService.count(queryWrapper);
// if (count > 0) {
// subjectAnalysisService.remove(queryWrapper);
// }
// subjectAnalysisService.saveBatch(subjectAnalyses);
// log.info("id为-{}-的专题,此次-伪事件脉络-数据更新完成", subjectId);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// }
//
// /**
// * 获取事件的分析报告数据
// *
// * @param record 接收的kafka数据
// * @author lkg
// * @date 2024/4/12
// */
// @KafkaListener(topics = {Constants.EVENT_REPORT_RECEIVE_TOPIC})
// public void eventReport(ConsumerRecord<String, String> record) {
// String value = record.value();
// EventAnalysisReport eventAnalysisReport = JSONObject.parseObject(value, EventAnalysisReport.class);
// eventAnalysisReportService.modify(eventAnalysisReport.getEventId(), eventAnalysisReport.getFilePath());
// log.info("id为-{}-的事件,分析报告更新完成", eventAnalysisReport.getEventId());
// }
//
// //去除特殊的字符,例如表情符
// private String removeNonBmpUniCodes(String str) {
// return StringUtils.isEmpty(str) ? null : str.replaceAll("[^\\u0000-\\uFFFF]", "");
// }
//}
package com.zzsn.event.kafka;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.zzsn.event.constant.Constants;
import com.zzsn.event.entity.EventAnalysisReport;
import com.zzsn.event.entity.SubjectAnalysis;
import com.zzsn.event.service.EventAnalysisReportService;
import com.zzsn.event.service.SubjectAnalysisService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author lkg
* 监听消费kafka消息
* 消费topic中消息(注意:如果监听的topic不存在则会自动创建)
* @date 2022/7/15
*/
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
private SubjectAnalysisService subjectAnalysisService;
@Autowired
private EventAnalysisReportService eventAnalysisReportService;
/**
* 获取-事件脉络-分析结果数据,并入库
*
* @param record 接收的kafka数据
*/
@KafkaListener(topics = {Constants.EVENT_CONTEXT_RECEIVE_TOPIC})
public void eventContext(ConsumerRecord<String, String> record) {
String value = record.value();
if (StringUtils.isNotEmpty(value)) {
String subjectId;
Integer category = 2;
try {
List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class);
subjectId = subjectAnalyses.get(0).getSubjectId();
subjectAnalyses.forEach(e -> {
e.setCategory(category);
e.setTitle(removeNonBmpUniCodes(e.getTitle()));
});
LambdaQueryWrapper<SubjectAnalysis> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(SubjectAnalysis::getSubjectId, subjectId).eq(SubjectAnalysis::getCategory, category);
int count = subjectAnalysisService.count(queryWrapper);
if (count > 0) {
subjectAnalysisService.remove(queryWrapper);
}
subjectAnalysisService.saveBatch(subjectAnalyses);
log.info("id为-{}-的专题,此次-事件脉络-数据更新完成", subjectId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 获取-伪事件脉络-分析结果数据,并入库
*
* @param record 接收的kafka数据
*/
@KafkaListener(topics = {Constants.FAKE_EVENT_CONTEXT_RECEIVE_TOPIC})
public void eventContext_fake(ConsumerRecord<String, String> record) {
String value = record.value();
if (StringUtils.isNotEmpty(value)) {
String subjectId;
Integer category = 3;
try {
List<SubjectAnalysis> subjectAnalyses = JSON.parseArray(value, SubjectAnalysis.class);
subjectId = subjectAnalyses.get(0).getSubjectId();
subjectAnalyses.forEach(e -> {
e.setCategory(category);
e.setTitle(removeNonBmpUniCodes(e.getTitle()));
});
LambdaQueryWrapper<SubjectAnalysis> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(SubjectAnalysis::getSubjectId, subjectId).eq(SubjectAnalysis::getCategory, category);
int count = subjectAnalysisService.count(queryWrapper);
if (count > 0) {
subjectAnalysisService.remove(queryWrapper);
}
subjectAnalysisService.saveBatch(subjectAnalyses);
log.info("id为-{}-的专题,此次-伪事件脉络-数据更新完成", subjectId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 获取事件的分析报告数据
*
* @param record 接收的kafka数据
* @author lkg
* @date 2024/4/12
*/
@KafkaListener(topics = {Constants.EVENT_REPORT_RECEIVE_TOPIC})
public void eventReport(ConsumerRecord<String, String> record) {
String value = record.value();
EventAnalysisReport eventAnalysisReport = JSONObject.parseObject(value, EventAnalysisReport.class);
eventAnalysisReportService.modify(eventAnalysisReport.getEventId(), eventAnalysisReport.getFilePath());
log.info("id为-{}-的事件,分析报告更新完成", eventAnalysisReport.getEventId());
}
//去除特殊的字符,例如表情符
private String removeNonBmpUniCodes(String str) {
return StringUtils.isEmpty(str) ? null : str.replaceAll("[^\\u0000-\\uFFFF]", "");
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论