package dk.daoas.adressevedligehold.afstandandenrute; import java.sql.Connection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import ags.utils.dataStructures.trees.thirdGenKD.KdTree; import dk.daoas.adressevedligehold.ReloadHelper; import dk.daoas.adressevedligehold.ServiceConfig; import dk.daoas.adressevedligehold.beans.Address; import dk.daoas.adressevedligehold.db.DBConnection; import dk.daoas.adressevedligehold.tasks.Task; import dk.daoas.adressevedligehold.tasks.TaskLogger; public class AfstandAndenRuteTask extends Task { private TaskLogger logger = TaskLogger.getInstance(); final static boolean VERBOSE = false;// public final static String INCREMENTAL = "INCREMENTAL"; public final static String FULL = "FULL"; static boolean test_mode; String distributor; int antalIkkeDaekkede = 0; AtomicInteger antalBeregnet = new AtomicInteger(); ThreadPoolExecutor threadPool; boolean isIncremental; public AfstandAndenRuteTask(String distributor, String type) { this.distributor = distributor; type = type.toUpperCase(); switch(type) { case FULL: isIncremental = false; break; case INCREMENTAL: isIncremental = true; break; default: throw new RuntimeException("Unknown type" + type); } } @Override protected void taskRun() throws Exception { Constants.init(distributor); Constants consts = Constants.getInstance(); int max_workers = ServiceConfig.getInstance().maxWorkers; if (max_workers <= 0) { logger.info("!!! AUTO-DETECT MAX_WORKERS !!!"); int cores = Runtime.getRuntime().availableProcessors(); cores -= 1;//Efterlad 1 core/cpu i reserve til systemet max_workers = Math.max(1, cores); //Dog skal der som minimum være 1 core til beregning } if (test_mode) { max_workers = 1; } logger.info("Starting with MAX_WORKERS:" + max_workers); setupThreadPool(max_workers); logger.info("Starting with INCREMENTAL:" + isIncremental); AtomicInteger antalFundne = new AtomicInteger(0); long start = System.currentTimeMillis(); AtomicInteger antalDaekkedeAdresser = new AtomicInteger(-1); // MAIN RUN START logger.info("======================================================================"); mainRun(consts, isIncremental, "ruteMa", max_workers, antalFundne, antalDaekkedeAdresser); logger.info("======================================================================"); mainRun(consts, true, "ruteLo", max_workers, antalFundne, antalDaekkedeAdresser); logger.info("======================================================================"); mainRun(consts, true, "ruteSo", max_workers, antalFundne, antalDaekkedeAdresser); logger.info("======================================================================"); // END OF MAIN run threadPool.shutdown(); //Calc is done now long now = System.currentTimeMillis(); long elapsed = now - start ; logger.info("Fuld load done : " + formatMilliSeconds(elapsed) ); logger.info("Antal daekkede : " + antalDaekkedeAdresser ); logger.info("Antal ikke-daekkede : " + antalIkkeDaekkede ); logger.info("Heraf, antal fundne : " + antalFundne ); logger.info( String.format("Fandt adresser til : %.2f %%", (antalFundne.get() *100.0)/antalIkkeDaekkede ) ); } private void mainRun(Constants consts, boolean localIsIncremental, String ugedag, int max_workers, AtomicInteger antalFundne, AtomicInteger antalDaekkedeAdresser) throws Exception { logger.info("MainRun() isIncremental=" + localIsIncremental + " ugedag=" + ugedag); try (Connection conn = DBConnection.getConnection() ) { DatabaseRouteDistance db = new DatabaseRouteDistance(conn, localIsIncremental, ugedag); if (localIsIncremental) { db.prepareIncrementalSearch(); } Queue
ikkeDaekkede = db.hentAlleIkkedaekkedeAdresser(consts.getMinPostnr(), consts.getMaxPostnr() ); antalIkkeDaekkede += ikkeDaekkede.size(); logger.info("Antal ikke-daekkede: " + antalIkkeDaekkede ); logger.info("Finder postnumre"); Set postnumre = db.hentPostnumreCache(); // Først validerer vi BBox på alle postnummre, for at undgå fuldt stop midt i beregningen for(short postnr : postnumre) { // logger.info("Validerer BBox for " + postnr); BoundingBox bbox = db.getBoundingbox(postnr); bbox.validateBbox(); } logger.info("Henter alle daekkede adresser"); int tmpAntalDaekkede = db.hentAlleDaekkedeAdresser(distributor); antalDaekkedeAdresser.set(tmpAntalDaekkede); logger.info( "AlleDaekkedeAdresser.length=" + antalDaekkedeAdresser ); Map> addrHoList = db.getDaekkedeAdresserHO(); Map> hoTrees = new HashMap>(); for ( Map.Entry> entry : addrHoList.entrySet() ) { short ho = entry.getKey(); List
geopoints = entry.getValue(); logger.info("Opbygger KDTree for " + ho + " - antal=" + geopoints.size() ); int bucketSize = 96*12; KdTree
addressTree = new KdTree
( 3, bucketSize ); for(Address a: geopoints) { addressTree.addPoint(a.xyz, a); } hoTrees.put(ho, addressTree); } db.resetResultTable(); logger.info("Starter beregning"); //pre-check er ok - reset tmp tabel og start søgningen CyclicBarrier barrier = new CyclicBarrier(max_workers + 1); for (int i=0; i