View Javadoc

1   package org.opensync.engine.server;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.StringReader;
6   import java.text.SimpleDateFormat;
7   import java.util.GregorianCalendar;
8   import java.util.Iterator;
9   import java.util.List;
10  import java.util.StringTokenizer;
11  
12  import javax.xml.transform.TransformerException;
13  
14  import org.apache.log4j.Category;
15  import org.apache.xindice.util.XindiceException;
16  import org.dom4j.Document;
17  import org.dom4j.DocumentException;
18  import org.dom4j.DocumentHelper;
19  import org.dom4j.Element;
20  import org.dom4j.XPath;
21  import org.dom4j.io.SAXReader;
22  import org.opensync.engine.server.connector.BDConnector;
23  import org.opensync.engine.util.FileHelper;
24  import org.opensync.engine.util.I18n;
25  import org.opensync.tools.Utils;
26  import org.opensync.xmldb.DatabaseManager;
27  import org.xml.sax.SAXException;
28  import org.xmldb.api.base.Collection;
29  import org.xmldb.api.base.XMLDBException;
30  
31  /***
32   * This object represents the motor of the OpenSync application.
33   */
34  
35  public class Synchronizer{
36  
37    static protected SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss dd/MM/yyyy");
38  
39    /***
40     * The mapper use to synchronize the and transform views of tasks
41     *
42     * @see	Mapper
43     */
44    static protected Mapper mapper = new Mapper();
45    static public Task currentTask = null;
46    /***
47     * Execute a task
48     *
49     * @param	task the task to execute
50     * @exception	TransformerException
51     * @exception	IOException
52     * @exception	Exception
53     */
54    static public void executeTask(Task task)throws Exception{
55      String fileSeparator = System.getProperty("file.separator");
56  
57      currentTask = task;
58  
59      Source sourceFrom = null;
60      Source sourceTo = null;
61      View viewFrom = null;
62      View viewTo = null;
63  
64      Protocol protocolFrom = null;
65      Protocol protocolTo = null;
66  
67      Connector connectorTo  = null;
68      Connector connectorFrom = null;
69  
70      Adapter adapterFrom = null;
71      Adapter adapterTo = null;
72  
73      // Incremental synchro. support
74      DatabaseManager dbMgr = null;
75      Collection col = null;
76      Collection tx_col = null;
77  
78      Statistic stat = new Statistic();
79      Window w = null;
80  
81      boolean incrementalMode = false;
82      try{
83  
84        long begin = System.currentTimeMillis();
85        logInfo(task,"info.synch.start",null);
86        Synchronization synchronization = task.getSynchronization();
87        //from objects
88        sourceFrom = synchronization.getFromSource();
89        viewFrom = synchronization.getFromView();
90        connectorFrom = sourceFrom.getConnector();
91        if(connectorFrom.getType().equals("bd"))
92          w = new Window(0,((BDConnector)connectorFrom).getWindowSize() );
93  
94        logDebug(task,"info.synch.use-connector",
95          new Object[]{connectorFrom.getType(),sourceFrom.getName(),connectorFrom.getUrl()}
96        );
97        adapterFrom = connectorFrom.getAdapter();
98  
99        logDebug(task,"info.synch.use-adapter",
100         new Object[]{getClassName(adapterFrom),viewFrom.getName(),sourceFrom.getName()}
101       );
102       adapterFrom.startReadInputFile(true);
103       protocolFrom = connectorFrom.getProtocol();
104       if(protocolFrom != null){
105         logDebug(task,"info.synch.use-protocol-get",
106           new Object[]{protocolFrom.getName(),viewFrom.getName(),sourceFrom.getName()}
107         );
108       }
109       //to objects
110       sourceTo = synchronization.getToSource();
111       viewTo = synchronization.getToView();
112       connectorTo = sourceTo.getConnector();
113       logDebug(task,"info.synch.use-connector",
114         new Object[]{connectorTo.getType(),sourceTo.getName(),connectorTo.getUrl()}
115       );
116       adapterTo = sourceTo.getConnector().getAdapter();
117 
118       logDebug(task,"info.synch.use-adapter",
119         new Object[]{getClassName(adapterTo),viewTo.getName(),sourceTo.getName()}
120       );
121       // Init adapter: if BDAdapter, set stmt used for batch !
122       adapterTo.init();
123       Document themes = adapterTo.readThemeDescriptor();
124       protocolTo = connectorTo.getProtocol();
125       if(protocolTo != null){
126         logDebug(task,"info.synch.use-protocol-put",
127           new Object[]{protocolTo.getName(),viewTo.getName(),sourceTo.getName()}
128         );
129 	protocolTo.init();
130       }
131       //Get the views in source format
132       String[] inViews = null;
133       if(protocolFrom != null){
134         logDebug(task,"info.synch.get-input-views",
135           new Object[]{viewFrom.getName(),connectorFrom.getUrl(),protocolFrom.getFolder()}
136         );
137 
138         inViews = protocolFrom.getViews(viewFrom.getFile());
139         if(inViews.length == 0){
140           logDebug(task,"info.synch.no-view",null);
141         }
142       }
143       else{
144         //it's the adapter who get the view
145         inViews = new String[]{""};
146       }
147 
148       String id = sourceFrom.getName() + "_" + sourceTo.getName() + "_" + viewFrom.getName() + "_" + viewTo.getName();
149 
150       incrementalMode = OpenSync.getInstance().getIncrementalMode();
151       if (incrementalMode) {
152 	// Init the XML internal database for incremental synchronizations.
153 	dbMgr = new DatabaseManager();
154 	// Initialization of the collections for incremental synchronizationss.
155 	if ( (col = dbMgr.getCollection("sync_col_"+id)) == null) col = dbMgr.createCollection("sync_col_"+id);
156 	tx_col = dbMgr.tx_start(col);
157       }
158 
159       boolean endOfviews = true;
160       File xmlMappingFile = null;
161       for(int i=0;i<inViews.length;i++)  {
162           if (inViews[i] != null) endOfviews = false;
163       }
164       int line = 0;
165       while (!endOfviews) {
166         for(int i=0;i<inViews.length;i++) {
167           //Adapt the view to the xml OpenSync format
168           logInfo(task,"info.synch.adapt-input-view",
169             new Object[]{viewFrom.getName(),getClassName(adapterFrom)}
170           );
171           String xmlView = null;
172           if (inViews[i] != null) {
173 
174             xmlView = adapterFrom.adaptInputView(inViews[i],viewFrom, stat, w);
175             if(connectorFrom.getType().equals("bd")) {
176               if(w.getStartline() != -1) inViews[i] = "NOT FINISH";
177               else if(inViews[i].equals("NOT FINISH")) inViews[i] = "";
178             }
179 
180             //Get the mapping file
181             logDebug(task,"info.synch.get-mapping",new Object[]{synchronization.getUrlMapping()});
182             String urlMapping = synchronization.getUrlMapping();
183             if (xmlMappingFile == null) {
184               if(urlMapping != null && urlMapping.length() > 0){
185 		String configFolder = System.getProperty("opensync.configfolder");
186 		configFolder = (configFolder == null ? "" : configFolder);
187 
188                 String xmlMapping = FileHelper.fileToString(
189                   OpenSync.getInstance().getFilePath(
190                     fileSeparator+"etc"+fileSeparator+configFolder+fileSeparator+"mapping"+fileSeparator + synchronization.getUrlMapping(),true
191                   )
192                 );
193                 xmlMappingFile = new File(OpenSync.getInstance().getFilePath(
194                                                 fileSeparator+"etc"+fileSeparator+configFolder+fileSeparator+"mapping"+fileSeparator + synchronization.getUrlMapping(),true));
195               }
196             }
197             //Transform the view with xslt process
198             logDebug(task,"info.synch.transform",null);
199             logDebug(task,"info.synch.get-mapping",new Object[]{synchronization.getUrlMapping()});
200             //xmlView = mapper.map(xmlMapping,xmlView); (
201             // In order to use the external files in the mapping specification
202             // such as the mapping.dtd file !
203             // (cf. the method generateXslMapping from the Mapper class)
204 	    logDebug(task,"info.synch.adapt-input-view", new Object[]{xmlView});
205             OpenSync.getInstance().getLog().debug(Log.ROOT, "Input view: \n"+xmlView);
206 	    xmlView = mapper.map(xmlMappingFile,xmlView);
207 
208 	    if (connectorTo.getIncremental()) {
209 	      // Incremental synchronizations support
210 	      // The xmlView is reduced to the data not already synchronized.
211 	      // col: contains the synchronized data
212 	      //
213 	      org.xmldb.api.modules.XMLResource res;
214 	      if (incrementalMode) {
215 		if ( (res = dbMgr.retrieveDocument(col, id)) != null) {
216 		  SAXReader xmlReader = new SAXReader();
217 		  Document doc = xmlReader.read(new StringReader((String)res.getContent()));
218 		  Document curr_doc = xmlReader.read(new StringReader(xmlView));
219 		  // Get the primary keys of the theme
220 		  // By convention, the name of the view must be the name of the theme
221 		  // This convention will be changed after testing.
222 		  List pks = adapterTo.getPKs(themes, viewTo.getTheme());
223 		  String xmlView_diff = prepareDocToAdd(doc, curr_doc, pks);
224 		  String xmlView_merge = mergeDoc(xmlReader.read(new StringReader(xmlView_diff)), doc);
225 		  dbMgr.addDocument(tx_col, id, xmlView_merge);
226 		  xmlView = xmlView_diff;
227 		} else {
228 		  dbMgr.addDocument(tx_col, id, xmlView );
229 		}
230 		// End of Incremental synchronizations support
231 	      }
232 	    }
233 
234 	    logDebug(task,"info.synch.adapt-output-view", new Object[]{xmlView});
235 
236 	    //Adapt the xml view to the source format
237             logInfo(task,"info.synch.adapt-output-view",
238               new Object[]{viewTo.getName(),getClassName(adapterTo),getClassName(sourceTo)}
239             );
240 	    String outView = adapterTo.adaptOutputView(xmlView,viewTo, stat);
241 
242             //Put the view to the source
243             if(protocolTo != null){
244 		logDebug(task,"info.synch.put-output-view",
245 			 new Object[]{viewTo.getName(),protocolTo.getName(),protocolTo.getFolder(),connectorTo.getUrl()}
246 		);
247 		protocolTo.putView(outView,viewTo.getFile());
248 		logInfo(task, "info.synch.put-output-records", new Object[]{outView});
249             }
250 
251           }
252           //Thread.sleep((int)(20000*Math.random()));
253         }
254 	// This setting must be done before getting the
255 	// next views (to avoid for instance to skip
256 	// again the first lines of an input txt file.
257 	adapterFrom.startReadInputFile(false);
258 
259         if (protocolFrom != null)  {
260           inViews = protocolFrom.getViews(viewFrom.getFile());
261         }
262         endOfviews = true;
263         for(int i=0;i<inViews.length;i++)  {
264           if (inViews[i] != null && inViews[i]!="") endOfviews = false;
265         }
266 
267       }
268 
269 
270       // Release resource used by the connector
271       if (connectorTo != null) {
272 	connectorTo.release();
273       }
274       if (connectorFrom != null) {
275 	connectorFrom.release();
276       }
277       // Commit the current synchronizations for incremental synchro.
278       if (incrementalMode) dbMgr.tx_commit(col);
279     }
280 
281     catch(Exception e){
282        stat.error.add(e.toString());
283        // If we get an Exception, we consider the current synchronization
284       // not done thus rollback.
285       if (incrementalMode) dbMgr.tx_rollback(col);
286 
287       if (connectorTo != null) {
288 	connectorFrom.releaseWithException();
289 	connectorTo.releaseWithException();
290       }
291       logError(task, e);
292       throw e;
293     }
294     finally {
295      stat.print();
296      stat.end = java.util.Calendar.getInstance().getTime();
297      logStat(task, stat);
298      if (incrementalMode) {
299        dbMgr.close(col);
300        dbMgr.close(tx_col);
301      }
302      if(protocolFrom != null){
303        logDebug(task,"info.synch.protocol-clean",null);
304        protocolFrom.clean(viewFrom.getFile());
305      }
306      if(protocolTo != null){
307        logDebug(task,"info.synch.protocol-clean",null);
308        protocolTo.clean(viewTo.getFile());
309      }
310 
311      logDebug(task,"info.synch.finish",null);
312     }
313 
314 
315   }
316 
317   static private String mergeDoc(Document doc_diff, Document doc_target) {
318     Element root_target = doc_target.getRootElement();
319     Element root_diff = doc_diff.getRootElement();
320     Iterator elementIterator = root_diff.elementIterator();
321     while (elementIterator.hasNext()) {
322 	Element element = (Element)elementIterator.next();
323 	root_target.add(element.createCopy());
324     }
325     return root_target.asXML();
326   }
327 
328   static private String prepareDocToAdd(Document doc, Document doc2add, List pks) throws XindiceException, XMLDBException, DocumentException {
329     Document doc_result = DocumentHelper.createDocument();
330 
331     Element doc_root = doc.getRootElement();
332     XPath xpathSelector = DocumentHelper.createXPath("/DATA/ROW");
333     Element root_result = doc_result.addElement( "DATA" );
334     List results = xpathSelector.selectNodes(doc2add);
335     for ( Iterator iter = results.iterator(); iter.hasNext(); ) {
336       Element root = (Element) iter.next();
337       Iterator  elementIterator = root.elementIterator();
338       StringBuffer xpathQuery = new StringBuffer("/DATA/ROW[");
339       while (elementIterator.hasNext()) {
340 	Element element = (Element)elementIterator.next();
341 	if (pks.contains(element.getName())) {
342 	  xpathQuery.append(element.getName()+"=\""+Utils.checkSpecialCharXml(element.getTextTrim()) + "\" and ");
343 	  //System.out.println(element.getName()+ ": " +Utils.checkSpecialCharXml(element.getTextTrim()));
344 	}
345       }
346       String xpathQueryString = xpathQuery.toString();
347       if (!xpathQueryString.equals("/DATA/ROW[")) {
348 	if (xpathQueryString.lastIndexOf("and") > 0) {
349 	  xpathQueryString = xpathQueryString.substring(0, xpathQueryString.lastIndexOf("and"));
350 	}
351 	xpathQueryString += "]";
352       //System.out.println(xpathQueryString);
353 	XPath xpathSelector2 = DocumentHelper.createXPath(xpathQueryString);
354 	List results2 = xpathSelector2.selectNodes(doc);
355 	Iterator iter_res2 = results2.iterator();
356 	if (!iter_res2.hasNext()) {
357 	  root_result.add(root.createCopy());
358 	}
359       } else
360 	root_result.add(root.createCopy());
361     }
362     //System.out.println("root_result:"+root_result.asXML());
363 
364     return root_result.asXML();
365   }
366   /***
367    * Log information
368    *
369    * @param	task the task executed
370    * @param	msgKey the message to log
371    * @param	params the message parameters
372    */
373   static protected void logDebug(Task task,String msgKey,Object[]params){
374     Log log = OpenSync.getInstance().getLog();
375     Category cat = Category.getInstance(task.getName());
376     log.debug(cat,msgKey,params);
377     String str;
378     if(params == null){
379       str =  I18n.getInstance().get(msgKey);
380     }
381     else{
382       str =  I18n.getInstance().format(msgKey,params);
383     }
384     log.debug(Log.ROOT,task.getName() + " : " + str);
385 
386   }
387 
388   /***
389    * Log information
390    *
391    * @param	task the task executed
392    * @param	msgKey the message to log
393    * @param	params the message parameters
394    */
395   static protected void logInfo(Task task,String msgKey,Object[]params){
396     Log log = OpenSync.getInstance().getLog();
397     Category cat = Category.getInstance(task.getName());
398     log.debug(cat,msgKey,params);
399     String str;
400     if(params == null){
401       str =  I18n.getInstance().get(msgKey);
402     }
403     else{
404       str =  I18n.getInstance().format(msgKey,params);
405     }
406     log.info(Log.ROOT,task.getName() + " : " + str);
407 
408   }
409 
410   /***
411     * Log information
412     *
413     * @param	task the task executed
414     * @param	msgKey the message to log
415     * @param	params the message parameters
416     */
417    static protected void logStat(Task task,Statistic stat){
418      Log log = OpenSync.getInstance().getLog();
419      String str = stat.toString();
420      log.info(Log.STATISTIC,"Start : "+dateFormat.format(stat.start));
421      log.info(Log.STATISTIC,"task name : "+task.getName());
422      log.info(Log.STATISTIC,"\t\tINPUT\t= "+stat.query);
423      log.info(Log.STATISTIC,"\t\tUPDATE\t= "+stat.update);
424      log.info(Log.STATISTIC,"\t\tINSERT\t= "+stat.insert);
425      log.info(Log.STATISTIC,"\t\tERROR\t= "+stat.error.size());
426      if(stat.error.size() > 0) log.info(Log.STATISTIC,"Error messages : ");
427      for(int i=0; i<stat.error.size(); i++) {
428        log.info(Log.STATISTIC,"\t"+stat.error.get(i));
429      }
430      log.info(Log.STATISTIC,"End : "+dateFormat.format(stat.end));
431   }
432 
433 
434     /***
435    * Log information
436    *
437    * @param	task the task executed
438    * @param	msgKey the message to log
439    * @param	params the message parameters
440    */
441   static public void logWarning(Task task,String msgKey,Object[]params){
442     Log log = OpenSync.getInstance().getLog();
443     Category cat = Category.getInstance(task.getName());
444     log.warn(cat,msgKey,params);
445     String str;
446     if(params == null){
447       str =  I18n.getInstance().get(msgKey);
448     }
449     else{
450       str =  I18n.getInstance().format(msgKey,params);
451     }
452     log.warn(Log.ROOT,task.getName() + " : " + str);
453 
454   }
455 
456   /***
457    * Log error
458    *
459    * @param	task the executed task
460    * @param	ex the exception log
461    */
462   static public void logError(Task task,Exception ex){
463     Log log = OpenSync.getInstance().getLog();
464     Category cat = Category.getInstance(task.getName());
465     if(ex instanceof OpenSyncException){
466       log.error(Log.SYNCHRONIZER,
467         I18n.getInstance().format(
468           "info.task.fail",new Object[]{
469             task.getName(),Log.dateFormat.format(
470               GregorianCalendar.getInstance().getTime()
471             ),ex
472           }
473         )
474       );
475       log.error(cat,ex.getMessage());
476     } else if (ex instanceof SAXException && (((SAXException)ex).getException() != null)) {
477       ex = ((SAXException)ex).getException();
478       log.error(Log.SYNCHRONIZER,
479         I18n.getInstance().format(
480           "info.task.fail",new Object[]{
481             task.getName(),Log.dateFormat.format(
482               GregorianCalendar.getInstance().getTime()
483             ),ex
484           }
485         )
486       );
487       log.error(cat,ex.getMessage());
488     } else{
489       /*log.fatal(cat,
490         I18n.getInstance().format(
491           "info.task.fail",new Object[]{
492             task.getName(),log.dateFormat.format(
493               GregorianCalendar.getInstance().getTime()
494             ),log.formatStackTrace(ex)
495           }
496         )
497       );*/
498       log.fatal(cat,
499         I18n.getInstance().format(
500           "info.task.fail",new Object[]{
501             task.getName(),Log.dateFormat.format(
502               GregorianCalendar.getInstance().getTime()
503             ),ex
504           }
505         )
506       );
507     }
508   }
509   /***
510    * Trunc the full name of the class
511    *
512    * @param	obj the class
513    */
514   public static String getClassName(Object obj){
515    StringTokenizer tokens = new StringTokenizer(obj.getClass().getName(),".");
516    String name = null;
517    while(tokens.hasMoreTokens()){
518     name = tokens.nextToken();
519    }
520    return name;
521   }
522 }
523 
524 
525 
526