diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/Main.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/Main.java index 362e308..811bb35 100644 --- a/src/main/java/org/texttechnologylab/project/gruppe_05_1/Main.java +++ b/src/main/java/org/texttechnologylab/project/gruppe_05_1/Main.java @@ -8,10 +8,18 @@ import org.texttechnologylab.project.gruppe_05_1.rest.RESTHandler; import org.texttechnologylab.project.gruppe_05_1.util.Logger; import org.texttechnologylab.project.gruppe_05_1.util.PPRUtils; import org.texttechnologylab.project.gruppe_05_1.xml.FileObjectFactory; +import org.texttechnologylab.project.gruppe_05_1.xml.speeches.SpeechParser; +import org.w3c.dom.Document; + import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; +import static org.texttechnologylab.project.gruppe_05_1.util.PPRUtils.checkAndProcessNewProtocols; public class Main { public static boolean UPLOAD_MEMBER_PHOTOS; @@ -111,13 +119,33 @@ public class Main { Logger.pink("Uploading Member Photos to DB..."); mongoDBHandler.uploadMemberPhotos(); } - mongoDBHandler.close(); - try { - NlpUtils.runRemoteDriver(); - } catch (Exception e) { - Logger.error("Error while running NLP remote driver"); - Logger.error(e.getMessage()); - } + NlpUtils.runRemoteDriver(); + /*ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate(() -> { + try { + NlpUtils.runRemoteDriver(); + } catch (Exception e) { + Logger.error("Error while running NLP remote driver"); + Logger.error(e.getMessage()); + } + try { + Logger.info("Starte Aktualisierung der Protokolle..."); + Set newProtocols = checkAndProcessNewProtocols(mongoDBHandler); + Logger.info("Neue Protokolle gefunden: " + newProtocols.size()); + if (newProtocols.isEmpty()) { + Logger.info("Keine neuen Protokolle gefunden, Upload wird übersprungen."); + } else { + SpeechParser speechParser = new SpeechParser(); + mongoDBHandler.insertSessions(speechParser.parseAllSessions(newProtocols)); + mongoDBHandler.insertAgendaItems(speechParser.getAgendaItems()); + mongoDBHandler.insertSpeeches(speechParser.getSpeeches()); + Logger.info("Neuer Protokolle uploaded: " + newProtocols.size()); + } + } catch (Exception ex) { + Logger.error("Fehler bei der Protokollaktualisierung: " + ex.getMessage()); + } + }, 0, 10, TimeUnit.MINUTES);*/ + RESTHandler restHandler = new RESTHandler(); restHandler.startJavalin(); diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/database/MongoDBHandler.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/database/MongoDBHandler.java index edc05c2..e7d38d4 100644 --- a/src/main/java/org/texttechnologylab/project/gruppe_05_1/database/MongoDBHandler.java +++ b/src/main/java/org/texttechnologylab/project/gruppe_05_1/database/MongoDBHandler.java @@ -3,6 +3,7 @@ package org.texttechnologylab.project.gruppe_05_1.database; import com.mongodb.MongoClientSettings; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; +import com.mongodb.WriteConcern; import com.mongodb.bulk.BulkWriteResult; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; @@ -693,7 +694,9 @@ public class MongoDBHandler { public void bulkWriteNlpData(List> bulkOperations) { if (!bulkOperations.isEmpty()) { - BulkWriteResult result = speechesCollection.bulkWrite(bulkOperations); + BulkWriteOptions options = new BulkWriteOptions().ordered(false); + // Optional: Setze einen weniger strengen Write Concern + BulkWriteResult result = speechesCollection.bulkWrite(bulkOperations, options); int modifiedCount = result.getModifiedCount(); int matchedCount = result.getMatchedCount(); int upsertCount = result.getUpserts().size(); @@ -766,6 +769,12 @@ public class MongoDBHandler { } } + public boolean sessionExists(String sessionNumber) { + Document filter = new Document("sessionNumber", sessionNumber); + long count = sessionsCollection.countDocuments(filter); + return count > 0; + } + public void close() { mongoClient.close(); } diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/NlpUtils.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/NlpUtils.java index dbc18b1..51ad002 100644 --- a/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/NlpUtils.java +++ b/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/NlpUtils.java @@ -353,13 +353,15 @@ public class NlpUtils { JCas jCas; jCas = rede.toCas(); pComposer.run(jCas); - System.out.println("Anzahl Sentiment-Annotationen: " + JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class).size()); + //System.out.println("Anzahl Sentiment-Annotationen: " + JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class).size()); Document analysisResults = extractAnnotations(jCas); Bson updateFilter = Filters.eq("speechKey", rede.getSpeechKey()); Bson update = new Document("$set", new Document("analysisResults", analysisResults)); bulkOperations.add(new UpdateOneModel<>(updateFilter, update)); } if (!bulkOperations.isEmpty()) { + System.out.println("Processing of " + bulkOperations.size() + " documents finished"); + System.out.println("uploading..."); mongoDBHandler.bulkWriteNlpData(bulkOperations); System.out.println("Bulk write completed for " + bulkOperations.size() + " documents."); mongoDBHandler.close(); diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/XmiExtractor.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/XmiExtractor.java index 2463891..335d73e 100644 --- a/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/XmiExtractor.java +++ b/src/main/java/org/texttechnologylab/project/gruppe_05_1/nlp/XmiExtractor.java @@ -1,15 +1,16 @@ package org.texttechnologylab.project.gruppe_05_1.nlp; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.WriteModel; -import com.mongodb.client.result.UpdateResult; import org.apache.uima.fit.util.JCasUtil; import org.bson.Document; import java.io.*; -import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.*; import java.util.*; import java.util.stream.Collectors; @@ -18,40 +19,51 @@ import org.bson.conversions.Bson; import org.apache.uima.fit.factory.JCasFactory; import org.apache.uima.jcas.JCas; import org.apache.uima.cas.impl.XmiCasDeserializer; -import de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Token; -import de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence; -import de.tudarmstadt.ukp.dkpro.core.api.syntax.type.dependency.Dependency; -import de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity; import org.texttechnologylab.project.gruppe_05_1.database.MongoDBHandler; -import org.hucompute.textimager.uima.type.category.CategoryCoveredTagged; import org.texttechnologylab.project.gruppe_05_1.util.Logger; public class XmiExtractor { - private List> bulkOperations; - private MongoDBHandler mongoDBHandler; + private final List> bulkOperations = Collections.synchronizedList(new ArrayList<>()); + private final MongoDBHandler mongoDBHandler; private static final int BATCH_SIZE = 1000; - private int processedCount = 0; + private static final AtomicInteger processedCount = new AtomicInteger(0); + public XmiExtractor() { mongoDBHandler = new MongoDBHandler(); - this.bulkOperations = new ArrayList<>(); } public void extractAndUploadXmiData() throws IOException { InputStream resourceStream = getClass().getClassLoader().getResourceAsStream("speeches/20.zip"); + if (resourceStream == null) { + throw new IOException("20.zip nicht gefunden im Ressourcenordner /speeches"); + } + ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + List> futures = new ArrayList<>(); try (ZipInputStream zis = new ZipInputStream(resourceStream)) { ZipEntry entry; while ((entry = zis.getNextEntry()) != null) { if (entry.getName().endsWith(".xmi.gz")) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - int len; - while ((len = zis.read(buffer)) > 0) { - baos.write(buffer, 0, len); + File tempFile = File.createTempFile("xmi_entry_", ".xmi.gz"); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + byte[] buffer = new byte[1024]; + int len; + while ((len = zis.read(buffer)) > 0) { + fos.write(buffer, 0, len); + } } - byte[] entryData = baos.toByteArray(); - processXmiGzStream(new ByteArrayInputStream(entryData), entry.getName()); + ZipEntry finalEntry = entry; + Future future = executor.submit(() -> { + try (FileInputStream fis = new FileInputStream(tempFile)) { + processXmiGzStream(fis, finalEntry.getName()); + } catch (IOException e) { + e.printStackTrace(); + } finally { + tempFile.delete(); + } + }); + futures.add(future); } zis.closeEntry(); } @@ -59,7 +71,16 @@ public class XmiExtractor { Logger.error("Error reading XMI data from ZIP file."); Logger.error(e.getMessage()); } - flushBatch(); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + e.printStackTrace(); + } + } + executor.shutdown(); + flushBatch(); // Synchronously upload the remaining batch + mongoDBHandler.close(); } private void processXmiGzStream(InputStream inputStream, String filename) { @@ -68,12 +89,10 @@ public class XmiExtractor { jCas = JCasFactory.createJCas("src/main/resources/speeches/TypeSystem"); XmiCasDeserializer.deserialize(gis, jCas.getCas(), true); - // Build structured analysisResults Document Document analysisResults = new Document(); - // Tokens: Include POS, Lemma, etc. List tokens = new ArrayList<>(); - for (Token token : JCasUtil.select(jCas, Token.class)) { + for (de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Token token : JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Token.class)) { Document tokenDoc = new Document() .append("text", token.getCoveredText()) .append("pos", token.getPos().getPosValue()) @@ -82,15 +101,13 @@ public class XmiExtractor { } analysisResults.append("tokens", tokens); - // Sentences - List sentences = JCasUtil.select(jCas, Sentence.class).stream() - .map(Sentence::getCoveredText) + List sentences = JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence.class).stream() + .map(de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence::getCoveredText) .collect(Collectors.toList()); analysisResults.append("sentences", sentences); - // Dependencies List dependencies = new ArrayList<>(); - for (Dependency dep : JCasUtil.select(jCas, Dependency.class)) { + for (de.tudarmstadt.ukp.dkpro.core.api.syntax.type.dependency.Dependency dep : JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.syntax.type.dependency.Dependency.class)) { Document depDoc = new Document() .append("type", dep.getDependencyType()) .append("governor", dep.getGovernor().getCoveredText()) @@ -99,9 +116,8 @@ public class XmiExtractor { } analysisResults.append("dependencies", dependencies); - // Named Entities List namedEntities = new ArrayList<>(); - for (NamedEntity ne : JCasUtil.select(jCas, NamedEntity.class)) { + for (de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity ne : JCasUtil.select(jCas, de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity.class)) { Document neDoc = new Document() .append("text", ne.getCoveredText()) .append("type", ne.getValue()); @@ -109,23 +125,16 @@ public class XmiExtractor { } analysisResults.append("namedEntities", namedEntities); - // Sentiment List sentiments = new ArrayList<>(); - for (org.hucompute.textimager.uima.type.Sentiment sentiment : - JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class)) { - + for (org.hucompute.textimager.uima.type.Sentiment sentiment : JCasUtil.select(jCas, org.hucompute.textimager.uima.type.Sentiment.class)) { Document sentimentDoc = new Document() .append("begin", sentiment.getBegin()) .append("end", sentiment.getEnd()) .append("score", sentiment.getSentiment()) .append("subjectivity", sentiment.getSubjectivity()); - - // Check for VaderSentiment subtype if (sentiment instanceof org.hucompute.textimager.uima.type.VaderSentiment) { - org.hucompute.textimager.uima.type.VaderSentiment vader = - (org.hucompute.textimager.uima.type.VaderSentiment) sentiment; - sentimentDoc - .append("pos", vader.getPos()) + org.hucompute.textimager.uima.type.VaderSentiment vader = (org.hucompute.textimager.uima.type.VaderSentiment) sentiment; + sentimentDoc.append("pos", vader.getPos()) .append("neu", vader.getNeu()) .append("neg", vader.getNeg()); } @@ -134,39 +143,36 @@ public class XmiExtractor { analysisResults.append("sentiments", sentiments); List topics = new ArrayList<>(); - for (CategoryCoveredTagged topic : JCasUtil.select(jCas, CategoryCoveredTagged.class)) { + for (org.hucompute.textimager.uima.type.category.CategoryCoveredTagged topic : JCasUtil.select(jCas, org.hucompute.textimager.uima.type.category.CategoryCoveredTagged.class)) { Document topicDoc = new Document() .append("topic", topic.getValue()) .append("score", topic.getScore()) - .append("tags", topic.getTags()) .append("text", topic.getCoveredText()); topics.add(topicDoc); } topics.sort((d1, d2) -> Double.compare(d2.getDouble("score"), d1.getDouble("score"))); analysisResults.append("topics", topics); - - // Upload structured Document to MongoDB String speechKey = extractSpeechKeyFromFilename(filename); if (speechKey != null) { Bson filter = Filters.eq("speechKey", speechKey); Bson update = new Document("$set", new Document("analysisResults", analysisResults)); UpdateOneModel updateModel = new UpdateOneModel<>(filter, update); bulkOperations.add(updateModel); - if (bulkOperations.size() >= BATCH_SIZE) { - flushBatch(); + synchronized (bulkOperations) { + if (bulkOperations.size() >= BATCH_SIZE) { + Logger.info("BATCH_SIZE to Upload: " + bulkOperations.size()); + flushBatch(); + } } - processedCount++; - if (processedCount % 5000 == 0) { - Logger.info("Processed speeches: " + processedCount); + int count = processedCount.incrementAndGet(); + if (count % 1000 == 0) { + Logger.info("Processed speeches: " + count); } - } - } catch (Exception e) { e.printStackTrace(); - } - finally { + } finally { if (jCas != null) { jCas.reset(); } @@ -178,23 +184,11 @@ public class XmiExtractor { return baseName.replace("20/", ""); } - private void flushBatch() { + private synchronized void flushBatch() { if (!bulkOperations.isEmpty()) { mongoDBHandler.bulkWriteNlpData(bulkOperations); bulkOperations.clear(); } } - - - /* - public static void main(String[] args) { - try { - XmiExtractor extractor = new XmiExtractor(database); - extractor.extractAndUploadXmiData(); - System.out.println("Processing complete."); - } catch (Exception e) { - e.printStackTrace(); - } - } */ } diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/rest/RedenController.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/rest/RedenController.java new file mode 100644 index 0000000..e69de29 diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/util/PPRUtils.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/util/PPRUtils.java index 25e2aad..30dd722 100644 --- a/src/main/java/org/texttechnologylab/project/gruppe_05_1/util/PPRUtils.java +++ b/src/main/java/org/texttechnologylab/project/gruppe_05_1/util/PPRUtils.java @@ -23,6 +23,8 @@ import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -423,4 +425,61 @@ public abstract class PPRUtils { return fileNames; } + public static Set checkAndProcessNewProtocols(MongoDBHandler mongoDBHandler) { + Set newProtocols = new HashSet<>(); + int offset = 0; + int limit = 10; + boolean hasMore = true; + Pattern sessionPattern = Pattern.compile("Plenarprotokoll der (\\d+)\\. Sitzung"); + + while (hasMore) { + String queryUrl = "https://www.bundestag.de/ajax/filterlist/de/services/opendata/866354-866354?limit=" + + limit + "&noFilterSet=true&offset=" + offset; + try { + org.jsoup.nodes.Document htmlDoc = Jsoup.connect(queryUrl).get(); + Elements sessionLinks = htmlDoc.select("a.bt-link-dokument"); + if (sessionLinks.isEmpty()) break; + + for (org.jsoup.nodes.Element link : sessionLinks) { + String xmlUrl = link.attr("href"); + String fileName = xmlUrl.substring(xmlUrl.lastIndexOf('/') + 1); // "20212.xml" + // Entferne die Dateiendung + String sessionNumberFull = fileName.replace(".xml", ""); // z.B. "20212" + String sessionNumber; + if (sessionNumberFull.startsWith("20") && sessionNumberFull.length() > 2) { + sessionNumber = sessionNumberFull.substring(2); + } else { + sessionNumber = sessionNumberFull; + } + if (!mongoDBHandler.sessionExists(sessionNumber)) { + try { + org.w3c.dom.Document xmlDoc = downloadAndParseXML(xmlUrl); + newProtocols.add(xmlDoc); + } catch (Exception ex) { + Logger.error("Error processing XML for session " + sessionNumber + ": " + ex.getMessage()); + } + } + + } + + org.jsoup.nodes.Element metaSlider = htmlDoc.selectFirst("div.meta-slider"); + if (metaSlider != null && metaSlider.hasAttr("data-nextoffset")) { + int nextOffset = Integer.parseInt(metaSlider.attr("data-nextoffset")); + if (nextOffset <= offset) { + hasMore = false; + } else { + offset = nextOffset; + } + } else { + hasMore = false; + } + } catch (IOException e) { + Logger.error("Error loading page: " + queryUrl + " : " + e.getMessage()); + break; + } + } + return newProtocols; + } + + } diff --git a/src/main/java/org/texttechnologylab/project/gruppe_05_1/xml/speeches/SpeechParser.java b/src/main/java/org/texttechnologylab/project/gruppe_05_1/xml/speeches/SpeechParser.java index c4c1ef6..1750a36 100644 --- a/src/main/java/org/texttechnologylab/project/gruppe_05_1/xml/speeches/SpeechParser.java +++ b/src/main/java/org/texttechnologylab/project/gruppe_05_1/xml/speeches/SpeechParser.java @@ -40,7 +40,6 @@ public class SpeechParser { } public List parseAllSessions() { - List sessionsEmpty = new ArrayList<>(); List sessions = new ArrayList<>(); this.speeches = new ArrayList<>(); this.agendaItems = new ArrayList<>(); @@ -61,6 +60,26 @@ public class SpeechParser { } + public List parseAllSessions(Set xmlDocuments) { + List sessions = new ArrayList<>(); + this.speeches = new ArrayList<>(); + this.agendaItems = new ArrayList<>(); + Logger.info("All new sessions parsed"); + for (org.w3c.dom.Document xmlDoc : xmlDocuments) { + try { + File tempFile = convertDocumentToFile(xmlDoc); + Session session = parseSessionFile(tempFile); + sessions.add(session); + tempFile.delete(); // Lösche die temporäre Datei nach der Verarbeitung + } catch (Exception e) { + Logger.error("Error parsing XML document."); + Logger.error(e.getMessage()); + } + } + return sessions; + + } + private Session parseSessionFile(File file) throws Exception { //file = removeDoctypeAnnotation(file.getAbsolutePath());