reden import multithread

This commit is contained in:
Picman2000 2025-03-17 14:12:38 +01:00
parent b7ffa45fa9
commit 47f5afcd54
7 changed files with 188 additions and 77 deletions

View file

@ -8,10 +8,18 @@ import org.texttechnologylab.project.gruppe_05_1.rest.RESTHandler;
import org.texttechnologylab.project.gruppe_05_1.util.Logger;
import org.texttechnologylab.project.gruppe_05_1.util.PPRUtils;
import org.texttechnologylab.project.gruppe_05_1.xml.FileObjectFactory;
import org.texttechnologylab.project.gruppe_05_1.xml.speeches.SpeechParser;
import org.w3c.dom.Document;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static org.texttechnologylab.project.gruppe_05_1.util.PPRUtils.checkAndProcessNewProtocols;
public class Main {
public static boolean UPLOAD_MEMBER_PHOTOS;
@ -111,13 +119,33 @@ public class Main {
Logger.pink("Uploading Member Photos to DB...");
mongoDBHandler.uploadMemberPhotos();
}
mongoDBHandler.close();
try {
NlpUtils.runRemoteDriver();
} catch (Exception e) {
Logger.error("Error while running NLP remote driver");
Logger.error(e.getMessage());
}
NlpUtils.runRemoteDriver();
/*ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
NlpUtils.runRemoteDriver();
} catch (Exception e) {
Logger.error("Error while running NLP remote driver");
Logger.error(e.getMessage());
}
try {
Logger.info("Starte Aktualisierung der Protokolle...");
Set<Document> newProtocols = checkAndProcessNewProtocols(mongoDBHandler);
Logger.info("Neue Protokolle gefunden: " + newProtocols.size());
if (newProtocols.isEmpty()) {
Logger.info("Keine neuen Protokolle gefunden, Upload wird übersprungen.");
} else {
SpeechParser speechParser = new SpeechParser();
mongoDBHandler.insertSessions(speechParser.parseAllSessions(newProtocols));
mongoDBHandler.insertAgendaItems(speechParser.getAgendaItems());
mongoDBHandler.insertSpeeches(speechParser.getSpeeches());
Logger.info("Neuer Protokolle uploaded: " + newProtocols.size());
}
} catch (Exception ex) {
Logger.error("Fehler bei der Protokollaktualisierung: " + ex.getMessage());
}
}, 0, 10, TimeUnit.MINUTES);*/
RESTHandler restHandler = new RESTHandler();
restHandler.startJavalin();

View file

@ -3,6 +3,7 @@ package org.texttechnologylab.project.gruppe_05_1.database;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@ -693,7 +694,9 @@ public class MongoDBHandler {
public void bulkWriteNlpData(List<WriteModel<Document>> bulkOperations) {
if (!bulkOperations.isEmpty()) {
BulkWriteResult result = speechesCollection.bulkWrite(bulkOperations);
BulkWriteOptions options = new BulkWriteOptions().ordered(false);
// Optional: Setze einen weniger strengen Write Concern
BulkWriteResult result = speechesCollection.bulkWrite(bulkOperations, options);
int modifiedCount = result.getModifiedCount();
int matchedCount = result.getMatchedCount();
int upsertCount = result.getUpserts().size();
@ -766,6 +769,12 @@ public class MongoDBHandler {
}
}
public boolean sessionExists(String sessionNumber) {
Document filter = new Document("sessionNumber", sessionNumber);
long count = sessionsCollection.countDocuments(filter);
return count > 0;
}
public void close() {
mongoClient.close();
}

View file

@ -353,13 +353,15 @@ public class NlpUtils {
JCas jCas;
jCas = rede.toCas();
pComposer.run(jCas);
System.out.println("Anzahl Sentiment-Annotationen: " + JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class).size());
//System.out.println("Anzahl Sentiment-Annotationen: " + JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class).size());
Document analysisResults = extractAnnotations(jCas);
Bson updateFilter = Filters.eq("speechKey", rede.getSpeechKey());
Bson update = new Document("$set", new Document("analysisResults", analysisResults));
bulkOperations.add(new UpdateOneModel<>(updateFilter, update));
}
if (!bulkOperations.isEmpty()) {
System.out.println("Processing of " + bulkOperations.size() + " documents finished");
System.out.println("uploading...");
mongoDBHandler.bulkWriteNlpData(bulkOperations);
System.out.println("Bulk write completed for " + bulkOperations.size() + " documents.");
mongoDBHandler.close();

View file

@ -1,15 +1,16 @@
package org.texttechnologylab.project.gruppe_05_1.nlp;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.UpdateResult;
import org.apache.uima.fit.util.JCasUtil;
import org.bson.Document;
import java.io.*;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.*;
import java.util.*;
import java.util.stream.Collectors;
@ -18,40 +19,51 @@ import org.bson.conversions.Bson;
import org.apache.uima.fit.factory.JCasFactory;
import org.apache.uima.jcas.JCas;
import org.apache.uima.cas.impl.XmiCasDeserializer;
import de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Token;
import de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence;
import de.tudarmstadt.ukp.dkpro.core.api.syntax.type.dependency.Dependency;
import de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity;
import org.texttechnologylab.project.gruppe_05_1.database.MongoDBHandler;
import org.hucompute.textimager.uima.type.category.CategoryCoveredTagged;
import org.texttechnologylab.project.gruppe_05_1.util.Logger;
public class XmiExtractor {
private List<WriteModel<Document>> bulkOperations;
private MongoDBHandler mongoDBHandler;
private final List<WriteModel<Document>> bulkOperations = Collections.synchronizedList(new ArrayList<>());
private final MongoDBHandler mongoDBHandler;
private static final int BATCH_SIZE = 1000;
private int processedCount = 0;
private static final AtomicInteger processedCount = new AtomicInteger(0);
public XmiExtractor() {
mongoDBHandler = new MongoDBHandler();
this.bulkOperations = new ArrayList<>();
}
public void extractAndUploadXmiData() throws IOException {
InputStream resourceStream = getClass().getClassLoader().getResourceAsStream("speeches/20.zip");
if (resourceStream == null) {
throw new IOException("20.zip nicht gefunden im Ressourcenordner /speeches");
}
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<?>> futures = new ArrayList<>();
try (ZipInputStream zis = new ZipInputStream(resourceStream)) {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (entry.getName().endsWith(".xmi.gz")) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int len;
while ((len = zis.read(buffer)) > 0) {
baos.write(buffer, 0, len);
File tempFile = File.createTempFile("xmi_entry_", ".xmi.gz");
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
byte[] buffer = new byte[1024];
int len;
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
}
byte[] entryData = baos.toByteArray();
processXmiGzStream(new ByteArrayInputStream(entryData), entry.getName());
ZipEntry finalEntry = entry;
Future<?> future = executor.submit(() -> {
try (FileInputStream fis = new FileInputStream(tempFile)) {
processXmiGzStream(fis, finalEntry.getName());
} catch (IOException e) {
e.printStackTrace();
} finally {
tempFile.delete();
}
});
futures.add(future);
}
zis.closeEntry();
}
@ -59,7 +71,16 @@ public class XmiExtractor {
Logger.error("Error reading XMI data from ZIP file.");
Logger.error(e.getMessage());
}
flushBatch();
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
flushBatch(); // Synchronously upload the remaining batch
mongoDBHandler.close();
}
private void processXmiGzStream(InputStream inputStream, String filename) {
@ -68,12 +89,10 @@ public class XmiExtractor {
jCas = JCasFactory.createJCas("src/main/resources/speeches/TypeSystem");
XmiCasDeserializer.deserialize(gis, jCas.getCas(), true);
// Build structured analysisResults Document
Document analysisResults = new Document();
// Tokens: Include POS, Lemma, etc.
List<Document> tokens = new ArrayList<>();
for (Token token : JCasUtil.select(jCas, Token.class)) {
for (de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Token token : JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Token.class)) {
Document tokenDoc = new Document()
.append("text", token.getCoveredText())
.append("pos", token.getPos().getPosValue())
@ -82,15 +101,13 @@ public class XmiExtractor {
}
analysisResults.append("tokens", tokens);
// Sentences
List<String> sentences = JCasUtil.select(jCas, Sentence.class).stream()
.map(Sentence::getCoveredText)
List<String> sentences = JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence.class).stream()
.map(de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence::getCoveredText)
.collect(Collectors.toList());
analysisResults.append("sentences", sentences);
// Dependencies
List<Document> dependencies = new ArrayList<>();
for (Dependency dep : JCasUtil.select(jCas, Dependency.class)) {
for (de.tudarmstadt.ukp.dkpro.core.api.syntax.type.dependency.Dependency dep : JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.syntax.type.dependency.Dependency.class)) {
Document depDoc = new Document()
.append("type", dep.getDependencyType())
.append("governor", dep.getGovernor().getCoveredText())
@ -99,9 +116,8 @@ public class XmiExtractor {
}
analysisResults.append("dependencies", dependencies);
// Named Entities
List<Document> namedEntities = new ArrayList<>();
for (NamedEntity ne : JCasUtil.select(jCas, NamedEntity.class)) {
for (de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity ne : JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity.class)) {
Document neDoc = new Document()
.append("text", ne.getCoveredText())
.append("type", ne.getValue());
@ -109,23 +125,16 @@ public class XmiExtractor {
}
analysisResults.append("namedEntities", namedEntities);
// Sentiment
List<Document> sentiments = new ArrayList<>();
for (org.hucompute.textimager.uima.type.Sentiment sentiment :
JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class)) {
for (org.hucompute.textimager.uima.type.Sentiment sentiment : JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class)) {
Document sentimentDoc = new Document()
.append("begin", sentiment.getBegin())
.append("end", sentiment.getEnd())
.append("score", sentiment.getSentiment())
.append("subjectivity", sentiment.getSubjectivity());
// Check for VaderSentiment subtype
if (sentiment instanceof org.hucompute.textimager.uima.type.VaderSentiment) {
org.hucompute.textimager.uima.type.VaderSentiment vader =
(org.hucompute.textimager.uima.type.VaderSentiment) sentiment;
sentimentDoc
.append("pos", vader.getPos())
org.hucompute.textimager.uima.type.VaderSentiment vader = (org.hucompute.textimager.uima.type.VaderSentiment) sentiment;
sentimentDoc.append("pos", vader.getPos())
.append("neu", vader.getNeu())
.append("neg", vader.getNeg());
}
@ -134,39 +143,36 @@ public class XmiExtractor {
analysisResults.append("sentiments", sentiments);
List<Document> topics = new ArrayList<>();
for (CategoryCoveredTagged topic : JCasUtil.select(jCas, CategoryCoveredTagged.class)) {
for (org.hucompute.textimager.uima.type.category.CategoryCoveredTagged topic : JCasUtil.select(jCas, org.hucompute.textimager.uima.type.category.CategoryCoveredTagged.class)) {
Document topicDoc = new Document()
.append("topic", topic.getValue())
.append("score", topic.getScore())
.append("tags", topic.getTags())
.append("text", topic.getCoveredText());
topics.add(topicDoc);
}
topics.sort((d1, d2) -> Double.compare(d2.getDouble("score"), d1.getDouble("score")));
analysisResults.append("topics", topics);
// Upload structured Document to MongoDB
String speechKey = extractSpeechKeyFromFilename(filename);
if (speechKey != null) {
Bson filter = Filters.eq("speechKey", speechKey);
Bson update = new Document("$set", new Document("analysisResults", analysisResults));
UpdateOneModel<Document> updateModel = new UpdateOneModel<>(filter, update);
bulkOperations.add(updateModel);
if (bulkOperations.size() >= BATCH_SIZE) {
flushBatch();
synchronized (bulkOperations) {
if (bulkOperations.size() >= BATCH_SIZE) {
Logger.info("BATCH_SIZE to Upload: " + bulkOperations.size());
flushBatch();
}
}
processedCount++;
if (processedCount % 5000 == 0) {
Logger.info("Processed speeches: " + processedCount);
int count = processedCount.incrementAndGet();
if (count % 1000 == 0) {
Logger.info("Processed speeches: " + count);
}
}
} catch (Exception e) {
e.printStackTrace();
}
finally {
} finally {
if (jCas != null) {
jCas.reset();
}
@ -178,23 +184,11 @@ public class XmiExtractor {
return baseName.replace("20/", "");
}
private void flushBatch() {
private synchronized void flushBatch() {
if (!bulkOperations.isEmpty()) {
mongoDBHandler.bulkWriteNlpData(bulkOperations);
bulkOperations.clear();
}
}
/*
public static void main(String[] args) {
try {
XmiExtractor extractor = new XmiExtractor(database);
extractor.extractAndUploadXmiData();
System.out.println("Processing complete.");
} catch (Exception e) {
e.printStackTrace();
}
} */
}

View file

@ -23,6 +23,8 @@ import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@ -423,4 +425,61 @@ public abstract class PPRUtils {
return fileNames;
}
public static Set<org.w3c.dom.Document> checkAndProcessNewProtocols(MongoDBHandler mongoDBHandler) {
Set<org.w3c.dom.Document> newProtocols = new HashSet<>();
int offset = 0;
int limit = 10;
boolean hasMore = true;
Pattern sessionPattern = Pattern.compile("Plenarprotokoll der (\\d+)\\. Sitzung");
while (hasMore) {
String queryUrl = "https://www.bundestag.de/ajax/filterlist/de/services/opendata/866354-866354?limit="
+ limit + "&noFilterSet=true&offset=" + offset;
try {
org.jsoup.nodes.Document htmlDoc = Jsoup.connect(queryUrl).get();
Elements sessionLinks = htmlDoc.select("a.bt-link-dokument");
if (sessionLinks.isEmpty()) break;
for (org.jsoup.nodes.Element link : sessionLinks) {
String xmlUrl = link.attr("href");
String fileName = xmlUrl.substring(xmlUrl.lastIndexOf('/') + 1); // "20212.xml"
// Entferne die Dateiendung
String sessionNumberFull = fileName.replace(".xml", ""); // z.B. "20212"
String sessionNumber;
if (sessionNumberFull.startsWith("20") && sessionNumberFull.length() > 2) {
sessionNumber = sessionNumberFull.substring(2);
} else {
sessionNumber = sessionNumberFull;
}
if (!mongoDBHandler.sessionExists(sessionNumber)) {
try {
org.w3c.dom.Document xmlDoc = downloadAndParseXML(xmlUrl);
newProtocols.add(xmlDoc);
} catch (Exception ex) {
Logger.error("Error processing XML for session " + sessionNumber + ": " + ex.getMessage());
}
}
}
org.jsoup.nodes.Element metaSlider = htmlDoc.selectFirst("div.meta-slider");
if (metaSlider != null && metaSlider.hasAttr("data-nextoffset")) {
int nextOffset = Integer.parseInt(metaSlider.attr("data-nextoffset"));
if (nextOffset <= offset) {
hasMore = false;
} else {
offset = nextOffset;
}
} else {
hasMore = false;
}
} catch (IOException e) {
Logger.error("Error loading page: " + queryUrl + " : " + e.getMessage());
break;
}
}
return newProtocols;
}
}

View file

@ -40,7 +40,6 @@ public class SpeechParser {
}
public List<Session> parseAllSessions() {
List<Session> sessionsEmpty = new ArrayList<>();
List<Session> sessions = new ArrayList<>();
this.speeches = new ArrayList<>();
this.agendaItems = new ArrayList<>();
@ -61,6 +60,26 @@ public class SpeechParser {
}
public List<Session> parseAllSessions(Set<Document> xmlDocuments) {
List<Session> sessions = new ArrayList<>();
this.speeches = new ArrayList<>();
this.agendaItems = new ArrayList<>();
Logger.info("All new sessions parsed");
for (org.w3c.dom.Document xmlDoc : xmlDocuments) {
try {
File tempFile = convertDocumentToFile(xmlDoc);
Session session = parseSessionFile(tempFile);
sessions.add(session);
tempFile.delete(); // Lösche die temporäre Datei nach der Verarbeitung
} catch (Exception e) {
Logger.error("Error parsing XML document.");
Logger.error(e.getMessage());
}
}
return sessions;
}
private Session parseSessionFile(File file) throws Exception {
//file = removeDoctypeAnnotation(file.getAbsolutePath());