001 /* 002 * Cumulus4j - Securing your data in the cloud - http://cumulus4j.org 003 * Copyright (C) 2011 NightLabs Consulting GmbH 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <http://www.gnu.org/licenses/>. 017 */ 018 package org.cumulus4j.store; 019 020 import java.util.HashMap; 021 import java.util.Locale; 022 import java.util.Map; 023 import java.util.Set; 024 025 import javax.jdo.JDOHelper; 026 import javax.jdo.PersistenceManager; 027 import javax.jdo.PersistenceManagerFactory; 028 import javax.transaction.xa.XAException; 029 import javax.transaction.xa.XAResource; 030 import javax.transaction.xa.Xid; 031 032 import org.cumulus4j.store.model.ClassMeta; 033 import org.cumulus4j.store.model.DataEntry; 034 import org.cumulus4j.store.model.DatastoreVersion; 035 import org.cumulus4j.store.model.EmbeddedClassMeta; 036 import org.cumulus4j.store.model.EmbeddedFieldMeta; 037 import org.cumulus4j.store.model.EncryptionCoordinateSet; 038 import org.cumulus4j.store.model.FieldMeta; 039 import org.cumulus4j.store.model.IndexEntry; 040 import org.cumulus4j.store.model.IndexEntryContainerSize; 041 import org.cumulus4j.store.model.Sequence2; 042 import org.cumulus4j.store.resource.ResourceHelper; 043 import org.datanucleus.ExecutionContext; 044 import org.datanucleus.PersistenceConfiguration; 045 import org.datanucleus.store.StoreManager; 046 import org.datanucleus.store.connection.AbstractConnectionFactory; 047 import org.datanucleus.store.connection.AbstractManagedConnection; 048 import org.datanucleus.store.connection.ManagedConnection; 049 import org.datanucleus.util.NucleusLogger; 050 import org.datanucleus.util.StringUtils; 051 052 /** 053 * <p> 054 * Connection factory implementation for Cumulus4j-connections. 055 * </p><p> 056 * A "connection" in Cumulus4J is a <code>PersistenceManager</code> for the backing datastore. 057 * When the transaction in Cumulus4J is committed, the equivalent transaction is committed in the PM(s) of the 058 * backing datastore(s). 059 * </p><p> 060 * How to configure a connection factory is documented on 061 * <a href="http://cumulus4j.org/1.2.0-SNAPSHOT/documentation/persistence-api.html">Persistence API</a>. 062 * </p> 063 * @author Marco หงุ่ยตระà¸à¸¹à¸¥-Schulze - marco at nightlabs dot de 064 */ 065 public class Cumulus4jConnectionFactory extends AbstractConnectionFactory 066 { 067 /** PMF for DataEntry, ClassMeta+FieldMeta, and optionally index data (if not using pmfIndex). */ 068 private PersistenceManagerFactory pmf; 069 070 /** Optional PMF for index data. */ 071 private PersistenceManagerFactory pmfIndex; 072 073 private String[] propertiesToForward = { 074 "datanucleus.ConnectionDriverName", 075 "datanucleus.ConnectionURL", 076 "datanucleus.ConnectionUserName", 077 "datanucleus.ConnectionFactory", 078 "datanucleus.ConnectionFactoryName", 079 "datanucleus.ConnectionFactory2", 080 "datanucleus.ConnectionFactory2Name" 081 }; 082 083 private static final String CUMULUS4J_PROPERTY_PREFIX = "cumulus4j."; 084 private static final String CUMULUS4J_INDEX_PROPERTY_PREFIX = "cumulus4j.index."; 085 086 private static final String[] CUMULUS4J_FORWARD_PROPERTY_PREFIXES = { 087 CUMULUS4J_PROPERTY_PREFIX + "datanucleus.", 088 CUMULUS4J_PROPERTY_PREFIX + "javax." 089 }; 090 091 private static final String[] CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES = { 092 CUMULUS4J_INDEX_PROPERTY_PREFIX + "datanucleus.", 093 CUMULUS4J_INDEX_PROPERTY_PREFIX + "javax." 094 }; 095 096 public Cumulus4jConnectionFactory(StoreManager storeMgr, String resourceType) { 097 super(storeMgr, resourceType); 098 099 Map<String, Object> backendProperties = ResourceHelper.getCumulus4jBackendProperties(); 100 Map<String, Object> persistenceProperties = ResourceHelper.getCumulus4jPersistenceProperties(); 101 Map<String, Object> backendIndexProperties = null; 102 103 PersistenceConfiguration persistenceConfiguration = storeMgr.getNucleusContext().getPersistenceConfiguration(); 104 for (Map.Entry<String, Object> me : persistenceProperties.entrySet()) { 105 if (me.getKey() == null) // can't happen, but better play safe 106 continue; 107 108 if (!persistenceConfiguration.hasProperty(me.getKey())) // we load our defaults after the user config, hence we must not override! 109 persistenceConfiguration.setProperty(me.getKey(), me.getValue()); 110 } 111 112 // Copy the properties that are directly (as is) forwarded. 113 for (String propKey : propertiesToForward) { 114 Object propValue = persistenceConfiguration.getProperty(propKey); 115 if (propValue != null) 116 backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), propValue); 117 } 118 119 // Copy the properties that are prefixed with "cumulus4j." and thus forwarded. 120 for (Map.Entry<String, Object> me : persistenceConfiguration.getPersistenceProperties().entrySet()) { 121 if (me.getKey() == null) // don't know if null keys can ever occur, but better play safe 122 continue; 123 124 for (String prefix : CUMULUS4J_FORWARD_PROPERTY_PREFIXES) { 125 if (me.getKey().startsWith(prefix)) { 126 String propKey = me.getKey().substring(CUMULUS4J_PROPERTY_PREFIX.length()); 127 backendProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue()); 128 } 129 } 130 131 for (String prefix : CUMULUS4J_INDEX_FORWARD_PROPERTY_PREFIXES) { 132 if (me.getKey().startsWith(prefix)) { 133 String propKey = me.getKey().substring(CUMULUS4J_INDEX_PROPERTY_PREFIX.length()); 134 if (backendIndexProperties == null) { 135 backendIndexProperties = new HashMap<String, Object>(backendProperties); 136 } 137 backendIndexProperties.put(propKey.toLowerCase(Locale.ENGLISH), me.getValue()); 138 } 139 } 140 } 141 142 // The password might be encrypted, but the getConnectionPassword(...) method decrypts it. 143 String pw = storeMgr.getConnectionPassword(); 144 if (pw != null) { 145 backendProperties.put("datanucleus.ConnectionPassword".toLowerCase(Locale.ENGLISH), pw); 146 } 147 148 // This block is an alternative to getting Extent of each Cumulus4j schema class 149 /* StringBuffer classNameStr = new StringBuffer(); 150 classNameStr.append(ClassMeta.class.getName()).append(","); 151 classNameStr.append(DataEntry.class.getName()).append(","); 152 classNameStr.append(FieldMeta.class.getName()).append(","); 153 classNameStr.append(IndexEntryContainerSize.class.getName()).append(","); 154 classNameStr.append(Sequence.class.getName()); 155 PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager(); 156 ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension( 157 "org.cumulus4j.store.index_mapping", null, null); 158 if (elems != null && elems.length > 0) { 159 HashSet<Class> initialisedClasses = new HashSet<Class>(); 160 for (int i=0;i<elems.length;i++) { 161 String indexTypeName = elems[i].getAttribute("index-entry-type"); 162 Class cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName); 163 if (!initialisedClasses.contains(cls)) { 164 initialisedClasses.add(cls); 165 classNameStr.append(",").append(indexTypeName); 166 } 167 } 168 } 169 cumulus4jBackendProperties.put("datanucleus.autostartmechanism", "Classes"); 170 cumulus4jBackendProperties.put("datanucleus.autostartclassnames", classNameStr.toString());*/ 171 172 // PMF for data (and optionally index) 173 if (backendIndexProperties == null) { 174 NucleusLogger.GENERAL.debug("Creating PMF for Data+Index with the following properties : "+StringUtils.mapToString(backendProperties)); 175 } 176 else { 177 NucleusLogger.GENERAL.debug("Creating PMF for Data with the following properties : "+StringUtils.mapToString(backendProperties)); 178 } 179 pmf = JDOHelper.getPersistenceManagerFactory(backendProperties); 180 181 // initialise meta-data (which partially tests it) 182 PersistenceManager pm = pmf.getPersistenceManager(); 183 try { 184 // Class structure meta-data 185 pm.getExtent(ClassMeta.class); 186 pm.getExtent(FieldMeta.class); 187 pm.getExtent(EmbeddedClassMeta.class); 188 pm.getExtent(EmbeddedFieldMeta.class); 189 190 // Data 191 pm.getExtent(DataEntry.class); 192 193 // Sequence for ID generation 194 pm.getExtent(Sequence2.class); 195 196 // Mapping for encryption settings (encryption algorithm, mode, padding, MAC, etc. 197 // are mapped to a number which reduces the size of each record) 198 pm.getExtent(EncryptionCoordinateSet.class); 199 200 // versioning of datastore structure 201 pm.getExtent(DatastoreVersion.class); 202 203 if (backendIndexProperties == null) { 204 // Index 205 initialiseIndexMetaData(pm, storeMgr); 206 } 207 } finally { 208 pm.close(); 209 } 210 211 if (backendIndexProperties != null) { 212 // PMF for index data 213 NucleusLogger.GENERAL.debug("Creating PMF for Index data with the following properties : "+StringUtils.mapToString(backendIndexProperties)); 214 pmfIndex = JDOHelper.getPersistenceManagerFactory(backendIndexProperties); 215 216 PersistenceManager pmIndex = pmfIndex.getPersistenceManager(); 217 try { 218 // Class structure meta-data 219 pmIndex.getExtent(ClassMeta.class); 220 pmIndex.getExtent(FieldMeta.class); 221 222 // Index 223 initialiseIndexMetaData(pmIndex, storeMgr); 224 225 // versioning of datastore structure 226 pm.getExtent(DatastoreVersion.class); 227 } finally { 228 pmIndex.close(); 229 } 230 } 231 } 232 233 private static void initialiseIndexMetaData(PersistenceManager pm, StoreManager storeMgr) 234 { 235 // While it is not necessary to initialise the meta-data now (can be done lazily, 236 // when the index is used), it is still better as it prevents delays when the 237 // data is persisted. 238 // Furthermore, if the underlying database uses transactional DDL (like PostgreSQL, MSSQL 239 // and others), and a separate JDBC connection is used for DDL (like it must be in 240 // a JEE server), it is essentially required to initialise the meta-data in a separate 241 // transaction before actually using the tables. 242 pm.getExtent(IndexEntryContainerSize.class); 243 244 Set<Class<? extends IndexEntry>> indexEntryClasses = ((Cumulus4jStoreManager)storeMgr).getIndexFactoryRegistry().getIndexEntryClasses(); 245 for (Class<? extends IndexEntry> indexEntryClass : indexEntryClasses) { 246 pm.getExtent(indexEntryClass); 247 } 248 249 // PluginManager pluginMgr = storeMgr.getNucleusContext().getPluginManager(); 250 // ConfigurationElement[] elems = pluginMgr.getConfigurationElementsForExtension( 251 // "org.cumulus4j.store.index_mapping", null, null); 252 // if (elems != null && elems.length > 0) { 253 // HashSet<Class<?>> initialisedClasses = new HashSet<Class<?>>(); 254 // for (int i=0;i<elems.length;i++) { 255 // String indexTypeName = elems[i].getAttribute("index-entry-type"); 256 // Class<?> cls = pluginMgr.loadClass("org.cumulus4j.store.index_mapping", indexTypeName); 257 // if (!initialisedClasses.contains(cls)) { 258 // initialisedClasses.add(cls); 259 // pm.getExtent(cls); 260 // } 261 // } 262 // } 263 } 264 265 public PersistenceManagerFactory getPMFData() { 266 return pmf; 267 } 268 269 public PersistenceManagerFactory getPMFIndex() { 270 return pmfIndex; 271 } 272 273 @Override 274 public ManagedConnection createManagedConnection(ExecutionContext ec, @SuppressWarnings("rawtypes") Map transactionOptions) 275 { 276 return new Cumulus4jManagedConnection(ec, transactionOptions); 277 } 278 279 /** 280 * Cumulus4j-specific {@link ManagedConnection} implementation. Avoid to access this specific class whenever possible! 281 * @author mschulze 282 */ 283 class Cumulus4jManagedConnection extends AbstractManagedConnection 284 { 285 private ExecutionContext ec; 286 287 @SuppressWarnings({"rawtypes","unused"}) 288 private Map options; 289 290 PersistenceManagerConnection pmConnection; 291 292 @Override 293 public XAResource getXAResource() { 294 return new Cumulus4jXAResource((PersistenceManagerConnection)getConnection()); 295 } 296 297 public Cumulus4jManagedConnection(ExecutionContext ec, @SuppressWarnings("rawtypes") Map options) { 298 this.ec = ec; 299 this.options = options; 300 } 301 302 public ExecutionContext getExecutionContext() { 303 return ec; 304 } 305 306 @Override 307 public void close() { 308 if (pmConnection != null) { 309 PersistenceManager dataPM = pmConnection.getDataPM(); 310 dataPM.close(); 311 if (pmConnection.indexHasOwnPM()) { 312 PersistenceManager indexPM = pmConnection.getIndexPM(); 313 indexPM.close(); 314 } 315 pmConnection = null; 316 } 317 } 318 319 @Override 320 public Object getConnection() { 321 if (pmConnection == null) { 322 this.pmConnection = new PersistenceManagerConnection(pmf.getPersistenceManager(), 323 pmfIndex != null ? pmfIndex.getPersistenceManager() : null); 324 } 325 return pmConnection; 326 } 327 } 328 329 class Cumulus4jXAResource implements XAResource { 330 private PersistenceManagerConnection pmConnection; 331 // private Xid xid; 332 333 Cumulus4jXAResource(PersistenceManagerConnection pmConn) { 334 this.pmConnection = pmConn; 335 } 336 337 @Override 338 public void start(Xid xid, int arg1) throws XAException { 339 // if (this.xid != null) 340 // throw new IllegalStateException("Transaction already started! Cannot start twice!"); 341 342 PersistenceManager dataPM = pmConnection.getDataPM(); 343 dataPM.currentTransaction().begin(); 344 if (pmConnection.indexHasOwnPM()) { 345 PersistenceManager indexPM = pmConnection.getIndexPM(); 346 indexPM.currentTransaction().begin(); 347 } 348 // this.xid = xid; 349 } 350 351 @Override 352 public void commit(Xid xid, boolean arg1) throws XAException { 353 // if (this.xid == null) 354 // throw new IllegalStateException("Transaction not active!"); 355 // 356 // if (!this.xid.equals(xid)) 357 // throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid); 358 359 PersistenceManager dataPM = pmConnection.getDataPM(); 360 dataPM.currentTransaction().commit(); 361 if (pmConnection.indexHasOwnPM()) { 362 PersistenceManager indexPM = pmConnection.getIndexPM(); 363 indexPM.currentTransaction().commit(); 364 } 365 366 // this.xid = null; 367 } 368 369 @Override 370 public void rollback(Xid xid) throws XAException { 371 // if (this.xid == null) 372 // throw new IllegalStateException("Transaction not active!"); 373 // 374 // if (!this.xid.equals(xid)) 375 // throw new IllegalStateException("Transaction mismatch! this.xid=" + this.xid + " otherXid=" + xid); 376 377 PersistenceManager dataPM = pmConnection.getDataPM(); 378 dataPM.currentTransaction().rollback(); 379 if (pmConnection.indexHasOwnPM()) { 380 PersistenceManager indexPM = pmConnection.getIndexPM(); 381 indexPM.currentTransaction().rollback(); 382 } 383 384 // this.xid = null; 385 } 386 387 @Override 388 public void end(Xid arg0, int arg1) throws XAException { 389 //ignore 390 } 391 392 @Override 393 public void forget(Xid arg0) throws XAException { 394 //ignore 395 } 396 397 @Override 398 public int getTransactionTimeout() throws XAException { 399 return 0; 400 } 401 402 @Override 403 public boolean isSameRM(XAResource resource) throws XAException { 404 if ((resource instanceof Cumulus4jXAResource) && pmConnection.equals(((Cumulus4jXAResource)resource).pmConnection)) 405 return true; 406 else 407 return false; 408 } 409 410 @Override 411 public int prepare(Xid arg0) throws XAException { 412 return 0; 413 } 414 415 @Override 416 public Xid[] recover(int arg0) throws XAException { 417 throw new XAException("Unsupported operation"); 418 } 419 420 @Override 421 public boolean setTransactionTimeout(int arg0) throws XAException { 422 return false; 423 } 424 } 425 }