Package de.hybris.deltadetection.impl
Class DefaultChangeDetectionService
java.lang.Object
de.hybris.deltadetection.impl.DefaultChangeDetectionService
- All Implemented Interfaces:
ChangeDetectionService
Default Implementation of change detection service
-
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidcollectChangesForType(ComposedTypeModel composedType, StreamConfiguration configuration, ChangesCollector collector) Detects all kind of changes (NEW, MODIFIED, DELETED) on items for the given composed type.voidcollectChangesForType(ComposedTypeModel composedType, String streamId, ChangesCollector collector) Detects all kind of changes (NEW, MODIFIED, DELETED) on items for the given composed type.voidconsumeChanges(List<ItemChangeDTO> allChanges) Consumes all given changes.protected intconsumeChangesBatch(List<ItemChangeDTO> changes, String streamId) createConsumeBatchCallable(String streamId, List<ItemChangeDTO> changes, int maxRetries) voiddeleteItemVersionMarkersForStream(String streamId) Deletes all item version markers belonging to the streamprotected voidfillInitialVersionMarker(ItemVersionMarkerModel marker, Long itemPK, Date version, String versionValue, String info, ComposedTypeModel itemComposedType, String streamId) protected Map<Long,ItemVersionMarkerModel> findItemVersionMarkersForStreamByPKs(String streamId, List<PK> PKs) protected ItemVersionMarkerModelfindVersionMarkerByItemPK(String streamId, Long itemPk) Checks and returns existing item version marker for the item and stream.getChangeForExistingItem(ItemModel item, String streamId) Detects change for a given existing item and stream.getChangeForRemovedItem(PK pk, String streamId) Detects potential change for a an not existing item - (change type 'DELETED'), stored under ItemVersionMarker referencing given item pk.protected ItemChangeDTOgetChangeFromRow(List<Object> row, String streamId) getChangesForRemovedItems(String streamId) Detects changes for all items, which doesn't exist anymore (change type 'DELETED'), stored under ItemVersionMarker referencing the not existing item pks.protected List<ComposedTypeModel>getComposedTypesForStream(String streamId) protected List<ItemVersionMarkerModel>getDeletedVersionMarkers(String streamId) protected FlexibleSearchServiceprotected ModelServiceprotected List<ItemVersionMarkerModel>getNewOrModifiedVersionMarkers(String streamId, ComposedTypeModel model) protected Map<String,List<List<ItemChangeDTO>>> getPartitionedBatchChangesByStreamId(List<ItemChangeDTO> allChanges) protected StringgetQueryForDeletedItems(String typePksAsQueryParams, String baseType, String itemSelector) protected StringgetQueryForModifiedItems(String typePksAsQueryParams, String baseType, String itemSelector, String versionValuePart) protected StringgetQueryForNewItems(String typePksAsQueryParams, String baseType, String itemSelector, String versionValuePart) protected ItemModelgetSavedModel(PK pk) protected Optional<StreamConfigurationModel>getStreamById(String streamId) protected TypeServiceprotected List<ItemVersionMarkerModel>getVersionMarkersForRemovedItems(String streamId) protected booleanprotected booleanisNotSingleResult(List<ItemVersionMarkerModel> result) protected voidlogConsumedChanges(int numConsumed) protected StringparseInfoExpression(Long itemPK, StreamConfigurationModel streamConfiguration) protected Map<Long,ItemVersionMarkerModel> preloadVersionMarkers(List<ItemChangeDTO> changes, String streamId) prepareBatchExecutionCallables(List<ItemChangeDTO> allChanges) protected StringprepareQueryForFindingChangesByType(ComposedTypeModel composedType, StreamConfiguration configuration) voidresetStream(String streamId) Resets the stream.voidsetFlexibleSearchService(FlexibleSearchService flexibleSearchService) voidsetModelService(ModelService modelService) voidsetTypeService(TypeService typeService) protected voidupdateVersionMarker(ItemVersionMarkerModel marker, Date version, String versionValue, String info)
-
Field Details
-
VERSION_MARKERS_FOR_STREAM_BY_PKS_QRY
- See Also:
-
VERSION_MARKER_BY_PK
- See Also:
-
-
Constructor Details
-
DefaultChangeDetectionService
public DefaultChangeDetectionService()
-
-
Method Details
-
getChangeForExistingItem
Description copied from interface:ChangeDetectionServiceDetects change for a given existing item and stream. Finds changes of change type 'NEW' or "MODIFIED'.- Specified by:
getChangeForExistingItemin interfaceChangeDetectionService- Parameters:
item- the item to be checkedstreamId- the streamId for which the changes should be detect- Returns:
ItemChangeDTOObject containing all the information about the change
-
getChangeForRemovedItem
Description copied from interface:ChangeDetectionServiceDetects potential change for a an not existing item - (change type 'DELETED'), stored under ItemVersionMarker referencing given item pk.- Specified by:
getChangeForRemovedItemin interfaceChangeDetectionService- Parameters:
pk- the pk of the item to be checkedstreamId- the streamId for which the change should be detect- Returns:
ItemChangeDTOObject containing all the information about the change
-
isItemVersionMarkerActive
-
getChangesForRemovedItems
Description copied from interface:ChangeDetectionServiceDetects changes for all items, which doesn't exist anymore (change type 'DELETED'), stored under ItemVersionMarker referencing the not existing item pks.- Specified by:
getChangesForRemovedItemsin interfaceChangeDetectionService- Parameters:
streamId- the streamId for which the change should be detect- Returns:
- List of
ItemChangeDTOObjects containing all the information about the changes
-
collectChangesForType
public void collectChangesForType(ComposedTypeModel composedType, String streamId, ChangesCollector collector) Description copied from interface:ChangeDetectionServiceDetects all kind of changes (NEW, MODIFIED, DELETED) on items for the given composed type.- Specified by:
collectChangesForTypein interfaceChangeDetectionService- Parameters:
composedType- the type for which the change detection should be executed (including subtypes)streamId- the streamId for which the change should be detectcollector- responsible for collecting the changes in specified way, e.g. store them in memory or generate csv file.
-
collectChangesForType
public void collectChangesForType(ComposedTypeModel composedType, StreamConfiguration configuration, ChangesCollector collector) Description copied from interface:ChangeDetectionServiceDetects all kind of changes (NEW, MODIFIED, DELETED) on items for the given composed type.- Specified by:
collectChangesForTypein interfaceChangeDetectionService- Parameters:
composedType- the type for which the change detection should be executed (including subtypes)configuration- the stream configuration defining which item should be included into the stream.collector- responsible for collecting the changes in specified way, e.g. store them in memory or generate csv file.- See Also:
-
getChangeFromRow
-
prepareQueryForFindingChangesByType
protected String prepareQueryForFindingChangesByType(ComposedTypeModel composedType, StreamConfiguration configuration) -
getQueryForDeletedItems
-
getQueryForModifiedItems
-
getQueryForNewItems
-
preloadVersionMarkers
protected Map<Long,ItemVersionMarkerModel> preloadVersionMarkers(List<ItemChangeDTO> changes, String streamId) -
consumeChangesBatch
-
consumeChanges
Description copied from interface:ChangeDetectionServiceConsumes all given changes. it Means, for the changes of change type NEW - a new ItemVersionMarker is created, in case of change type MODIFIED - the item version marker is updated properly and in case of change type DELETED - the ItemVersionMarker is removed. The stream for which the changes will be consumed is defined inside the givenItemChangeDTOobjects- Specified by:
consumeChangesin interfaceChangeDetectionService- Parameters:
allChanges- list of the changes to be consumed.
-
logConsumedChanges
protected void logConsumedChanges(int numConsumed) -
getPartitionedBatchChangesByStreamId
protected Map<String,List<List<ItemChangeDTO>>> getPartitionedBatchChangesByStreamId(List<ItemChangeDTO> allChanges) -
prepareBatchExecutionCallables
-
createConsumeBatchCallable
protected Callable<Integer> createConsumeBatchCallable(String streamId, List<ItemChangeDTO> changes, int maxRetries) -
deleteItemVersionMarkersForStream
Description copied from interface:ChangeDetectionServiceDeletes all item version markers belonging to the stream- Specified by:
deleteItemVersionMarkersForStreamin interfaceChangeDetectionService- Parameters:
streamId- stream id for which item version markers should be deleted
-
resetStream
Description copied from interface:ChangeDetectionServiceResets the stream. It means that all ItemVersionMarkers with status DELETED will change their status to ACTIVE and all the ItemVersionMarkers with status ACTIVE will have their versionTS set to the beginning of Unix Epoch time (1 January 1970).- Specified by:
resetStreamin interfaceChangeDetectionService- Parameters:
streamId- stream id for which item version markers should be reset
-
findItemVersionMarkersForStreamByPKs
-
findVersionMarkerByItemPK
Checks and returns existing item version marker for the item and stream. Useful to check if the item has already assigned ItemVersionMarker in 'DELETED' state - in such case we'd just change status to 'ACTIVE' in order to avoid having to ItemVersionMarkers ('ACTIVE' and 'DELETED') for the same item. -
isNotSingleResult
-
getNewOrModifiedVersionMarkers
protected List<ItemVersionMarkerModel> getNewOrModifiedVersionMarkers(String streamId, ComposedTypeModel model) -
getDeletedVersionMarkers
-
getComposedTypesForStream
-
getStreamById
-
getVersionMarkersForRemovedItems
-
fillInitialVersionMarker
protected void fillInitialVersionMarker(ItemVersionMarkerModel marker, Long itemPK, Date version, String versionValue, String info, ComposedTypeModel itemComposedType, String streamId) -
updateVersionMarker
protected void updateVersionMarker(ItemVersionMarkerModel marker, Date version, String versionValue, String info) -
parseInfoExpression
-
getSavedModel
-
setFlexibleSearchService
-
getFlexibleSearchService
-
setModelService
-
getModelService
-
setTypeService
-
getTypeService
-