Class MultiThreadedImpExImportReader
java.lang.Object
de.hybris.platform.impex.jalo.ImpExReader
de.hybris.platform.impex.jalo.imp.ImpExImportReader
de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader
- Direct Known Subclasses:
ImpExImportJob.MyMultiThreadedImpExImportReader
Multi-threaded ImpEx import reader. This implementation starts multiple threads for processing lines. For reading
lines a separate thread is started too. Please note that line order cannot be guaranteed now. Reader methods which
rely upon line order must not be used:
discardNextLine(),
dumpNextLine(String) and setDumpingAllowed(boolean).- Since:
- 3.1-u7
-
Nested Class Summary
Nested classes/interfaces inherited from class de.hybris.platform.impex.jalo.ImpExReader
ImpExReader.ResultSetCSVReader -
Field Summary
Fields inherited from class de.hybris.platform.impex.jalo.ImpExReader
FIRST -
Constructor Summary
ConstructorsConstructorDescriptionMultiThreadedImpExImportReader(CSVReader reader, boolean legacyMode) MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter) MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, DocumentIDRegistry docIDRegistry, MultiThreadedImportProcessor processor, EnumerationValue mode) MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor) MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor, boolean legacyMode) -
Method Summary
Modifier and TypeMethodDescriptionprotected voidaddNewWorker(PoolableThread poolableThread, int workerIndex) voidaddReadingError(ImpExException e, ImpExReaderWorker impExReaderWorker) protected voidaddResult(ImpExWorkerResult result) protected HeaderDescriptorcreateNewHeader(Map<Integer, String> line) Overwritten to block execution until all workers finished previous lines.protected ImpExWorkercreateWorker(PoolableThread poolableThread, int number) voidDisallowed in parallel mode since line order cannot be guaranteed.voiddumpNextLine(String reason) Disallowed in parallel mode since line order cannot be guaranteed.protected booleanensureValidHeaderOrMarkUnresolved(ValueLine valueLine) protected voidexecute(AbstractCodeLine line, Map csvLine, boolean forEachMode) Called for each code line.protected ValueLinefetchNextValueLine(ImpExWorker worker) protected ImpExWorkerResultprotected intprotected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreadsintprotected voidprotected booleanprotected booleanprotected final booleanprotected booleanprotected booleanprotected booleanmustMarkLineAsUnresolved(ImpExWorkerResult result, ValueLine line) protected final voidpostProcessValueLine(ValueLine currentValueLine, Item ret, Exception error) Now does nothing since at this time the value line is just put into queue and no result is available yet.protected voidpostProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, Exception error) Real post processing of lines after the worker finished importing.protected final ItemprocessLine(ValueLine valueLine) Now enqueues the value line to be processed by one of our workers.protected booleanprotected ItemCalled fromImpExWorkerto trigger item data processing e.g.readLine()Fetches next available result item.protected booleanCalled fromImpExReaderWorkerasynchronously.voidsetCurrentHeader(HeaderDescriptor header) Overwritten to switch on/off parallel mode allowed flag fromImpExConstants.Syntax.Modifier#PARALLEL.voidsetDumpingAllowed(boolean dumpingAllowed) Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.voidsetLogFilter(ImpExLogFilter logFilter) voidsetMaxThreads(int requested) protected inttryAllocateWorkers(int amount) protected PoolableThreadprotected PoolableThreadtryToBorrowThread(ThreadPool threadPool) protected voidOverwritten to wait until line queue is empty.Methods inherited from class de.hybris.platform.impex.jalo.imp.ImpExImportReader
close, createValueLine, dumpNextLine, dumpUnresolvedLine, findMarker, getAfterEachCode, getCSVWriter, getCurrentValueLine, getCustomImportProcessor, getDumpedLineCount, getDumpedLineCountPerHeader, getForEachCode, getImportProcessor, getLastImportedItem, getLastImportedItemLineNumber, getLastImportedLine, getProcessedItemsCount, getProcessedItemsCountPerHeader, getResolvedItemsCount, getScriptExecutionContext, getValidationMode, getValueLineCount, incDumpCount, incItemsCount, isCreateHMCSavedValues, isDumpingAllowed, isLegacyMode, isSecondPass, preProcessLine, processMarkerCodeLine, readAll, setAfterEachCode, setBeanShellContext, setCreateHMCSavedValues, setCSVWriter, setForEachCode, setIsSecondPass, setValueLinesToSkipMethods inherited from class de.hybris.platform.impex.jalo.ImpExReader
addDefinition, addExternalDataMedia, addExternalDataMedias, addHeaderExceptionInfoAsComment, addHeaderReplacementRule, addToBeanShellContext, adjustLineIndexes, applyHeaderReplacements, assureBeanShellLoaded, checkDefinitonKey, createCodeLine, createCodeLineLegacyWay, createCodeLineModernWay, createInvalidHeader, debug, debug, enableCodeExecution, enableExternalCodeExecution, enableExternalDataCodeExecution, enableExternalImpExSyntaxParsing, enableExternalSyntaxParsing, enterIfBlock, error, error, execute, exitIfBlock, findExternalDataMedia, findExternalDataMedia, getAllExternalDataMediaCodes, getAttributeConstraintFilter, getBeforeEachCode, getCellDecorators, getCSVReader, getCurrentHeader, getCurrentLocation, getCurrentReader, getDefaultReplacements, getDocumentIDRegistry, getFromBeanShellContext, getInvalidHeaderPolicy, getLineSize, getLocale, getStrictMode, gotInsertedLines, hasCellDecorators, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalData, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeExternalDataMedia, includeSQLData, includeSQLData, includeSQLData, info, info, initDatabase, insertLine, isCodeExecutionEnabled, isCodeLine, isCombinedSearchEnabled, isCommentLine, isDebugEnabled, isDefinition, isEmptyLine, isEndUserRights, isExternalCodeExecutionEnabled, isExternalSyntaxParsingEnabled, isHeaderLine, isIncludingExternalData, isInfoEnabled, isNotInInactiveIfBlock, isStartUserRights, legacyExecute, lineToList, modernExecute, parseHeader, parseHeader, parseHeader, parseHeader, processCodeLine, readNextCSVLine, removeExternalDataMedia, removeExternalDataMedias, replaceDefinitions, setAttributeConstraintFilter, setBeforeEachCode, setCellDecorators, setCurrentHeader, setInvalidHeaderPolicy, setLocale, setRelaxedMode, setRelaxedMode, setValidationMode, setValidationMode, sortRulesList, splitDefinitonCell, storeUserRightsLine, warn, warn
-
Constructor Details
-
MultiThreadedImpExImportReader
-
MultiThreadedImpExImportReader
-
MultiThreadedImpExImportReader
-
MultiThreadedImpExImportReader
-
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor) -
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, MultiThreadedImportProcessor processor, boolean legacyMode) -
MultiThreadedImpExImportReader
public MultiThreadedImpExImportReader(CSVReader reader, CSVWriter dumpWriter, DocumentIDRegistry docIDRegistry, MultiThreadedImportProcessor processor, EnumerationValue mode)
-
-
Method Details
-
setDumpingAllowed
public void setDumpingAllowed(boolean dumpingAllowed) Dumping cannot be switched off in multi-threaded mode since line order cannot be guaranteed.- Overrides:
setDumpingAllowedin classImpExImportReader
-
discardNextLine
public void discardNextLine()Disallowed in parallel mode since line order cannot be guaranteed.- Overrides:
discardNextLinein classImpExImportReader
-
dumpNextLine
Disallowed in parallel mode since line order cannot be guaranteed.- Overrides:
dumpNextLinein classImpExImportReader- Parameters:
reason- message stored with value line to dump describing reason for dumping
-
readLine
Fetches next available result item. Please note that due to multiple threads working in background there might be more items already imported at this time. Using this class it's not possible to perform single-step importing.- Overrides:
readLinein classImpExImportReader- Returns:
- the next item which was processed
- Throws:
ImpExException- any import/export error exceptUnresolvedValueException, which is handled internally
-
getInitialThreads
protected de.hybris.platform.impex.jalo.imp.MultiThreadedImpExImportReader.InitialThreads getInitialThreads() -
postProcessValueLine
protected final void postProcessValueLine(ValueLine currentValueLine, Item ret, Exception error) throws ImpExException Now does nothing since at this time the value line is just put into queue and no result is available yet.- Overrides:
postProcessValueLinein classImpExImportReader- Throws:
ImpExException
-
processLine
Now enqueues the value line to be processed by one of our workers. Returns adummyitem instance - don't use it!- Overrides:
processLinein classImpExImportReader- Throws:
ImpExException
-
writeUserRightsLines
Overwritten to wait until line queue is empty.- Overrides:
writeUserRightsLinesin classImpExReader- Throws:
ImpExException- there were no user rights for import.
-
createNewHeader
protected HeaderDescriptor createNewHeader(Map<Integer, String> line) throws HeaderValidationExceptionOverwritten to block execution until all workers finished previous lines. This is necessary since header lines may contain default value expressions pointing to previous lines!- Overrides:
createNewHeaderin classImpExReader- Parameters:
line- line for which a header object is needed- Returns:
- created header object
- Throws:
HeaderValidationException- header is not valid
-
setCurrentHeader
Overwritten to switch on/off parallel mode allowed flag fromImpExConstants.Syntax.Modifier#PARALLEL.- Overrides:
setCurrentHeaderin classImpExImportReader- Parameters:
header- the header which will be used from now as header for read value lines
-
readLineFromWorker
Called fromImpExReaderWorkerasynchronously.- Returns:
- true if file is not at end
- Throws:
ImpExException
-
processPendingResult
-
mustMarkLineAsUnresolved
-
postProcessValueLineInternal
protected void postProcessValueLineInternal(ValueLine currentValueLine, AbstractProcessResult ret, Exception error) throws ImpExException Real post processing of lines after the worker finished importing.- Parameters:
currentValueLine- the value lineret- the import resulterror-- Throws:
ImpExException
-
ensureValidHeaderOrMarkUnresolved
- Overrides:
ensureValidHeaderOrMarkUnresolvedin classImpExImportReader
-
fetchNextValueLine
-
fetchNextWorkerResult
-
addResult
-
hasUnrecoverableError
-
handOffToResultProcessorWorker
-
processValueLineFromWorker
Called fromImpExWorkerto trigger item data processing e.g. create, update or removal.- Parameters:
line-- Returns:
- the imported item
- Throws:
ImpExException
-
isInParallelMode
protected final boolean isInParallelMode() -
execute
protected void execute(AbstractCodeLine line, Map csvLine, boolean forEachMode) throws ImpExException Called for each code line. If not in forEach mode this thread now waits for all workers to finish before performing beanshell code.- Overrides:
executein classImpExReader- Parameters:
line- the code linecsvLine- current line object for context variable 'line'forEachMode- is the reason for execution not the normal script processing, instead it is a execution reasoned by a forEach marker?- Throws:
ImpExException- error while code execution
-
tryAllocateWorkers
protected int tryAllocateWorkers(int amount) -
addNewWorker
-
createWorker
-
tryToBorrowThread
-
tryToBorrowThread
-
isReaderFinished
protected boolean isReaderFinished() -
isResultProcessorFinished
protected boolean isResultProcessorFinished() -
isAllWorkerFinished
protected boolean isAllWorkerFinished() -
getLogFilter
-
setLogFilter
-
getMaxThreads
public int getMaxThreads() -
setMaxThreads
public void setMaxThreads(int requested) -
getAllocatedThreads
protected int getAllocatedThreads()- Returns:
- the allocatedThreads
-
addReadingError
-