1 package org.opensync.engine.server;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.StringReader;
6 import java.text.SimpleDateFormat;
7 import java.util.GregorianCalendar;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.StringTokenizer;
11
12 import javax.xml.transform.TransformerException;
13
14 import org.apache.log4j.Category;
15 import org.apache.xindice.util.XindiceException;
16 import org.dom4j.Document;
17 import org.dom4j.DocumentException;
18 import org.dom4j.DocumentHelper;
19 import org.dom4j.Element;
20 import org.dom4j.XPath;
21 import org.dom4j.io.SAXReader;
22 import org.opensync.engine.server.connector.BDConnector;
23 import org.opensync.engine.util.FileHelper;
24 import org.opensync.engine.util.I18n;
25 import org.opensync.tools.Utils;
26 import org.opensync.xmldb.DatabaseManager;
27 import org.xml.sax.SAXException;
28 import org.xmldb.api.base.Collection;
29 import org.xmldb.api.base.XMLDBException;
30
31 /***
32 * This object represents the motor of the OpenSync application.
33 */
34
35 public class Synchronizer{
36
37 static protected SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss dd/MM/yyyy");
38
39 /***
40 * The mapper use to synchronize the and transform views of tasks
41 *
42 * @see Mapper
43 */
44 static protected Mapper mapper = new Mapper();
45 static public Task currentTask = null;
46 /***
47 * Execute a task
48 *
49 * @param task the task to execute
50 * @exception TransformerException
51 * @exception IOException
52 * @exception Exception
53 */
54 static public void executeTask(Task task)throws Exception{
55 String fileSeparator = System.getProperty("file.separator");
56
57 currentTask = task;
58
59 Source sourceFrom = null;
60 Source sourceTo = null;
61 View viewFrom = null;
62 View viewTo = null;
63
64 Protocol protocolFrom = null;
65 Protocol protocolTo = null;
66
67 Connector connectorTo = null;
68 Connector connectorFrom = null;
69
70 Adapter adapterFrom = null;
71 Adapter adapterTo = null;
72
73
74 DatabaseManager dbMgr = null;
75 Collection col = null;
76 Collection tx_col = null;
77
78 Statistic stat = new Statistic();
79 Window w = null;
80
81 boolean incrementalMode = false;
82 try{
83
84 long begin = System.currentTimeMillis();
85 logInfo(task,"info.synch.start",null);
86 Synchronization synchronization = task.getSynchronization();
87
88 sourceFrom = synchronization.getFromSource();
89 viewFrom = synchronization.getFromView();
90 connectorFrom = sourceFrom.getConnector();
91 if(connectorFrom.getType().equals("bd"))
92 w = new Window(0,((BDConnector)connectorFrom).getWindowSize() );
93
94 logDebug(task,"info.synch.use-connector",
95 new Object[]{connectorFrom.getType(),sourceFrom.getName(),connectorFrom.getUrl()}
96 );
97 adapterFrom = connectorFrom.getAdapter();
98
99 logDebug(task,"info.synch.use-adapter",
100 new Object[]{getClassName(adapterFrom),viewFrom.getName(),sourceFrom.getName()}
101 );
102 adapterFrom.startReadInputFile(true);
103 protocolFrom = connectorFrom.getProtocol();
104 if(protocolFrom != null){
105 logDebug(task,"info.synch.use-protocol-get",
106 new Object[]{protocolFrom.getName(),viewFrom.getName(),sourceFrom.getName()}
107 );
108 }
109
110 sourceTo = synchronization.getToSource();
111 viewTo = synchronization.getToView();
112 connectorTo = sourceTo.getConnector();
113 logDebug(task,"info.synch.use-connector",
114 new Object[]{connectorTo.getType(),sourceTo.getName(),connectorTo.getUrl()}
115 );
116 adapterTo = sourceTo.getConnector().getAdapter();
117
118 logDebug(task,"info.synch.use-adapter",
119 new Object[]{getClassName(adapterTo),viewTo.getName(),sourceTo.getName()}
120 );
121
122 adapterTo.init();
123 Document themes = adapterTo.readThemeDescriptor();
124 protocolTo = connectorTo.getProtocol();
125 if(protocolTo != null){
126 logDebug(task,"info.synch.use-protocol-put",
127 new Object[]{protocolTo.getName(),viewTo.getName(),sourceTo.getName()}
128 );
129 protocolTo.init();
130 }
131
132 String[] inViews = null;
133 if(protocolFrom != null){
134 logDebug(task,"info.synch.get-input-views",
135 new Object[]{viewFrom.getName(),connectorFrom.getUrl(),protocolFrom.getFolder()}
136 );
137
138 inViews = protocolFrom.getViews(viewFrom.getFile());
139 if(inViews.length == 0){
140 logDebug(task,"info.synch.no-view",null);
141 }
142 }
143 else{
144
145 inViews = new String[]{""};
146 }
147
148 String id = sourceFrom.getName() + "_" + sourceTo.getName() + "_" + viewFrom.getName() + "_" + viewTo.getName();
149
150 incrementalMode = OpenSync.getInstance().getIncrementalMode();
151 if (incrementalMode) {
152
153 dbMgr = new DatabaseManager();
154
155 if ( (col = dbMgr.getCollection("sync_col_"+id)) == null) col = dbMgr.createCollection("sync_col_"+id);
156 tx_col = dbMgr.tx_start(col);
157 }
158
159 boolean endOfviews = true;
160 File xmlMappingFile = null;
161 for(int i=0;i<inViews.length;i++) {
162 if (inViews[i] != null) endOfviews = false;
163 }
164 int line = 0;
165 while (!endOfviews) {
166 for(int i=0;i<inViews.length;i++) {
167
168 logInfo(task,"info.synch.adapt-input-view",
169 new Object[]{viewFrom.getName(),getClassName(adapterFrom)}
170 );
171 String xmlView = null;
172 if (inViews[i] != null) {
173
174 xmlView = adapterFrom.adaptInputView(inViews[i],viewFrom, stat, w);
175 if(connectorFrom.getType().equals("bd")) {
176 if(w.getStartline() != -1) inViews[i] = "NOT FINISH";
177 else if(inViews[i].equals("NOT FINISH")) inViews[i] = "";
178 }
179
180
181 logDebug(task,"info.synch.get-mapping",new Object[]{synchronization.getUrlMapping()});
182 String urlMapping = synchronization.getUrlMapping();
183 if (xmlMappingFile == null) {
184 if(urlMapping != null && urlMapping.length() > 0){
185 String configFolder = System.getProperty("opensync.configfolder");
186 configFolder = (configFolder == null ? "" : configFolder);
187
188 String xmlMapping = FileHelper.fileToString(
189 OpenSync.getInstance().getFilePath(
190 fileSeparator+"etc"+fileSeparator+configFolder+fileSeparator+"mapping"+fileSeparator + synchronization.getUrlMapping(),true
191 )
192 );
193 xmlMappingFile = new File(OpenSync.getInstance().getFilePath(
194 fileSeparator+"etc"+fileSeparator+configFolder+fileSeparator+"mapping"+fileSeparator + synchronization.getUrlMapping(),true));
195 }
196 }
197
198 logDebug(task,"info.synch.transform",null);
199 logDebug(task,"info.synch.get-mapping",new Object[]{synchronization.getUrlMapping()});
200
201
202
203
204 logDebug(task,"info.synch.adapt-input-view", new Object[]{xmlView});
205 OpenSync.getInstance().getLog().debug(Log.ROOT, "Input view: \n"+xmlView);
206 xmlView = mapper.map(xmlMappingFile,xmlView);
207
208 if (connectorTo.getIncremental()) {
209
210
211
212
213 org.xmldb.api.modules.XMLResource res;
214 if (incrementalMode) {
215 if ( (res = dbMgr.retrieveDocument(col, id)) != null) {
216 SAXReader xmlReader = new SAXReader();
217 Document doc = xmlReader.read(new StringReader((String)res.getContent()));
218 Document curr_doc = xmlReader.read(new StringReader(xmlView));
219
220
221
222 List pks = adapterTo.getPKs(themes, viewTo.getTheme());
223 String xmlView_diff = prepareDocToAdd(doc, curr_doc, pks);
224 String xmlView_merge = mergeDoc(xmlReader.read(new StringReader(xmlView_diff)), doc);
225 dbMgr.addDocument(tx_col, id, xmlView_merge);
226 xmlView = xmlView_diff;
227 } else {
228 dbMgr.addDocument(tx_col, id, xmlView );
229 }
230
231 }
232 }
233
234 logDebug(task,"info.synch.adapt-output-view", new Object[]{xmlView});
235
236
237 logInfo(task,"info.synch.adapt-output-view",
238 new Object[]{viewTo.getName(),getClassName(adapterTo),getClassName(sourceTo)}
239 );
240 String outView = adapterTo.adaptOutputView(xmlView,viewTo, stat);
241
242
243 if(protocolTo != null){
244 logDebug(task,"info.synch.put-output-view",
245 new Object[]{viewTo.getName(),protocolTo.getName(),protocolTo.getFolder(),connectorTo.getUrl()}
246 );
247 protocolTo.putView(outView,viewTo.getFile());
248 logInfo(task, "info.synch.put-output-records", new Object[]{outView});
249 }
250
251 }
252
253 }
254
255
256
257 adapterFrom.startReadInputFile(false);
258
259 if (protocolFrom != null) {
260 inViews = protocolFrom.getViews(viewFrom.getFile());
261 }
262 endOfviews = true;
263 for(int i=0;i<inViews.length;i++) {
264 if (inViews[i] != null && inViews[i]!="") endOfviews = false;
265 }
266
267 }
268
269
270
271 if (connectorTo != null) {
272 connectorTo.release();
273 }
274 if (connectorFrom != null) {
275 connectorFrom.release();
276 }
277
278 if (incrementalMode) dbMgr.tx_commit(col);
279 }
280
281 catch(Exception e){
282 stat.error.add(e.toString());
283
284
285 if (incrementalMode) dbMgr.tx_rollback(col);
286
287 if (connectorTo != null) {
288 connectorFrom.releaseWithException();
289 connectorTo.releaseWithException();
290 }
291 logError(task, e);
292 throw e;
293 }
294 finally {
295 stat.print();
296 stat.end = java.util.Calendar.getInstance().getTime();
297 logStat(task, stat);
298 if (incrementalMode) {
299 dbMgr.close(col);
300 dbMgr.close(tx_col);
301 }
302 if(protocolFrom != null){
303 logDebug(task,"info.synch.protocol-clean",null);
304 protocolFrom.clean(viewFrom.getFile());
305 }
306 if(protocolTo != null){
307 logDebug(task,"info.synch.protocol-clean",null);
308 protocolTo.clean(viewTo.getFile());
309 }
310
311 logDebug(task,"info.synch.finish",null);
312 }
313
314
315 }
316
317 static private String mergeDoc(Document doc_diff, Document doc_target) {
318 Element root_target = doc_target.getRootElement();
319 Element root_diff = doc_diff.getRootElement();
320 Iterator elementIterator = root_diff.elementIterator();
321 while (elementIterator.hasNext()) {
322 Element element = (Element)elementIterator.next();
323 root_target.add(element.createCopy());
324 }
325 return root_target.asXML();
326 }
327
328 static private String prepareDocToAdd(Document doc, Document doc2add, List pks) throws XindiceException, XMLDBException, DocumentException {
329 Document doc_result = DocumentHelper.createDocument();
330
331 Element doc_root = doc.getRootElement();
332 XPath xpathSelector = DocumentHelper.createXPath("/DATA/ROW");
333 Element root_result = doc_result.addElement( "DATA" );
334 List results = xpathSelector.selectNodes(doc2add);
335 for ( Iterator iter = results.iterator(); iter.hasNext(); ) {
336 Element root = (Element) iter.next();
337 Iterator elementIterator = root.elementIterator();
338 StringBuffer xpathQuery = new StringBuffer("/DATA/ROW[");
339 while (elementIterator.hasNext()) {
340 Element element = (Element)elementIterator.next();
341 if (pks.contains(element.getName())) {
342 xpathQuery.append(element.getName()+"=\""+Utils.checkSpecialCharXml(element.getTextTrim()) + "\" and ");
343
344 }
345 }
346 String xpathQueryString = xpathQuery.toString();
347 if (!xpathQueryString.equals("/DATA/ROW[")) {
348 if (xpathQueryString.lastIndexOf("and") > 0) {
349 xpathQueryString = xpathQueryString.substring(0, xpathQueryString.lastIndexOf("and"));
350 }
351 xpathQueryString += "]";
352
353 XPath xpathSelector2 = DocumentHelper.createXPath(xpathQueryString);
354 List results2 = xpathSelector2.selectNodes(doc);
355 Iterator iter_res2 = results2.iterator();
356 if (!iter_res2.hasNext()) {
357 root_result.add(root.createCopy());
358 }
359 } else
360 root_result.add(root.createCopy());
361 }
362
363
364 return root_result.asXML();
365 }
366 /***
367 * Log information
368 *
369 * @param task the task executed
370 * @param msgKey the message to log
371 * @param params the message parameters
372 */
373 static protected void logDebug(Task task,String msgKey,Object[]params){
374 Log log = OpenSync.getInstance().getLog();
375 Category cat = Category.getInstance(task.getName());
376 log.debug(cat,msgKey,params);
377 String str;
378 if(params == null){
379 str = I18n.getInstance().get(msgKey);
380 }
381 else{
382 str = I18n.getInstance().format(msgKey,params);
383 }
384 log.debug(Log.ROOT,task.getName() + " : " + str);
385
386 }
387
388 /***
389 * Log information
390 *
391 * @param task the task executed
392 * @param msgKey the message to log
393 * @param params the message parameters
394 */
395 static protected void logInfo(Task task,String msgKey,Object[]params){
396 Log log = OpenSync.getInstance().getLog();
397 Category cat = Category.getInstance(task.getName());
398 log.debug(cat,msgKey,params);
399 String str;
400 if(params == null){
401 str = I18n.getInstance().get(msgKey);
402 }
403 else{
404 str = I18n.getInstance().format(msgKey,params);
405 }
406 log.info(Log.ROOT,task.getName() + " : " + str);
407
408 }
409
410 /***
411 * Log information
412 *
413 * @param task the task executed
414 * @param msgKey the message to log
415 * @param params the message parameters
416 */
417 static protected void logStat(Task task,Statistic stat){
418 Log log = OpenSync.getInstance().getLog();
419 String str = stat.toString();
420 log.info(Log.STATISTIC,"Start : "+dateFormat.format(stat.start));
421 log.info(Log.STATISTIC,"task name : "+task.getName());
422 log.info(Log.STATISTIC,"\t\tINPUT\t= "+stat.query);
423 log.info(Log.STATISTIC,"\t\tUPDATE\t= "+stat.update);
424 log.info(Log.STATISTIC,"\t\tINSERT\t= "+stat.insert);
425 log.info(Log.STATISTIC,"\t\tERROR\t= "+stat.error.size());
426 if(stat.error.size() > 0) log.info(Log.STATISTIC,"Error messages : ");
427 for(int i=0; i<stat.error.size(); i++) {
428 log.info(Log.STATISTIC,"\t"+stat.error.get(i));
429 }
430 log.info(Log.STATISTIC,"End : "+dateFormat.format(stat.end));
431 }
432
433
434 /***
435 * Log information
436 *
437 * @param task the task executed
438 * @param msgKey the message to log
439 * @param params the message parameters
440 */
441 static public void logWarning(Task task,String msgKey,Object[]params){
442 Log log = OpenSync.getInstance().getLog();
443 Category cat = Category.getInstance(task.getName());
444 log.warn(cat,msgKey,params);
445 String str;
446 if(params == null){
447 str = I18n.getInstance().get(msgKey);
448 }
449 else{
450 str = I18n.getInstance().format(msgKey,params);
451 }
452 log.warn(Log.ROOT,task.getName() + " : " + str);
453
454 }
455
456 /***
457 * Log error
458 *
459 * @param task the executed task
460 * @param ex the exception log
461 */
462 static public void logError(Task task,Exception ex){
463 Log log = OpenSync.getInstance().getLog();
464 Category cat = Category.getInstance(task.getName());
465 if(ex instanceof OpenSyncException){
466 log.error(Log.SYNCHRONIZER,
467 I18n.getInstance().format(
468 "info.task.fail",new Object[]{
469 task.getName(),Log.dateFormat.format(
470 GregorianCalendar.getInstance().getTime()
471 ),ex
472 }
473 )
474 );
475 log.error(cat,ex.getMessage());
476 } else if (ex instanceof SAXException && (((SAXException)ex).getException() != null)) {
477 ex = ((SAXException)ex).getException();
478 log.error(Log.SYNCHRONIZER,
479 I18n.getInstance().format(
480 "info.task.fail",new Object[]{
481 task.getName(),Log.dateFormat.format(
482 GregorianCalendar.getInstance().getTime()
483 ),ex
484 }
485 )
486 );
487 log.error(cat,ex.getMessage());
488 } else{
489
490
491
492
493
494
495
496
497
498 log.fatal(cat,
499 I18n.getInstance().format(
500 "info.task.fail",new Object[]{
501 task.getName(),Log.dateFormat.format(
502 GregorianCalendar.getInstance().getTime()
503 ),ex
504 }
505 )
506 );
507 }
508 }
509 /***
510 * Trunc the full name of the class
511 *
512 * @param obj the class
513 */
514 public static String getClassName(Object obj){
515 StringTokenizer tokens = new StringTokenizer(obj.getClass().getName(),".");
516 String name = null;
517 while(tokens.hasMoreTokens()){
518 name = tokens.nextToken();
519 }
520 return name;
521 }
522 }
523
524
525
526