001/* 002 * $RCSfile: Entity.java,v $ 003 * $Revision: 1.61.2.5 $ 004 * $Date: 2015/05/05 14:21:48 $ 005 * 006 * Copyright 2012, 2013 Cloud Software Group, Inc. ALL RIGHTS RESERVED. 007 * Cloud Software Group, Inc. Confidential Information 008 */ 009package com.tibco.xp.runtime; 010 011import java.io.Serializable; 012import java.util.ArrayList; 013import java.util.HashMap; 014import java.util.List; 015 016import com.kabira.platform.ResourceUnavailableException; 017import com.kabira.platform.swbuiltin.TransactionNotifier; 018import com.tibco.cep.kernel.core.rete.ReteWM; 019import com.tibco.cep.kernel.model.knowledgebase.DuplicateExtIdException; 020import com.tibco.cep.kernel.service.logging.Level; 021import com.tibco.cep.runtime.model.element.impl.EntityImpl; 022import com.tibco.cep.runtime.model.element.impl.ManagedObjectManager; 023import com.tibco.cep.runtime.session.RuleServiceProviderManager; 024import com.tibco.cep.runtime.session.RuleSession; 025import com.tibco.cep.runtime.session.RuleSessionManager; 026import com.tibco.cep.runtime.session.impl.RuleSessionImpl; 027import com.tibco.cep.runtime.session.impl.RuleSessionManagerImpl; 028 029/** 030 * Concept and Event base class. 031 */ 032public abstract class Entity implements Serializable 033{ 034 /** 035 * Invalid type identifier constant 036 */ 037 protected static final int INVALID_TYPE_IDENTIFIER = 0; 038 039 // TODO - Find constant value in BE source for the magic 0 040 /** 041 * Empty entity identifier 042 */ 043 static final long EMPTY_ENTITY_IDENTIFIER = 0; 044 045 046 /** 047 * Load an entity into working memory. 048 * <p> 049 * The entity is not asserted, it does not 050 * cause any rules to be triggered. 051 * <p> 052 * Calling this method on entities that are already 053 * loaded into working memory quietly does nothing. 054 * @throws ResourceUnavailableException Rules engine not running 055 */ 056 final public void load() throws ResourceUnavailableException 057 { 058 EntityImpl entityImpl = getHandle(this); 059 assert entityImpl != null : getId(); 060 061 if (TransactionService.isLoggerEnabledFor(Level.TRACE) == true) 062 { 063 TransactionService.log(Level.TRACE, "Loaded " + entityImpl); 064 } 065 } 066 067 /** 068 * Get the unique identifier for this entity 069 * @return Entity identifier 070 */ 071 public final long getId() 072 { 073 if (TransactionService.isRulesEngineActive() == false) 074 { 075 return EMPTY_ENTITY_IDENTIFIER; 076 } 077 078 assert TransactionService.getInstance() != null; 079 long identifier = getIdentifierFromRegistry(); 080 081 // 082 // If no identifier registered for this entity do it now. 083 // Identifiers are not assigned to entities created from Java 084 // until required. 085 // 086 if (identifier == IdentifierGenerator.RESERVED_FOR_EMPTY_INDICATOR) 087 { 088 identifier = TransactionService.getInstance().generateId(getClass()); 089 register(identifier); 090 } 091 092 return identifier; 093 } 094 095 @Override 096 public String toString() 097 { 098 return getClass().getName() + "@id=" + getId() + "@extId=" + getExtId(); 099 } 100 101 /** 102 * Remove this identifier from thread local storage 103 * @param id Identifier to remove 104 */ 105 static void unRegisterId(long id) 106 { 107 HashMap<Long,Holder> map = m_byIdentifierRegistry.get(); 108 assert map != null : Thread.currentThread().getName(); 109 assert map.containsKey(id) == true : id; 110 map.remove(id); 111 } 112 113 /** 114 * Assert an entity 115 * @param startRTC True starts an RTC. False just asserts the entity 116 * @throws DuplicateExtIdException Duplicated external identifier 117 */ 118 protected void assertEntity(final Boolean startRTC) throws DuplicateExtIdException 119 { 120 checkActive(); 121 122 RuleSession session = getAndSetActiveRuleSession(); 123 assert session != null : getClass().getName(); 124 125 // 126 // If RTC being started mark the thread as being in an RTC 127 // 128 if (startRTC == true) 129 { 130 pushRTCStack(); 131 } 132 133 // 134 // Get the handle - don't load it into working 135 // memory since that will prevent the assert 136 // from working 137 // 138 EntityImpl entityImpl = getHandle(false); 139 140 if (TransactionService.isLoggerEnabledFor(Level.TRACE) == true) 141 { 142 TransactionService.log(Level.TRACE, "Asserting entity " + entityImpl); 143 } 144 145 try 146 { 147 session.assertObject(entityImpl, startRTC); 148 } 149 finally 150 { 151 // 152 // If we executed an RTC, pop the RTC stack 153 // 154 if (startRTC == true) 155 { 156 popRTCStack(); 157 } 158 } 159 } 160 161 /** 162 * Get the external identifier for this entity. 163 * <p> 164 * Default implementation returns an empty string. 165 * Overridden in sub-types if they have mapped a user 166 * key to an external identifier 167 * @return Externalization identifier 168 */ 169 protected String getExtId() 170 { 171 return ""; 172 } 173 174 /** 175 * Register this entity 176 * @param id Entity identifier 177 */ 178 final protected void register(long id) 179 { 180 registerByEntity(id); 181 registerByIdentifier(null); 182 } 183 184 /** 185 * Lookup an entity by its unique identifier 186 * @param id Unique identifier 187 * @return Entity with unique identifier, or null if not found 188 */ 189 static Entity lookupById(long id) 190 { 191 if (id == EMPTY_ENTITY_IDENTIFIER) 192 { 193 return null; 194 } 195 196 // 197 // Check thread registry first 198 // 199 return getEntityFromRegistry(id); 200 } 201 202 /** 203 * Map a RETE handle to a managed object entity 204 * <p> 205 * If the handle is dirty, shared memory is updated 206 * before returning the mapped entity. 207 * @param handle Handle to map. Can be null. 208 * @return Mapped entity, or null if no entity to map 209 */ 210 // TODO - Currently required to be public because it is used in the 211 // wrapper catalog function stubs 212 public static Entity mapHandle(com.tibco.cep.kernel.model.entity.Entity handle) 213 { 214 if (handle == null) 215 { 216 return null; 217 } 218 219 Entity me = Entity.lookupById(handle.getId()); 220 221 // 222 // If we didn't find the entity we need to insert it. This can happen 223 // if an entity is created in an RTC and then a catalog function is called 224 // 225 if (me == null) 226 { 227 TransactionService.getInstance().insert((EntityImpl)handle); 228 me = Entity.lookupById(handle.getId()); 229 assert me != null : handle; 230 } 231 232 return me; 233 } 234 235 /** 236 * Map an array of RETE handles to managed entities 237 * @param handles Handle array to map 238 * @return Entities mapped from handle objects returned here 239 */ 240 // TODO - Currently required to be public because it is used in the 241 // wrapper catalog function stubs 242 public static ArrayList<Entity> mapHandleArray( 243 com.tibco.cep.kernel.model.entity.Entity [ ] handles) 244 { 245 if (handles == null) 246 { 247 return new ArrayList<Entity>(); 248 } 249 250 ArrayList<Entity> mapped = new ArrayList<Entity>(handles.length); 251 252 for (com.tibco.cep.kernel.model.entity.Entity e : handles) 253 { 254 mapped.add(mapHandle(e)); 255 } 256 return mapped; 257 } 258 259 /** 260 * Map an array of entities to their implementation objects 261 * @param entities Entity array to map 262 * @return Entity implementations mapped from entity objects 263 */ 264 // TODO - Currently required to be public because it is used in the 265 // wrapper catalog function stubs 266 public static ArrayList<com.tibco.cep.kernel.model.entity.Entity> mapArray(Entity [ ] entities) 267 { 268 assert entities != null; 269 270 ArrayList<com.tibco.cep.kernel.model.entity.Entity> mapped = 271 new ArrayList<com.tibco.cep.kernel.model.entity.Entity>(entities.length); 272 273 for (Entity e : entities) 274 { 275 mapped.add(Entity.getHandle(e)); 276 } 277 return mapped; 278 } 279 280 /** 281 * Check that the rules engine is running in this JVM 282 */ 283 protected static void checkActive() 284 { 285 // 286 // Throw a resource unavailable exception if the 287 // rules engine isn't active 288 // 289 if (ServiceProvider.isRulesEngineActive() == false) 290 { 291 String message = "Rules engine not active in this JVM"; 292 TransactionService.log(Level.FATAL, message); 293 throw new com.kabira.platform.ResourceUnavailableException(message); 294 } 295 } 296 297 /** 298 * Push an RTC stack 299 */ 300 static void pushRTCStack() 301 { 302 assert TransactionService.getInstance() != null; 303 304 Long count = m_rtcState.get(); 305 306 // 307 // If count is 0, we are starting an RTC stack 308 // If using transaction scope, create a transaction 309 // notifier to do the reseting at the transaction boundary 310 // An additional stack frame is pushed to hold the stack open 311 // until the transaction completes. 312 // 313 if (count == 0) 314 { 315 if (TransactionService.getInstance().isTransactionWorkingMemoryScope() == true) 316 { 317 // 318 // Add transaction stack frame 319 // 320 count++; 321 new Notifier(); 322 } 323 324 if (TransactionService.isLoggerEnabledFor(Level.TRACE) == true) 325 { 326 String scope = "RUN_TO_COMPLETION"; 327 if (TransactionService.getInstance().isTransactionWorkingMemoryScope() == true) 328 { 329 scope = "TRANSACTION"; 330 } 331 332 TransactionService.log(Level.TRACE, 333 "Starting " + scope + " scope for working memory. Thread " 334 + Thread.currentThread().getId() 335 + ":" 336 + Thread.currentThread().getName()); 337 } 338 } 339 count++; 340 m_rtcState.set(count); 341 } 342 343 /** 344 * Pop an RTC stack frame 345 */ 346 static void popRTCStack() 347 { 348 Long count = m_rtcState.get(); 349 assert count > 0; 350 count--; 351 m_rtcState.set(count); 352 353 // 354 // If stack is empty - clean up 355 // 356 if (count == 0) 357 { 358 reset(); 359 } 360 } 361 362 /** 363 * Return the current RTC stack depth 364 */ 365 static Long getRTCStackDepth() 366 { 367 return m_rtcState.get(); 368 } 369 370 /** 371 * Set the active rule session 372 * @return Active rule session 373 */ 374 static RuleSession getAndSetActiveRuleSession() 375 { 376 assert RuleServiceProviderManager.getInstance() != null; 377 assert RuleServiceProviderManager.getInstance().getDefaultProvider() != null; 378 assert RuleServiceProviderManager.getInstance().getDefaultProvider() instanceof BexRuleServiceProvider; 379 380 // 381 // If we are called from the context of a rule or rule function 382 // the current rule session has already been set - use it 383 // 384 RuleSession session = RuleSessionManager.getCurrentRuleSession(); 385 386 if (session != null) 387 { 388 return session; 389 } 390 391 // 392 // Get the rule session for this thread and make it active 393 // 394 session = m_ruleSessions.get(); 395 assert session != null; 396 assert session.getRuleRuntime() instanceof RuleSessionManagerImpl 397 : session.getRuleRuntime().getClass().getName(); 398 ((RuleSessionManagerImpl)session.getRuleRuntime()).setCurrentRuleSession(session); 399 400 return session; 401 } 402 403 /** 404 * Get the RETE handle for an entity 405 * <p> 406 * N.B. - this method is static so that it isn't 407 * dispatched on remote objects since it is accessing 408 * local POJOs 409 * @param entity Entity for which to get associated handle object. May be null. 410 * @return RETE handle, or null. 411 */ 412 // TODO - Currently required to be public because it is used in the 413 // wrapper catalog function stubs 414 public static EntityImpl getHandle(final Entity entity) 415 { 416 if (entity == null) 417 { 418 return null; 419 } 420 421 checkActive(); 422 423 return entity.getHandle(true); 424 } 425 426 /** 427 * Get a RETE handle 428 * @param load True if handle should be loaded into working 429 * memory if not already loaded, false do not load into working memory. 430 * @return RETE handle 431 */ 432 protected final EntityImpl getHandle(boolean load) 433 { 434 // 435 // Check to see if this entity is already loaded into working memory 436 // 437 EntityImpl handle = getHandleFromRegistry(getId()); 438 439 if (handle != null) 440 { 441 return handle; 442 } 443 444 EntityProxy entityProxy = EntityProxy.getProxy(this); 445 assert entityProxy != null : this; 446 447 handle = entityProxy.getHandle(); 448 assert handle != null : this; 449 450 // 451 // Register the entity with the identifier registry 452 // 453 registerByIdentifier(handle); 454 455 // 456 // Load the handle into the RTC working memory 457 // 458 if (load == true) 459 { 460 handle = ManagedObjectManager.loadIntoRtc(handle); 461 } 462 463 return handle; 464 } 465 // 466 // Reset per thread context 467 // 468 private static void reset() 469 { 470 assert getRTCStackDepth() == 0 : getRTCStackDepth(); 471 472 EntityProxy.clearProxyObjects(); 473 clearRegistry(); 474 475 // 476 // Clean up the rule session if still set. This is the 477 // case when asserts were done from Java 478 // 479 RuleSession session = RuleSessionManager.getCurrentRuleSession(); 480 481 if (session != null) 482 { 483 try 484 { 485 // 486 // Retract objects from working memory 487 // 488 // N.B. - Do not use session.reset() which spans threads 489 // 490 // TODO - Verify that this only returns objects in current thread 491 List<?> list = session.getObjects(); 492 for (Object o : list) 493 { 494 if (TransactionService.isLoggerEnabledFor(Level.TRACE) == true) 495 { 496 TransactionService.log(Level.TRACE, "Retracting " + o); 497 } 498 session.retractObject(o, false); 499 } 500 501 // 502 // Reset the rule agenda - the rules engine is not resetting this 503 // correctly on exceptions 504 // 505 assert session instanceof RuleSessionImpl : session; 506 assert ((RuleSessionImpl)session).getWorkingMemory() != null; 507 assert ((RuleSessionImpl)session).getWorkingMemory() instanceof ReteWM : 508 ((RuleSessionImpl)session).getWorkingMemory(); 509 510 ReteWM workingMemory = (ReteWM)((RuleSessionImpl)session).getWorkingMemory(); 511 workingMemory.cleanResolver(); 512 } 513 catch (Exception e) 514 { 515 e.printStackTrace(); 516 } 517 } 518 519 if (TransactionService.isLoggerEnabledFor(Level.TRACE) == true) 520 { 521 TransactionService.log( 522 Level.TRACE, "Reset thread context for thread " 523 + Thread.currentThread().getId() 524 + ":" 525 + Thread.currentThread().getName()); 526 } 527 } 528 529 // 530 // Register identifier in entity registry 531 // 532 private void registerByEntity(long id) 533 { 534 HashMap<Entity,Long> map = m_byEntityRegistry.get(); 535 assert map != null : Thread.currentThread().getName(); 536 assert map.get(this) == null : id + " already registered"; 537 map.put(this, id); 538 } 539 540 // 541 // Get the entity from the registry 542 // 543 private long getIdentifierFromRegistry() 544 { 545 HashMap<Entity,Long> map = m_byEntityRegistry.get(); 546 assert map != null : Thread.currentThread().getName(); 547 Long identifier = map.get(this); 548 549 if (identifier == null) 550 { 551 return IdentifierGenerator.RESERVED_FOR_EMPTY_INDICATOR; 552 } 553 return identifier; 554 } 555 556 // 557 // Register entity in by-identifier registry 558 // 559 private void registerByIdentifier(EntityImpl entityImpl) 560 { 561 HashMap<Long,Holder> map = m_byIdentifierRegistry.get(); 562 assert map != null : Thread.currentThread().getName(); 563 assert (entityImpl == null) || getHandleFromRegistry(getId()) == null : 564 getId() + " already registered"; 565 Holder holder = new Holder(this, entityImpl); 566 map.put(getId(), holder); 567 } 568 569 // 570 // Get the entity from the registry 571 // 572 private static Entity getEntityFromRegistry(long id) 573 { 574 assert id != EMPTY_ENTITY_IDENTIFIER; 575 HashMap<Long,Holder> map = m_byIdentifierRegistry.get(); 576 assert map != null : Thread.currentThread().getName(); 577 Holder holder = map.get(id); 578 579 if (holder == null) 580 { 581 return null; 582 } 583 return holder.getEntity(); 584 } 585 586 // 587 // Get the RETE handle from the registry 588 // 589 private EntityImpl getHandleFromRegistry(long id) 590 { 591 HashMap<Long,Holder> map = m_byIdentifierRegistry.get(); 592 assert map != null : Thread.currentThread().getName(); 593 Holder holder = map.get(id); 594 595 if (holder == null) 596 { 597 return null; 598 } 599 return holder.getHandle(); 600 } 601 602 // 603 // Clear identifier registry 604 // 605 private static void clearRegistry() 606 { 607 m_byIdentifierRegistry.get().clear(); 608 m_byEntityRegistry.get().clear(); 609 } 610 611 // 612 // Holder object for identifier registry 613 // 614 private static class Holder 615 { 616 Holder(final Entity entity, final EntityImpl entityImpl) 617 { 618 m_entity = entity; 619 m_entityImpl = entityImpl; 620 } 621 622 Entity getEntity() 623 { 624 return m_entity; 625 } 626 627 EntityImpl getHandle() 628 { 629 return m_entityImpl; 630 } 631 @Override 632 public String toString() 633 { 634 return m_entity + ":" + m_entityImpl; 635 } 636 private final Entity m_entity; 637 private final EntityImpl m_entityImpl; 638 } 639 640 // 641 // Thread local storage for by-identifier registry 642 // 643 private static class ByIdentifierRegistry extends ThreadLocal<HashMap<Long, Holder>> 644 { 645 646 @Override 647 protected HashMap<Long, Holder> initialValue() 648 { 649 return new HashMap<Long, Holder>(); 650 } 651 @Override 652 public HashMap<Long,Holder> get() 653 { 654 return super.get(); 655 } 656 @Override 657 public void set(HashMap<Long,Holder> value) 658 { 659 super.set(value); 660 } 661 @Override 662 public void remove() 663 { 664 super.remove(); 665 } 666 } 667 668 // 669 // Thread local storage for by-entity registry 670 // 671 private static class ByEntityRegistry extends ThreadLocal<HashMap<Entity, Long>> 672 { 673 @Override 674 protected HashMap<Entity, Long> initialValue() 675 { 676 return new HashMap<Entity, Long>(); 677 } 678 @Override 679 public HashMap<Entity,Long> get() 680 { 681 return super.get(); 682 } 683 @Override 684 public void set(HashMap<Entity,Long> value) 685 { 686 super.set(value); 687 } 688 @Override 689 public void remove() 690 { 691 super.remove(); 692 } 693 } 694 695 // 696 // Thread local storage for the current RTC state 697 // 698 private static class RTCState extends ThreadLocal<Long> 699 { 700 @Override 701 protected Long initialValue() 702 { 703 return 0L; 704 } 705 @Override 706 public Long get() 707 { 708 return super.get(); 709 } 710 @Override 711 public void set(Long value) 712 { 713 super.set(value); 714 } 715 @Override 716 public void remove() 717 { 718 super.remove(); 719 } 720 } 721 // 722 // Thread local storage for the RuleSessions 723 // 724 private static class RuleSessions extends ThreadLocal<RuleSession> 725 { 726 @Override 727 protected RuleSession initialValue() 728 { 729 return createRuleSession(); 730 } 731 @Override 732 public RuleSession get() 733 { 734 return super.get(); 735 } 736 @Override 737 public void set(RuleSession value) 738 { 739 super.set(value); 740 } 741 @Override 742 public void remove() 743 { 744 super.remove(); 745 } 746 747 // 748 // Create a rule session 749 // 750 private RuleSession createRuleSession() 751 { 752 assert RuleServiceProviderManager.getInstance() != null; 753 assert RuleServiceProviderManager.getInstance().getDefaultProvider() != null; 754 assert RuleServiceProviderManager.getInstance().getDefaultProvider() instanceof BexRuleServiceProvider; 755 756 RuleSession session = null; 757 BexRuleServiceProvider rsp 758 = (BexRuleServiceProvider)RuleServiceProviderManager.getInstance().getDefaultProvider(); 759 760 RuleSession[] sessions = rsp.getRuleRuntime().getRuleSessions(); 761 762 assert sessions != null; 763 764 for (RuleSession s : sessions) 765 { 766 session = s; 767 break; 768 } 769 770 return session; 771 } 772 } 773 774 // 775 // Transaction notifier for transaction scope working 776 // memory management 777 // 778 private static class Notifier extends TransactionNotifier 779 { 780 public Notifier() 781 { 782 assert Entity.getRTCStackDepth() == 0 : Entity.getRTCStackDepth(); 783 } 784 @Override 785 public void onPrepare() 786 { 787 assert Entity.getRTCStackDepth() == 1 : Entity.getRTCStackDepth(); 788 reset(); 789 } 790 791 @Override 792 public void onRollback() 793 { 794 assert Entity.getRTCStackDepth() > 0 : Entity.getRTCStackDepth(); 795 reset(); 796 } 797 798 private void reset() 799 { 800 assert TransactionService.getInstance().isTransactionWorkingMemoryScope() == true; 801 802 // 803 // Pop the transaction stack frame 804 // 805 Entity.popRTCStack(); 806 } 807 } 808 809 private static final long serialVersionUID = 1L; 810 private static final RTCState m_rtcState = new RTCState(); 811 private static final ByIdentifierRegistry m_byIdentifierRegistry = new ByIdentifierRegistry(); 812 private static final ByEntityRegistry m_byEntityRegistry = new ByEntityRegistry(); 813 private static final RuleSessions m_ruleSessions = new RuleSessions(); 814}