/[projects]/dao/DaoAdresseVedligehold/src/main/java/dk/daoas/adressevedligehold/afstandandenrute/AfstandAndenRuteTask.java
ViewVC logotype

Diff of /dao/DaoAdresseVedligehold/src/main/java/dk/daoas/adressevedligehold/afstandandenrute/AfstandAndenRuteTask.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2903 by torben, Wed Feb 3 18:45:33 2016 UTC revision 2994 by torben, Tue Mar 29 20:36:40 2016 UTC
# Line 7  import java.util.Map; Line 7  import java.util.Map;
7  import java.util.Queue;  import java.util.Queue;
8  import java.util.Set;  import java.util.Set;
9  import java.util.concurrent.CyclicBarrier;  import java.util.concurrent.CyclicBarrier;
10    import java.util.concurrent.ExecutorService;
11  import java.util.concurrent.Executors;  import java.util.concurrent.Executors;
12  import java.util.concurrent.ThreadFactory;  import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;  
13  import java.util.concurrent.atomic.AtomicInteger;  import java.util.concurrent.atomic.AtomicInteger;
14    
15  import ags.utils.dataStructures.trees.thirdGenKD.KdTree;  import ags.utils.dataStructures.trees.thirdGenKD.KdTree;
16  import dk.daoas.adressevedligehold.ReloadHelper;  import dk.daoas.adressevedligehold.ReloadTask;
17  import dk.daoas.adressevedligehold.ServiceConfig;  import dk.daoas.adressevedligehold.ServiceConfig;
18  import dk.daoas.adressevedligehold.beans.Address;  import dk.daoas.adressevedligehold.beans.Address;
19  import dk.daoas.adressevedligehold.db.DBConnection;  import dk.daoas.adressevedligehold.db.DBConnection;
# Line 23  import dk.daoas.adressevedligehold.tasks Line 23  import dk.daoas.adressevedligehold.tasks
23    
24    
25  public class AfstandAndenRuteTask extends Task {  public class AfstandAndenRuteTask extends Task {
26            
27    
28          private TaskLogger logger = TaskLogger.getInstance();          private TaskLogger logger = TaskLogger.getInstance();
29    
30          final boolean verbose = false;//          final static boolean VERBOSE = false;//
31            
32            public final static String INCREMENTAL = "INCREMENTAL";
33            public final static String FULL = "FULL";
34    
35    
36          static boolean test_mode;          static boolean test_mode;
37    
38    
39          String distributor;          String distributor;
40          int antalIkkeDaekkede = -1;          int antalIkkeDaekkede = 0;
41                    
42          AtomicInteger antalBeregnet = new AtomicInteger();          AtomicInteger antalBeregnet = new AtomicInteger();
43    
44    
45    
46          ThreadPoolExecutor threadPool;          ExecutorService threadPool;
47    
48            boolean isIncremental;
49    
50    
51          public AfstandAndenRuteTask(String distributor) {          public AfstandAndenRuteTask(String distributor, String type) {
52                  this.distributor = distributor;                  this.distributor = distributor;
53                    
54                    type = type.toUpperCase();
55                    switch(type) {
56                    case FULL:
57                            isIncremental = false;
58                            break;
59                    case INCREMENTAL:
60                            isIncremental = true;
61                            break;
62                    default:
63                            throw new RuntimeException("Unknown type" + type);                      
64                    }
65          }          }
66    
67    
68          @Override          @Override
69          protected void taskRun() throws Exception {          protected void taskRun() throws Exception {
70                    
   
   
   
                 //Setup j.u.l Logger  
                 //Logger root = Logger.getLogger("");  
                 //FileHandler fhandler = new FileHandler("fulddaekning.log"); // Ingen max størrelse, ingen rotation og ingen append  
                 //fhandler.setFormatter( new SimpleFormatter() );  
                 //root.addHandler( fhandler );  
   
   
71    
72                  Constants.init(distributor);                  Constants.init(distributor);
73                  Constants consts = Constants.getInstance();                  Constants consts = Constants.getInstance();
# Line 78  public class AfstandAndenRuteTask extend Line 85  public class AfstandAndenRuteTask extend
85                          max_workers = 1;                          max_workers = 1;
86                  }                  }
87                  logger.info("Starting with MAX_WORKERS:" + max_workers);                          logger.info("Starting with MAX_WORKERS:" + max_workers);        
                 setupThreadPool(max_workers);  
88    
89                    threadPool = Executors.newFixedThreadPool(max_workers, new WorkerThreadFactory() );
90                    
91                    logger.info("Starting with INCREMENTAL:" + isIncremental);
92    
93    
                 try (Connection conn = DBConnection.getConnection() ) {  
                         Database db = new Database(conn);  
94    
95                          Queue<Address> ikkeDaekkede = db.hentAlleIkkedaekkedeAdresser(consts.getMinPostnr(), consts.getMaxPostnr() );                  
                         antalIkkeDaekkede = ikkeDaekkede.size();  
                         logger.info("Antal ikke-daekkede: " + antalIkkeDaekkede );  
96    
97    
                         boolean testRun= false;  
98    
99                          AtomicInteger antalFundne = new AtomicInteger(0);                  AtomicInteger antalFundne = new AtomicInteger(0);
100    
101                          long beregnStart =0;                  long start = System.currentTimeMillis();
102                          long start = System.currentTimeMillis();                  AtomicInteger antalDaekkedeAdresser = new AtomicInteger(-1);
                         int antalDaekkedeAdresser = -1;  
103    
                         if (testRun == false) {  
104    
105                                  logger.info("Finder postnumre");                  // MAIN RUN START
                                 Set<Short> postnumre = db.hentPostnumreCache();  
   
                                 // Først validerer vi BBox på alle postnummre, for at undgå fuldt stop midt i beregningen  
                                 for(short postnr : postnumre) { //  
                                         logger.info("Validerer BBox for " + postnr);  
                                         BoundingBox bbox = db.getBoundingbox(postnr);  
                                         bbox.validateBbox();  
                                 }  
106    
107                    logger.info("======================================================================");
108                    
109                    mainRun(consts, isIncremental, "ruteMa", max_workers,  antalFundne, antalDaekkedeAdresser);
110                    
111                    logger.info("======================================================================");
112                    
113                    mainRun(consts, true, "ruteLo", max_workers,  antalFundne, antalDaekkedeAdresser);
114                    
115                    logger.info("======================================================================");
116                    
117                    mainRun(consts, true, "ruteSo", max_workers,  antalFundne, antalDaekkedeAdresser);
118                    
119                    logger.info("======================================================================");
120    
121                                  logger.info("Henter alle daekkede adresser");                  // END OF MAIN run
122                                  antalDaekkedeAdresser = db.hentAlleDaekkedeAdresser(distributor);                  
123                                  logger.info( "AlleDaekkedeAdresser.length=" + antalDaekkedeAdresser );                  threadPool.shutdown(); //Calc is done now
124                    threadPool = null;//release early for GC
125                            
126    
127                                  Map<Short, List<Address>> addrHoList = db.getDaekkedeAdresserHO();                  manager.submitTask( new ReloadTask("AfstandAndenRute/" + distributor) );
128                    
129    
130                                  Map<Short, KdTree<Address>> hoTrees = new HashMap<Short,KdTree<Address>>();                  long now = System.currentTimeMillis();
131                    long elapsed = now - start ;
132    
                                 for ( Map.Entry<Short, List<Address>> entry : addrHoList.entrySet() ) {  
                                         short ho = entry.getKey();  
                                         List<Address> geopoints = entry.getValue();  
133    
                                         logger.info("Opbygger KDTree for " + ho + " - antal=" + geopoints.size() );  
   
                                         int bucketSize = 96*12;  
   
                                         KdTree<Address> addressTree = new KdTree<Address>( 3, bucketSize );  
                                         for(Address a: geopoints) {  
                                                 addressTree.addPoint(a.xyz, a);  
                                         }  
   
                                         hoTrees.put(ho, addressTree);                            
                                 }  
134    
135                                  db.resetResultTable();                  logger.info("Fuld load done : " + formatMilliSeconds(elapsed) );
136                    logger.info("Antal daekkede : " + antalDaekkedeAdresser );
137                                  beregnStart = System.currentTimeMillis();                  logger.info("Antal ikke-daekkede : " + antalIkkeDaekkede );
138                                  logger.info("Starter beregning");                  logger.info("Heraf, antal fundne : " + antalFundne );
   
                                 //pre-check er ok - reset tmp tabel og start søgningen  
   
                                 CyclicBarrier barrier = new CyclicBarrier(max_workers + 1);  
   
                                 for (int i=0; i<max_workers; i++) {  
                                         LookupWorker worker = new LookupWorker(i, this, barrier, ikkeDaekkede, hoTrees,antalFundne,antalBeregnet,db,verbose,consts);  
                                         threadPool.submit( worker );  
                                 }  
   
                                 barrier.await(); // Afvent at workerne bliver færdige  
   
                                 logger.info("Calc is done - cleaning up remaining bits");  
   
                                 threadPool.shutdown(); //Calc is done now  
139    
140                    logger.info( String.format("Fandt adresser til : %.2f %%", (antalFundne.get() *100.0)/antalIkkeDaekkede ) );
141            }
142    
                                 db.saveBatch();  
143    
144                                  if (test_mode == false && this.isAborted() == false) {          private void mainRun(Constants consts, boolean localIsIncremental, String ugedag, int max_workers, AtomicInteger antalFundne, AtomicInteger antalDaekkedeAdresser) throws Exception {
145                                          db.renameResultTables();                  
146                                                            logger.info("MainRun() isIncremental=" + localIsIncremental + " ugedag=" + ugedag);
147                                          ReloadHelper.triggerReload( "AfstandAndenRute/" + distributor );                  
148                    try (Connection conn = DBConnection.getConnection() ) {
149                            DatabaseRouteDistance db = new DatabaseRouteDistance(conn, localIsIncremental, ugedag);
150                            
151                            if (localIsIncremental) {
152                                    db.prepareIncrementalSearch();
153                            }
154    
155                            Queue<Address> ikkeDaekkede = db.hentAlleIkkedaekkedeAdresser(consts.getMinPostnr(), consts.getMaxPostnr() );
156                            antalIkkeDaekkede += ikkeDaekkede.size();
157                            logger.info("Antal ikke-daekkede: " + antalIkkeDaekkede );
158                    
159                    
160    
161                                  } else {                          logger.info("Finder postnumre");
162                                          logger.info( "Rename tables is disabled !!!" );                          Set<Short> postnumre = db.hentPostnumreCache();
163            
164                            // Først validerer vi BBox på alle postnummre, for at undgå fuldt stop midt i beregningen
165                            for(short postnr : postnumre) { //
166                                    logger.info("Validerer BBox for " + postnr);
167                                    BoundingBox bbox = db.getBoundingbox(postnr);
168                                    bbox.validateBbox();
169                            }
170            
171            
172                            logger.info("Henter alle daekkede adresser");
173                            int tmpAntalDaekkede = db.hentAlleDaekkedeAdresser(distributor);
174                            antalDaekkedeAdresser.set(tmpAntalDaekkede);
175                            logger.info( "AlleDaekkedeAdresser.length=" + antalDaekkedeAdresser );
176            
177                            Map<Short, List<Address>> addrHoList = db.getDaekkedeAdresserHO();
178            
179                            Map<Short, KdTree<Address>> hoTrees = new HashMap<Short,KdTree<Address>>();
180            
181                            for ( Map.Entry<Short, List<Address>> entry : addrHoList.entrySet() ) {
182                                    short ho = entry.getKey();
183                                    List<Address> geopoints = entry.getValue();
184            
185                                    logger.info("Opbygger KDTree for " + ho + " - antal=" + geopoints.size() );
186            
187                                    int bucketSize = 96*12;
188            
189                                    KdTree<Address> addressTree = new KdTree<Address>( 3, bucketSize );
190                                    for(Address a: geopoints) {
191                                            addressTree.addPoint(a.xyz, a);
192                                  }                                  }
193            
194                                    hoTrees.put(ho, addressTree);                          
195                            }
196            
197                            db.resetResultTable();
198            
199    
200                            logger.info("Starter beregning");
201            
202                            //pre-check er ok - reset tmp tabel og start søgningen
203            
204                            CyclicBarrier barrier = new CyclicBarrier(max_workers + 1);
205            
206                            for (int i=0; i<max_workers; i++) {
207                                    LookupWorker worker = new LookupWorker(i, this, barrier, ikkeDaekkede, hoTrees,antalFundne,antalBeregnet,db,VERBOSE,consts);
208                                    threadPool.submit( worker );
209                            }
210            
211                            barrier.await(); // Afvent at workerne bliver færdige
212            
213                            logger.info("Calc is done - cleaning up remaining bits");
214            
215            
216                            db.saveBatch();
217            
218                            if (test_mode == false && this.isAborted() == false) {
219                                    db.renameResultTables();                                
220            
221            
222                          } else {                          } else {
223                                  /// Test                                  logger.info( "Rename tables is disabled !!!" );
                                 /*  
                                 db.resetResultTable();  
   
   
                                 alleDaekkedeAdresser = db.hentAlleDaekkedeAdresser();  
                                 logger.info( "AlleDaekkedeAdresser.length=" + alleDaekkedeAdresser.size());  
   
                                 short post = (short) 2700;                        
                                 Lookup lookup = new Lookup(post, db, threadPool);  
                                 lookup.doLookup();*/                                                              
224                          }                          }
225    
   
                         long now = System.currentTimeMillis();  
                         long elapsed = now - start ;  
                         long elapsedBeregn = now - beregnStart;  
   
   
                         logger.info("Fuld load done : " + formatMilliSeconds(elapsed) );  
                         logger.info("Fuld load done (beregning) : " + formatMilliSeconds(elapsedBeregn) );  
                         logger.info("Antal daekkede : " + antalDaekkedeAdresser );  
                         logger.info("Antal ikke-daekkede : " + antalIkkeDaekkede );  
                         logger.info("Heraf, antal fundne : " + antalFundne );  
   
                         logger.info( String.format("Fandt adresser til : %.2f %%", (antalFundne.get() *100.0)/antalIkkeDaekkede ) );  
                         //logger.info("Fandt adresser til : " + (antalFundne*100.0)/ikkeDaekkede.length + "%" );  
   
226                  }                  }
227          }          }
228    
# Line 213  public class AfstandAndenRuteTask extend Line 237  public class AfstandAndenRuteTask extend
237                  return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, mseconds);                  return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, mseconds);
238          }          }
239    
         private void setupThreadPool(int max_workers) {    
                 threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(max_workers, new WorkerThreadFactory() );          
         }  
240    
241          static class WorkerThreadFactory implements ThreadFactory {                static class WorkerThreadFactory implements ThreadFactory {      
242                  int count = 0;                    int count = 0;  
# Line 240  public class AfstandAndenRuteTask extend Line 261  public class AfstandAndenRuteTask extend
261    
262          @Override          @Override
263          public String getDetail() {          public String getDetail() {
264                  // TODO Auto-generated method stub                  if (isIncremental) {
265                  return "";                          return "Type: incremental";
266                    } else {
267                            return "Type: full";
268                    }
269          }          }
270    
271    
272    
273          @Override          @Override
274          public double getPercentCompleted() {          public double getPercentCompleted() {
275                    if (antalIkkeDaekkede == 0) {//avoid division by zero
276                            return -1;
277                    }
278                    
279                  return (antalBeregnet.get() / ((float)antalIkkeDaekkede)) * 100.0;                  return (antalBeregnet.get() / ((float)antalIkkeDaekkede)) * 100.0;
280          }          }
281  }  }

Legend:
Removed from v.2903  
changed lines
  Added in v.2994

  ViewVC Help
Powered by ViewVC 1.1.20