package Sequenic.T2; import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import Sequenic.T2.Engines.Util; import Sequenic.T2.Seq.ReportersPool; import Sequenic.T2.Seq.Trace; import Sequenic.T2.StreamTraces.TraceFilter; //Uses a parallel thread to read a TrFile and selectively executes public class SelectiveParallelExecutor implements Closeable { static final Trace poison = new Trace(); TrFile main; private final ExecutorService parThreads; private final BlockingQueue fromDisk, toExecutor; public SelectiveParallelExecutor() { parThreads = Executors.newFixedThreadPool(2); fromDisk = new ArrayBlockingQueue(5000); toExecutor = new ArrayBlockingQueue(5000); } public int execute(final String fname, final TraceFilter filter) throws InterruptedException, ClassNotFoundException { return execute(fname,callable(filter),true); } public int execute(final String trFile, final Callable filter, final boolean execute) throws InterruptedException, ClassNotFoundException { parThreads.submit(new FilterRunner(filter, fromDisk, toExecutor)); parThreads.submit(new Reader(trFile, fromDisk)); return execute(execute); } private int execute(final boolean execute) throws InterruptedException, ClassNotFoundException { final BlockingQueue locQueue = toExecutor; final Trace locPoison = poison; Trace t; int count = 0; locQueue.take();//will unblock once main has initialized final Class c = Class.forName(main.CUTname); final Pool p = (Pool) Util.mkAnInstance(main.poolClassName); final List classinv = new ArrayList(); final ReportersPool nullReporter = ReportersPool.NULLreporter; for (String cowner : main.classinvariantOwners) { classinv.add(Util.getClassINV(Class.forName(cowner))); } while((t=locQueue.take())!=locPoison) { if(execute) t.exec(c, p, classinv, t.diverging, false, nullReporter); count++; } return count; } @Override public void close() throws IOException { parThreads.shutdown(); try { parThreads.awaitTermination(5, TimeUnit.MINUTES); } catch (InterruptedException e) { throw new IOException(e); } } private class Reader implements Runnable { private final String fname; private final BlockingQueue out; private Reader(final String fname, final BlockingQueue out) { this.fname = fname; this.out = out; } @Override public void run() { try { final ObjectInputStream ois = TrFile.initIn(fname); main = (TrFile) ois.readObject(); toExecutor.put(poison); //signal that main is initialized, bypass filter final BlockingQueue locQueue = out; int size = ois.readInt(); while(size>0) { locQueue.put(Trace.alternativeSerializationRead(ois)); size--; } if(size==-2) { while(true) { Trace t = Trace.alternativeSerializationRead(ois); if(t==null) break; else locQueue.put(t); } } locQueue.put(poison);//signal that all traces have been read; ois.close(); } catch (Exception e) { System.err.println("Exception on file read"); e.printStackTrace(); } } } private class FilterRunner implements Callable{ private FilterRunner(final Callable filter, final BlockingQueue in, final BlockingQueue out) { this.filter = filter; this.in = in; this.out = out; } private final Callable filter; private final BlockingQueue in, out; @Override public Object call() throws Exception { final BlockingQueue in = this.in; final BlockingQueue out = this.out; final TraceFilter locFilter = filter.call(); final Trace locPoison = poison; Trace t; try { while((t=in.take())!=locPoison) { if(locFilter.add(t)); out.put(t); } out.put(t);//signal end to executor } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } } public static Callable callable(final T object) { return new Callable() { @Override public T call() throws Exception { return object; } }; } }