/[projects]/dao/DaoAdresseVedligehold/src/main/java/dk/daoas/adressevedligehold/afstandandenrute/AfstandAndenRuteTask.java
ViewVC logotype

Diff of /dao/DaoAdresseVedligehold/src/main/java/dk/daoas/adressevedligehold/afstandandenrute/AfstandAndenRuteTask.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 2878 by torben, Sat Jan 30 14:05:53 2016 UTC revision 3074 by torben, Thu Jul 28 11:37:37 2016 UTC
# Line 1  Line 1 
1  package dk.daoas.adressevedligehold.afstandandenrute;  package dk.daoas.adressevedligehold.afstandandenrute;
2    
 import java.io.File;  
 import java.io.FileReader;  
3  import java.sql.Connection;  import java.sql.Connection;
4  import java.util.HashMap;  import java.util.HashMap;
5  import java.util.List;  import java.util.List;
# Line 9  import java.util.Map; Line 7  import java.util.Map;
7  import java.util.Queue;  import java.util.Queue;
8  import java.util.Set;  import java.util.Set;
9  import java.util.concurrent.CyclicBarrier;  import java.util.concurrent.CyclicBarrier;
10    import java.util.concurrent.ExecutorService;
11  import java.util.concurrent.Executors;  import java.util.concurrent.Executors;
12  import java.util.concurrent.ThreadFactory;  import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;  
13  import java.util.concurrent.atomic.AtomicInteger;  import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.FileHandler;  
 import java.util.logging.Logger;  
 import java.util.logging.SimpleFormatter;  
14    
15  import ags.utils.dataStructures.trees.thirdGenKD.KdTree;  import ags.utils.dataStructures.trees.thirdGenKD.KdTree;
16    import dk.daoas.adressevedligehold.ReloadTask;
17  import dk.daoas.adressevedligehold.ServiceConfig;  import dk.daoas.adressevedligehold.ServiceConfig;
18  import dk.daoas.adressevedligehold.beans.Address;  import dk.daoas.adressevedligehold.beans.Address;
19  import dk.daoas.adressevedligehold.db.DBConnection;  import dk.daoas.adressevedligehold.db.DBConnection;
20  import dk.daoas.adressevedligehold.tasks.Task;  import dk.daoas.adressevedligehold.tasks.Task;
21  import dk.daoas.adressevedligehold.util.HttpUtil;  import dk.daoas.adressevedligehold.tasks.TaskLogger;
22    
23    
24    
25  public class AfstandAndenRuteTask extends Task {  public class AfstandAndenRuteTask extends Task {
26            
27    
28          final Logger logger = Logger.getLogger( AfstandAndenRuteTask.class.toString() );          private TaskLogger logger = TaskLogger.getInstance();
29    
30          final boolean verbose = false;//          final static boolean VERBOSE = false;//
31            
32            public final static String INCREMENTAL = "INCREMENTAL";
33            public final static String FULL = "FULL";
34    
35    
36          static boolean test_mode;          static boolean test_mode;
37    
38    
39          String distributor;          String distributor;
40            int antalIkkeDaekkede = 0;
41            
42            AtomicInteger antalBeregnet = new AtomicInteger();
43    
44    
45    
46          ThreadPoolExecutor threadPool;          ExecutorService threadPool;
47    
48            final boolean isIncremental;    
49            final int max_workers;
50    
51    
52          public AfstandAndenRuteTask(String distributor) {          public AfstandAndenRuteTask(String distributor, String type) {
53                  this.distributor = distributor;                  this.distributor = distributor;
54          }                  
55                    type = type.toUpperCase();
56                    switch(type) {
57          @Override                  case FULL:
58          protected void taskRun() throws Exception {                          isIncremental = false;
59                            break;
60                    case INCREMENTAL:
61                            isIncremental = true;
62                            break;
63                  //Setup j.u.l Logger                  default:
64                  Logger root = Logger.getLogger("");                          throw new RuntimeException("Unknown type" + type);                      
65                  FileHandler fhandler = new FileHandler("fulddaekning.log"); // Ingen max størrelse, ingen rotation og ingen append                  }
66                  fhandler.setFormatter( new SimpleFormatter() );                  
67                  root.addHandler( fhandler );                  int tmp_max_workers = ServiceConfig.getInstance().maxWorkers;    
68                    if (tmp_max_workers <= 0) {      
   
   
                 Constants.init(distributor);  
                 Constants consts = Constants.getInstance();  
   
                 int max_workers = ServiceConfig.getInstance().maxWorkers;          
                 if (max_workers <= 0) {    
69                          logger.info("!!! AUTO-DETECT MAX_WORKERS !!!");                            logger.info("!!! AUTO-DETECT MAX_WORKERS !!!");  
70                          int cores = Runtime.getRuntime().availableProcessors();                            int cores = Runtime.getRuntime().availableProcessors();  
71                          cores -= 1;//Efterlad 1 core/cpu i reserve til systemet                            cores -= 1;//Efterlad 1 core/cpu i reserve til systemet  
72    
73                          max_workers = Math.max(1, cores); //Dog skal der som minimum være 1 core til beregning                            tmp_max_workers = Math.max(1, cores); //Dog skal der som minimum være 1 core til beregning      
74    
75                  }                          }        
76                  if (test_mode) {                  if (test_mode) {
77                          max_workers = 1;                          tmp_max_workers = 1;
78                  }                  }
79                  logger.info("Starting with MAX_WORKERS:" + max_workers);                          
80                  setupThreadPool(max_workers);                  max_workers = tmp_max_workers;
81            }
                 int antalIkkeDaekkede = -1;  
   
                 try (Connection conn = DBConnection.getConnection() ) {  
                         Database db = new Database(conn);  
   
                         Queue<Address> ikkeDaekkede = db.hentAlleIkkedaekkedeAdresser(consts.getMinPostnr(), consts.getMaxPostnr() );  
                         antalIkkeDaekkede = ikkeDaekkede.size();  
                         logger.info("Antal ikke-daekkede: " + antalIkkeDaekkede );  
   
   
                         boolean testRun= false;  
   
                         AtomicInteger antalFundne = new AtomicInteger(0);  
   
                         long beregnStart =0;  
                         long start = System.currentTimeMillis();  
                         int antalDaekkedeAdresser = -1;  
   
                         if (testRun == false) {  
82    
                                 logger.info("Finder postnumre");  
                                 Set<Short> postnumre = db.hentPostnumreCache();  
83    
84                                  // Først validerer vi BBox på alle postnummre, for at undgå fuldt stop midt i beregningen          @Override
85                                  for(short postnr : postnumre) { //          protected void taskRun() throws Exception {
86                                          logger.info("Validerer BBox for " + postnr);                  
                                         BoundingBox bbox = db.getBoundingbox(postnr);  
                                         bbox.validateBbox();  
                                 }  
87    
88                    Constants.init(distributor);
89                    Constants consts = Constants.getInstance();
90    
91                                  logger.info("Henter alle daekkede adresser");                  
92                                  antalDaekkedeAdresser = db.hentAlleDaekkedeAdresser(distributor);                  logger.info("Starting with MAX_WORKERS:" + max_workers);        
                                 logger.info( "AlleDaekkedeAdresser.length=" + antalDaekkedeAdresser );  
93    
94                                  Map<Short, List<Address>> addrHoList = db.getDaekkedeAdresserHO();                  threadPool = Executors.newFixedThreadPool(max_workers, new WorkerThreadFactory() );
95                    
96                    logger.info("Starting with INCREMENTAL:" + isIncremental);
97    
                                 Map<Short, KdTree<Address>> hoTrees = new HashMap<Short,KdTree<Address>>();  
98    
                                 for ( Map.Entry<Short, List<Address>> entry : addrHoList.entrySet() ) {  
                                         short ho = entry.getKey();  
                                         List<Address> geopoints = entry.getValue();  
99    
100                                          logger.info("Opbygger KDTree for " + ho + " - antal=" + geopoints.size() );                  
101    
                                         int bucketSize = 96*12;  
102    
                                         KdTree<Address> addressTree = new KdTree<Address>( 3, bucketSize );  
                                         for(Address a: geopoints) {  
                                                 addressTree.addPoint(a.xyz, a);  
                                         }  
103    
104                                          hoTrees.put(ho, addressTree);                                            AtomicInteger antalFundne = new AtomicInteger(0);
                                 }  
105    
106                                  db.resetResultTable();                  long start = System.currentTimeMillis();
107                    AtomicInteger antalDaekkedeAdresser = new AtomicInteger(-1);
108    
                                 beregnStart = System.currentTimeMillis();  
                                 logger.info("Starter beregning");  
109    
110                                  //pre-check er ok - reset tmp tabel og start søgningen                  // MAIN RUN START
111    
112                                  CyclicBarrier barrier = new CyclicBarrier(max_workers + 1);                  logger.info("======================================================================");
113                    
114                    mainRun(consts, isIncremental, "ruteMa", antalFundne, antalDaekkedeAdresser);
115                    
116                    logger.info("======================================================================");
117                    
118                    mainRun(consts, true, "ruteLo", antalFundne, antalDaekkedeAdresser);
119                    
120                    logger.info("======================================================================");
121                    
122                    mainRun(consts, true, "ruteSo",  antalFundne, antalDaekkedeAdresser);
123                    
124                    logger.info("======================================================================");
125    
126                                  for (int i=0; i<max_workers; i++) {                  // END OF MAIN run
127                                          LookupWorker worker = new LookupWorker(i, this, barrier, ikkeDaekkede, hoTrees,antalFundne,db,verbose,consts);                  
128                                          threadPool.submit( worker );                  threadPool.shutdown(); //Calc is done now
129                                  }                  threadPool = null;//release early for GC
130                            
131    
132                                  barrier.await(); // Afvent at workerne bliver færdige                  manager.submitTask( new ReloadTask("AfstandAndenRute/" + distributor) );
133                    
134    
135                                  logger.info("Calc is done - cleaning up remaining bits");                  long now = System.currentTimeMillis();
136                    long elapsed = now - start ;
137    
                                 threadPool.shutdown(); //Calc is done now  
138    
139    
140                                  db.saveBatch();                  logger.info("Fuld load done : " + formatMilliSeconds(elapsed) );
141                    logger.info("Antal daekkede : " + antalDaekkedeAdresser );
142                    logger.info("Antal ikke-daekkede : " + antalIkkeDaekkede );
143                    logger.info("Heraf, antal fundne : " + antalFundne );
144    
145                                  if (test_mode == false && this.isAborted() == false) {                  logger.info( String.format("Fandt adresser til : %.2f %%", (antalFundne.get() *100.0)/antalIkkeDaekkede ) );
146                                          db.renameResultTables();          }
147    
                                         String trigger_url = ServiceConfig.getInstance().trigger_url;  
148    
149                                          logger.info("Calling trigger URL");          private void mainRun(Constants consts, boolean localIsIncremental, String ugedag, AtomicInteger antalFundne, AtomicInteger antalDaekkedeAdresser) throws Exception {
150                                          String url = trigger_url + "&trigger=AfstandAndenRute/" + distributor;                  
151                    logger.info("MainRun() isIncremental=" + localIsIncremental + " ugedag=" + ugedag);
152                    
153                    try (Connection conn = DBConnection.getConnection() ) {
154                            DatabaseRouteDistance db = new DatabaseRouteDistance(conn, localIsIncremental, ugedag);
155                            
156                            if (localIsIncremental) {
157                                    db.prepareIncrementalSearch();
158                            }
159    
160                                          HttpUtil.getContentString(url, 500);                          Queue<Address> ikkeDaekkede = db.hentAlleIkkedaekkedeAdresser(consts.getMinPostnr(), consts.getMaxPostnr() );
161                            antalIkkeDaekkede += ikkeDaekkede.size();
162                            logger.info("Antal ikke-daekkede: " + antalIkkeDaekkede );
163                    
164                    
165    
166                                  } else {                          logger.info("Finder postnumre");
167                                          logger.info( "Rename tables is disabled !!!" );                          Set<Short> postnumre = db.hentPostnumreCache();
168            
169                            // Først validerer vi BBox på alle postnummre, for at undgå fuldt stop midt i beregningen
170                            for(short postnr : postnumre) { //
171                                    logger.info("Validerer BBox for " + postnr);
172                                    BoundingBox bbox = db.getBoundingbox(postnr);
173                                    bbox.validateBbox();
174                            }
175            
176            
177                            logger.info("Henter alle daekkede adresser");
178                            int tmpAntalDaekkede = db.hentAlleDaekkedeAdresser(distributor);
179                            antalDaekkedeAdresser.set(tmpAntalDaekkede);
180                            logger.info( "AlleDaekkedeAdresser.length=" + antalDaekkedeAdresser );
181            
182                            Map<Short, List<Address>> addrHoList = db.getDaekkedeAdresserHO();
183            
184                            Map<Short, KdTree<Address>> hoTrees = new HashMap<Short,KdTree<Address>>();
185            
186                            for ( Map.Entry<Short, List<Address>> entry : addrHoList.entrySet() ) {
187                                    short ho = entry.getKey();
188                                    List<Address> geopoints = entry.getValue();
189            
190                                    logger.info("Opbygger KDTree for " + ho + " - antal=" + geopoints.size() );
191            
192                                    int bucketSize = 96*12;
193            
194                                    KdTree<Address> addressTree = new KdTree<Address>( 3, bucketSize );
195                                    for(Address a: geopoints) {
196                                            addressTree.addPoint(a.xyz, a);
197                                  }                                  }
198            
199                                    hoTrees.put(ho, addressTree);                          
200                            }
201            
202                            db.resetResultTable();
203            
204    
205                            logger.info("Starter beregning");
206            
207                            //pre-check er ok - reset tmp tabel og start søgningen
208            
209                            CyclicBarrier barrier = new CyclicBarrier(max_workers + 1);
210            
211                            for (int i=0; i<max_workers; i++) {
212                                    LookupWorker worker = new LookupWorker(i, this, barrier, ikkeDaekkede, hoTrees,antalFundne,antalBeregnet,db,VERBOSE,consts);
213                                    threadPool.submit( worker );
214                            }
215            
216                            barrier.await(); // Afvent at workerne bliver færdige
217            
218                            logger.info("Calc is done - cleaning up remaining bits");
219            
220            
221                            db.saveBatch();
222            
223                            if (test_mode == false && this.isAborted() == false) {
224                                    db.renameResultTables();                                
225            
226            
227                          } else {                          } else {
228                                  /// Test                                  logger.info( "Rename tables is disabled !!!" );
                                 /*  
                                 db.resetResultTable();  
   
   
                                 alleDaekkedeAdresser = db.hentAlleDaekkedeAdresser();  
                                 logger.info( "AlleDaekkedeAdresser.length=" + alleDaekkedeAdresser.size());  
   
                                 short post = (short) 2700;                        
                                 Lookup lookup = new Lookup(post, db, threadPool);  
                                 lookup.doLookup();*/                                                              
229                          }                          }
230    
   
                         long now = System.currentTimeMillis();  
                         long elapsed = now - start ;  
                         long elapsedBeregn = now - beregnStart;  
   
   
                         logger.info("Fuld load done : " + formatMilliSeconds(elapsed) );  
                         logger.info("Fuld load done (beregning) : " + formatMilliSeconds(elapsedBeregn) );  
                         logger.info("Antal daekkede : " + antalDaekkedeAdresser );  
                         logger.info("Antal ikke-daekkede : " + antalIkkeDaekkede );  
                         logger.info("Heraf, antal fundne : " + antalFundne );  
   
                         logger.info( String.format("Fandt adresser til : %.2f %%", (antalFundne.get() *100.0)/antalIkkeDaekkede ) );  
                         //logger.info("Fandt adresser til : " + (antalFundne*100.0)/ikkeDaekkede.length + "%" );  
   
231                  }                  }
232          }          }
233    
# Line 218  public class AfstandAndenRuteTask extend Line 242  public class AfstandAndenRuteTask extend
242                  return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, mseconds);                  return String.format("%02d:%02d:%02d.%03d", hours, minutes, seconds, mseconds);
243          }          }
244    
         private void setupThreadPool(int max_workers) {    
                 threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(max_workers, new WorkerThreadFactory() );          
         }  
245    
246          static class WorkerThreadFactory implements ThreadFactory {                static class WorkerThreadFactory implements ThreadFactory {      
247                  int count = 0;                    int count = 0;  
# Line 238  public class AfstandAndenRuteTask extend Line 259  public class AfstandAndenRuteTask extend
259    
260          @Override          @Override
261          public String getDescription() {          public String getDescription() {
262                  return "Afstand anden rute";                  return "Afstand anden rute/" + distributor;
263          }          }
264    
265    
266    
267          @Override          @Override
268          public String getDetail() {          public String getDetail() {
269                  // TODO Auto-generated method stub                  if (isIncremental) {
270                  return null;                          return "Type: incremental";
271                    } else {
272                            return "Type: full";
273                    }
274          }          }
275    
276    
277    
278          @Override          @Override
279          public short getPercentCompleted() {          public double getPercentCompleted() {
280                  // TODO Auto-generated method stub                  if (antalIkkeDaekkede == 0) {//avoid division by zero
281                  return 0;                          return -1;
282                    }
283                    
284                    return (antalBeregnet.get() / ((float)antalIkkeDaekkede)) * 100.0;
285          }          }
286  }  }

Legend:
Removed from v.2878  
changed lines
  Added in v.3074

  ViewVC Help
Powered by ViewVC 1.1.20