package Sequenic.T2; import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import Sequenic.T2.Msg.T2Exception; import Sequenic.T2.Seq.Trace; public class StreamTraces implements Iterator, Closeable { private final TraceFilter filter; private Trace prev; private int remaining_in; private final Deque inputFiles; private ObjectInputStream in; private ObjectOutputStream out; private final TrFile main; private Future futureNext; private final AtomicInteger totRead, totWritten; private final ExecutorService parallelIO; public StreamTraces(TraceFilter filter, String outFile, Collection inFiles) throws IOException, ClassNotFoundException, T2Exception { this.filter = filter; inputFiles = new ArrayDeque(inFiles); out = TrFile.initOut(outFile); main = initializeMain(); out.writeObject(main); out.writeInt(-2); prev = null; totRead = new AtomicInteger(); totWritten = new AtomicInteger(); parallelIO = Executors.newSingleThreadExecutor(); futureNext = newTask(); } public StreamTraces(String inFile) throws IOException, ClassNotFoundException, T2Exception { this(TraceFilter.trueFilter,null,Collections.singleton(inFile)); } private TrFile initializeMain() throws IOException, ClassNotFoundException { in = TrFile.initIn(inputFiles.pop()); TrFile x = (TrFile) in.readObject(); remaining_in = in.readInt(); return x; } private Trace readNext() throws IOException, ClassNotFoundException, T2Exception { Trace t; switch (remaining_in) { case -2: t = Trace.alternativeSerializationRead(in); if(t!=null) { break; } case -1: case 0: in.close(); if(!inputFiles.isEmpty()) { in = TrFile.initIn(inputFiles.pop()); main.compatible((TrFile) in.readObject()); remaining_in = in.readInt(); return readNext(); } else { return null; } default: remaining_in--; t = Trace.alternativeSerializationRead(in); } totRead.incrementAndGet(); return t; } @Override public boolean hasNext() { try { if(futureNext.get()==null) return false; } catch (Exception e) { e.printStackTrace(); } return true; } @Override public Trace next() { try { final Trace current = futureNext.get(); if(current==null) throw new NoSuchElementException(); futureNext = newTask(); prev = current; return current; } catch (Exception e) { throw new NoSuchElementException("Could not read next: "+e.getMessage()); } } @Override public void remove() { prev = null; } private Future newTask() { final Trace previous = prev; return parallelIO.submit( new Callable() { @Override public Trace call() throws Exception { if(previous!=null) { totWritten.getAndIncrement(); previous.alternativeSerializationWrite(out); } Trace res; do { res = readNext(); } while(res!=null && !filter.add(res)); return res; } } ); } @Override public void close() throws IOException { parallelIO.shutdown(); try { parallelIO.awaitTermination(5,TimeUnit.MINUTES); } catch (InterruptedException e) { throw new IOException("Could not shutdown parallel IO thread within 5 minutes"); } if(prev!=null) { totWritten.getAndIncrement(); prev.alternativeSerializationWrite(out); } out.write(Trace.endOfStream); in.close(); out.close(); } public TrFile trFile() { return main; } public int totalRead() { return totRead.get(); } public int totalWritten() { return totWritten.get(); } public interface TraceFilter { public boolean add(Trace t); public final TraceFilter obsolete = new TraceFilter() { @Override public boolean add(Trace t) { return !t.isObsolete(); } }; public final TraceFilter trueFilter = new TraceFilter() { @Override public boolean add(Trace t) { return true; } }; public final TraceFilter falseFilter = new TraceFilter() { @Override public boolean add(Trace t) { return false; } }; public final Callable trueCallable = SelectiveParallelExecutor.callable(trueFilter); public final Callable falseCallable = SelectiveParallelExecutor.callable(falseFilter); public final Callable obsoleteCallable = SelectiveParallelExecutor.callable(obsolete); } }